Skip to content

Commit

Permalink
[FLINK-36115][pipeline-connector][mysql] Introduce scan.newly-added-t…
Browse files Browse the repository at this point in the history
…able.enabled option for MySQL Source

This closes  #3560.
  • Loading branch information
lvyanquan committed Aug 22, 2024
1 parent 6205a5a commit 565032e
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 19 deletions.
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ pipeline:
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 <br>
scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于: <br>
scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; <br>
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ pipeline:
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. <br>
The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: <br>
scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; <br>
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +63,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
Expand Down Expand Up @@ -128,6 +131,8 @@ public DataSource createDataSource(Context context) {
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -166,26 +171,32 @@ public DataSource createDataSource(Context context) {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables = getTableList(configFactory.createConfig(0), selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (scanBinlogNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
configFactory.tableList(newTables);
} else {
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables =
getTableList(configFactory.createConfig(0), selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
}
}
configFactory.tableList(capturedTables.toArray(new String[0]));
}
configFactory.tableList(capturedTables.toArray(new String[0]));

String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Expand Down Expand Up @@ -256,6 +267,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

Expand Down Expand Up @@ -410,6 +422,33 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
distributionFactorLower));
}

/**
* Currently, The supported regular syntax is not exactly the same in {@link Selectors} and
* {@link Tables.TableFilter}.
*
* <p>The main distinction are :
*
* <p>1) {@link Selectors} use `,` to split table names and {@link Tables.TableFilter} use use
* `|` to split table names.
*
* <p>2) If there is a need to use a dot (.) in a regular expression to match any character, it
* is necessary to escape the dot with a backslash, refer to {@link
* MySqlDataSourceOptions#TABLES}.
*/
private String validateTableAndReturnDebeziumStyle(String tables) {
// MySQL table names are not allowed to have `,` character.
if (tables.contains(",")) {
throw new IllegalArgumentException(
"the `,` in "
+ tables
+ " is not supported when "
+ SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
+ " was enabled.");
}

return tables.replace("\\.", ".");
}

/** Replaces the default timezone placeholder with session timezone, if applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,15 @@ public class MySqlDataSourceOptions {
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");

@Experimental
public static final ConfigOption<Boolean> SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.binlog.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n"
+ "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n"
+ "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n"
+ "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
Expand Down Expand Up @@ -83,7 +84,10 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
Expand Down Expand Up @@ -149,6 +153,40 @@ private MySqlConnection getConnection() {
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}

@Test
public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));

FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.enableCheckpointing(200);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo());

TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(env.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(env, source, resultBuffer, serializer, accumulatorName);
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(getConnection(), Collections.singletonList("address_beijing"));
List<Event> actual = fetchResults(iterator, 4);
assertThat(((ChangeEvent) actual.get(0)).tableId())
.isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing"));
}

@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
Expand Down Expand Up @@ -228,7 +266,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
List<String> listenTablesFirstRound = testParam.getFirstRoundListenTables();

FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(listenTablesFirstRound, parallelism);
getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>());
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
Expand Down Expand Up @@ -272,7 +310,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
List<String> listenTablesSecondRound = testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
getFlinkSourceProvider(listenTablesSecondRound, parallelism);
getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>());
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
Expand Down Expand Up @@ -432,7 +470,8 @@ private void initialAddressTables(JdbcConnection connection, List<String> addres
}
}

private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int parallelism) {
private FlinkSourceProvider getFlinkSourceProvider(
List<String> tables, int parallelism, Map<String, String> additionalOptions) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." + table)
Expand All @@ -446,6 +485,7 @@ private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int para
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.putAll(additionalOptions);
Factory.Context context =
new FactoryHelper.DefaultContext(
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),
Expand Down

0 comments on commit 565032e

Please sign in to comment.