From 869907b462f7f3bc97b0644d3cfb192263ce4757 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 24 Jul 2024 13:48:02 +0100 Subject: [PATCH] Manage webhook state from background job (#14) Instead of requiring the webhook handler to manage the state of the webhook, manage it from the background job. Atomically lock the webhook for processing, and skip the job if the webhook has already been taken by a different worker. This removes the need for handlers to manage the processing state, and also enables locking - removing the need for us to have locks on the level of the job. --- CHANGELOG.md | 1 + example/app/webhooks/webhook_test_handler.rb | 8 +- lib/munster/jobs/processing_job.rb | 19 +- lib/munster/models/received_webhook.rb | 2 +- test/integration/.keep | 0 test/integration/webhooks_controller_test.rb | 99 ---------- test/munster_test.rb | 177 +++++++++++++++++- .../webhook_test_handler.rb | 17 +- 8 files changed, 201 insertions(+), 122 deletions(-) delete mode 100644 test/integration/.keep delete mode 100644 test/integration/webhooks_controller_test.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index f7fb3a1..b37140a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## Unreleased +- Manage the `state` of the `ReceivedWebhook` from the background job itself. This frees up the handler to actually do the work associated with processing only. The job will manage the rest. - Use `valid?` in the background job instead of the controller. Most common configuration issue is an incorrectly specified signing secret, or an incorrectly implemented input validation. When these happen, it is better to allow the webhook to be reprocessed - Use instance methods in handlers instead of class methods, as they are shorter to define. Assume a handler module supports `.new` - with a module using singleton methods it may return `self` from `new`. - In the config, allow the handlers specified as strings. Module resolution in Rails happens after the config gets loaded, because the config may alter the Zeitwerk load paths. To allow the config to get loaded and to allow handlers to be autoloaded using Zeitwerk, the handler modules have to be resolved lazily. This also permits the handlers to be reloadable, like any module under Rails' autoloading control. diff --git a/example/app/webhooks/webhook_test_handler.rb b/example/app/webhooks/webhook_test_handler.rb index d69bcc4..8b13534 100644 --- a/example/app/webhooks/webhook_test_handler.rb +++ b/example/app/webhooks/webhook_test_handler.rb @@ -2,17 +2,11 @@ # This handler accepts webhooks from our integration tests. This webhook gets dispatched # if a banking provider test fails, indicating that the bank might be having an incident - class WebhookTestHandler < Munster::BaseHandler def valid?(request) = true def process(webhook) - return unless webhook.received? - webhook.update!(status: "processing") - webhook.update!(status: "processed") - rescue - webhook.update!(status: "error") - raise + Rails.logger.info { webhook.request.params.fetch(:payment_id) } end def expose_errors_to_sender? = true diff --git a/lib/munster/jobs/processing_job.rb b/lib/munster/jobs/processing_job.rb index 8e09e59..70052f2 100644 --- a/lib/munster/jobs/processing_job.rb +++ b/lib/munster/jobs/processing_job.rb @@ -4,15 +4,28 @@ module Munster class ProcessingJob < ActiveJob::Base + class WebhookPayloadInvalid < StandardError + end + def perform(webhook) + Rails.error.set_context(munster_handler_module_name: webhook.handler_module_name, **Munster.configuration.error_context) + + webhook.with_lock do + return unless webhook.received? + webhook.processing! + end + if webhook.handler.valid?(webhook.request) - # TODO: we are going to add some default state lifecycle managed - # by the background job later webhook.handler.process(webhook) + webhook.processed! if webhook.processing? else - Rails.logger.info { "Webhook #{webhook.inspect} did not pass validation and was skipped" } + e = WebhookPayloadInvalid.new("#{webhook.class} #{webhook.id} did not pass validation and was skipped") + Rails.error.report(e, handled: true, severity: :error) webhook.failed_validation! end + rescue => e + webhook.error! + raise e end end end diff --git a/lib/munster/models/received_webhook.rb b/lib/munster/models/received_webhook.rb index d23c7b1..71645dd 100644 --- a/lib/munster/models/received_webhook.rb +++ b/lib/munster/models/received_webhook.rb @@ -11,7 +11,7 @@ class ReceivedWebhook < ActiveRecord::Base state_machine_enum :status do |s| s.permit_transition(:received, :processing) - s.permit_transition(:received, :failed_validation) + s.permit_transition(:processing, :failed_validation) s.permit_transition(:processing, :skipped) s.permit_transition(:processing, :processed) s.permit_transition(:processing, :error) diff --git a/test/integration/.keep b/test/integration/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/test/integration/webhooks_controller_test.rb b/test/integration/webhooks_controller_test.rb deleted file mode 100644 index 49c92d3..0000000 --- a/test/integration/webhooks_controller_test.rb +++ /dev/null @@ -1,99 +0,0 @@ -require "test_helper" -require_relative "../test_app" - -class WebhooksControllerTest < ActionDispatch::IntegrationTest - def webhook_body - <<~JSON - { - "provider_id": "musterbank-flyio", - "starts_at": "<%= Time.now.utc %>", - "external_source": "The Forge Of Downtime", - "external_ticket_title": "DOWN-123", - "internal_description_markdown": "A test has failed" - } - JSON - end - - Munster.configure do |config| - config.active_handlers = { - test: WebhookTestHandler, - inactive: "InactiveHandler", - invalid: "InvalidHandler", - private: "PrivateHandler", - "failing-with-exposed-errors": "FailingWithExposedErrors", - "failing-with-concealed-errors": "FailingWithConcealedErrors", - extract_id: "ExtractIdHandler" - } - end - self.app = MunsterTestApp - - def self.xtest(msg) - test(msg) { skip } - end - - test "accepts a webhook without an event ID and stores it" do - Munster::ReceivedWebhook.delete_all - - post "/munster/test", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} - assert_response 200 - - webhook = Munster::ReceivedWebhook.last! - - assert_equal "WebhookTestHandler", webhook.handler_module_name - assert_equal webhook.status, "received" - assert_equal webhook.body, webhook_body - end - - test "raises an error if the service_id is not known" do - post "/munster/missing_service", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} - assert_response 404 - end - - test "inactive handlers" do - post "/munster/inactive", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} - - assert_response 503 - assert_equal 'Webhook handler "inactive" is inactive', response.parsed_body["error"] - end - - test "returns a 200 status and error message if the handler does not expose errors" do - post "/munster/failing-with-concealed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} - - assert_response 200 - assert_equal false, response.parsed_body["ok"] - assert response.parsed_body["error"] - end - - test "returns a 500 status and error message if the handler does not expose errors" do - post "/munster/failing-with-exposed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} - - assert_response 500 - # The response generation in this case is done by Rails, through the - # common Rails error page - end - - test "deduplicates received webhooks based on the event ID" do - body = {event_id: SecureRandom.uuid, body: "test"}.to_json - - assert_changes_by -> { Munster::ReceivedWebhook.count }, exactly: 1 do - 3.times do - post "/munster/extract_id", params: body, headers: {"CONTENT_TYPE" => "application/json"} - assert_response 200 - end - end - end - - test "preserves the route params and the request params in the serialised request stored with the webhook" do - body = {user_name: "John", number_of_dependents: 14}.to_json - - Munster::ReceivedWebhook.delete_all - post "/per-user-munster/123/private", params: body, headers: {"CONTENT_TYPE" => "application/json"} - assert_response 200 - - received_webhook = Munster::ReceivedWebhook.first! - assert_equal body, received_webhook.request.body.read - assert_equal "John", received_webhook.request.params["user_name"] - assert_equal 14, received_webhook.request.params["number_of_dependents"] - assert_equal "123", received_webhook.request.params["user_id"] - end -end diff --git a/test/munster_test.rb b/test/munster_test.rb index 0c89ad9..9f3e525 100644 --- a/test/munster_test.rb +++ b/test/munster_test.rb @@ -1,9 +1,184 @@ # frozen_string_literal: true require "test_helper" +require_relative "test_app" -class TestMunster < Minitest::Test +class TestMunster < ActionDispatch::IntegrationTest def test_that_it_has_a_version_number refute_nil ::Munster::VERSION end + + def webhook_body + <<~JSON + { + "provider_id": "musterbank-flyio", + "starts_at": "<%= Time.now.utc %>", + "external_source": "The Forge Of Downtime", + "external_ticket_title": "DOWN-123", + "internal_description_markdown": "A test has failed" + } + JSON + end + + Munster.configure do |config| + config.active_handlers = { + test: WebhookTestHandler, + inactive: "InactiveHandler", + invalid: "InvalidHandler", + private: "PrivateHandler", + "failing-with-exposed-errors": "FailingWithExposedErrors", + "failing-with-concealed-errors": "FailingWithConcealedErrors", + extract_id: "ExtractIdHandler" + } + end + self.app = MunsterTestApp + + def self.xtest(msg) + test(msg) { skip } + end + + test "accepts a webhook, stores and processes it" do + Munster::ReceivedWebhook.delete_all + + tf = Tempfile.new + body = {isValid: true, outputToFilename: tf.path} + body_json = body.to_json + + post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + + webhook = Munster::ReceivedWebhook.last! + + assert_predicate webhook, :received? + assert_equal "WebhookTestHandler", webhook.handler_module_name + assert_equal webhook.status, "received" + assert_equal webhook.body, body_json + + perform_enqueued_jobs + assert_predicate webhook.reload, :processed? + tf.rewind + assert_equal tf.read, body_json + end + + test "accepts a webhook but does not process it if it is invalid" do + Munster::ReceivedWebhook.delete_all + + tf = Tempfile.new + body = {isValid: false, outputToFilename: tf.path} + body_json = body.to_json + + post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + + webhook = Munster::ReceivedWebhook.last! + + assert_predicate webhook, :received? + assert_equal "WebhookTestHandler", webhook.handler_module_name + assert_equal webhook.status, "received" + assert_equal webhook.body, body_json + + perform_enqueued_jobs + assert_predicate webhook.reload, :failed_validation? + + tf.rewind + assert_predicate tf.read, :empty? + end + + test "marks a webhook as errored if it raises during processing" do + Munster::ReceivedWebhook.delete_all + + tf = Tempfile.new + body = {isValid: true, raiseDuringProcessing: true, outputToFilename: tf.path} + body_json = body.to_json + + post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + + webhook = Munster::ReceivedWebhook.last! + + assert_predicate webhook, :received? + assert_equal "WebhookTestHandler", webhook.handler_module_name + assert_equal webhook.status, "received" + assert_equal webhook.body, body_json + + assert_raises(StandardError) { perform_enqueued_jobs } + assert_predicate webhook.reload, :error? + + tf.rewind + assert_predicate tf.read, :empty? + end + + test "does not try to process a webhook if it is not in `received' state" do + Munster::ReceivedWebhook.delete_all + + tf = Tempfile.new + body = {isValid: true, raiseDuringProcessing: true, outputToFilename: tf.path} + body_json = body.to_json + + post "/munster/test", params: body_json, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + + webhook = Munster::ReceivedWebhook.last! + webhook.processing! + + perform_enqueued_jobs + assert_predicate webhook.reload, :processing? + + tf.rewind + assert_predicate tf.read, :empty? + end + + test "raises an error if the service_id is not known" do + post "/munster/missing_service", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 404 + end + + test "returns a 503 when a handler is inactive" do + post "/munster/inactive", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} + + assert_response 503 + assert_equal 'Webhook handler "inactive" is inactive', response.parsed_body["error"] + end + + test "returns a 200 status and error message if the handler does not expose errors" do + post "/munster/failing-with-concealed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} + + assert_response 200 + assert_equal false, response.parsed_body["ok"] + assert response.parsed_body["error"] + end + + test "returns a 500 status and error message if the handler does not expose errors" do + post "/munster/failing-with-exposed-errors", params: webhook_body, headers: {"CONTENT_TYPE" => "application/json"} + + assert_response 500 + # The response generation in this case is done by Rails, through the + # common Rails error page + end + + test "deduplicates received webhooks based on the event ID" do + body = {event_id: SecureRandom.uuid, body: "test"}.to_json + + assert_changes_by -> { Munster::ReceivedWebhook.count }, exactly: 1 do + 3.times do + post "/munster/extract_id", params: body, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + end + end + end + + test "preserves the route params and the request params in the serialised request stored with the webhook" do + body = {user_name: "John", number_of_dependents: 14}.to_json + + Munster::ReceivedWebhook.delete_all + post "/per-user-munster/123/private", params: body, headers: {"CONTENT_TYPE" => "application/json"} + assert_response 200 + + received_webhook = Munster::ReceivedWebhook.first! + assert_predicate received_webhook, :received? + assert_equal body, received_webhook.request.body.read + assert_equal "John", received_webhook.request.params["user_name"] + assert_equal 14, received_webhook.request.params["number_of_dependents"] + assert_equal "123", received_webhook.request.params["user_id"] + end end diff --git a/test/test-webhook-handlers/webhook_test_handler.rb b/test/test-webhook-handlers/webhook_test_handler.rb index 1a56ad6..fd14238 100644 --- a/test/test-webhook-handlers/webhook_test_handler.rb +++ b/test/test-webhook-handlers/webhook_test_handler.rb @@ -1,17 +1,12 @@ -# frozen_string_literal: true - -# This handler accepts webhooks from our integration tests. This webhook gets dispatched -# if a banking provider test fails, indicating that the bank might be having an incident - class WebhookTestHandler < Munster::BaseHandler - def valid?(request) = true + def valid?(request) + request.params.fetch(:isValid, false) + end def process(webhook) - return unless webhook.received? - webhook.update!(status: "processing") - webhook.update!(status: "processed") - rescue - webhook.update!(status: "error") + raise "Oops, failed" if webhook.request.params[:raiseDuringProcessing] + filename = webhook.request.params.fetch(:outputToFilename) + File.binwrite(filename, webhook.body) end def expose_errors_to_sender? = true