Skip to content

Commit

Permalink
fix: aggregate_send_calls in wolff_producer
Browse files Browse the repository at this point in the history
  • Loading branch information
terry-xiaoyu committed Jul 8, 2023
1 parent e47ef30 commit 0f75393
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,16 @@ handle_info(_Info, St) ->
handle_cast(_Cast, St) ->
{noreply, St}.

handle_batch(BatchCalls, #{client_id := ClientId,
topic := Topic,
partition := Partition,
config := #{max_batch_bytes := Limit}
} = St0) ->
handle_batch(BatchCalls, #{ client_id := ClientId,
topic := Topic,
partition := Partition,
config := #{max_batch_bytes := Limit}
} = St0) ->
{ok, lists:foldl(fun({Calls, Cnt, Oct}, St1) ->
ok = wolff_stats:recv(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}),
St2 = enqueue_calls(Calls, St1),
maybe_send_to_kafka(St2)
end, St0, aggregate_send_calls(BatchCalls, Limit))}.
ok = wolff_stats:recv(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}),
St2 = enqueue_calls(Calls, St1),
maybe_send_to_kafka(St2)
end, St0, aggregate_send_calls(BatchCalls, Limit))}.

code_change(_OldVsn, St, _Extra) ->
{ok, St}.
Expand All @@ -317,8 +317,9 @@ terminate(_, _) ->
aggregate_send_calls(BatchCalls, Limit) ->
aggregate_send_calls(BatchCalls, Limit, {[], 0, 0}, []).

aggregate_send_calls([], _, {_, _, _}, Res) ->
lists:reverse(Res);
aggregate_send_calls([], _, {CallsAcc, Cnt, Size}, Res) ->
Res1 = [{lists:reverse(CallsAcc), Cnt, Size} | Res],
lists:reverse(Res1);
aggregate_send_calls([?SEND_REQ(_, Batch, _) = Call | Calls], Limit, {CallsAcc, Cnt, Size}, Res) ->
case Size + batch_bytes(Batch) of
Size1 when Size1 > Limit, Size == 0 ->
Expand Down

0 comments on commit 0f75393

Please sign in to comment.