diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle b/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle new file mode 100644 index 00000000000..7ac6d5d39b4 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle @@ -0,0 +1,81 @@ +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} +muzzle { + pass { + group = "org.apache.kafka" + module = "kafka-clients" + versions = "[3.8.0,)" + assertInverse = false + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuite('latestDepTest') + +//java { +// toolchain { +// languageVersion.set(JavaLanguageVersion.of(17)) +// } +//} + + +//project.afterEvaluate { +// tasks.withType(Test).configureEach { +// if (javaLauncher.get().metadata.languageVersion.asInt() >= 16) { +// jvmArgs += ['--add-opens', 'java.base/java.util=ALL-UNNAMED'] +// } +// } +//} +[compileMain_java17Java, compileTestJava, compileLatestDepTestJava].each { + it.configure { + setJavaVersion(it, 17) + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } +} +tasks.withType(JavaCompile).each { + it.configure { + setJavaVersion(it, 17) + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + } +} +tasks.withType(GroovyCompile) { + javaLauncher = getJavaLauncherFor(17) +} + +dependencies { + // compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' + main_java17CompileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' + implementation project(':dd-java-agent:instrumentation:kafka-common') + main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common') + + testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0' + testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.17.0' + testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3' + testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0' + testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1') + + + // Include latest version of kafka itself along with latest version of client libs. + // This seems to help with jar compatibility hell. + latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+' + latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.+' + latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+' + latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+' + latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+' + latestDepTestImplementation libs.guava + +} + +configurations.testRuntimeClasspath { + // spock-core depends on assertj version that is not compatible with kafka-clients + resolutionStrategy.force 'org.assertj:assertj-core:2.9.1' +} + + diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java new file mode 100644 index 00000000000..0f1cec8f17a --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java @@ -0,0 +1,46 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.*; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class ConsumerCoordinatorInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public ConsumerCoordinatorInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", + KafkaConsumerInfo.class.getName()); + return contextStores; + } + + @Override + public String instrumentedType() { + return "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator"; + } + + @Override + public String[] helperClassNames() { + return new String[] {packageName + ".KafkaConsumerInfo"}; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("sendOffsetCommitRequest")).and(takesArguments(1)), + packageName + ".ConsumerCoordinatorAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java new file mode 100644 index 00000000000..4e8d32e8541 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java @@ -0,0 +1,82 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField; +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** + * This instrumentation saves additional information from the KafkaConsumer, such as consumer group + * and cluster ID, in the context store for later use. + */ +@AutoService(InstrumenterModule.class) +public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public KafkaConsumerInfoInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker", + KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.ConsumerDelegate", + KafkaConsumerInfo.class.getName()); + return contextStores; + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.kafka.clients.consumer.internals.ConsumerDelegate"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return implementsInterface(named(hierarchyMarkerType())) + .and(declaresField(named("offsetCommitCallbackInvoker"))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isConstructor() + .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig"))) + .and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer"))) + .and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))), + packageName + ".ConstructorAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("poll")) + .and(takesArguments(1)) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + packageName + ".RecordsAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java new file mode 100644 index 00000000000..1c0537aa6cc --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java @@ -0,0 +1,85 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public KafkaConsumerInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.ConsumerRecords", + "datadog.trace.instrumentation.kafka_clients38.KafkaConsumerInfo"); + return Collections.unmodifiableMap(contextStores); + } + + @Override + public String instrumentedType() { + return "org.apache.kafka.clients.consumer.ConsumerRecords"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TextMapInjectAdapterInterface", + packageName + ".KafkaConsumerInfo", + packageName + ".KafkaConsumerInstrumentationHelper", + packageName + ".KafkaDecorator", + packageName + ".TextMapExtractAdapter", + packageName + ".TracingIterableDelegator", + packageName + ".TracingIterable", + packageName + ".TracingIterator", + packageName + ".TracingList", + packageName + ".TracingListIterator", + packageName + ".TextMapInjectAdapter", + "datadog.trace.instrumentation.kafka_common.Utils", + "datadog.trace.instrumentation.kafka_common.StreamingContext", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, String.class)) + .and(returns(Iterable.class)), + packageName + ".IterableAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) + .and(returns(List.class)), + packageName + ".ListAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("iterator")) + .and(takesArguments(0)) + .and(returns(Iterator.class)), + packageName + ".IteratorAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java new file mode 100644 index 00000000000..7bca67754f7 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -0,0 +1,64 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class KafkaProducerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public KafkaProducerInstrumentation() { + super("kafka"); + } + + @Override + public String instrumentedType() { + return "org.apache.kafka.clients.producer.KafkaProducer"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".KafkaDecorator", + packageName + ".TextMapInjectAdapterInterface", + packageName + ".TextMapInjectAdapter", + packageName + ".NoopTextMapInjectAdapter", + packageName + ".KafkaProducerCallback", + "datadog.trace.instrumentation.kafka_common.StreamingContext", + packageName + ".AvroSchemaExtractor", + }; + } + + @Override + public Map contextStore() { + return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String"); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("send")) + .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) + .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), + packageName + ".ProducerAdvice"); + + transformer.applyAdvice( + isMethod() + .and(isPrivate()) + .and(takesArgument(0, int.class)) + .and(named("ensureValidRecordSize")), // intercepting this call allows us to see the + // estimated message size + packageName + ".PayloadSizeAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java new file mode 100644 index 00000000000..133e3ada750 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java @@ -0,0 +1,82 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField; +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** + * This instrumentation saves additional information from the KafkaConsumer, such as consumer group + * and cluster ID, in the context store for later use. + */ +@AutoService(InstrumenterModule.class) +public final class LegacyKafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public LegacyKafkaConsumerInfoInstrumentation() { + super("kafka"); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String"); + contextStores.put( + "org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", + KafkaConsumerInfo.class.getName()); + contextStores.put( + "org.apache.kafka.clients.consumer.internals.ConsumerDelegate", + KafkaConsumerInfo.class.getName()); + return contextStores; + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.kafka.clients.consumer.internals.ConsumerDelegate"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return implementsInterface(named(hierarchyMarkerType())) + .and(declaresField(named("coordinator"))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isConstructor() + .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig"))) + .and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer"))) + .and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))), + packageName + ".LegacyConstructorAdvice"); + transformer.applyAdvice( + isMethod() + .and(isPublic()) + .and(named("poll")) + .and(takesArguments(1)) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + packageName + ".RecordsAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java new file mode 100644 index 00000000000..f67c5cc8101 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/MetadataInstrumentation.java @@ -0,0 +1,97 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.requests.MetadataResponse; + +@AutoService(InstrumenterModule.class) +public class MetadataInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public MetadataInstrumentation() { + super("kafka"); + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.kafka.clients.Metadata"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public String[] helperClassNames() { + return new String[] {packageName + ".KafkaDecorator"}; + } + + @Override + public Map contextStore() { + return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String"); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("update")) + .and(takesArgument(0, named("org.apache.kafka.common.Cluster"))), + MetadataInstrumentation.class.getName() + "$MetadataUpdateBefore22Advice"); + transformer.applyAdvice( + isMethod() + .and(named("update")) + .and(takesArgument(1, named("org.apache.kafka.common.requests.MetadataResponse"))), + MetadataInstrumentation.class.getName() + "$MetadataUpdate22AndAfterAdvice"); + } + + public static class MetadataUpdateBefore22Advice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This final Metadata metadata, @Advice.Argument(0) final Cluster newCluster) { + if (newCluster != null && !newCluster.isBootstrapConfigured()) { + InstrumentationContext.get(Metadata.class, String.class) + .put(metadata, newCluster.clusterResource().clusterId()); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so MetadataInstrumentation does the same + record.headers(); + } + } + + public static class MetadataUpdate22AndAfterAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This final Metadata metadata, @Advice.Argument(1) final MetadataResponse response) { + if (response != null) { + InstrumentationContext.get(Metadata.class, String.class) + .put(metadata, response.clusterId()); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so MetadataInstrumentation does the same + record.headers(); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerInstrumentation.java new file mode 100644 index 00000000000..66411944a6f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerInstrumentation.java @@ -0,0 +1,26 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; + +public class OffsetCommitCallbackInvokerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + public OffsetCommitCallbackInvokerInstrumentation() { + super("kafka"); + } + + @Override + public String instrumentedType() { + return "org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("enqueueUserCallbackInvocation")), + packageName + ".OffsetCommitCallbackInvokerAdvice"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/AvroSchemaExtractor.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/AvroSchemaExtractor.java new file mode 100644 index 00000000000..6ca8bc81aa3 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/AvroSchemaExtractor.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.api.DDTags; +import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.util.FNV64Hash; +import java.lang.reflect.Field; +import org.apache.kafka.clients.producer.ProducerRecord; + +public class AvroSchemaExtractor { + public static void tryExtractProducer(ProducerRecord record, AgentSpan span) { + AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring(); + if (!dsm.canSampleSchema(record.topic())) { + return; + } + Integer prio = span.forceSamplingDecision(); + if (prio == null || prio <= 0) { + // don't extract schema if span is not sampled + return; + } + int weight = AgentTracer.get().getDataStreamsMonitoring().trySampleSchema(record.topic()); + if (weight == 0) { + return; + } + String schema = extract(record.value()); + if (schema != null) { + span.setTag(DDTags.SCHEMA_DEFINITION, schema); + span.setTag(DDTags.SCHEMA_WEIGHT, weight); + span.setTag(DDTags.SCHEMA_TYPE, "avro"); + span.setTag(DDTags.SCHEMA_OPERATION, "serialization"); + span.setTag( + DDTags.SCHEMA_ID, + Long.toUnsignedString(FNV64Hash.generateHash(schema, FNV64Hash.Version.v1A))); + } + } + + public static String extract(Object value) { + if (value == null) { + return null; + } + try { + Class clazz = value.getClass(); + Field field = clazz.getDeclaredField("schema"); + field.setAccessible(true); + return field.get(value).toString(); + } catch (NoSuchFieldException | IllegalAccessException e) { + return null; + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java new file mode 100644 index 00000000000..38a616fe276 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java @@ -0,0 +1,60 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.List; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; + +public class ConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureGroup( + @Advice.This ConsumerDelegate consumer, + @Advice.Argument(0) ConsumerConfig consumerConfig, + @Advice.FieldValue("offsetCommitCallbackInvoker") + OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, + @Advice.FieldValue("metadata") Metadata metadata) { + ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG); + String normalizedConsumerGroup = + consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null; + if (normalizedConsumerGroup == null) { + if (groupMetadata != null) { + normalizedConsumerGroup = groupMetadata.groupId(); + } + } + List bootstrapServersList = + consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + String bootstrapServers = null; + if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) { + bootstrapServers = String.join(",", bootstrapServersList); + } + KafkaConsumerInfo kafkaConsumerInfo; + if (Config.get().isDataStreamsEnabled()) { + kafkaConsumerInfo = + new KafkaConsumerInfo(normalizedConsumerGroup, metadata, bootstrapServers); + } else { + kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, bootstrapServers); + } + + if (kafkaConsumerInfo.getConsumerGroup() != null || kafkaConsumerInfo.getmetadata() != null) { + InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class) + .put(consumer, kafkaConsumerInfo); + } + if (offsetCommitCallbackInvoker != null) { + InstrumentationContext.get(OffsetCommitCallbackInvoker.class, KafkaConsumerInfo.class) + .put(offsetCommitCallbackInvoker, kafkaConsumerInfo); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so KafkaConsumerGroupInstrumentation does the same + record.headers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java new file mode 100644 index 00000000000..ecec8bd1e77 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java @@ -0,0 +1,74 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.core.datastreams.TagsProcessor.CONSUMER_GROUP_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; + +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.LinkedHashMap; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; +import org.apache.kafka.clients.consumer.internals.RequestFuture; +import org.apache.kafka.common.TopicPartition; + +public class ConsumerCoordinatorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void trackCommitOffset( + @Advice.This ConsumerCoordinator coordinator, + @Advice.Return RequestFuture requestFuture, + @Advice.Argument(0) final Map offsets) { + if (requestFuture == null || requestFuture.failed()) { + return; + } + if (offsets == null) { + return; + } + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerCoordinator.class, KafkaConsumerInfo.class) + .get(coordinator); + + if (kafkaConsumerInfo == null) { + return; + } + + String consumerGroup = kafkaConsumerInfo.getConsumerGroup().get(); + Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().get(); + String clusterId = null; + if (consumerMetadata != null) { + clusterId = InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata); + } + + for (Map.Entry entry : offsets.entrySet()) { + if (consumerGroup == null) { + consumerGroup = ""; + } + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(CONSUMER_GROUP_TAG, consumerGroup); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + sortedTags.put(PARTITION_TAG, String.valueOf(entry.getKey().partition())); + sortedTags.put(TOPIC_TAG, entry.getKey().topic()); + sortedTags.put(TYPE_TAG, "kafka_commit"); + AgentTracer.get() + .getDataStreamsMonitoring() + .trackBacklog(sortedTags, entry.getValue().offset()); + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so ConsumerCoordinatorInstrumentation does the same + record.headers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/DDOffsetCommitCallback.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/DDOffsetCommitCallback.java new file mode 100644 index 00000000000..65b2b94b26b --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/DDOffsetCommitCallback.java @@ -0,0 +1,60 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.core.datastreams.TagsProcessor.CONSUMER_GROUP_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; + +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +public class DDOffsetCommitCallback implements OffsetCommitCallback { + OffsetCommitCallback callback; + KafkaConsumerInfo kafkaConsumerInfo; + + public DDOffsetCommitCallback( + OffsetCommitCallback callback, KafkaConsumerInfo kafkaConsumerInfo) { + this.callback = callback; + this.kafkaConsumerInfo = kafkaConsumerInfo; + } + + @Override + public void onComplete(Map map, Exception e) { + if (callback != null) { + callback.onComplete(map, e); + } + for (Map.Entry entry : map.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + LinkedHashMap sortedTags = new LinkedHashMap<>(); + if (kafkaConsumerInfo != null) { + String consumerGroup = kafkaConsumerInfo.getConsumerGroup().get(); + Metadata consumerMetadata = kafkaConsumerInfo.getmetadata().get(); + String clusterId = null; + if (consumerMetadata != null) { + clusterId = + InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata); + } + sortedTags.put(CONSUMER_GROUP_TAG, consumerGroup); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + } + + sortedTags.put(PARTITION_TAG, String.valueOf(entry.getKey().partition())); + sortedTags.put(TOPIC_TAG, entry.getKey().topic()); + sortedTags.put(TYPE_TAG, "kafka_commit"); + AgentTracer.get() + .getDataStreamsMonitoring() + .trackBacklog(sortedTags, entry.getValue().offset()); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java new file mode 100644 index 00000000000..d9471fbbc8f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java @@ -0,0 +1,34 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class IterableAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterable> iterable, + @Advice.This ConsumerRecords records) { + if (iterable != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterable = + new TracingIterable( + iterable, + KafkaDecorator.KAFKA_CONSUME, + KafkaDecorator.CONSUMER_DECORATE, + group, + clusterId, + bootstrapServers); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java new file mode 100644 index 00000000000..e0021f2f024 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java @@ -0,0 +1,35 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Iterator; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class IteratorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterator> iterator, + @Advice.This ConsumerRecords records) { + if (iterator != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterator = + new TracingIterator( + iterator, + KafkaDecorator.KAFKA_CONSUME, + KafkaDecorator.CONSUMER_DECORATE, + group, + clusterId, + bootstrapServers); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java new file mode 100644 index 00000000000..947bbd623ae --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfo.java @@ -0,0 +1,50 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.clients.Metadata; + +public class KafkaConsumerInfo { + private final String consumerGroup; + private final Metadata metadata; + private final String bootstrapServers; + + public KafkaConsumerInfo(String consumerGroup, Metadata metadata, String bootstrapServers) { + this.consumerGroup = consumerGroup; + this.metadata = metadata; + this.bootstrapServers = bootstrapServers; + } + + public KafkaConsumerInfo(String consumerGroup, String bootstrapServers) { + this.consumerGroup = consumerGroup; + this.metadata = null; + this.bootstrapServers = bootstrapServers; + } + + public Optional getConsumerGroup() { + return Optional.ofNullable(consumerGroup); + } + + public Optional getmetadata() { + return Optional.ofNullable(metadata); + } + + public Optional getBootstrapServers() { + return Optional.ofNullable(bootstrapServers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KafkaConsumerInfo consumerInfo = (KafkaConsumerInfo) o; + return Objects.equals(consumerGroup, consumerInfo.consumerGroup) + && Objects.equals(metadata, consumerInfo.metadata); + } + + @Override + public int hashCode() { + return 31 * (null == consumerGroup ? 0 : consumerGroup.hashCode()) + + (null == metadata ? 0 : metadata.hashCode()); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java new file mode 100644 index 00000000000..1791aa7e308 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java @@ -0,0 +1,29 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.ContextStore; +import org.apache.kafka.clients.Metadata; + +public class KafkaConsumerInstrumentationHelper { + public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) { + if (kafkaConsumerInfo != null) { + return kafkaConsumerInfo.getConsumerGroup().get(); + } + return null; + } + + public static String extractClusterId( + KafkaConsumerInfo kafkaConsumerInfo, ContextStore metadataContextStore) { + if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) { + Metadata metadata = kafkaConsumerInfo.getmetadata().get(); + if (metadata != null) { + return metadataContextStore.get(metadata); + } + } + return null; + } + + public static String extractBootstrapServers(KafkaConsumerInfo kafkaConsumerInfo) { + return kafkaConsumerInfo == null ? null : kafkaConsumerInfo.getBootstrapServers().get(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java new file mode 100644 index 00000000000..16ff81ab572 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java @@ -0,0 +1,165 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import datadog.trace.api.Config; +import datadog.trace.api.Functions; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.api.naming.SpanNaming; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; + +public class KafkaDecorator extends MessagingClientDecorator { + private static final String KAFKA = "kafka"; + public static final CharSequence JAVA_KAFKA = UTF8BytesString.create("java-kafka"); + public static final CharSequence KAFKA_CONSUME = + UTF8BytesString.create( + SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA)); + + public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll"); + public static final CharSequence KAFKA_PRODUCE = + UTF8BytesString.create( + SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA)); + public static final CharSequence KAFKA_DELIVER = UTF8BytesString.create("kafka.deliver"); + public static final boolean KAFKA_LEGACY_TRACING = Config.get().isKafkaLegacyTracingEnabled(); + public static final boolean TIME_IN_QUEUE_ENABLED = + Config.get().isTimeInQueueEnabled(!KAFKA_LEGACY_TRACING, KAFKA); + public static final String KAFKA_PRODUCED_KEY = "x_datadog_kafka_produced"; + private final String spanKind; + private final CharSequence spanType; + private final String serviceName; + + private static final DDCache PRODUCER_RESOURCE_NAME_CACHE = + DDCaches.newFixedSizeCache(32); + private static final Functions.Prefix PRODUCER_PREFIX = new Functions.Prefix("Produce Topic "); + private static final DDCache CONSUMER_RESOURCE_NAME_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache PRODUCER_BOOSTRAP_SERVERS_CACHE = + DDCaches.newFixedSizeWeakKeyCache(16); + private static final Function BOOTSTRAP_SERVERS_JOINER = + pc -> String.join(",", pc.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + private static final Functions.Prefix CONSUMER_PREFIX = new Functions.Prefix("Consume Topic "); + + public static final KafkaDecorator PRODUCER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_PRODUCER, + InternalSpanTypes.MESSAGE_PRODUCER, + SpanNaming.instance() + .namingSchema() + .messaging() + .outboundService(KAFKA, KAFKA_LEGACY_TRACING)); + + public static final KafkaDecorator CONSUMER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_CONSUMER, + InternalSpanTypes.MESSAGE_CONSUMER, + SpanNaming.instance() + .namingSchema() + .messaging() + .inboundService(KAFKA, KAFKA_LEGACY_TRACING)); + + public static final KafkaDecorator BROKER_DECORATE = + new KafkaDecorator( + Tags.SPAN_KIND_BROKER, + InternalSpanTypes.MESSAGE_BROKER, + SpanNaming.instance().namingSchema().messaging().timeInQueueService(KAFKA)); + + protected KafkaDecorator(String spanKind, CharSequence spanType, String serviceName) { + this.spanKind = spanKind; + this.spanType = spanType; + this.serviceName = serviceName; + } + + @Override + protected CharSequence spanType() { + return spanType; + } + + @Override + protected String[] instrumentationNames() { + return new String[] {"kafka"}; + } + + @Override + protected String service() { + return serviceName; + } + + @Override + protected CharSequence component() { + return JAVA_KAFKA; + } + + @Override + protected String spanKind() { + return spanKind; + } + + public void onConsume( + final AgentSpan span, + final ConsumerRecord record, + String consumerGroup, + String bootstrapServers) { + if (record != null) { + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, CONSUMER_PREFIX)); + span.setTag(PARTITION, record.partition()); + span.setTag(OFFSET, record.offset()); + if (consumerGroup != null) { + span.setTag(CONSUMER_GROUP, consumerGroup); + } + + if (bootstrapServers != null) { + span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); + } + // TODO - do we really need both? This mechanism already adds a lot of... baggage. + // check to not record a duration if the message was sent from an old Kafka client + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + long consumeTime = NANOSECONDS.toMillis(span.getStartTime()); + final long produceTime = record.timestamp(); + span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + } + } + } + + public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) { + if (record != null) { + String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(topic); + if (Config.get().isMessageBrokerSplitByDestination()) { + span.setServiceName(topic); + } + } + } + + public void onProduce( + final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) { + if (record != null) { + if (record.partition() != null) { + span.setTag(PARTITION, record.partition()); + } + if (producerConfig != null) { + span.setTag( + KAFKA_BOOTSTRAP_SERVERS, + PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent( + producerConfig, BOOTSTRAP_SERVERS_JOINER)); + } + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java new file mode 100644 index 00000000000..d2575af5a2d --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java @@ -0,0 +1,61 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.PRODUCER_DECORATE; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.LinkedHashMap; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; + +public class KafkaProducerCallback implements Callback { + private final Callback callback; + private final AgentSpan parent; + private final AgentSpan span; + private final String clusterId; + + public KafkaProducerCallback( + final Callback callback, + final AgentSpan parent, + final AgentSpan span, + final String clusterId) { + this.callback = callback; + this.parent = parent; + this.span = span; + this.clusterId = clusterId; + } + + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + PRODUCER_DECORATE.onError(span, exception); + PRODUCER_DECORATE.beforeFinish(span); + span.finish(); + if (callback != null) { + if (parent != null) { + try (final AgentScope scope = activateSpan(parent)) { + scope.setAsyncPropagation(true); + callback.onCompletion(metadata, exception); + } + } else { + callback.onCompletion(metadata, exception); + } + } + if (metadata == null) { + return; + } + LinkedHashMap sortedTags = new LinkedHashMap<>(); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + sortedTags.put(PARTITION_TAG, String.valueOf(metadata.partition())); + sortedTags.put(TOPIC_TAG, metadata.topic()); + sortedTags.put(TYPE_TAG, "kafka_produce"); + AgentTracer.get().getDataStreamsMonitoring().trackBacklog(sortedTags, metadata.offset()); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java new file mode 100644 index 00000000000..4aa27f3a251 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java @@ -0,0 +1,53 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.List; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; +import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; + +public class LegacyConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureGroup( + @Advice.This ConsumerDelegate consumer, + @Advice.Argument(0) ConsumerConfig consumerConfig, + @Advice.FieldValue("coordinator") ConsumerCoordinator coordinator, + @Advice.FieldValue("metadata") Metadata metadata) { + ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG); + String normalizedConsumerGroup = + consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null; + if (normalizedConsumerGroup == null) { + if (groupMetadata != null) { + normalizedConsumerGroup = groupMetadata.groupId(); + } + } + List bootstrapServersList = + consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + String bootstrapServers = null; + if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) { + bootstrapServers = String.join(",", bootstrapServersList); + } + KafkaConsumerInfo kafkaConsumerInfo; + kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, metadata, bootstrapServers); + + if (kafkaConsumerInfo.getConsumerGroup() != null || kafkaConsumerInfo.getmetadata() != null) { + InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class) + .put(consumer, kafkaConsumerInfo); + if (coordinator != null) { + InstrumentationContext.get(ConsumerCoordinator.class, KafkaConsumerInfo.class) + .put(coordinator, kafkaConsumerInfo); + } + } + } + + public static void muzzleCheck(ConsumerRecord record) { + // KafkaConsumerInstrumentation only applies for kafka versions with headers + // Make an explicit call so KafkaConsumerGroupInstrumentation does the same + record.headers(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java new file mode 100644 index 00000000000..8ac0cf4d307 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java @@ -0,0 +1,33 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.CONSUMER_DECORATE; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_CONSUME; + +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.List; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public class ListAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) List> iterable, + @Advice.This ConsumerRecords records) { + if (iterable != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records); + String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo); + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class)); + String bootstrapServers = + KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo); + iterable = + new TracingList( + iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java new file mode 100644 index 00000000000..bad09090337 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/NoopTextMapInjectAdapter.java @@ -0,0 +1,16 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import org.apache.kafka.common.header.Headers; + +public class NoopTextMapInjectAdapter implements TextMapInjectAdapterInterface { + + public static final NoopTextMapInjectAdapter NOOP_SETTER = new NoopTextMapInjectAdapter(); + + @Override + public void set(final Headers headers, final String key, final String value) {} + + @Override + public void set(Headers headers, String key, byte[] value) {} + + public void injectTimeInQueue(Headers headers) {} +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java new file mode 100644 index 00000000000..fc58fff319d --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/OffsetCommitCallbackInvokerAdvice.java @@ -0,0 +1,18 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; + +public class OffsetCommitCallbackInvokerAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnqueue( + @Advice.Argument(value = 0, readOnly = false) OffsetCommitCallback callback, + @Advice.This OffsetCommitCallbackInvoker callbackInvoker) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(OffsetCommitCallbackInvoker.class, KafkaConsumerInfo.class) + .get(callbackInvoker); + callback = new DDOffsetCommitCallback(callback, kafkaConsumerInfo); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java new file mode 100644 index 00000000000..47a2e6d0201 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/PayloadSizeAdvice.java @@ -0,0 +1,35 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.StatsPoint; +import net.bytebuddy.asm.Advice; + +public class PayloadSizeAdvice { + + /** + * Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of + * sending a kafka payload. This gives us access to an estimate of the payload size "for free", + * that we send as a metric. + */ + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) { + StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats(); + if (saved != null) { + // create new stats including the payload size + StatsPoint updated = + new StatsPoint( + saved.getEdgeTags(), + saved.getHash(), + saved.getParentHash(), + saved.getAggregationHash(), + saved.getTimestampNanos(), + saved.getPathwayLatencyNano(), + saved.getEdgeLatencyNano(), + estimatedPayloadSize); + // then send the point + AgentTracer.get().getDataStreamsMonitoring().add(updated); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java new file mode 100644 index 00000000000..0d245d16bf4 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -0,0 +1,117 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_PRODUCE; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.PRODUCER_DECORATE; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.TIME_IN_QUEUE_ENABLED; +import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import java.util.LinkedHashMap; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.common.record.RecordBatch; + +public class ProducerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, + @Advice.FieldValue("producerConfig") ProducerConfig producerConfig, + @Advice.FieldValue("sender") Sender sender, + @Advice.FieldValue("metadata") Metadata metadata, + @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, + @Advice.Argument(value = 1, readOnly = false) Callback callback) { + String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata); + final AgentSpan parent = activeSpan(); + final AgentSpan span = startSpan(KAFKA_PRODUCE); + PRODUCER_DECORATE.afterStart(span); + PRODUCER_DECORATE.onProduce(span, record, producerConfig); + + callback = new KafkaProducerCallback(callback, parent, span, clusterId); + + if (record.value() == null) { + span.setTag(InstrumentationTags.TOMBSTONE, true); + } + + TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER; + // Do not inject headers for batch versions below 2 + // This is how similar check is being done in Kafka client itself: + // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 + // Also, do not inject headers if specified by JVM option or environment variable + // This can help in mixed client environments where clients < 0.11 that do not support + // headers attempt to read messages that were produced by clients > 0.11 and the magic + // value of the broker(s) is >= 2 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 + && Config.get().isKafkaClientPropagationEnabled() + && !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) { + setter = TextMapInjectAdapter.SETTER; + } + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + sortedTags.put(TOPIC_TAG, record.topic()); + sortedTags.put(TYPE_TAG, "kafka"); + try { + propagate().inject(span, record.headers(), setter); + if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) + || STREAMING_CONTEXT.isSinkTopic(record.topic())) { + // inject the context in the headers, but delay sending the stats until we know the + // message size. + // The stats are saved in the pathway context and sent in PayloadSizeAdvice. + propagate() + .injectPathwayContextWithoutSendingStats(span, record.headers(), setter, sortedTags); + AvroSchemaExtractor.tryExtractProducer(record, span); + } + } catch (final IllegalStateException e) { + // headers must be read-only from reused record. try again with new one. + record = + new ProducerRecord<>( + record.topic(), + record.partition(), + record.timestamp(), + record.key(), + record.value(), + record.headers()); + + propagate().inject(span, record.headers(), setter); + if (STREAMING_CONTEXT.isDisabledForTopic(record.topic()) + || STREAMING_CONTEXT.isSinkTopic(record.topic())) { + propagate() + .injectPathwayContextWithoutSendingStats(span, record.headers(), setter, sortedTags); + AvroSchemaExtractor.tryExtractProducer(record, span); + } + } + if (TIME_IN_QUEUE_ENABLED) { + setter.injectTimeInQueue(record.headers()); + } + return activateSpan(span); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + PRODUCER_DECORATE.onError(scope, throwable); + PRODUCER_DECORATE.beforeFinish(scope); + scope.close(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java new file mode 100644 index 00000000000..01f89b66368 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java @@ -0,0 +1,61 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT; +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_POLL; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; + +/** + * this method transfers the consumer group from the KafkaConsumer class key to the ConsumerRecords + * key. This is necessary because in the poll method, we don't have access to the KafkaConsumer + * class. + */ +public class RecordsAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter() { + boolean dataStreamsEnabled; + if (activeSpan() != null) { + dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled(); + } else { + dataStreamsEnabled = Config.get().isDataStreamsEnabled(); + } + if (dataStreamsEnabled) { + final AgentSpan span = startSpan(KAFKA_POLL); + return activateSpan(span); + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureGroup( + @Advice.Enter final AgentScope scope, + @Advice.This ConsumerDelegate consumer, + @Advice.Return ConsumerRecords records) { + int recordsCount = 0; + if (records != null) { + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class).get(consumer); + if (kafkaConsumerInfo != null) { + InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class) + .put(records, kafkaConsumerInfo); + } + recordsCount = records.count(); + } + if (scope == null) { + return; + } + AgentSpan span = scope.span(); + span.setTag(KAFKA_RECORDS_COUNT, recordsCount); + span.finish(); + scope.close(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java new file mode 100644 index 00000000000..f879c202c46 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java @@ -0,0 +1,73 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import java.nio.ByteBuffer; +import java.util.Base64; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextMapExtractAdapter + implements AgentPropagation.ContextVisitor, + AgentPropagation.BinaryContextVisitor { + + private static final Logger log = LoggerFactory.getLogger(TextMapExtractAdapter.class); + + public static final TextMapExtractAdapter GETTER = + new TextMapExtractAdapter(Config.get().isKafkaClientBase64DecodingEnabled()); + + private final Base64.Decoder base64; + + public TextMapExtractAdapter(boolean base64DecodeHeaders) { + this.base64 = base64DecodeHeaders ? Base64.getDecoder() : null; + } + + @Override + public void forEachKey(Headers carrier, AgentPropagation.KeyClassifier classifier) { + for (Header header : carrier) { + String key = header.key(); + byte[] value = header.value(); + if (null != value) { + String string = + base64 != null + ? new String(base64.decode(header.value()), UTF_8) + : new String(header.value(), UTF_8); + if (!classifier.accept(key, string)) { + return; + } + } + } + } + + @Override + public void forEachKey(Headers carrier, AgentPropagation.BinaryKeyClassifier classifier) { + for (Header header : carrier) { + String key = header.key(); + byte[] value = header.value(); + if (null != value) { + if (!classifier.accept(key, value)) { + return; + } + } + } + } + + public long extractTimeInQueueStart(Headers carrier) { + Header header = carrier.lastHeader(KafkaDecorator.KAFKA_PRODUCED_KEY); + if (null != header) { + try { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.put(base64 != null ? base64.decode(header.value()) : header.value()); + buf.flip(); + return buf.getLong(); + } catch (Exception e) { + log.debug("Unable to get kafka produced time", e); + } + } + return 0; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java new file mode 100644 index 00000000000..4c53707b289 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapter.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.kafka.common.header.Headers; + +public class TextMapInjectAdapter implements TextMapInjectAdapterInterface { + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(final Headers headers, final String key, final String value) { + headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void set(Headers headers, String key, byte[] value) { + headers.remove(key).add(key, value); + } + + public void injectTimeInQueue(Headers headers) { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.putLong(System.currentTimeMillis()); + headers.add(KafkaDecorator.KAFKA_PRODUCED_KEY, buf.array()); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java new file mode 100644 index 00000000000..e911c1a6df9 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapInjectAdapterInterface.java @@ -0,0 +1,8 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import org.apache.kafka.common.header.Headers; + +public interface TextMapInjectAdapterInterface extends AgentPropagation.BinarySetter { + public void injectTimeInQueue(Headers headers); +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java new file mode 100644 index 00000000000..59b53b1c363 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterable.java @@ -0,0 +1,40 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingIterable implements Iterable>, TracingIterableDelegator { + private final Iterable> delegate; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingIterable( + final Iterable> delegate, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.delegate = delegate; + this.operationName = operationName; + this.decorator = decorator; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public Iterator> iterator() { + // every iteration will add spans. Not only the very first one + return new TracingIterator( + delegate.iterator(), operationName, decorator, group, clusterId, bootstrapServers); + } + + @Override + public Iterable> getDelegate() { + return delegate; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java new file mode 100644 index 00000000000..8b9f94a3e4f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterableDelegator.java @@ -0,0 +1,8 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface TracingIterableDelegator { + // Used by the streams instrumentation to unwrap (disable) the iteration advice. + Iterable> getDelegate(); +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java new file mode 100644 index 00000000000..e30bc02ecd3 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -0,0 +1,146 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.KAFKA_CLUSTER_ID_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients38.TextMapExtractAdapter.GETTER; +import static datadog.trace.instrumentation.kafka_clients38.TextMapInjectAdapter.SETTER; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.instrumentation.kafka_common.StreamingContext; +import datadog.trace.instrumentation.kafka_common.Utils; +import java.util.Iterator; +import java.util.LinkedHashMap; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TracingIterator implements Iterator> { + + private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); + + private final Iterator> delegateIterator; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingIterator( + final Iterator> delegateIterator, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.delegateIterator = delegateIterator; + this.operationName = operationName; + this.decorator = decorator; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public boolean hasNext() { + boolean moreRecords = delegateIterator.hasNext(); + if (!moreRecords) { + // no more records, use this as a signal to close the last iteration scope + closePrevious(true); + } + return moreRecords; + } + + @Override + public ConsumerRecord next() { + final ConsumerRecord next = delegateIterator.next(); + startNewRecordSpan(next); + return next; + } + + protected void startNewRecordSpan(ConsumerRecord val) { + try { + closePrevious(true); + AgentSpan span, queueSpan = null; + if (val != null) { + if (!Config.get().isKafkaClientPropagationDisabledForTopic(val.topic())) { + final Context spanContext = propagate().extract(val.headers(), GETTER); + long timeInQueueStart = GETTER.extractTimeInQueueStart(val.headers()); + if (timeInQueueStart == 0 || !KafkaDecorator.TIME_IN_QUEUE_ENABLED) { + span = startSpan(operationName, spanContext); + } else { + queueSpan = + startSpan( + KafkaDecorator.KAFKA_DELIVER, + spanContext, + MILLISECONDS.toMicros(timeInQueueStart)); + KafkaDecorator.BROKER_DECORATE.afterStart(queueSpan); + KafkaDecorator.BROKER_DECORATE.onTimeInQueue(queueSpan, val); + span = startSpan(operationName, queueSpan.context()); + KafkaDecorator.BROKER_DECORATE.beforeFinish(queueSpan); + // The queueSpan will be finished after inner span has been activated to ensure that + // spans are written out together by TraceStructureWriter when running in strict mode + } + + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_IN); + sortedTags.put(GROUP_TAG, group); + if (clusterId != null) { + sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId); + } + sortedTags.put(TOPIC_TAG, val.topic()); + sortedTags.put(TYPE_TAG, "kafka"); + + final long payloadSize = + span.traceConfig().isDataStreamsEnabled() ? Utils.computePayloadSizeBytes(val) : 0; + if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) { + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, sortedTags, val.timestamp(), payloadSize); + } else { + // when we're in a streaming context we want to consume only from source topics + if (StreamingContext.STREAMING_CONTEXT.isSourceTopic(val.topic())) { + // We have to inject the context to headers here, + // since the data received from the source may leave the topology on + // some other instance of the application, breaking the context propagation + // for DSM users + propagate() + .injectPathwayContext( + span, val.headers(), SETTER, sortedTags, val.timestamp(), payloadSize); + } + } + } else { + span = startSpan(operationName, null); + } + if (val.value() == null) { + span.setTag(InstrumentationTags.TOMBSTONE, true); + } + decorator.afterStart(span); + decorator.onConsume(span, val, group, bootstrapServers); + activateNext(span); + if (null != queueSpan) { + queueSpan.finish(); + } + } + } catch (final Exception e) { + log.debug("Error starting new record span", e); + } + } + + @Override + public void remove() { + delegateIterator.remove(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingList.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingList.java new file mode 100644 index 00000000000..24cd318d175 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingList.java @@ -0,0 +1,161 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingList implements List>, TracingIterableDelegator { + + private final List> delegate; + private final CharSequence operationName; + private final KafkaDecorator decorator; + private final String group; + private final String clusterId; + private final String bootstrapServers; + + public TracingList( + final List> delegate, + final CharSequence operationName, + final KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + this.operationName = operationName; + this.decorator = decorator; + this.delegate = delegate; + this.group = group; + this.clusterId = clusterId; + this.bootstrapServers = bootstrapServers; + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return delegate.contains(o); + } + + @Override + public Iterator> iterator() { + return listIterator(0); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T[] toArray(final T[] a) { + return delegate.toArray(a); + } + + @Override + public boolean add(final ConsumerRecord consumerRecord) { + return delegate.add(consumerRecord); + } + + @Override + public boolean remove(final Object o) { + return delegate.remove(o); + } + + @Override + public boolean containsAll(final Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean addAll(final Collection> c) { + return delegate.addAll(c); + } + + @Override + public boolean addAll(final int index, final Collection> c) { + return delegate.addAll(index, c); + } + + @Override + public boolean removeAll(final Collection c) { + return delegate.removeAll(c); + } + + @Override + public boolean retainAll(final Collection c) { + return delegate.retainAll(c); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public ConsumerRecord get(final int index) { + // TODO: should this be instrumented as well? + return delegate.get(index); + } + + @Override + public ConsumerRecord set(final int index, final ConsumerRecord element) { + return delegate.set(index, element); + } + + @Override + public void add(final int index, final ConsumerRecord element) { + delegate.add(index, element); + } + + @Override + public ConsumerRecord remove(final int index) { + return delegate.remove(index); + } + + @Override + public int indexOf(final Object o) { + return delegate.indexOf(o); + } + + @Override + public int lastIndexOf(final Object o) { + return delegate.lastIndexOf(o); + } + + @Override + public ListIterator> listIterator() { + return listIterator(0); + } + + @Override + public ListIterator> listIterator(final int index) { + // every iteration will add spans. Not only the very first one + return new TracingListIterator( + delegate.listIterator(index), operationName, decorator, group, clusterId, bootstrapServers); + } + + @Override + public List> subList(final int fromIndex, final int toIndex) { + return new TracingList( + delegate.subList(fromIndex, toIndex), + operationName, + decorator, + group, + clusterId, + bootstrapServers); + } + + @Override + public List> getDelegate() { + return delegate; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java new file mode 100644 index 00000000000..9014ff51966 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingListIterator.java @@ -0,0 +1,65 @@ +package datadog.trace.instrumentation.kafka_clients38; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; + +import java.util.ListIterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class TracingListIterator extends TracingIterator + implements ListIterator> { + + private final ListIterator> delegateIterator; + + public TracingListIterator( + ListIterator> delegateIterator, + CharSequence operationName, + KafkaDecorator decorator, + String group, + String clusterId, + String bootstrapServers) { + super(delegateIterator, operationName, decorator, group, clusterId, bootstrapServers); + this.delegateIterator = delegateIterator; + } + + @Override + public boolean hasPrevious() { + boolean moreRecords = delegateIterator.hasPrevious(); + if (!moreRecords) { + // no more records, use this as a signal to close the last iteration scope + closePrevious(true); + } + return moreRecords; + } + + @Override + public ConsumerRecord previous() { + final ConsumerRecord prev = delegateIterator.previous(); + startNewRecordSpan(prev); + return prev; + } + + @Override + public int nextIndex() { + return delegateIterator.nextIndex(); + } + + @Override + public int previousIndex() { + return delegateIterator.previousIndex(); + } + + /* + * org.apache.kafka.clients.consumer.ConsumerRecords::records(TopicPartition) always returns + * UnmodifiableList. Modifiable operations will lead to exception + */ + + @Override + public void set(ConsumerRecord consumerRecord) { + delegateIterator.set(consumerRecord); + } + + @Override + public void add(ConsumerRecord consumerRecord) { + delegateIterator.add(consumerRecord); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMock.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMock.java new file mode 100644 index 00000000000..5bea93a1875 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMock.java @@ -0,0 +1,7 @@ +public class AvroMock { + private final String schema; + + AvroMock(String schema) { + this.schema = schema; + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMockSerializer.java b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMockSerializer.java new file mode 100644 index 00000000000..244359cae98 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/AvroMockSerializer.java @@ -0,0 +1,15 @@ +import java.util.Map; +import org.apache.kafka.common.serialization.Serializer; + +class AvroMockSerializer implements Serializer { + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public byte[] serialize(String topic, AvroMock data) { + return new byte[0]; + } + + @Override + public void close() {} +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy new file mode 100644 index 00000000000..8b51f35214f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientCustomPropagationConfigTest.groovy @@ -0,0 +1,305 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.bootstrap.instrumentation.api.AgentSpan +import datadog.trace.test.util.Flaky +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeaders +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.listener.ContainerProperties + +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan +import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_PRODUCE + +class KafkaClientCustomPropagationConfigTest extends AgentTestRunner { + static final SHARED_TOPIC = ["topic1", "topic2", "topic3", "topic4"] + static final MESSAGE = "Testing without headers for certain topics" + + static final dataTable() { + [ + ["topic1,topic2,topic3,topic4", false, false, false, false], + ["topic1,topic2", false, false, true, true], + ["topic1", false, true, true, true], + ["", true, true, true, true], + ["randomTopic", true, true, true, true] + ] + } + + @Override + boolean useStrictTraceWrites() { + // TODO fix this by making sure that spans get closed properly + return false + } + + @Rule + public KafkaContainer embeddedKafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + .withEmbeddedZookeeper() + + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig("dd.kafka.e2e.duration.enabled", "true") + } + @Flaky + def "test kafka client header propagation with topic filters"() { + setup: + injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS, value as String) + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS","topic1,topic2,topic3,topic4") + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties1 = new ContainerProperties(SHARED_TOPIC[0]) + def containerProperties2 = new ContainerProperties(SHARED_TOPIC[1]) + def containerProperties3 = new ContainerProperties(SHARED_TOPIC[2]) + def containerProperties4 = new ContainerProperties(SHARED_TOPIC[3]) + + // create a Kafka MessageListenerContainer + def container1 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties1) + def container2 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties2) + def container3 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties3) + def container4 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties4) + + // create a thread safe queue to store the received message + def records1 = new LinkedBlockingQueue>() + def records2 = new LinkedBlockingQueue>() + def records3 = new LinkedBlockingQueue>() + def records4 = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container1.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records1.add(record) + } + }) + + container2.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records2.add(record) + } + }) + + container3.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records3.add(record) + } + }) + + container4.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records4.add(record) + } + }) + + // start the container and underlying message listener + container1.start() + container2.start() + container3.start() + container4.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container1, container1.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container2, container2.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container3, container3.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container4, container4.assignedPartitions.size()) + + when: + for (String topic : SHARED_TOPIC) { + kafkaTemplate.send(topic, MESSAGE) + } + + then: + // check that the message was received + def received1 = records1.poll(5, TimeUnit.SECONDS) + def received2 = records2.poll(5, TimeUnit.SECONDS) + def received3 = records3.poll(5, TimeUnit.SECONDS) + def received4 = records4.poll(5, TimeUnit.SECONDS) + + received1.headers().iterator().hasNext() == expected1 + received2.headers().iterator().hasNext() == expected2 + received3.headers().iterator().hasNext() == expected3 + received4.headers().iterator().hasNext() == expected4 + + cleanup: + producerFactory.stop() + container1?.stop() + container2?.stop() + container3?.stop() + container4?.stop() + + where: + [value, expected1, expected2, expected3, expected4]<< dataTable() + } + + @Flaky + def "test consumer with topic filters"() { + setup: + injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_PROPAGATION_DISABLED_TOPICS, value as String) + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS","topic1,topic2,topic3,topic4") + kafkaContainer.start() + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties1 = new ContainerProperties(SHARED_TOPIC[0]) + def containerProperties2 = new ContainerProperties(SHARED_TOPIC[1]) + def containerProperties3 = new ContainerProperties(SHARED_TOPIC[2]) + def containerProperties4 = new ContainerProperties(SHARED_TOPIC[3]) + + // create a Kafka MessageListenerContainer + def container1 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties1) + def container2 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties2) + def container3 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties3) + def container4 = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties4) + + // create a thread safe queue to store the received message + def records1 = new LinkedBlockingQueue() + def records2 = new LinkedBlockingQueue() + def records3 = new LinkedBlockingQueue() + def records4 = new LinkedBlockingQueue() + + // setup a Kafka message listener + container1.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records1.add(activeSpan()) + } + }) + + container2.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records2.add(activeSpan()) + } + }) + + container3.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records3.add(activeSpan()) + } + }) + + container4.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records4.add(activeSpan()) + } + }) + + // start the container and underlying message listener + container1.start() + container2.start() + container3.start() + container4.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container1, container1.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container2, container2.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container3, container3.assignedPartitions.size()) + ContainerTestUtils.waitForAssignment(container4, container4.assignedPartitions.size()) + + when: + Headers header = new RecordHeaders() + + AgentSpan span = startSpan(KAFKA_PRODUCE) + activateSpan(span).withCloseable { + for (String topic : SHARED_TOPIC) { + ProducerRecord record = new ProducerRecord<>( + topic, + 0, + null, + MESSAGE, + header + ) + kafkaTemplate.send(record as ProducerRecord) + } + } + span.finish() + + then: + // check that the message was received + def received1 = records1.poll(5, TimeUnit.SECONDS) + def received2 = records2.poll(5, TimeUnit.SECONDS) + def received3 = records3.poll(5, TimeUnit.SECONDS) + def received4 = records4.poll(5, TimeUnit.SECONDS) + + if (expected1) { + assert received1.getTraceId() == span.getTraceId() + } else { + assert received1.getTraceId() != span.getTraceId() + } + if (expected2) { + assert received2.getTraceId() == span.getTraceId() + } else { + assert received2.getTraceId() != span.getTraceId() + } + if (expected3) { + assert received3.getTraceId() == span.getTraceId() + } else { + assert received3.getTraceId() != span.getTraceId() + } + if (expected4) { + assert received4.getTraceId() == span.getTraceId() + } else { + assert received4.getTraceId() != span.getTraceId() + } + + cleanup: + producerFactory.stop() + container1?.stop() + container2?.stop() + container3?.stop() + container4?.stop() + + + where: + [value, expected1, expected2, expected3, expected4]<< dataTable() + } + +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy new file mode 100644 index 00000000000..9beecf540f3 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -0,0 +1,1207 @@ +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.agent.test.naming.VersionedNamingTestBase +import datadog.trace.api.Config +import datadog.trace.api.DDTags +import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.DDSpan +import datadog.trace.core.datastreams.StatsGroup +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.Rule +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName + +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan + +abstract class KafkaClientTestBase extends VersionedNamingTestBase { + static final SHARED_TOPIC = "shared.topic" + static final String MESSAGE = "Testing without headers for certain topics" + + static final dataTable() { + [ + ["topic1,topic2,topic3,topic4", false, false, false, false], + ["topic1,topic2", false, false, true, true], + ["topic1", false, true, true, true], + ["", true, true, true, true], + ["randomTopic", true, true, true, true] + ] + } + + @Override + boolean useStrictTraceWrites() { + // TODO fix this by making sure that spans get closed properly + return false + } + + @Rule + public KafkaContainer embeddedKafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + .withEmbeddedZookeeper() + + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig("dd.kafka.e2e.duration.enabled", "true") + } + + public static final LinkedHashMap PRODUCER_PATHWAY_EDGE_TAGS + + // filter out Kafka poll, since the function is called in a loop, giving inconsistent results + final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll")) + } + } + + final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() { + @Override + boolean accept(List trace) { + return !(trace.size() == 1 && + trace.get(0).getResourceName().toString().equals("kafka.poll") && + trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0)) + } + } + + // TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results + private static class SortKafkaTraces implements Comparator> { + @Override + int compare(List o1, List o2) { + return rootSpanTrace(o1) - rootSpanTrace(o2) + } + + int rootSpanTrace(List trace) { + assert !trace.isEmpty() + def rootSpan = trace.get(0).localRootSpan + switch (rootSpan.operationName.toString()) { + case "parent": + return 3 + case "kafka.poll": + return 2 + default: + return 1 + } + } + } + + static { + PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3) + PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out") + PRODUCER_PATHWAY_EDGE_TAGS.put("topic", SHARED_TOPIC) + PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka") + } + + def setup() { + TEST_WRITER.setFilter(dropKafkaPoll) + } + + @Override + int version() { + 0 + } + + @Override + String operation() { + return null + } + + String operationForProducer() { + "kafka.produce" + } + + String operationForConsumer() { + "kafka.consume" + } + + String serviceForTimeInQueue() { + "kafka" + } + + abstract boolean hasQueueSpan() + + abstract boolean splitByDestination() + + @Override + protected boolean isDataStreamsEnabled() { + return true + } + + def "test kafka produce and consume"() { + setup: + // Create and start a Kafka container using Testcontainers + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + TEST_WRITER.setFilter(dropEmptyKafkaPoll) + KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + String clusterId = "" + if (isDataStreamsEnabled()) { + producer.flush() + clusterId = producer.metadata.fetch().clusterResource().clusterId() + while (clusterId == null || clusterId.isEmpty()) { + Thread.sleep(1500) + clusterId = producer.metadata.fetch().clusterResource().clusterId() + } + } + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(SHARED_TOPIC) + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) + // ensure consistent ordering of traces + records.add(record) + } + }) + // start the container and underlying message listener + container.start() + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, container.assignedPartitions.size()) + when: + String greeting = "Hello Spring Kafka Sender!" + runUnderTrace("parent") { + producer.send(new ProducerRecord(SHARED_TOPIC,greeting)) { meta, ex -> + assert activeScope().isAsyncPropagating() + if (ex == null) { + runUnderTrace("producer callback") {} + } else { + runUnderTrace("producer exception: " + ex) {} + } + } + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition + //TODO + TEST_DATA_STREAMS_WRITER.waitForBacklogs(2) + + } + + then: + // // check that the message was received + def received = records.poll(10, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + int nTraces = isDataStreamsEnabled() ? 3 : 2 + int produceTraceIdx = nTraces - 1 + TEST_WRITER.waitForTraces(nTraces) + def traces = (Arrays.asList(TEST_WRITER.toArray()) as List>) + Collections.sort(traces, new SortKafkaTraces()) + assertTraces(nTraces, new SortKafkaTraces()) { + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, span(1)) + queueSpan(it, trace(produceTraceIdx)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2]) + } + } + if (isDataStreamsEnabled()) { + trace(1, { pollSpan(it) }) + } + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + } + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}" + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka"] + edgeTags.size() == 4 + } + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId", + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + List produce = [ + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_produce" + ] + List commit = [ + "consumer_group:sender", + "kafka_cluster_id:$clusterId", + "partition:"+received.partition(), + "topic:$SHARED_TOPIC", + "type:kafka_commit" + ] + verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { + contains(new AbstractMap.SimpleEntry, Long>(commit, 1).toString()) + contains(new AbstractMap.SimpleEntry, Long>(produce, 0).toString()) + } + } + + cleanup: + producer.close() + container?.stop() + kafkaContainer.stop() + } + + def "test producing message too large"() { + setup: + // set a low max request size, so that we can crash it + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + senderProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10) + KafkaProducer producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer()) + + + when: + String greeting = "Hello Spring Kafka" + Future future = producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + } + future.get() + then: + thrown ExecutionException + cleanup: + producer.close() + } + + + def "test spring kafka template produce and consume"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + if (isDataStreamsEnabled()) { + senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000) + } + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + String clusterId = null + if (isDataStreamsEnabled()) { + clusterId = waitForKafkaMetadataUpdate(kafkaTemplate) + } + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(SHARED_TOPIC) + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, container.assignedPartitions.size()) + + when: + String greeting = "Hello Spring Kafka Sender!" + runUnderTrace("parent") { + kafkaTemplate.send(SHARED_TOPIC, greeting).whenComplete { meta, ex -> + assert activeScope().isAsyncPropagating() + if (ex == null) { + runUnderTrace("producer callback") {} + } else { + runUnderTrace("producer exception: " + ex) {} + } + } + blockUntilChildSpansFinished(2) + } + if (isDataStreamsEnabled()) { + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + // wait for produce offset 0, commit offset 0 on partition 0 and 1, and commit offset 1 on 1 partition. + //TODO + TEST_DATA_STREAMS_WRITER.waitForBacklogs(2) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(3) { + basicSpan(it, "parent") + basicSpan(it, "producer callback", span(0)) + producerSpan(it, senderProps, span(0), false) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[2]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[2]) + } + } + } + + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[0][2].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[0][2].spanId}" + + if (isDataStreamsEnabled()) { + StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } + verifyAll(first) { + edgeTags == [ + "direction:out", + "kafka_cluster_id:$clusterId".toString(), + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 4 + } + + StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } + verifyAll(second) { + edgeTags == [ + "direction:in", + "group:sender", + "kafka_cluster_id:$clusterId".toString(), + "topic:$SHARED_TOPIC".toString(), + "type:kafka" + ] + edgeTags.size() == 5 + } + List produce = [ + "kafka_cluster_id:$clusterId".toString(), + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_produce" + ] + List commit = [ + "consumer_group:sender", + "kafka_cluster_id:$clusterId".toString(), + "partition:"+received.partition(), + "topic:"+SHARED_TOPIC, + "type:kafka_commit" + ] + verifyAll(TEST_DATA_STREAMS_WRITER.backlogs) { + contains(new AbstractMap.SimpleEntry, Long>(commit, 1).toString()) + contains(new AbstractMap.SimpleEntry, Long>(produce, 0).toString()) + } + } + + cleanup: + producerFactory.stop() + container?.stop() + kafkaContainer.stop() + } + + def "test pass through tombstone"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(SHARED_TOPIC) + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, container.assignedPartitions.size()) + + when: + kafkaTemplate.send(SHARED_TOPIC, null) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == null + received.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps, null, false, true) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1], 0..0, true) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0, true) + } + } + } + + cleanup: + producerFactory.stop() + container?.stop() + kafkaContainer.stop() + + } + + + def "test records(TopicPartition) kafka consume"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + kafkaContainer.stop() + + + } + + def "test records(TopicPartition).subList kafka consume"() { + setup: + + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // set up the Kafka consumer properties + def kafkaPartition = 0 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def records = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)) + def recs = records.subList(0, records.size()).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + kafkaContainer.stop() + + } + def "test records(TopicPartition).forEach kafka consume"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + // set up the Kafka consumer properties + def kafkaPartition = 0 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def pollResult = KafkaTestUtils.getRecords(consumer) + + def records = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)) + + def last = null + records.forEach { + last = it + assert activeSpan() != null + } + + then: + records.size() == 1 + last.value() == greeting + last.key() == null + + assertTraces(2, SORT_TRACES_BY_ID) { + trace(1) { + producerSpan(it, senderProps) + } + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(1)[1]) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0]) + } + } + } + + cleanup: + consumer.close() + producer.close() + kafkaContainer.stop() + + } + + def "test iteration backwards over ConsumerRecords"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + def kafkaPartition = 0 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greetings = ["msg 1", "msg 2", "msg 3"] + greetings.each { + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, it)) + } + + then: + TEST_WRITER.waitForTraces(3) + def pollRecords = KafkaTestUtils.getRecords(consumer) + + def listIter = + pollRecords.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).listIterator() + + then: + def receivedSet = greetings.toSet() + while (listIter.hasNext()) { + listIter.next() + } + while (listIter.hasPrevious()) { + def record = listIter.previous() + receivedSet.remove(record.value()) + assert record.key() == null + } + receivedSet.isEmpty() + + assertTraces(9, SORT_TRACES_BY_ID) { + + // producing traces + trace(1) { + producerSpan(it, senderProps) + } + trace(1) { + producerSpan(it, senderProps) + } + trace(1) { + producerSpan(it, senderProps) + } + + // iterating to the end of ListIterator: + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(3)[1], 0..0) + queueSpan(it, trace(0)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(4)[1], 1..1) + queueSpan(it, trace(1)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(5)[1], 2..2) + queueSpan(it, trace(2)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(1)[0], 1..1) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + } + } + + // backwards iteration over ListIterator to the beginning + if (hasQueueSpan()) { + trace(2) { + consumerSpan(it, consumerProperties, trace(6)[1], 2..2) + queueSpan(it, trace(2)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(7)[1], 1..1) + queueSpan(it, trace(1)[0]) + } + trace(2) { + consumerSpan(it, consumerProperties, trace(8)[1], 0..0) + queueSpan(it, trace(0)[0]) + } + } else { + trace(1) { + consumerSpan(it, consumerProperties, trace(2)[0], 2..2) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(1)[0], 1..1) + } + trace(1) { + consumerSpan(it, consumerProperties, trace(0)[0], 0..0) + } + } + } + + cleanup: + consumer.close() + producer.close() + kafkaContainer.stop() + + } + + def "test kafka client header propagation manual config"() { + setup: + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")).withEmbeddedZookeeper().withEnv("KAFKA_CREATE_TOPICS", SHARED_TOPIC) + kafkaContainer.start() + + def senderProps = KafkaTestUtils.producerProps(kafkaContainer.getBootstrapServers()) + def consumerProperties = KafkaTestUtils.consumerProps( kafkaContainer.getBootstrapServers(),"sender", "false") + + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties = new ContainerProperties(SHARED_TOPIC) + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + if (isDataStreamsEnabled()) { + // even if header propagation is disabled, we want data streams to work. + TEST_DATA_STREAMS_WRITER.waitForGroups(2) + } + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, container.assignedPartitions.size()) + + when: + String message = "Testing without headers" + injectSysConfig("kafka.client.propagation.enabled", value) + kafkaTemplate.send(SHARED_TOPIC, message) + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == expected + + cleanup: + producerFactory.stop() + container?.stop() + kafkaContainer.stop() + + where: + value | expected + "false" | false + "true" | true + } + + def producerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + boolean partitioned = true, + boolean tombstone = false, + String schema = null + ) { + trace.span { + serviceName service() + operationName operationForProducer() + resourceName "Produce Topic $SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + if (partitioned) { + "$InstrumentationTags.PARTITION" { it >= 0 } + } + if (tombstone) { + "$InstrumentationTags.TOMBSTONE" true + } + if ({isDataStreamsEnabled()}) { + "$DDTags.PATHWAY_HASH" { String } + if (schema != null) { + "$DDTags.SCHEMA_DEFINITION" schema + "$DDTags.SCHEMA_WEIGHT" 1 + "$DDTags.SCHEMA_TYPE" "avro" + "$DDTags.SCHEMA_OPERATION" "serialization" + "$DDTags.SCHEMA_ID" "10810872322569724838" + } + } + peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS) + defaultTags() + } + } + } + + def queueSpan( + TraceAssert trace, + DDSpan parentSpan = null + ) { + trace.span { + serviceName splitByDestination() ? "$SHARED_TOPIC" : serviceForTimeInQueue() + operationName "kafka.deliver" + resourceName "$SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_BROKER + defaultTags(true) + } + } + } + + def consumerSpan( + TraceAssert trace, + Map config, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() + ) { + trace.span { + serviceName service() + operationName operationForConsumer() + resourceName "Consume Topic $SHARED_TOPIC" + spanType "queue" + errored false + measured true + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + tags { + "$Tags.COMPONENT" "java-kafka" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "$InstrumentationTags.PARTITION" { it >= 0 } + "$InstrumentationTags.OFFSET" { offset.containsWithinBounds(it as int) } + "$InstrumentationTags.CONSUMER_GROUP" "sender" + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + "$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } + "$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 } + if (tombstone) { + "$InstrumentationTags.TOMBSTONE" true + } + if ({isDataStreamsEnabled()}) { + "$DDTags.PATHWAY_HASH" { String } + } + defaultTags(distributedRootSpan) + } + } + } + + def pollSpan( + TraceAssert trace, + int recordCount = 1, + DDSpan parentSpan = null, + Range offset = 0..0, + boolean tombstone = false, + boolean distributedRootSpan = !hasQueueSpan() + ) { + trace.span { + serviceName Config.get().getServiceName() + operationName "kafka.poll" + resourceName "kafka.poll" + errored false + measured false + tags { + "$InstrumentationTags.KAFKA_RECORDS_COUNT" recordCount + defaultTags(true) + } + } + } + def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) { + kafkaTemplate.flush() + Producer wrappedProducer = kafkaTemplate.getTheProducer() + assert(wrappedProducer instanceof DefaultKafkaProducerFactory.CloseSafeProducer) + Producer producer = wrappedProducer.delegate + assert(producer instanceof KafkaProducer) + String clusterId = producer.metadata.fetch().clusterResource().clusterId() + while (clusterId == null || clusterId.isEmpty()) { + Thread.sleep(1500) + clusterId = producer.metadata.fetch().clusterResource().clusterId() + } + return clusterId + } + +} + +abstract class KafkaClientForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") + injectSysConfig("dd.service", "KafkaClientTest") + } + + @Override + boolean hasQueueSpan() { + return true + } + + @Override + boolean splitByDestination() { + return false + } +} + +class KafkaClientV0ForkedTest extends KafkaClientForkedTest { + @Override + String service() { + return "KafkaClientTest" + } +} + +class KafkaClientV1ForkedTest extends KafkaClientForkedTest { + @Override + int version() { + 1 + } + + @Override + String service() { + return "KafkaClientTest" + } + + @Override + String operationForProducer() { + "kafka.send" + } + + @Override + String operationForConsumer() { + return "kafka.process" + } + + @Override + String serviceForTimeInQueue() { + "kafka-queue" + } + + @Override + boolean hasQueueSpan() { + false + } +} + +class KafkaClientSplitByDestinationForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "false") + injectSysConfig("dd.message.broker.split-by-destination", "true") + } + + @Override + String service() { + return "KafkaClientTest" + } + + @Override + boolean hasQueueSpan() { + return true + } + + @Override + boolean splitByDestination() { + return true + } +} + +abstract class KafkaClientLegacyTracingForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientLegacyTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") + } + + @Override + String service() { + return "kafka" + } + + @Override + boolean hasQueueSpan() { + return false + } + + @Override + boolean splitByDestination() { + return false + } +} + +class KafkaClientLegacyTracingV0ForkedTest extends KafkaClientLegacyTracingForkedTest{ + + +} + +class KafkaClientLegacyTracingV1ForkedTest extends KafkaClientLegacyTracingForkedTest{ + + @Override + int version() { + 1 + } + + @Override + String operationForProducer() { + "kafka.send" + } + + @Override + String operationForConsumer() { + return "kafka.process" + } + + @Override + String service() { + return Config.get().getServiceName() + } +} + +class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { + @Override + void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.service", "KafkaClientDataStreamsDisabledForkedTest") + injectSysConfig("dd.kafka.legacy.tracing.enabled", "true") + } + + @Override + String service() { + return "kafka" + } + + @Override + boolean hasQueueSpan() { + return false + } + + @Override + boolean splitByDestination() { + return false + } + + @Override + boolean isDataStreamsEnabled() { + return false + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy new file mode 100644 index 00000000000..a94e291a97f --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy @@ -0,0 +1,35 @@ +import com.google.common.io.BaseEncoding +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation +import datadog.trace.instrumentation.kafka_clients38.TextMapExtractAdapter +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders + +import java.nio.charset.StandardCharsets + +class TextMapExtractAdapterTest extends AgentTestRunner { + + def "check can decode base64 mangled headers"() { + given: + def base64 = BaseEncoding.base64().encode("foo".getBytes(StandardCharsets.UTF_8)) + def expectedValue = base64Decode ? "foo" : base64 + Headers headers = new RecordHeaders(new RecordHeader("key", base64.getBytes(StandardCharsets.UTF_8))) + TextMapExtractAdapter adapter = new TextMapExtractAdapter(base64Decode) + when: + String extracted = null + adapter.forEachKey(headers, new AgentPropagation.KeyClassifier() { + @Override + boolean accept(String key, String value) { + extracted = value + return false + } + }) + + then: + extracted == expectedValue + + where: + base64Decode << [true, false] + } +} diff --git a/settings.gradle b/settings.gradle index d332a3df27d..371730e65af 100644 --- a/settings.gradle +++ b/settings.gradle @@ -324,6 +324,7 @@ include ':dd-java-agent:instrumentation:junit-5.3:cucumber-junit-5' include ':dd-java-agent:instrumentation:junit-5.3:spock-junit-5' include ':dd-java-agent:instrumentation:kafka-common' include ':dd-java-agent:instrumentation:kafka-clients-0.11' +include 'dd-java-agent:instrumentation:kafka-clients-3.8' include ':dd-java-agent:instrumentation:kafka-streams-0.11' include ':dd-java-agent:instrumentation:kafka-streams-1.0' include ':dd-java-agent:instrumentation:karate' @@ -486,3 +487,4 @@ include ':dd-java-agent:benchmark' include ':dd-java-agent:benchmark-integration' include ':dd-java-agent:benchmark-integration:jetty-perftest' include ':dd-java-agent:benchmark-integration:play-perftest' +