Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka pubsub message gets dropped despite default retry forever policy #3529

Closed
passuied opened this issue Sep 3, 2024 · 8 comments
Closed
Labels
kind/bug Something isn't working

Comments

@passuied
Copy link
Contributor

passuied commented Sep 3, 2024

Expected Behavior

Kafka pubsub invalid message gets retried forever when default retry policy in place

Actual Behavior

Kafka pubsub invalid message gets dropped upon the pod terminating

Steps to Reproduce the Problem

  1. Use default retry policy (retry forever)
  2. publish valid message to topic my_topic
  3. publish an invalid message and another valid message to the same topic
  4. set up a consumer consuming my_topic which returns 500 when consuming invalid message
  5. Observe that dapr will retry processing the message over and over
  6. Terminate pod
  7. In log the following message will show: "Too many failed attempts at processing Kafka message:..."
  8. Upon restart, the consumer will move to next message. I.e. the message was dropped...

Release Note

RELEASE NOTE: FIX Bug in Kafka Pubsub where poison pill message will get dropped upon pod termination despite retry forever policy

@passuied passuied added the kind/bug Something isn't working label Sep 3, 2024
@passuied
Copy link
Contributor Author

passuied commented Sep 3, 2024

Based on research, this issue is the result of this code:

  • retry.NotifyRecover() will retry per the retry policy unless a Permanent error is encountered or the context is Done (see doRetryNotify())
  • When bubbled up, the ConsumeClaim() code will just assume the error is a permanent error and return null, causing the message to be ack'ed.

The solution is not super clear here as there is a lot of ping pong between the dapr and component-contrib code. An expert eye would be welcome. Specifically, it is not clear what acks the message when a permanent error occurs in general... The only place that seems to ack the message is in the doCallBack() if the handler doesn't return an error...

@yaron2
Copy link
Member

yaron2 commented Sep 3, 2024

Based on research, this issue is the result of this code:

  • retry.NotifyRecover() will retry per the retry policy unless a Permanent error is encountered or the context is Done (see doRetryNotify())
  • When bubbled up, the ConsumeClaim() code will just assume the error is a permanent error and return null, causing the message to be ack'ed.

The solution is not super clear here as there is a lot of ping pong between the dapr and component-contrib code. An expert eye would be welcome. Specifically, it is not clear what acks the message when a permanent error occurs in general... The only place that seems to ack the message is in the doCallBack() if the handler doesn't return an error...

cc @halspang can you take a look?

@passuied
Copy link
Contributor Author

passuied commented Sep 3, 2024

cc @halspang can you take a look?

I can provide a log of the event that got dropped... Dapr retried for 18h on the same message (as expected) then upon pod termination, the message "Too many failed attempts at processing Kafka message:..." showed up and the message got skipped...

Please note this happened in version 1.13.4 as our Production cluster hasn't been updated yet to 1.14.x (we were going to upgrade last week but now waiting for 1.14.2)

@passuied passuied closed this as completed Sep 3, 2024
@passuied passuied reopened this Sep 3, 2024
@yaron2
Copy link
Member

yaron2 commented Sep 5, 2024

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

@passuied
Copy link
Contributor Author

passuied commented Sep 5, 2024

@yaron2 I think I figured it out. the problem occurs because there is a valid message that exists afterwards and the retry.NotifyRecover() exits but the next loop will process the next message instead of exiting...
The solution should be simple and just move the case session.Context().Done() above so it doesn't process the next message and exits directly!

Testing it right now...

@passuied
Copy link
Contributor Author

passuied commented Sep 5, 2024

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

Your local issue is probably because (like me earlier) the first message you're processing is invalid and therefore the offset was NEVER committed for this group. So next restart, it will default to Newest offset... Which is normal behavior...

It took me a while to figure this one out! :D

@yaron2
Copy link
Member

yaron2 commented Sep 5, 2024

I can reproduce this as well locally, but I suspect something is off with my local Docker based Kafka as this reproduces all the way back to Dapr 1.10.x. (didn't test further)

Your local issue is probably because (like me earlier) the first message you're processing is invalid and therefore the offset was NEVER committed for this group. So next restart, it will default to Newest offset... Which is normal behavior...

It took me a while to figure this one out! :D

I literally just now noticed this. Setting initialOffset: oldest resolves it.

@passuied
Copy link
Contributor Author

This is issue is now resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants