Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continued consumption with next event after exception was thrown #346

Open
ePaul opened this issue Jul 17, 2019 · 4 comments
Open

Continued consumption with next event after exception was thrown #346

ePaul opened this issue Jul 17, 2019 · 4 comments

Comments

@ePaul
Copy link

ePaul commented Jul 17, 2019

We are using nakadi-java-client 0.9.17.

We've observed a problematic case where we had two events in the same partition.
Due to batch size of 1, they will be both handed to the StreamObserver's onNext() method in separate batches.
The processing of the first event caused an exception (in our code), which was logged by nakadi-java as StreamBatchRecordSubscriber.detected_retryable_exception, without committing any cursor changes.

But then the StreamObserver's onNext() method was called again, with the second event (in a new 1-event batch). (We have the max_uncommitted_events at a higher setting than 1 – the default is 10, I think.) This one could be processed without problems, and our code committed the cursor. As the new cursor was after the first one, we now got both events committed, and Nakadi won't resend either of them. The first failed event is effectively lost now.

This seems not to happen if there is no later event in the partition – then the first event is retried a bit later.

(I didn't succeed to dig into nakadi-java's code to see what is happening when a retryable exception is caught and more events are available on the same partition.)

Is this behavior expected? What should we have done differently?

@dehora
Copy link
Owner

dehora commented Jul 18, 2019

@ePaul thanks for reporting; let me do some digging, this one might be tricky to debug. In the meantime can you add the stream connection parameters as details?

@ePaul
Copy link
Author

ePaul commented Jul 18, 2019

 nakadiClient
      .resources()
      .streamBuilder()
      .streamConfiguration(
        new StreamConfiguration()
          .subscriptionId(subscriptionId)
          .batchLimit(eventProcessingConfiguration.batchSize())
      )
      .streamObserverFactory(new EventStreamObserverProvider(processingService, eventParser, eventProcessingConfiguration))
      .build()

The batch size here is 50, all other parameters are their default values.

@ePaul
Copy link
Author

ePaul commented Jun 5, 2020

We just got another case of this (internal link).

@ePaul
Copy link
Author

ePaul commented Jun 5, 2020

I guess a workaround would be to always set max_uncommitted_events to 1, but this will reduce the possible throughput quite a lot (no parallelization possible).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants