From 6583cb29b75310f92556b44c7427f0d61c5b63c7 Mon Sep 17 00:00:00 2001 From: mahaotian's debian Date: Wed, 4 Sep 2024 22:11:23 +0800 Subject: [PATCH 1/2] [INLONG-11013][Sort] A demo of reporting sort logs through OpenTelemetry --- .../sort-connectors/mysql-cdc/pom.xml | 74 ++++++++++++++++--- .../source/reader/MySqlSourceReader.java | 32 +++++++- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml index 77f03595fd1..a59a1623b42 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -7,9 +7,7 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - +http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,22 +18,73 @@ 4.0.0 - org.apache.inlong sort-connectors-v1.15 1.14.0-SNAPSHOT - sort-connector-mysql-cdc-v1.15 jar Apache InLong - Sort-connector-mysql-cdc - ${project.parent.parent.parent.parent.parent.basedir} - + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + 1.28.0 + + + io.opentelemetry.instrumentation + opentelemetry-log4j-appender-2.17 + 1.28.0-alpha + + + io.opentelemetry + opentelemetry-sdk-trace + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-semconv + 1.28.0-alpha + + + io.opentelemetry + opentelemetry-sdk-logs + 1.28.0 + + + com.squareup.okhttp3 + okhttp + + + com.squareup.okio + okio + 1.17.2 + com.ververica @@ -78,7 +127,6 @@ ${project.version} - @@ -92,6 +140,7 @@ package + false org.apache.inlong:* @@ -113,6 +162,11 @@ org.apache.flink:flink-shaded-guava com.google.protobuf:* + io.opentelemetry + io.opentelemetry.* + io.opentelemetry-sdk.* + com.squareup.okhttp3 + com.squareup.okio @@ -137,10 +191,6 @@ - - org.apache.inlong.sort.base - org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base - org.apache.inlong.sort.cdc.base org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.cdc.base diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java index 01f34f28b15..62326b71811 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java @@ -47,6 +47,13 @@ import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; +import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; +import io.opentelemetry.instrumentation.log4j.appender.v2_17.OpenTelemetryAppender; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -88,6 +95,7 @@ public class MySqlSourceReader private final MySqlSourceReaderContext mySqlSourceReaderContext; private MySqlBinlogSplit suspendedBinlogSplit; private final DebeziumDeserializationSchema metricSchema; + private OpenTelemetrySdk SDK; public MySqlSourceReader( FutureCompletingBlockingQueue> elementQueue, @@ -113,11 +121,33 @@ public MySqlSourceReader( @Override public void start() { + this.SDK = OpenTelemetrySdk.builder() + .setLoggerProvider( + SdkLoggerProvider.builder() + .setResource( + Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAME, "log4j-example") + .build()) + .addLogRecordProcessor( + BatchLogRecordProcessor.builder( + OtlpGrpcLogRecordExporter.builder() + .setEndpoint("http://127.0.0.1:4317") + .build()) + .build()) + .build()) + .build(); + OpenTelemetryAppender.install(SDK); if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } + @Override + public void close() throws Exception { + super.close(); + SDK.close(); + } + @Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) { @@ -380,4 +410,4 @@ private void logCurrentBinlogOffsets(List splits, long checkpointId) protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); } -} +} \ No newline at end of file From fa88f994602fdf16f6ba31eb35edc439940311fb Mon Sep 17 00:00:00 2001 From: mahaotian's debian Date: Wed, 4 Sep 2024 22:45:01 +0800 Subject: [PATCH 2/2] [INLONG-11013][Sort] Corrected license format --- .../sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml index a59a1623b42..533e631cdf9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -7,7 +7,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at -http://www.apache.org/licenses/LICENSE-2.0 + + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY