Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Messages Paging #3955

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
SeekTypeDTO seekType,
List<String> seekTo,
Integer limit,
Integer page,
String q,
MessageFilterTypeDTO filterQueryType,
SeekDirectionDTO seekDirection,
Expand All @@ -85,6 +86,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
.topicActions(MESSAGES_READ)
.build());

page = page != null ? page : 0;
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
Expand All @@ -98,7 +100,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
limit, page, seekDirection, keySerde, valueSerde)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sin
messagesProcessing.sendFinishEvent(sink);
sink.complete();
}

protected Integer getPageOffset() {
return messagesProcessing.getPageOffset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
return; //fast return in case of sink cancellation
}
long beginOffset = seekOperations.getBeginOffsets().get(tp);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition - this.getPageOffset());
readToOffset = readToOffset - this.getPageOffset();


partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.forEach(r -> sendMessage(sink, r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();

seekOperations.getOffsetsForSeek().forEach((p, o) -> {
consumer.seek(p, o + getPageOffset());
});

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@ public class MessagesProcessing {
private final Predicate<TopicMessageDTO> filter;
private final @Nullable Integer limit;

private final @Nullable Integer page;

public MessagesProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
@Nullable Integer limit) {
@Nullable Integer limit,
@Nullable Integer page) {
this.deserializer = deserializer;
this.filter = filter;
this.limit = limit;
this.page = page;
}

boolean limitReached() {
return limit != null && sentMessages >= limit;
}

Integer getPageOffset() {
return this.limit * this.page;
}

void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
if (!sink.isCancelled() && !limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topi
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
@Nullable Integer pageSize,
Integer page,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
filterQueryType, fixPageSize(pageSize), page,
seekDirection, keySerde, valueSerde));
}

private int fixPageSize(@Nullable Integer pageSize) {
Expand All @@ -182,6 +184,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
int page,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
Expand All @@ -191,7 +194,8 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
var processing = new MessagesProcessing(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
getMsgFilter(query, filterQueryType),
seekDirection == SeekDirectionDTO.TAILING ? null : limit
seekDirection == SeekDirectionDTO.TAILING ? null : limit,
page
);

if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private Flux<TopicMessageEventDTO> createTailingFlux(
query,
MessageFilterTypeDTO.STRING_CONTAINS,
0,
0,
SeekDirectionDTO.TAILING,
"String",
"String");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void sendMessageReturnsExceptionWhenTopicNotFound() {
@Test
void loadMessagesReturnsExceptionWhenTopicNotFound() {
StepVerifier.create(messagesService
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, 0, null, "String", "String"))
.expectError(TopicNotFoundException.class)
.verify();
}
Expand All @@ -75,6 +75,7 @@ void maskingAppliedOnConfiguredClusters() throws Exception {
null,
null,
100,
0,
SeekDirectionDTO.FORWARD,
StringSerde.name(),
StringSerde.name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ public void doAssert(Consumer<TopicMessageDTO> msgAssert) {
null,
null,
1,
0,
SeekDirectionDTO.FORWARD,
msgToSend.getKeySerde().get(),
msgToSend.getValueSerde().get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ paths:
in: query
schema:
type: integer
- name: page
in: query
schema:
type: integer
- name: q
in: query
schema:
Expand Down
Loading