Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mysql pipline rate #3607

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
gaoyan1998 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public class PipelineOptions {
.defaultValue("Flink CDC Pipeline Job")
.withDescription("The name of the pipeline");

public static final ConfigOption<Double> RATE_LIMIT =
ConfigOptions.key("rate.limit")
.doubleType()
.defaultValue(0d)
.withDescription(
"Limits the number of records per second, default 0 is disabled");

gaoyan1998 marked this conversation as resolved.
Show resolved Hide resolved
public static final ConfigOption<Integer> PIPELINE_PARALLELISM =
ConfigOptions.key("parallelism")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public DataSource createDataSource(Context context) {
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}

return new MySqlDataSource(configFactory);
return new MySqlDataSource(configFactory, context.getPipelineConfiguration());
gaoyan1998 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,70 @@

package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.source.MetadataAccessor;
import org.apache.flink.cdc.connectors.mysql.rate.SnapshotGuavaRateLimiter;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;

/** A {@link DataSource} for mysql cdc connector. */
/**
* A {@link DataSource} for mysql cdc connector.
*/
gaoyan1998 marked this conversation as resolved.
Show resolved Hide resolved
@Internal
public class MySqlDataSource implements DataSource {

private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
private final Configuration piplineConfig;

public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
this(configFactory, new Configuration());
}

public MySqlDataSource(MySqlSourceConfigFactory configFactory, Configuration piplineConfig) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.createConfig(0);
this.piplineConfig = piplineConfig;
}

@Override
public EventSourceProvider getEventSourceProvider() {
double maxRatePeer =
piplineConfig
.getOptional(PipelineOptions.RATE_LIMIT)
.orElse(PipelineOptions.RATE_LIMIT.defaultValue());

MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());

RateLimiterStrategy rateLimiterStrategy = maxRatePeer > 0 ? createRateLimiterStrategy(maxRatePeer) : RateLimiterStrategy.noOp();
MySqlSource<Event> source =
new MySqlSource<>(
configFactory,
deserializer,
(sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
deserializer, sourceReaderMetrics, sourceConfig));
deserializer, sourceReaderMetrics, sourceConfig),
rateLimiterStrategy);

return FlinkSourceProvider.of(source);
}

private RateLimiterStrategy createRateLimiterStrategy(double recordsPerSecond) {
return parallelism -> new SnapshotGuavaRateLimiter(recordsPerSecond, parallelism);
}

@Override
public MetadataAccessor getMetadataAccessor() {
return new MySqlMetadataAccessor(sourceConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.flink.cdc.connectors.mysql.rate;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

/**
* The SnapshotGuavaRateLimiter class implements Flink's RateLimiter interface, which is used to limit the read rate of data sources.
* It uses Guava's RateLimiter to control the number of requests allowed to pass per second,
* ensuring that parallel tasks can adjust the rate limit after the checkpoint is completed.
*/
public class SnapshotGuavaRateLimiter implements org.apache.flink.api.connector.source.util.ratelimit.RateLimiter {

private final Executor limiter =
Executors.newSingleThreadExecutor(new ExecutorThreadFactory("flink-snapshot-rate-limiter"));
gaoyan1998 marked this conversation as resolved.
Show resolved Hide resolved
private final RateLimiter rateLimiter;

private int getTokenCountAtOnce;

public SnapshotGuavaRateLimiter(double recordsPerSecond, int parallelism) {
this.rateLimiter = RateLimiter.create(recordsPerSecond);
this.getTokenCountAtOnce = parallelism;
}

/**
* Due to the CDC feature, the degree of parallelism will change to 1 after the checkpoint is completed,
* so you need to reset the speed limiter.
*/
@Override
public void notifyCheckpointComplete(long checkpointId) {
getTokenCountAtOnce = 1;
}

public void getAcquire() {
rateLimiter.acquire(getTokenCountAtOnce);
}

@Override
public CompletionStage<Void> acquire() {
return CompletableFuture.runAsync(this::getAcquire, limiter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@

package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.jdbc.JdbcConnection;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.util.ratelimit.NoOpRateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
Expand Down Expand Up @@ -57,13 +66,8 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;

import io.debezium.jdbc.JdbcConnection;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel
Expand Down Expand Up @@ -111,6 +115,8 @@ public class MySqlSource<T>
// hook for generating changes.
private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();

private final RateLimiterStrategy rateLimiterStrategy;

/**
* Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}.
*
Expand All @@ -131,16 +137,19 @@ public static <T> MySqlSourceBuilder<T> builder() {
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()));
sourceConfig.isIncludeSchemaChanges()),
RateLimiterStrategy.noOp());
}

MySqlSource(
MySqlSourceConfigFactory configFactory,
DebeziumDeserializationSchema<T> deserializationSchema,
RecordEmitterSupplier<T> recordEmitterSupplier) {
RecordEmitterSupplier<T> recordEmitterSupplier,
RateLimiterStrategy rateLimiterStrategy) {
this.configFactory = configFactory;
this.deserializationSchema = deserializationSchema;
this.recordEmitterSupplier = recordEmitterSupplier;
this.rateLimiterStrategy = rateLimiterStrategy;
}

public MySqlSourceConfigFactory getConfigFactory() {
Expand All @@ -157,6 +166,8 @@ public Boundedness getBoundedness() {
}
}

private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class);

@Override
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
throws Exception {
Expand All @@ -182,13 +193,29 @@ public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContex
readerContext.getIndexOfSubtask(),
mySqlSourceReaderContext,
snapshotHooks);
return new MySqlSourceReader<>(
RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter = recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig);
MySqlSourceReader<T> sourceReader = new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig),
recordEmitter,
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig);
int parallelism = readerContext.currentParallelism();

RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism);

if (rateLimiter == null || rateLimiter instanceof NoOpRateLimiter) {
return sourceReader;
}

SourceReader<T, MySqlSplit> rateSoureceReader = new RateLimitedSourceReader<>(sourceReader, rateLimiter);

if (recordEmitter instanceof MySqlRecordEmitter) {
MySqlRecordEmitter<T> mySqlRecordEmitter = (MySqlRecordEmitter<T>) recordEmitter;
mySqlRecordEmitter.setRateLimiter(rateLimiter);
}
return rateSoureceReader;
}

@Override
Expand Down Expand Up @@ -266,7 +293,9 @@ public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) {
this.snapshotHooks = snapshotHooks;
}

/** Create a {@link RecordEmitter} for {@link MySqlSourceReader}. */
/**
* Create a {@link RecordEmitter} for {@link MySqlSourceReader}.
*/
@Internal
@FunctionalInterface
interface RecordEmitterSupplier<T> extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.flink.cdc.connectors.mysql.source.reader;

import io.debezium.document.Array;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.util.Iterator;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
Expand All @@ -27,16 +32,10 @@
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;

import io.debezium.document.Array;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

/**
* The {@link RecordEmitter} implementation for {@link MySqlSourceReader}.
*
Expand All @@ -54,6 +53,8 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My
private final boolean includeSchemaChanges;
private final OutputCollector<T> outputCollector;

private RateLimiter rateLimiter;

public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
Expand All @@ -71,6 +72,9 @@ public void emitRecord(
final Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
while (elementIterator.hasNext()) {
processElement(elementIterator.next(), output, splitState);
if (rateLimiter != null) {
rateLimiter.acquire().toCompletableFuture().join();
}
}
}

Expand Down Expand Up @@ -158,4 +162,8 @@ public void close() {
// do nothing
}
}

public void setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
}
Loading