Skip to content

Commit

Permalink
Manage webhook state from background job (#14)
Browse files Browse the repository at this point in the history
Instead of requiring the webhook handler to manage the state of the
webhook, manage it from the background job. Atomically lock the webhook
for processing, and skip the job if the webhook has already been taken
by a different worker. This removes the need for handlers to manage the
processing state, and also enables locking - removing the need for us to
have locks on the level of the job.
  • Loading branch information
julik authored Jul 24, 2024
1 parent d98041f commit 869907b
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 122 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## Unreleased

- Manage the `state` of the `ReceivedWebhook` from the background job itself. This frees up the handler to actually do the work associated with processing only. The job will manage the rest.
- Use `valid?` in the background job instead of the controller. Most common configuration issue is an incorrectly specified signing secret, or an incorrectly implemented input validation. When these happen, it is better to allow the webhook to be reprocessed
- Use instance methods in handlers instead of class methods, as they are shorter to define. Assume a handler module supports `.new` - with a module using singleton methods it may return `self` from `new`.
- In the config, allow the handlers specified as strings. Module resolution in Rails happens after the config gets loaded, because the config may alter the Zeitwerk load paths. To allow the config to get loaded and to allow handlers to be autoloaded using Zeitwerk, the handler modules have to be resolved lazily. This also permits the handlers to be reloadable, like any module under Rails' autoloading control.
Expand Down
8 changes: 1 addition & 7 deletions example/app/webhooks/webhook_test_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@

# This handler accepts webhooks from our integration tests. This webhook gets dispatched
# if a banking provider test fails, indicating that the bank might be having an incident

class WebhookTestHandler < Munster::BaseHandler
def valid?(request) = true

def process(webhook)
return unless webhook.received?
webhook.update!(status: "processing")
webhook.update!(status: "processed")
rescue
webhook.update!(status: "error")
raise
Rails.logger.info { webhook.request.params.fetch(:payment_id) }
end

def expose_errors_to_sender? = true
Expand Down
19 changes: 16 additions & 3 deletions lib/munster/jobs/processing_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,28 @@

module Munster
class ProcessingJob < ActiveJob::Base
class WebhookPayloadInvalid < StandardError
end

def perform(webhook)
Rails.error.set_context(munster_handler_module_name: webhook.handler_module_name, **Munster.configuration.error_context)

webhook.with_lock do
return unless webhook.received?
webhook.processing!
end

if webhook.handler.valid?(webhook.request)
# TODO: we are going to add some default state lifecycle managed
# by the background job later
webhook.handler.process(webhook)
webhook.processed! if webhook.processing?
else
Rails.logger.info { "Webhook #{webhook.inspect} did not pass validation and was skipped" }
e = WebhookPayloadInvalid.new("#{webhook.class} #{webhook.id} did not pass validation and was skipped")
Rails.error.report(e, handled: true, severity: :error)
webhook.failed_validation!
end
rescue => e
webhook.error!
raise e
end
end
end
2 changes: 1 addition & 1 deletion lib/munster/models/received_webhook.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ReceivedWebhook < ActiveRecord::Base

state_machine_enum :status do |s|
s.permit_transition(:received, :processing)
s.permit_transition(:received, :failed_validation)
s.permit_transition(:processing, :failed_validation)
s.permit_transition(:processing, :skipped)
s.permit_transition(:processing, :processed)
s.permit_transition(:processing, :error)
Expand Down
Empty file removed test/integration/.keep
Empty file.
99 changes: 0 additions & 99 deletions test/integration/webhooks_controller_test.rb

This file was deleted.

177 changes: 176 additions & 1 deletion test/munster_test.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,184 @@
# frozen_string_literal: true

require "test_helper"
require_relative "test_app"

class TestMunster < Minitest::Test
class TestMunster < ActionDispatch::IntegrationTest
def test_that_it_has_a_version_number
refute_nil ::Munster::VERSION
end

def webhook_body
<<~JSON
{
"provider_id": "musterbank-flyio",
"starts_at": "<%= Time.now.utc %>",
"external_source": "The Forge Of Downtime",
"external_ticket_title": "DOWN-123",
"internal_description_markdown": "A test has failed"
}
JSON
end

Munster.configure do |config|
config.active_handlers = {
test: WebhookTestHandler,
inactive: "InactiveHandler",
invalid: "InvalidHandler",
private: "PrivateHandler",
"failing-with-exposed-errors": "FailingWithExposedErrors",
"failing-with-concealed-errors": "FailingWithConcealedErrors",
extract_id: "ExtractIdHandler"
}
end
self.app = MunsterTestApp

def self.xtest(msg)
test(msg) { skip }
end

test "accepts a webhook, stores and processes it" do
Munster::ReceivedWebhook.delete_all

tf = Tempfile.new
body = {isValid: true, outputToFilename: tf.path}
body_json = body.to_json

post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200

webhook = Munster::ReceivedWebhook.last!

assert_predicate webhook, :received?
assert_equal "WebhookTestHandler", webhook.handler_module_name
assert_equal webhook.status, "received"
assert_equal webhook.body, body_json

perform_enqueued_jobs
assert_predicate webhook.reload, :processed?
tf.rewind
assert_equal tf.read, body_json
end

test "accepts a webhook but does not process it if it is invalid" do
Munster::ReceivedWebhook.delete_all

tf = Tempfile.new
body = {isValid: false, outputToFilename: tf.path}
body_json = body.to_json

post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200

webhook = Munster::ReceivedWebhook.last!

assert_predicate webhook, :received?
assert_equal "WebhookTestHandler", webhook.handler_module_name
assert_equal webhook.status, "received"
assert_equal webhook.body, body_json

perform_enqueued_jobs
assert_predicate webhook.reload, :failed_validation?

tf.rewind
assert_predicate tf.read, :empty?
end

test "marks a webhook as errored if it raises during processing" do
Munster::ReceivedWebhook.delete_all

tf = Tempfile.new
body = {isValid: true, raiseDuringProcessing: true, outputToFilename: tf.path}
body_json = body.to_json

post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200

webhook = Munster::ReceivedWebhook.last!

assert_predicate webhook, :received?
assert_equal "WebhookTestHandler", webhook.handler_module_name
assert_equal webhook.status, "received"
assert_equal webhook.body, body_json

assert_raises(StandardError) { perform_enqueued_jobs }
assert_predicate webhook.reload, :error?

tf.rewind
assert_predicate tf.read, :empty?
end

test "does not try to process a webhook if it is not in `received' state" do
Munster::ReceivedWebhook.delete_all

tf = Tempfile.new
body = {isValid: true, raiseDuringProcessing: true, outputToFilename: tf.path}
body_json = body.to_json

post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200

webhook = Munster::ReceivedWebhook.last!
webhook.processing!

perform_enqueued_jobs
assert_predicate webhook.reload, :processing?

tf.rewind
assert_predicate tf.read, :empty?
end

test "raises an error if the service_id is not known" do
post "/munster/missing_service", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 404
end

test "returns a 503 when a handler is inactive" do
post "/munster/inactive", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"}

assert_response 503
assert_equal 'Webhook handler "inactive" is inactive', response.parsed_body["error"]
end

test "returns a 200 status and error message if the handler does not expose errors" do
post "/munster/failing-with-concealed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"}

