Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6264 Upgrade Pravega to 0.13.0 #14

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ It is thus recommended to execute integration tests per-module and set-up necess
Note: running these tests against external infrastructure may incur cost with your cloud provider.
We're not going to pay your AWS/GCP/Azure bill.

## Table of Contents

* [Amazon Kinesis](#amazon-kinesis)
* [Google Cloud Pub/Sub](#google-cloud-pubsub)
* [Azure Event Hubs](#azure-event-hubs)
* [Pravega](#pravega)

## Amazon Kinesis

* Execute `aws configure` as described in AWS CLI [getting started](https://github.com/aws/aws-cli#getting-started) guide and setup the account.
Expand Down Expand Up @@ -93,3 +100,28 @@ Delete the Event Hubs namespace and log out, e.g. on the CLI:
az group delete -n eventhubstest
az logout
```

## Pravega

[Pravega](https://pravega.io/) is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega. The transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed.

The Pravega sink expects destination scope and streams to already be created.

The Pravega sink uses Pravega 0.13.0 Client API and supports Pravega versions 0.11.0 and above.

### `conf/application.properties` Configuration

|Property|Default|Description|
|--------|-------|-----------|
|`debezium.sink.type`||Must be set to `pravega`.|
|`debezium.sink.pravega.controller.uri`|`tcp://localhost:9090`|The connection string to a Controller in the Pravega cluster.|
|`debezium.sink.pravega.scope`||The name of the scope in which to find the destination streams.|
|`debezium.sink.pravega.transaction`|`false`|Set to `true` to have the sink use Pravega transactions for each Debezium batch.|

### CDI Injection Points

Pravega sink behavior can be modified by custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used.

|Interface|Description|
|---------|-----------|
|`io.debezium.server.StreamNameMapper`|A custom implementation maps the planned destination stream name into a physical Pravega stream name. By default the same name is used.|
2 changes: 1 addition & 1 deletion debezium-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<version.pubsub>25.0.0</version.pubsub>
<version.pulsar>2.10.1</version.pulsar>
<version.eventhubs>5.12.1</version.eventhubs>
<version.pravega>0.9.1</version.pravega>
<version.pravega>0.13.0</version.pravega>
<version.nats>2.16.3</version.nats>
<version.stan>2.2.3</version.stan>
<version.commons.logging>1.2</version.commons.logging>
Expand Down
133 changes: 80 additions & 53 deletions debezium-server-pravega/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
<properties>
<!-- IMPORTANT: This must be aligned with the netty shipped with Quarkus, specified in debezium-build-parent -->
<netty.version>4.1.86.Final</netty.version>
<protobuf.version>3.21.7</protobuf.version>
<grpc.version>1.47.0</grpc.version>
</properties>
<dependencyManagement>
<dependencies>
Expand All @@ -21,6 +23,20 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
<version>${protobuf.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand All @@ -34,51 +50,6 @@
<groupId>io.pravega</groupId>
<artifactId>pravega-client</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-socks</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- Testing -->
<dependency>
Expand All @@ -92,12 +63,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.9.1</version>
<scope>test</scope>
</dependency>

</dependencies>
<build>
Expand Down Expand Up @@ -131,7 +96,67 @@
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test-pravega</id>
<id>integration-test-pravega-0.11</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>false</debezium.sink.pravega.transaction>
<pravega.docker.version>0.11.0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
</execution>
<execution>
<id>integration-test-pravega-0.11-txn</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>true</debezium.sink.pravega.transaction>
<pravega.docker.version>0.11.0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
</execution>
<execution>
<id>integration-test-pravega-0.12</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>false</debezium.sink.pravega.transaction>
<pravega.docker.version>0.12.0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
</execution>
<execution>
<id>integration-test-pravega-0.12-txn</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>true</debezium.sink.pravega.transaction>
<pravega.docker.version>0.12.0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
</execution>
<execution>
<id>integration-test-pravega-0.13</id>
<goals>
<goal>integration-test</goal>
</goals>
Expand All @@ -140,12 +165,13 @@
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>false</debezium.sink.pravega.transaction>
<pravega.docker.version>0.13.0-rc0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
</execution>
<execution>
<id>integration-test-pravega-txn</id>
<id>integration-test-pravega-0.13-txn</id>
<goals>
<goal>integration-test</goal>
</goals>
Expand All @@ -154,6 +180,7 @@
<debezium.sink.type>pravega</debezium.sink.type>
<debezium.sink.pravega.scope>testc.inventory.customers</debezium.sink.pravega.scope>
<debezium.sink.pravega.transaction>true</debezium.sink.pravega.transaction>
<pravega.docker.version>0.13.0-rc0</pravega.docker.version>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public class PravegaTestResource implements QuarkusTestResourceLifecycleManager {

private static final String PRAVEGA_VERSION = "0.9.0";
private static final String PRAVEGA_VERSION = ConfigProvider.getConfig().getValue("pravega.docker.version", String.class);
public static final int CONTROLLER_PORT = 9090;
public static final int SEGMENT_STORE_PORT = 12345;
public static final String PRAVEGA_IMAGE = "pravega/pravega:" + PRAVEGA_VERSION;
Expand All @@ -38,7 +38,7 @@ public class PravegaTestResource implements QuarkusTestResourceLifecycleManager
.withFixedExposedPort(CONTROLLER_PORT, CONTROLLER_PORT)
.withFixedExposedPort(SEGMENT_STORE_PORT, SEGMENT_STORE_PORT)
.withStartupTimeout(Duration.ofSeconds(90))
.waitingFor(Wait.forLogMessage(".*Starting gRPC server listening on port: 9090.*", 1))
.waitingFor(Wait.forLogMessage(".*Pravega Sandbox is running locally now.*", 1))
.withCommand("standalone");

@Override
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
</properties>

<repositories>
<repository>
<id>v0.13.0-rc0</id>
<url>https://oss.sonatype.org/content/repositories/iopravega-1206</url>
</repository>
<repository>
<id>oss</id>
<name>OSS Sonatype Nexus</name>
Expand Down