diff --git a/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java b/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java index 67abc1d2..01e3f977 100644 --- a/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java +++ b/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java @@ -39,7 +39,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.http.jwt.JWTAuthenticatorBuilder; -import io.debezium.server.http.standard_webhooks.StandardWebhooksAuthenticatorBuilder; +import io.debezium.server.http.webhooks.StandardWebhooksAuthenticatorBuilder; import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -136,23 +136,7 @@ void initWithConfig(Config config) throws URISyntaxException { contentType = "application/json"; } - // Need to be able to throw an exception - // so not using ifPresent() syntax - Optional authenticationType = config.getOptionalValue(PROP_AUTHENTICATION_PREFIX + PROP_AUTHENTICATION_TYPE, String.class); - if (authenticationType.isPresent()) { - String t = authenticationType.get(); - if (t.equalsIgnoreCase(JWT_AUTHENTICATION)) { - JWTAuthenticatorBuilder builder = JWTAuthenticatorBuilder.fromConfig(config, PROP_AUTHENTICATION_PREFIX); - authenticator = builder.build(); - } - else if (t.equalsIgnoreCase(STANDARD_WEBHOOKS_AUTHENTICATION)) { - StandardWebhooksAuthenticatorBuilder builder = StandardWebhooksAuthenticatorBuilder.fromConfig(config, PROP_AUTHENTICATION_PREFIX); - authenticator = builder.build(); - } - else { - throw new DebeziumException("Unknown value '" + t + "' encountered for property " + PROP_AUTHENTICATION_PREFIX + PROP_AUTHENTICATION_TYPE); - } - } + authenticator = buildAuthenticator(config); LOGGER.info("Using http content-type type {}", contentType); LOGGER.info("Using sink URL: {}", sinkUrl); @@ -185,6 +169,28 @@ public void handleBatch(List> records, DebeziumEngin committer.markBatchFinished(); } + private Authenticator buildAuthenticator(Config config) { + // Need to be able to throw an exception + // so not using ifPresent() syntax + Optional authenticationType = config.getOptionalValue(PROP_AUTHENTICATION_PREFIX + PROP_AUTHENTICATION_TYPE, String.class); + if (authenticationType.isPresent()) { + String t = authenticationType.get(); + if (t.equalsIgnoreCase(JWT_AUTHENTICATION)) { + JWTAuthenticatorBuilder builder = JWTAuthenticatorBuilder.fromConfig(config, PROP_AUTHENTICATION_PREFIX); + return builder.build(); + } + else if (t.equalsIgnoreCase(STANDARD_WEBHOOKS_AUTHENTICATION)) { + StandardWebhooksAuthenticatorBuilder builder = StandardWebhooksAuthenticatorBuilder.fromConfig(config, PROP_AUTHENTICATION_PREFIX); + return builder.build(); + } + else { + throw new DebeziumException("Unknown value '" + t + "' encountered for property " + PROP_AUTHENTICATION_PREFIX + PROP_AUTHENTICATION_TYPE); + } + } + + return null; + } + private boolean recordSent(ChangeEvent record, UUID messageId) throws InterruptedException { boolean sent = false; HttpResponse r; diff --git a/debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticator.java b/debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticator.java similarity index 61% rename from debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticator.java rename to debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticator.java index f490d65b..6c3a8bf9 100644 --- a/debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticator.java +++ b/debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticator.java @@ -3,12 +3,13 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.http.standard_webhooks; +package io.debezium.server.http.webhooks; import java.net.http.HttpRequest.Builder; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.time.Clock; import java.time.Instant; import java.util.Base64; import java.util.UUID; @@ -27,11 +28,19 @@ public class StandardWebhooksAuthenticator implements Authenticator { static final String UNBRANDED_MSG_TIMESTAMP_KEY = "webhook-timestamp"; private static final String HMAC_SHA256 = "HmacSHA256"; - private final byte[] key; + private final Clock clock; + private final Mac sha512Hmac; public StandardWebhooksAuthenticator(final String secret) { + this(secret, Clock.systemUTC()); + } + + @VisibleForTesting + StandardWebhooksAuthenticator(final String secret, Clock clock) { super(); + this.clock = clock; + String sec = secret; if (sec.startsWith(StandardWebhooksAuthenticator.SECRET_PREFIX)) { sec = sec.substring(StandardWebhooksAuthenticator.SECRET_PREFIX.length()); @@ -44,17 +53,25 @@ public StandardWebhooksAuthenticator(final String secret) { throw new DebeziumException("Webhook secret must be between 24 and 64 bytes"); } - this.key = key; + try { + this.sha512Hmac = Mac.getInstance(HMAC_SHA256); + SecretKeySpec keySpec = new SecretKeySpec(key, HMAC_SHA256); + sha512Hmac.init(keySpec); + } + catch (InvalidKeyException | NoSuchAlgorithmException e) { + throw new DebeziumException("Failed to initialize HMAC-SHA256 signing algorithm", e); + } + } @Override public void setAuthorizationHeader(Builder httpRequestBuilder, final String bodyContent, final UUID messageId) { - final long timestamp = Instant.now().getEpochSecond(); + final long timestamp = Instant.now(this.clock).getEpochSecond(); final String msgId = "msg_" + messageId; final String signature = sign(msgId, timestamp, bodyContent); - httpRequestBuilder.header(StandardWebhooksAuthenticator.UNBRANDED_MSG_ID_KEY, msgId); - httpRequestBuilder.header(StandardWebhooksAuthenticator.UNBRANDED_MSG_SIGNATURE_KEY, signature); - httpRequestBuilder.header(StandardWebhooksAuthenticator.UNBRANDED_MSG_TIMESTAMP_KEY, Long.toString(timestamp)); + httpRequestBuilder.setHeader(StandardWebhooksAuthenticator.UNBRANDED_MSG_ID_KEY, msgId); + httpRequestBuilder.setHeader(StandardWebhooksAuthenticator.UNBRANDED_MSG_SIGNATURE_KEY, signature); + httpRequestBuilder.setHeader(StandardWebhooksAuthenticator.UNBRANDED_MSG_TIMESTAMP_KEY, Long.toString(timestamp)); } @Override @@ -64,17 +81,10 @@ public boolean authenticate() throws InterruptedException { @VisibleForTesting String sign(final String msgId, final long timestamp, final String payload) { - try { - String toSign = String.format("%s.%s.%s", msgId, timestamp, payload); - Mac sha512Hmac = Mac.getInstance(HMAC_SHA256); - SecretKeySpec keySpec = new SecretKeySpec(this.key, HMAC_SHA256); - sha512Hmac.init(keySpec); - byte[] macData = sha512Hmac.doFinal(toSign.getBytes(StandardCharsets.UTF_8)); - String signature = Base64.getEncoder().encodeToString(macData); - return String.format("v1,%s", signature); - } - catch (InvalidKeyException | NoSuchAlgorithmException e) { - throw new DebeziumException(e.getMessage()); - } + // https://github.com/standard-webhooks/standard-webhooks/blob/main/spec/standard-webhooks.md#signature-scheme + final String toSign = String.format("%s.%s.%s", msgId, timestamp, payload); + byte[] macData = sha512Hmac.doFinal(toSign.getBytes(StandardCharsets.UTF_8)); + final String signature = Base64.getEncoder().encodeToString(macData); + return String.format("v1,%s", signature); } } \ No newline at end of file diff --git a/debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilder.java b/debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilder.java similarity index 89% rename from debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilder.java rename to debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilder.java index d309279f..69ae7997 100644 --- a/debezium-server-http/src/main/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilder.java +++ b/debezium-server-http/src/main/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilder.java @@ -3,14 +3,14 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.http.standard_webhooks; - -import java.util.NoSuchElementException; +package io.debezium.server.http.webhooks; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; + public class StandardWebhooksAuthenticatorBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(StandardWebhooksAuthenticatorBuilder.class); @@ -34,7 +34,7 @@ public StandardWebhooksAuthenticator build() { if (secret == null) { String msg = "Cannot build StandardWebhooksAuthenticator. Secret must be set."; LOGGER.error(msg); - throw new NoSuchElementException(msg); + throw new DebeziumException(msg); } return new StandardWebhooksAuthenticator(secret); diff --git a/debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorTest.java b/debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorTest.java deleted file mode 100644 index d856179d..00000000 --- a/debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.http.standard_webhooks; - -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.withSettings; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.http.HttpHeaders; -import java.net.http.HttpRequest; -import java.time.Instant; -import java.util.Optional; -import java.util.UUID; - -import org.junit.Test; -import org.junit.jupiter.api.Assertions; -import org.mockito.MockedStatic; - -public class StandardWebhooksAuthenticatorTest { - - @Test - public void addAuthorizationHeader() throws URISyntaxException { - UUID messageId = UUID.fromString("22bd292a-71ab-46fe-a460-8632d6754ac6"); - Instant instant = Instant.ofEpochSecond(1234); - - try ( - MockedStatic mockedInstant = mockStatic(Instant.class, - withSettings().defaultAnswer(invocation -> invocation.callRealMethod()))) { - mockedInstant.when(Instant::now).thenReturn(instant); - - StandardWebhooksAuthenticator authenticator = new StandardWebhooksAuthenticator( - "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw"); - - URI testURI = new URI("http://example.com"); - String testEventContent = "{\"hello\":\"world\"}"; - HttpRequest.Builder builder = HttpRequest.newBuilder(testURI); - builder.POST(HttpRequest.BodyPublishers.ofString(testEventContent)); - authenticator.setAuthorizationHeader(builder, testEventContent, messageId); - HttpRequest request = builder.build(); - - HttpHeaders headers = request.headers(); - - Optional idHeader = headers.firstValue("webhook-id"); - Assertions.assertTrue(idHeader.isPresent()); - Assertions.assertEquals("msg_22bd292a-71ab-46fe-a460-8632d6754ac6", idHeader.get()); - - Optional timestampHeader = headers.firstValue("webhook-timestamp"); - Assertions.assertTrue(timestampHeader.isPresent()); - Assertions.assertEquals("1234", timestampHeader.get()); - - Optional signatureHeader = headers.firstValue("webhook-signature"); - Assertions.assertTrue(signatureHeader.isPresent()); - String[] sigParts = signatureHeader.get().split(","); - Assertions.assertEquals(2, sigParts.length); - Assertions.assertEquals("v1", sigParts[0]); - - // https://www.standardwebhooks.com/verify - String expected = "v1,qCVBRIv6rKQVhSJBAmUSE9GkdCdPe2j6xzzkm89UcoA="; - - Assertions.assertEquals(expected, signatureHeader.get()); - } - } -} diff --git a/debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilderTest.java b/debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilderTest.java similarity index 79% rename from debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilderTest.java rename to debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilderTest.java index 4b72c37c..fc031bef 100644 --- a/debezium-server-http/src/test/java/io/debezium/server/http/standard_webhooks/StandardWebhooksAuthenticatorBuilderTest.java +++ b/debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorBuilderTest.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.http.standard_webhooks; +package io.debezium.server.http.webhooks; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -15,6 +15,7 @@ import java.util.Optional; import org.eclipse.microprofile.config.Config; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class StandardWebhooksAuthenticatorBuilderTest { @@ -24,12 +25,14 @@ public void verifyBuild() throws URISyntaxException { StandardWebhooksAuthenticatorBuilder builder = new StandardWebhooksAuthenticatorBuilder(); builder.setSecret("whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw"); - StandardWebhooksAuthenticator authenticator = builder.build(); + Assertions.assertDoesNotThrow(() -> { + builder.build(); + }); } @Test public void verifyBuildFromConfig() throws URISyntaxException { - Map configValues = Map.of("debezium.sink.http.webhook.secret", "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw"); + Map configValues = Map.of("debezium.sink.http.authentication.webhook.secret", "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw"); Config result = mock(Config.class); @@ -39,9 +42,10 @@ public void verifyBuildFromConfig() throws URISyntaxException { when(result.getOptionalValue(eq(entry.getKey()), any())).thenReturn(Optional.of(value)); } - StandardWebhooksAuthenticatorBuilder builder = StandardWebhooksAuthenticatorBuilder.fromConfig(result, "debezium.sink.http."); - builder.setSecret("whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw"); + StandardWebhooksAuthenticatorBuilder builder = StandardWebhooksAuthenticatorBuilder.fromConfig(result, "debezium.sink.http.authentication."); - StandardWebhooksAuthenticator authenticator = builder.build(); + Assertions.assertDoesNotThrow(() -> { + builder.build(); + }); } } diff --git a/debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorTest.java b/debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorTest.java new file mode 100644 index 00000000..df1721e6 --- /dev/null +++ b/debezium-server-http/src/test/java/io/debezium/server/http/webhooks/StandardWebhooksAuthenticatorTest.java @@ -0,0 +1,58 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.http.webhooks; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Optional; +import java.util.UUID; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class StandardWebhooksAuthenticatorTest { + + @Test + public void addAuthorizationHeader() throws URISyntaxException { + Clock clock = Clock.fixed(Instant.ofEpochSecond(1234), ZoneOffset.UTC); + UUID messageId = UUID.fromString("22bd292a-71ab-46fe-a460-8632d6754ac6"); + StandardWebhooksAuthenticator authenticator = new StandardWebhooksAuthenticator( + "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw", clock); + + URI testURI = new URI("http://example.com"); + String testEventContent = "{\"hello\":\"world\"}"; + HttpRequest.Builder builder = HttpRequest.newBuilder(testURI); + builder.POST(HttpRequest.BodyPublishers.ofString(testEventContent)); + authenticator.setAuthorizationHeader(builder, testEventContent, messageId); + HttpRequest request = builder.build(); + + HttpHeaders headers = request.headers(); + + Optional idHeader = headers.firstValue("webhook-id"); + Assertions.assertTrue(idHeader.isPresent()); + Assertions.assertEquals("msg_22bd292a-71ab-46fe-a460-8632d6754ac6", idHeader.get()); + + Optional timestampHeader = headers.firstValue("webhook-timestamp"); + Assertions.assertTrue(timestampHeader.isPresent()); + Assertions.assertEquals("1234", timestampHeader.get()); + + Optional signatureHeader = headers.firstValue("webhook-signature"); + Assertions.assertTrue(signatureHeader.isPresent()); + String[] sigParts = signatureHeader.get().split(","); + Assertions.assertEquals(2, sigParts.length); + Assertions.assertEquals("v1", sigParts[0]); + + // https://www.standardwebhooks.com/verify + String expected = "v1,qCVBRIv6rKQVhSJBAmUSE9GkdCdPe2j6xzzkm89UcoA="; + + Assertions.assertEquals(expected, signatureHeader.get()); + } +}