Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved forking implementation #364

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "racecar/consumer_set"
require "racecar/runner"
require "racecar/parallel_runner"
require "racecar/forking_runner"
require "racecar/producer"
require "racecar/config"
require "racecar/version"
Expand Down Expand Up @@ -74,6 +75,8 @@ def self.runner(processor)

if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger)
elsif config.forks && config.forks > 0
ForkingRunner.new(runner: runner, config: config, logger: logger)
else
runner
end
Expand Down
13 changes: 13 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,24 @@ class Config < KingKonf::Config
desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x"
string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic"

desc "How many worker processes to fork"
integer :forks, default: 0

# The error handler must be set directly on the object.
attr_reader :error_handler

attr_accessor :subscriptions, :logger, :parallel_workers

attr_accessor :prefork, :postfork

def prefork
@prefork ||= lambda { |*_| }
end

def postfork
@postfork ||= lambda { |*_| }
end

def statistics_interval_ms
if Rdkafka::Config.statistics_callback
statistics_interval * 1000
Expand Down
132 changes: 132 additions & 0 deletions lib/racecar/forking_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# frozen_string_literal: true

module Racecar
class ForkingRunner
def initialize(runner:, config:, logger:, parent_monitor: ParentProcessMonitor.new)
@runner = runner
@config = config
@logger = logger
@pids = []
@parent_monitor = parent_monitor
@running = false
end

attr_reader :config, :runner, :logger, :pids, :parent_monitor
private :config, :runner, :logger, :pids, :parent_monitor

def run
config.prefork.call
install_signal_handlers

@running = true

@pids = config.forks.times.map do |n|
pid = fork do
parent_monitor.child_post_fork
config.postfork.call

parent_monitor.on_parent_exit do
logger.warn("Supervisor dead, exiting.")
runner.stop
end

runner.run
end
logger.debug("Racecar forked consumer process #{pid}.")

pid
end

parent_monitor.parent_post_fork

wait_for_child_processes
end

def stop
@running = false
logger.debug("Racecar::ForkingRunner runner stopping #{Process.pid}.")
terminate_workers
end

def running?
!!@running
end

private

def terminate_workers
pids.each do |pid|
begin
Process.kill("TERM", pid)
rescue Errno::ESRCH
logger.debug("Racecar::ForkingRunner Process not found #{Process.pid}.")
end
end
end

def check_workers
pids.each do |pid|
unless worker_running?(pid)
logger.debug("A forked worker has exited unepxectedly. Shutting everything down.")
stop
Copy link
Contributor

@HeyNonster HeyNonster Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we killing the ForkingRunner here and all of its children if one child exits? Maybe I'm following that incorrectly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would unexpected for this to happen, so rather than try to recover from a potentially bad state the whole thing shuts down so Kubernetes can bring up a new replica.

I'm not against trying to recover and replace a lost worker process but that is quite a bit more complex.

return
end
end
end

def worker_running?(pid)
_, status = Process.waitpid2(pid, Process::WNOHANG)
status.nil?
rescue Errno::ECHILD
false
end

def wait_for_child_processes
pids.each do |pid|
begin
Process.wait(pid)
rescue Errno::ECHILD
end
end
end

def install_signal_handlers
Signal.trap("CHLD") do |sid|
logger.warn("Received SIGCHLD")
check_workers if running?
end
Signal.trap("TERM") do |sid|
stop
end
Signal.trap("INT") do |sid|
stop
end
end

class ParentProcessMonitor
def initialize(pipe_ends = IO.pipe)
@read_end, @write_end = pipe_ends
@monitor_thread = nil
end

attr_reader :read_end, :write_end, :monitor_thread
private :read_end, :write_end, :monitor_thread

def on_parent_exit(&block)
child_post_fork
monitor_thread = Thread.new do
IO.select([read_end])
block.call
end
end

def parent_post_fork
read_end.close
end

def child_post_fork
write_end.close
end
end
end
end
Binary file added racecar-2.10.beta.3.3ad7680.gem
Binary file not shown.
11 changes: 6 additions & 5 deletions spec/integration/cooperative_sticky_assignment_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
start_consumer

wait_for_assignments(2)
reset_consumer_events
publish_messages
wait_for_a_few_messages

Expand All @@ -46,8 +47,8 @@
wait_for_all_messages

aggregate_failures do
expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
expect_consumer0_took_over_processing_from_consumer1
expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
end
end

Expand Down Expand Up @@ -87,6 +88,10 @@ def start_consumer
consumer_index_by_id["#{Process.pid}-#{thread.object_id}"] = consumers.index(runner)
end

def reset_consumer_events
@received_consumer_events = []
end

def terminate_consumer1
consumers[1].stop
end
Expand All @@ -105,10 +110,6 @@ def wait_for_all_messages
end

def set_config
Racecar.config.fetch_messages = 1
Racecar.config.max_wait_time = 0.1
Racecar.config.session_timeout = 6 # minimum allowed by default broker config
Racecar.config.heartbeat_interval = 1.5
Racecar.config.partition_assignment_strategy = "cooperative-sticky"
Racecar.config.load_consumer_class(consumer_class)
end
Expand Down
Loading