diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml index 77f03595fd1..533e631cdf9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -20,22 +20,73 @@ 4.0.0 - org.apache.inlong sort-connectors-v1.15 1.14.0-SNAPSHOT - sort-connector-mysql-cdc-v1.15 jar Apache InLong - Sort-connector-mysql-cdc - ${project.parent.parent.parent.parent.parent.basedir} - + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + 1.28.0 + + + io.opentelemetry.instrumentation + opentelemetry-log4j-appender-2.17 + 1.28.0-alpha + + + io.opentelemetry + opentelemetry-sdk-trace + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-semconv + 1.28.0-alpha + + + io.opentelemetry + opentelemetry-sdk-logs + 1.28.0 + + + com.squareup.okhttp3 + okhttp + + + com.squareup.okio + okio + 1.17.2 + com.ververica @@ -78,7 +129,6 @@ ${project.version} - @@ -92,6 +142,7 @@ package + false org.apache.inlong:* @@ -113,6 +164,11 @@ org.apache.flink:flink-shaded-guava com.google.protobuf:* + io.opentelemetry + io.opentelemetry.* + io.opentelemetry-sdk.* + com.squareup.okhttp3 + com.squareup.okio @@ -137,10 +193,6 @@ - - org.apache.inlong.sort.base - org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base - org.apache.inlong.sort.cdc.base org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.cdc.base diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java index 01f34f28b15..62326b71811 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java @@ -47,6 +47,13 @@ import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; +import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; +import io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -88,6 +95,7 @@ public class MySqlSourceReader private final MySqlSourceReaderContext mySqlSourceReaderContext; private MySqlBinlogSplit suspendedBinlogSplit; private final DebeziumDeserializationSchema metricSchema; + private OpenTelemetrySdk SDK; public MySqlSourceReader( FutureCompletingBlockingQueue> elementQueue, @@ -113,11 +121,33 @@ public MySqlSourceReader( @Override public void start() { + this.SDK = OpenTelemetrySdk.builder() + .setLoggerProvider( + SdkLoggerProvider.builder() + .setResource( + Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAME, "log4j-example") + .build()) + .addLogRecordProcessor( + BatchLogRecordProcessor.builder( + OtlpGrpcLogRecordExporter.builder() + .setEndpoint("http://127.0.0.1:4317") + .build()) + .build()) + .build()) + .build(); + OpenTelemetryAppender.install(SDK); if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } + @Override + public void close() throws Exception { + super.close(); + SDK.close(); + } + @Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) { @@ -380,4 +410,4 @@ private void logCurrentBinlogOffsets(List splits, long checkpointId) protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); } -} +} \ No newline at end of file