Skip to content
Leonard Xu edited this page May 12, 2021 · 12 revisions

Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC). The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is Debezium.

Supported Connectors

Connector Database Database Version Flink Version
MySQL CDC MySQL Database: 5.7, 8.0.x
JDBC Driver: 8.0.16
1.11+
Postgres CDC PostgreSQL Database: 9.6, 10, 11, 12
JDBC Driver: 42.2.12
1.11+
Format Supported Connector Flink Version
Changelog Json Apache Kafka 1.11+

The version mapping between Flink CDC connectors and Flink.

Flink CDC Connector Version Flink Version
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*

Features

  1. Supports reading database snapshot and continues to read binlogs with exactly-once processing even failures happen.
  2. CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.
  3. CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.

Downloads

You can download the releases connector jars from here.

Usage for Table/SQL API

We need several steps to setup a Flink cluster with the provided connector.

  1. Setup a Flink cluster with version 1.11+ and Java 8+ installed.
  2. Download the connector SQL jars from the Download page (or build yourself.
  3. Put the downloaded jars under FLINK_HOME/lib/.
  4. Restart the Flink cluster.

The example shows how to create a MySQL CDC source in Flink SQL Client and execute queries on it.

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

Usage for DataStream API

Include following Maven dependency (available through Maven Central):

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.1.0</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}
Clone this wiki locally