From d8adc6f9327d3d8dc46e0709f57d57c92c8e0f9c Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 29 Aug 2024 11:23:17 +0800 Subject: [PATCH 01/10] support mysql cdc pipline record rate limit --- .../cdc/common/pipeline/PipelineOptions.java | 7 +++ .../mysql/factory/MySqlDataSourceFactory.java | 2 +- .../mysql/source/MySqlDataSource.java | 27 +++++++++- .../mysql/rate/SnapshotGuavaRateLimiter.java | 52 +++++++++++++++++++ .../connectors/mysql/source/MySqlSource.java | 20 +++++-- .../source/reader/MySqlRecordEmitter.java | 1 + 6 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 48a4fbb13a..bddec45bca 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -39,6 +39,13 @@ public class PipelineOptions { .defaultValue("Flink CDC Pipeline Job") .withDescription("The name of the pipeline"); + public static final ConfigOption RATE_LIMIT = + ConfigOptions.key("rate.limit") + .doubleType() + .defaultValue(0d) + .withDescription( + "Limits the number of records per second, default 0 is disabled"); + public static final ConfigOption PIPELINE_PARALLELISM = ConfigOptions.key("parallelism") .intType() diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 7f7691961a..b9f6d2fe3f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -216,7 +216,7 @@ public DataSource createDataSource(Context context) { configFactory.chunkKeyColumn(chunkKeyColumnMap); } - return new MySqlDataSource(configFactory); + return new MySqlDataSource(configFactory, context.getPipelineConfiguration()); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index d1dc487c04..fa11643c79 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -17,47 +17,70 @@ package org.apache.flink.cdc.connectors.mysql.source; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.connectors.mysql.rate.SnapshotGuavaRateLimiter; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; -/** A {@link DataSource} for mysql cdc connector. */ +/** + * A {@link DataSource} for mysql cdc connector. + */ @Internal public class MySqlDataSource implements DataSource { private final MySqlSourceConfigFactory configFactory; private final MySqlSourceConfig sourceConfig; + private final Configuration piplineConfig; public MySqlDataSource(MySqlSourceConfigFactory configFactory) { + this(configFactory, new Configuration()); + } + + public MySqlDataSource(MySqlSourceConfigFactory configFactory, Configuration piplineConfig) { this.configFactory = configFactory; this.sourceConfig = configFactory.createConfig(0); + this.piplineConfig = piplineConfig; } @Override public EventSourceProvider getEventSourceProvider() { + double maxRatePeer = + piplineConfig + .getOptional(PipelineOptions.RATE_LIMIT) + .orElse(PipelineOptions.RATE_LIMIT.defaultValue()); + MySqlEventDeserializer deserializer = new MySqlEventDeserializer( DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); + RateLimiterStrategy rateLimiterStrategy = maxRatePeer > 0 ? createRateLimiterStrategy(maxRatePeer) : RateLimiterStrategy.noOp(); MySqlSource source = new MySqlSource<>( configFactory, deserializer, (sourceReaderMetrics, sourceConfig) -> new MySqlPipelineRecordEmitter( - deserializer, sourceReaderMetrics, sourceConfig)); + deserializer, sourceReaderMetrics, sourceConfig), + rateLimiterStrategy); return FlinkSourceProvider.of(source); } + private RateLimiterStrategy createRateLimiterStrategy(double recordsPerSecond) { + return parallelism -> new SnapshotGuavaRateLimiter(recordsPerSecond, parallelism); + } + @Override public MetadataAccessor getMetadataAccessor() { return new MySqlMetadataAccessor(sourceConfig); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java new file mode 100644 index 0000000000..17efad53b7 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java @@ -0,0 +1,52 @@ +package org.apache.flink.cdc.connectors.mysql.rate; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The SnapshotGuavaRateLimiter class implements Flink's RateLimiter interface, which is used to limit the read rate of data sources. + * It uses Guava's RateLimiter to control the number of requests allowed to pass per second, + * ensuring that parallel tasks can adjust the rate limit after the checkpoint is completed. + */ +public class SnapshotGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotGuavaRateLimiter.class); + + private final Executor limiter = + Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-snapshot-rate-limiter")); + private RateLimiter rateLimiter; + + private final double maxPerSecond; + private int parallelism; + + public SnapshotGuavaRateLimiter(double recordsPerSecond, int parallelism) { + this.rateLimiter = RateLimiter.create(recordsPerSecond / parallelism); + this.maxPerSecond = recordsPerSecond; + this.parallelism = parallelism; + } + + /** + * Due to the CDC feature, the degree of parallelism will change to 1 after the checkpoint is completed, + * so you need to reset the speed limiter. + * + */ + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (parallelism > 1) { + LOG.info("检查点完成,重装限速器:"+checkpointId); + rateLimiter = RateLimiter.create(maxPerSecond / parallelism); + parallelism = 1; + } + } + + @Override + public CompletionStage acquire() { + LOG.info("获取限速许可========================================================"); + return CompletableFuture.runAsync(rateLimiter::acquire, limiter); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 47c3af92b6..208edc985a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -24,6 +24,9 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.PublicEvolving; @@ -64,6 +67,8 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel @@ -111,6 +116,8 @@ public class MySqlSource // hook for generating changes. private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty(); + private final RateLimiterStrategy rateLimiterStrategy; + /** * Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}. * @@ -131,16 +138,19 @@ public static MySqlSourceBuilder builder() { new MySqlRecordEmitter<>( deserializationSchema, sourceReaderMetrics, - sourceConfig.isIncludeSchemaChanges())); + sourceConfig.isIncludeSchemaChanges()), + RateLimiterStrategy.noOp()); } MySqlSource( MySqlSourceConfigFactory configFactory, DebeziumDeserializationSchema deserializationSchema, - RecordEmitterSupplier recordEmitterSupplier) { + RecordEmitterSupplier recordEmitterSupplier, + RateLimiterStrategy rateLimiterStrategy) { this.configFactory = configFactory; this.deserializationSchema = deserializationSchema; this.recordEmitterSupplier = recordEmitterSupplier; + this.rateLimiterStrategy = rateLimiterStrategy; } public MySqlSourceConfigFactory getConfigFactory() { @@ -156,6 +166,7 @@ public Boundedness getBoundedness() { return Boundedness.CONTINUOUS_UNBOUNDED; } } + private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class); @Override public SourceReader createReader(SourceReaderContext readerContext) @@ -182,13 +193,16 @@ public SourceReader createReader(SourceReaderContext readerContex readerContext.getIndexOfSubtask(), mySqlSourceReaderContext, snapshotHooks); - return new MySqlSourceReader<>( + MySqlSourceReader sourceReader = new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig), readerContext.getConfiguration(), mySqlSourceReaderContext, sourceConfig); + int parallelism = readerContext.currentParallelism(); + RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism); + return new RateLimitedSourceReader<>(sourceReader,rateLimiter); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index e3c504113c..61b34edd7c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -140,6 +140,7 @@ private static class OutputCollector implements Collector { @Override public void collect(T record) { + LOG.info("收到数据!!!!!!: {}", record); if (currentMessageTimestamp != null && currentMessageTimestamp > 0) { // Only binlog event contains a valid timestamp. We use the output with timestamp to // report the event time and let the source operator to report From 194559ae3bd1567a545162b60cad8ecca727607c Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 12 Sep 2024 18:09:54 +0800 Subject: [PATCH 02/10] fix mysql rate missing token --- .../mysql/rate/SnapshotGuavaRateLimiter.java | 27 ++++++-------- .../connectors/mysql/source/MySqlSource.java | 35 +++++++++++++------ .../source/reader/MySqlRecordEmitter.java | 21 +++++++---- 3 files changed, 49 insertions(+), 34 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java index 17efad53b7..fd86a20097 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java @@ -6,8 +6,6 @@ import java.util.concurrent.Executors; import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The SnapshotGuavaRateLimiter class implements Flink's RateLimiter interface, which is used to limit the read rate of data sources. @@ -15,38 +13,33 @@ * ensuring that parallel tasks can adjust the rate limit after the checkpoint is completed. */ public class SnapshotGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { - private static final Logger LOG = LoggerFactory.getLogger(SnapshotGuavaRateLimiter.class); private final Executor limiter = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-snapshot-rate-limiter")); - private RateLimiter rateLimiter; + private final RateLimiter rateLimiter; - private final double maxPerSecond; - private int parallelism; + private int getTokenCountAtOnce; public SnapshotGuavaRateLimiter(double recordsPerSecond, int parallelism) { - this.rateLimiter = RateLimiter.create(recordsPerSecond / parallelism); - this.maxPerSecond = recordsPerSecond; - this.parallelism = parallelism; + this.rateLimiter = RateLimiter.create(recordsPerSecond); + this.getTokenCountAtOnce = parallelism; } /** * Due to the CDC feature, the degree of parallelism will change to 1 after the checkpoint is completed, * so you need to reset the speed limiter. - * */ @Override public void notifyCheckpointComplete(long checkpointId) { - if (parallelism > 1) { - LOG.info("检查点完成,重装限速器:"+checkpointId); - rateLimiter = RateLimiter.create(maxPerSecond / parallelism); - parallelism = 1; - } + getTokenCountAtOnce = 1; + } + + public void getAcquire() { + rateLimiter.acquire(getTokenCountAtOnce); } @Override public CompletionStage acquire() { - LOG.info("获取限速许可========================================================"); - return CompletableFuture.runAsync(rateLimiter::acquire, limiter); + return CompletableFuture.runAsync(this::getAcquire, limiter); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 208edc985a..9a61f77e42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -17,6 +17,11 @@ package org.apache.flink.cdc.connectors.mysql.source; +import io.debezium.jdbc.JdbcConnection; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.function.Supplier; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -24,6 +29,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.util.ratelimit.NoOpRateLimiter; import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; @@ -60,13 +66,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; - -import io.debezium.jdbc.JdbcConnection; - -import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,6 +165,7 @@ public Boundedness getBoundedness() { return Boundedness.CONTINUOUS_UNBOUNDED; } } + private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class); @Override @@ -193,16 +193,29 @@ public SourceReader createReader(SourceReaderContext readerContex readerContext.getIndexOfSubtask(), mySqlSourceReaderContext, snapshotHooks); + RecordEmitter recordEmitter = recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig); MySqlSourceReader sourceReader = new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, - recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig), + recordEmitter, readerContext.getConfiguration(), mySqlSourceReaderContext, sourceConfig); int parallelism = readerContext.currentParallelism(); + RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism); - return new RateLimitedSourceReader<>(sourceReader,rateLimiter); + + if (rateLimiter == null || rateLimiter instanceof NoOpRateLimiter) { + return sourceReader; + } + + SourceReader rateSoureceReader = new RateLimitedSourceReader<>(sourceReader, rateLimiter); + + if (recordEmitter instanceof MySqlRecordEmitter) { + MySqlRecordEmitter mySqlRecordEmitter = (MySqlRecordEmitter) recordEmitter; + mySqlRecordEmitter.setRateLimiter(rateLimiter); + } + return rateSoureceReader; } @Override @@ -280,7 +293,9 @@ public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) { this.snapshotHooks = snapshotHooks; } - /** Create a {@link RecordEmitter} for {@link MySqlSourceReader}. */ + /** + * Create a {@link RecordEmitter} for {@link MySqlSourceReader}. + */ @Internal @FunctionalInterface interface RecordEmitterSupplier extends Serializable { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 61b34edd7c..9d4a55c6b7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -17,7 +17,12 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; +import io.debezium.document.Array; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; +import java.util.Iterator; import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; @@ -27,16 +32,10 @@ import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; - -import io.debezium.document.Array; -import io.debezium.relational.history.HistoryRecord; -import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; - /** * The {@link RecordEmitter} implementation for {@link MySqlSourceReader}. * @@ -54,6 +53,8 @@ public class MySqlRecordEmitter implements RecordEmitter outputCollector; + private RateLimiter rateLimiter; + public MySqlRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, @@ -71,6 +72,9 @@ public void emitRecord( final Iterator elementIterator = sourceRecords.iterator(); while (elementIterator.hasNext()) { processElement(elementIterator.next(), output, splitState); + if (rateLimiter != null) { + rateLimiter.acquire().toCompletableFuture().join(); + } } } @@ -140,7 +144,6 @@ private static class OutputCollector implements Collector { @Override public void collect(T record) { - LOG.info("收到数据!!!!!!: {}", record); if (currentMessageTimestamp != null && currentMessageTimestamp > 0) { // Only binlog event contains a valid timestamp. We use the output with timestamp to // report the event time and let the source operator to report @@ -159,4 +162,8 @@ public void close() { // do nothing } } + + public void setRateLimiter(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + } } From 839136f263b145370b5e2f859c8cbdb4597eaa9d Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 16:05:58 +0800 Subject: [PATCH 03/10] Modify the rate description --- .../org/apache/flink/cdc/common/pipeline/PipelineOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 79607fdca1..51f3d95375 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -44,7 +44,7 @@ public class PipelineOptions { .doubleType() .defaultValue(0d) .withDescription( - "Limits the number of records per second, default 0 is disabled"); + "Limits the number of records per second, default value 0 means no limit"); public static final ConfigOption PIPELINE_PARALLELISM = ConfigOptions.key("parallelism") From a2578d0c7380cd8a02a012ea9817f153eeaa6add Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 16:09:29 +0800 Subject: [PATCH 04/10] rename confusing rate class --- .../flink/cdc/connectors/mysql/source/MySqlDataSource.java | 4 ++-- ...hotGuavaRateLimiter.java => MysqlCdcGuavaRateLimiter.java} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/{SnapshotGuavaRateLimiter.java => MysqlCdcGuavaRateLimiter.java} (92%) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index fa11643c79..aa6c959337 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -27,7 +27,7 @@ import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; -import org.apache.flink.cdc.connectors.mysql.rate.SnapshotGuavaRateLimiter; +import org.apache.flink.cdc.connectors.mysql.rate.MysqlCdcGuavaRateLimiter; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; @@ -78,7 +78,7 @@ public EventSourceProvider getEventSourceProvider() { } private RateLimiterStrategy createRateLimiterStrategy(double recordsPerSecond) { - return parallelism -> new SnapshotGuavaRateLimiter(recordsPerSecond, parallelism); + return parallelism -> new MysqlCdcGuavaRateLimiter(recordsPerSecond, parallelism); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java similarity index 92% rename from flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java rename to flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java index fd86a20097..c724ace1f3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/SnapshotGuavaRateLimiter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java @@ -12,7 +12,7 @@ * It uses Guava's RateLimiter to control the number of requests allowed to pass per second, * ensuring that parallel tasks can adjust the rate limit after the checkpoint is completed. */ -public class SnapshotGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { +public class MysqlCdcGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { private final Executor limiter = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-snapshot-rate-limiter")); @@ -20,7 +20,7 @@ public class SnapshotGuavaRateLimiter implements org.apache.flink.api.connector. private int getTokenCountAtOnce; - public SnapshotGuavaRateLimiter(double recordsPerSecond, int parallelism) { + public MysqlCdcGuavaRateLimiter(double recordsPerSecond, int parallelism) { this.rateLimiter = RateLimiter.create(recordsPerSecond); this.getTokenCountAtOnce = parallelism; } From c916cb7a636fd040424d15c6e0b456ae424109fa Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 16:21:50 +0800 Subject: [PATCH 05/10] formate code --- .../mysql/source/MySqlDataSource.java | 9 +++-- .../mysql/rate/MysqlCdcGuavaRateLimiter.java | 23 +++++++----- .../connectors/mysql/source/MySqlSource.java | 37 ++++++++++--------- .../source/reader/MySqlRecordEmitter.java | 10 +++-- 4 files changed, 45 insertions(+), 34 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index aa6c959337..c6587abdeb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -33,9 +33,7 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; -/** - * A {@link DataSource} for mysql cdc connector. - */ +/** A {@link DataSource} for mysql cdc connector. */ @Internal public class MySqlDataSource implements DataSource { @@ -64,7 +62,10 @@ public EventSourceProvider getEventSourceProvider() { new MySqlEventDeserializer( DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); - RateLimiterStrategy rateLimiterStrategy = maxRatePeer > 0 ? createRateLimiterStrategy(maxRatePeer) : RateLimiterStrategy.noOp(); + RateLimiterStrategy rateLimiterStrategy = + maxRatePeer > 0 + ? createRateLimiterStrategy(maxRatePeer) + : RateLimiterStrategy.noOp(); MySqlSource source = new MySqlSource<>( configFactory, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java index c724ace1f3..18de0601d7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java @@ -1,21 +1,26 @@ package org.apache.flink.cdc.connectors.mysql.rate; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; /** - * The SnapshotGuavaRateLimiter class implements Flink's RateLimiter interface, which is used to limit the read rate of data sources. - * It uses Guava's RateLimiter to control the number of requests allowed to pass per second, - * ensuring that parallel tasks can adjust the rate limit after the checkpoint is completed. + * The SnapshotGuavaRateLimiter class implements Flink's RateLimiter interface, which is used to + * limit the read rate of data sources. It uses Guava's RateLimiter to control the number of + * requests allowed to pass per second, ensuring that parallel tasks can adjust the rate limit after + * the checkpoint is completed. */ -public class MysqlCdcGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { +public class MysqlCdcGuavaRateLimiter + implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter { private final Executor limiter = - Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-snapshot-rate-limiter")); + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory("flink-snapshot-rate-limiter")); private final RateLimiter rateLimiter; private int getTokenCountAtOnce; @@ -26,8 +31,8 @@ public MysqlCdcGuavaRateLimiter(double recordsPerSecond, int parallelism) { } /** - * Due to the CDC feature, the degree of parallelism will change to 1 after the checkpoint is completed, - * so you need to reset the speed limiter. + * Due to the CDC feature, the degree of parallelism will change to 1 after the checkpoint is + * completed, so you need to reset the speed limiter. */ @Override public void notifyCheckpointComplete(long checkpointId) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 9a61f77e42..d20fe36dc7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -17,11 +17,6 @@ package org.apache.flink.cdc.connectors.mysql.source; -import io.debezium.jdbc.JdbcConnection; -import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.function.Supplier; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -66,9 +61,16 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.jdbc.JdbcConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.function.Supplier; + /** * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel * reading snapshot of table and then continue to capture data change from binlog. @@ -193,14 +195,16 @@ public SourceReader createReader(SourceReaderContext readerContex readerContext.getIndexOfSubtask(), mySqlSourceReaderContext, snapshotHooks); - RecordEmitter recordEmitter = recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig); - MySqlSourceReader sourceReader = new MySqlSourceReader<>( - elementsQueue, - splitReaderSupplier, - recordEmitter, - readerContext.getConfiguration(), - mySqlSourceReaderContext, - sourceConfig); + RecordEmitter recordEmitter = + recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig); + MySqlSourceReader sourceReader = + new MySqlSourceReader<>( + elementsQueue, + splitReaderSupplier, + recordEmitter, + readerContext.getConfiguration(), + mySqlSourceReaderContext, + sourceConfig); int parallelism = readerContext.currentParallelism(); RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism); @@ -209,7 +213,8 @@ public SourceReader createReader(SourceReaderContext readerContex return sourceReader; } - SourceReader rateSoureceReader = new RateLimitedSourceReader<>(sourceReader, rateLimiter); + SourceReader rateSoureceReader = + new RateLimitedSourceReader<>(sourceReader, rateLimiter); if (recordEmitter instanceof MySqlRecordEmitter) { MySqlRecordEmitter mySqlRecordEmitter = (MySqlRecordEmitter) recordEmitter; @@ -293,9 +298,7 @@ public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) { this.snapshotHooks = snapshotHooks; } - /** - * Create a {@link RecordEmitter} for {@link MySqlSourceReader}. - */ + /** Create a {@link RecordEmitter} for {@link MySqlSourceReader}. */ @Internal @FunctionalInterface interface RecordEmitterSupplier extends Serializable { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 9d4a55c6b7..e9c4ffa632 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -17,10 +17,6 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; -import io.debezium.document.Array; -import io.debezium.relational.history.HistoryRecord; -import io.debezium.relational.history.TableChanges; -import java.util.Iterator; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; @@ -32,10 +28,16 @@ import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; + +import io.debezium.document.Array; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; + /** * The {@link RecordEmitter} implementation for {@link MySqlSourceReader}. * From 94bd9fa9786b02bbc1f2d93293fb173f6b07da6c Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 16:45:46 +0800 Subject: [PATCH 06/10] update rate limit docs --- docs/content.zh/docs/core-concept/data-pipeline.md | 3 ++- docs/content/docs/core-concept/data-pipeline.md | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md b/docs/content.zh/docs/core-concept/data-pipeline.md index 7b286bfbf5..5ec76eb0f2 100644 --- a/docs/content.zh/docs/core-concept/data-pipeline.md +++ b/docs/content.zh/docs/core-concept/data-pipeline.md @@ -115,4 +115,5 @@ The following config options of Data Pipeline level are supported: |-----------------|-----------------------------------------------------------------------------------------|-------------------| | name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | | parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | -| local-time-zone | The local time zone defines current session time zone id. | optional | \ No newline at end of file +| local-time-zone | The local time zone defines current session time zone id. | optional | +| rate.limit | 限制数据源读取速率,防止下游压力过大。单位为条/秒。(目前仅支持mysql数据源) | 可选 | \ No newline at end of file diff --git a/docs/content/docs/core-concept/data-pipeline.md b/docs/content/docs/core-concept/data-pipeline.md index 759a47b245..68ced1ca2c 100644 --- a/docs/content/docs/core-concept/data-pipeline.md +++ b/docs/content/docs/core-concept/data-pipeline.md @@ -133,4 +133,5 @@ The following config options of Data Pipeline level are supported: |-----------------|-----------------------------------------------------------------------------------------|-------------------| | name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | | parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | -| local-time-zone | The local time zone defines current session time zone id. | optional | \ No newline at end of file +| local-time-zone | The local time zone defines current session time zone id. | optional | +| rate.limit | Limit the reading rate of data sources. (Only the MySQL data source is supported.) | optional | \ No newline at end of file From fd932fad6a37aa952f94eb70d7cf136cfa327362 Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 17:19:53 +0800 Subject: [PATCH 07/10] update rate limit key name --- docs/content.zh/docs/core-concept/data-pipeline.md | 12 ++++++------ docs/content/docs/core-concept/data-pipeline.md | 12 ++++++------ .../flink/cdc/common/pipeline/PipelineOptions.java | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md b/docs/content.zh/docs/core-concept/data-pipeline.md index 5ec76eb0f2..7fc86d1508 100644 --- a/docs/content.zh/docs/core-concept/data-pipeline.md +++ b/docs/content.zh/docs/core-concept/data-pipeline.md @@ -111,9 +111,9 @@ We could use following yaml file to define a complicated Data Pipeline describin # Pipeline Configurations The following config options of Data Pipeline level are supported: -| parameter | meaning | optional/required | -|-----------------|-----------------------------------------------------------------------------------------|-------------------| -| name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | -| parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | -| local-time-zone | The local time zone defines current session time zone id. | optional | -| rate.limit | 限制数据源读取速率,防止下游压力过大。单位为条/秒。(目前仅支持mysql数据源) | 可选 | \ No newline at end of file +| parameter | meaning | optional/required | +|-------------------|-----------------------------------------------------------------------------------------|-------------------| +| name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | +| parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | +| local-time-zone | The local time zone defines current session time zone id. | optional | +| source-rate-limit | 限制数据源读取速率,防止下游压力过大。单位为条/秒。(目前仅支持mysql数据源) | 可选 | \ No newline at end of file diff --git a/docs/content/docs/core-concept/data-pipeline.md b/docs/content/docs/core-concept/data-pipeline.md index 68ced1ca2c..99e55ee463 100644 --- a/docs/content/docs/core-concept/data-pipeline.md +++ b/docs/content/docs/core-concept/data-pipeline.md @@ -129,9 +129,9 @@ We could use following yaml file to define a complicated Data Pipeline describin # Pipeline Configurations The following config options of Data Pipeline level are supported: -| parameter | meaning | optional/required | -|-----------------|-----------------------------------------------------------------------------------------|-------------------| -| name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | -| parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | -| local-time-zone | The local time zone defines current session time zone id. | optional | -| rate.limit | Limit the reading rate of data sources. (Only the MySQL data source is supported.) | optional | \ No newline at end of file +| parameter | meaning | optional/required | +|-------------------|-----------------------------------------------------------------------------------------|-------------------| +| name | The name of the pipeline, which will be submitted to the Flink cluster as the job name. | optional | +| parallelism | The global parallelism of the pipeline. Defaults to 1. | optional | +| local-time-zone | The local time zone defines current session time zone id. | optional | +| source-rate-limit | Limit the reading rate of data sources. (Only the MySQL data source is supported.) | optional | \ No newline at end of file diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 51f3d95375..39bf4bbe3b 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -40,7 +40,7 @@ public class PipelineOptions { .withDescription("The name of the pipeline"); public static final ConfigOption RATE_LIMIT = - ConfigOptions.key("rate.limit") + ConfigOptions.key("source-rate-limit") .doubleType() .defaultValue(0d) .withDescription( From 8319e7f99f35a0bd9da99cd8a1dee5aa0aac24c2 Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 17:21:26 +0800 Subject: [PATCH 08/10] update rate limit key name --- .../org/apache/flink/cdc/common/pipeline/PipelineOptions.java | 2 +- .../flink/cdc/connectors/mysql/source/MySqlDataSource.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 39bf4bbe3b..c203a7c054 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -39,7 +39,7 @@ public class PipelineOptions { .defaultValue("Flink CDC Pipeline Job") .withDescription("The name of the pipeline"); - public static final ConfigOption RATE_LIMIT = + public static final ConfigOption SOURCE_RATE_LIMIT = ConfigOptions.key("source-rate-limit") .doubleType() .defaultValue(0d) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index c6587abdeb..de3a60f13c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -55,8 +55,8 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory, Configuration pip public EventSourceProvider getEventSourceProvider() { double maxRatePeer = piplineConfig - .getOptional(PipelineOptions.RATE_LIMIT) - .orElse(PipelineOptions.RATE_LIMIT.defaultValue()); + .getOptional(PipelineOptions.SOURCE_RATE_LIMIT) + .orElse(PipelineOptions.SOURCE_RATE_LIMIT.defaultValue()); MySqlEventDeserializer deserializer = new MySqlEventDeserializer( From 3b183bda328299a5d9e9e0f53d16008b54eb7839 Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 17:28:14 +0800 Subject: [PATCH 09/10] Add MysqlCdcGuavaRateLimiter Licensed header --- .../mysql/rate/MysqlCdcGuavaRateLimiter.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java index 18de0601d7..c6e3279b12 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.cdc.connectors.mysql.rate; import org.apache.flink.util.concurrent.ExecutorThreadFactory; From b3c8758b7e76a755d776269153afbf6480bb8866 Mon Sep 17 00:00:00 2001 From: ikiler Date: Thu, 19 Sep 2024 17:30:36 +0800 Subject: [PATCH 10/10] Add MysqlCdcGuavaRateLimiter Licensed header --- .../cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java index c6e3279b12..83b87a2a0a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/rate/MysqlCdcGuavaRateLimiter.java @@ -37,7 +37,7 @@ public class MysqlCdcGuavaRateLimiter private final Executor limiter = Executors.newSingleThreadExecutor( - new ExecutorThreadFactory("flink-snapshot-rate-limiter")); + new ExecutorThreadFactory("mysql-cdc-rate-limiter")); private final RateLimiter rateLimiter; private int getTokenCountAtOnce;