assert_response 200
assert_equal false, response.parsed_body["ok"]
assert response.parsed_body["error"]
end

test "returns a 500 status and error message if the handler does not expose errors" do
post "/munster/failing-with-exposed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"}

assert_response 500
# The response generation in this case is done by Rails, through the
# common Rails error page
end

test "deduplicates received webhooks based on the event ID" do
body = {event_id: SecureRandom.uuid, body: "test"}.to_json

assert_changes_by -> { Munster::ReceivedWebhook.count }, exactly: 1 do
3.times do
post "/munster/extract_id", params: body, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200
end
end
end

test "preserves the route params and the request params in the serialised request stored with the webhook" do
body = {user_name: "John", number_of_dependents: 14}.to_json

Munster::ReceivedWebhook.delete_all
post "/per-user-munster/123/private", params: body, headers: {"CONTENT_TYPE" => "application/json"}
assert_response 200

received_webhook = Munster::ReceivedWebhook.first!
assert_predicate received_webhook, :received?
assert_equal body, received_webhook.request.body.read
assert_equal "John", received_webhook.request.params["user_name"]
assert_equal 14, received_webhook.request.params["number_of_dependents"]
assert_equal "123", received_webhook.request.params["user_id"]
end
end
17 changes: 6 additions & 11 deletions test/test-webhook-handlers/webhook_test_handler.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
# frozen_string_literal: true

# This handler accepts webhooks from our integration tests. This webhook gets dispatched
# if a banking provider test fails, indicating that the bank might be having an incident

class WebhookTestHandler < Munster::BaseHandler
def valid?(request) = true
def valid?(request)
request.params.fetch(:isValid, false)
end

def process(webhook)
return unless webhook.received?
webhook.update!(status: "processing")
webhook.update!(status: "processed")
rescue
webhook.update!(status: "error")
raise "Oops, failed" if webhook.request.params[:raiseDuringProcessing]
filename = webhook.request.params.fetch(:outputToFilename)
File.binwrite(filename, webhook.body)
end

def expose_errors_to_sender? = true
Expand Down

0 comments on commit 869907b

Please sign in to comment.