Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize disk buffer writes #77

Merged
merged 3 commits into from
Sep 9, 2024

Conversation

zmstone
Copy link
Contributor

@zmstone zmstone commented Sep 7, 2024

Previously, the wolff_producer implementation was aggressive when trying to enqueue newly received calls,
even when max_linger_ms is set to non-zero because the linger was implemented at the popping end of the queue.

This change moves the linger timer to the pushing end of the queue, that is, the process will delay enqueue to allow a larger collection of concurrent calls so the write batch towards disk can be larger

This also means when max_linger_ms is not zero, the concurrent callers will get blocked while lingering,
For completely non-blocking API, call cast/3 or cast/4 instead.

Previously, the wolff_producer implementation is aggressive
when try to enqueue newly received calls, even when max_linger_ms
is set to non-zero because the linger was implemented from the popping
end of the queue.
This change moves the linger timer to the pushing end of the queue,
that is, the process will delay enqueue to allow a larger collection
of concurrent calls so the write batch towards disk can be larger
cast APIs have no backpressure at all.
not even wait for the messages getting queued.
src/wolff.erl Outdated Show resolved Hide resolved
@@ -130,6 +136,9 @@
%% exact max allowed message size configured in kafka.
%% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection
%% is idle (as in no pending acks). Default: 0 (as in send immediately)
%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: what is the difference between max_linger_bytes and max_batch_bytes? Just looking at the descriptions, they seem the same to me. 🙈

From the code, it seems that this is the number of bytes that are collected in the producer mailbox before it's enqueued (to disk or memory), before even sending to kafka?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, max_batch_bytes is for the popping end of the queue.
max_linger_bytes is at the pushing end.

receive
?SEND_REQ(_, Batch, _) = Call ->
collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit)
Bytes = batch_bytes(Batch),
collect_send_calls(Call, Bytes, Calls)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a count or byte limit before recursing, to avoid end up getting blocked for a long time collecting calls that keep arriving if multiple clients are appearing and sending requests?

collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) ->
Sum = Bytes0 + Bytes,
R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]},
case Sum < Max of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
case Sum < Max of
case Sum =< Max of

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less than max, then continue collecting.

is_linger_continue(#{calls := Calls, config := Config}) ->
#{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config,
#{ts := Ts, bytes := Bytes} = Calls,
case Bytes < MaxLingerBytes of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case Bytes < MaxLingerBytes of
case Bytes =< MaxLingerBytes of

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less than max, then continue lingering.

@zmstone zmstone merged commit 40deb8a into kafka4beam:main Sep 9, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants