diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index aa9d7d5315d..fe80f4897f6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -73,6 +73,7 @@ public Mono>> getTopicMessages(String SeekTypeDTO seekType, List seekTo, Integer limit, + Integer page, String q, MessageFilterTypeDTO filterQueryType, SeekDirectionDTO seekDirection, @@ -85,6 +86,7 @@ public Mono>> getTopicMessages(String .topicActions(MESSAGES_READ) .build()); + page = page != null ? page : 0; seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; @@ -98,7 +100,7 @@ public Mono>> getTopicMessages(String ResponseEntity.ok( messagesService.loadMessages( getCluster(clusterName), topicName, positions, q, filterQueryType, - limit, seekDirection, keySerde, valueSerde) + limit, page, seekDirection, keySerde, valueSerde) ) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 9ea0526bac8..072a2daac58 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -59,4 +59,8 @@ protected void sendFinishStatsAndCompleteSink(FluxSink sin messagesProcessing.sendFinishEvent(sink); sink.complete(); } + + protected Integer getPageOffset() { + return messagesProcessing.getPageOffset(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index ccd24e85680..3254391eee7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -57,7 +57,9 @@ public void accept(FluxSink sink) { return; //fast return in case of sink cancellation } long beginOffset = seekOperations.getBeginOffsets().get(tp); - long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition); + long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition - this.getPageOffset()); + readToOffset = readToOffset - this.getPageOffset(); + partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink) .forEach(r -> sendMessage(sink, r)); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 8f85e0a8ba2..4b33565e2a0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -37,6 +37,10 @@ public void accept(FluxSink sink) { var seekOperations = SeekOperations.create(consumer, position); seekOperations.assignAndSeekNonEmptyPartitions(); + seekOperations.getOffsetsForSeek().forEach((p, o) -> { + consumer.seek(p, o + getPageOffset()); + }); + EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); while (!sink.isCancelled() && !sendLimitReached() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java index b6d23bc90d3..cddf275a5fa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java @@ -23,18 +23,26 @@ public class MessagesProcessing { private final Predicate filter; private final @Nullable Integer limit; + private final @Nullable Integer page; + public MessagesProcessing(ConsumerRecordDeserializer deserializer, Predicate filter, - @Nullable Integer limit) { + @Nullable Integer limit, + @Nullable Integer page) { this.deserializer = deserializer; this.filter = filter; this.limit = limit; + this.page = page; } boolean limitReached() { return limit != null && sentMessages >= limit; } + Integer getPageOffset() { + return this.limit * this.page; + } + void sendMsg(FluxSink sink, ConsumerRecord rec) { if (!sink.isCancelled() && !limitReached()) { TopicMessageDTO topicMessage = deserializer.deserialize(rec); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index f6ad42c1107..d50fed00de0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -160,6 +160,7 @@ public Flux loadMessages(KafkaCluster cluster, String topi @Nullable String query, MessageFilterTypeDTO filterQueryType, @Nullable Integer pageSize, + Integer page, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { @@ -167,7 +168,8 @@ public Flux loadMessages(KafkaCluster cluster, String topi .flux() .publishOn(Schedulers.boundedElastic()) .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, - filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde)); + filterQueryType, fixPageSize(pageSize), page, + seekDirection, keySerde, valueSerde)); } private int fixPageSize(@Nullable Integer pageSize) { @@ -182,6 +184,7 @@ private Flux loadMessagesImpl(KafkaCluster cluster, @Nullable String query, MessageFilterTypeDTO filterQueryType, int limit, + int page, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { @@ -191,7 +194,8 @@ private Flux loadMessagesImpl(KafkaCluster cluster, var processing = new MessagesProcessing( deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde), getMsgFilter(query, filterQueryType), - seekDirection == SeekDirectionDTO.TAILING ? null : limit + seekDirection == SeekDirectionDTO.TAILING ? null : limit, + page ); if (seekDirection.equals(SeekDirectionDTO.FORWARD)) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java index 2798bd213fe..fa784181899 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java @@ -115,6 +115,7 @@ private Flux createTailingFlux( query, MessageFilterTypeDTO.STRING_CONTAINS, 0, + 0, SeekDirectionDTO.TAILING, "String", "String"); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java index 75a69adec71..a7067ed009e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java @@ -55,7 +55,7 @@ void sendMessageReturnsExceptionWhenTopicNotFound() { @Test void loadMessagesReturnsExceptionWhenTopicNotFound() { StepVerifier.create(messagesService - .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String")) + .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, 0, null, "String", "String")) .expectError(TopicNotFoundException.class) .verify(); } @@ -75,6 +75,7 @@ void maskingAppliedOnConfiguredClusters() throws Exception { null, null, 100, + 0, SeekDirectionDTO.FORWARD, StringSerde.name(), StringSerde.name() diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 78c111cdd19..6ae836be0b9 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -508,6 +508,7 @@ public void doAssert(Consumer msgAssert) { null, null, 1, + 0, SeekDirectionDTO.FORWARD, msgToSend.getKeySerde().get(), msgToSend.getValueSerde().get() diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 590e324abd8..7421ad4c061 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -658,6 +658,10 @@ paths: in: query schema: type: integer + - name: page + in: query + schema: + type: integer - name: q in: query schema: