Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP/HTTP collector #13

Merged
merged 14 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ static final class HttpService extends AbstractHttpService {

final OpenTelemetryHttpCollector collector;

final SpanTranslator spanTranslator;

HttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
this.spanTranslator = new SpanTranslator(collector.metrics);
}

@Override
Expand All @@ -118,6 +121,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop()
)).handle((msg, t) -> {
if (t != null) {
collector.metrics.incrementMessagesDropped();
result.onError(t);
return null;
}
Expand All @@ -126,13 +130,15 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
result.onSuccess(null);
return null;
}

collector.metrics.incrementBytes(content.length());
try {
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput());
List<Span> spans = SpanTranslator.translate(request);
collector.metrics.incrementMessages();
List<Span> spans = spanTranslator.translate(request);
collector.collector.accept(spans, result);
}
catch (IOException e) {
collector.metrics.incrementMessagesDropped();
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.opentelemetry.semconv.OtelAttributes;
import io.opentelemetry.semconv.ServiceAttributes;
import zipkin2.Endpoint;
import zipkin2.collector.CollectorMetrics;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand All @@ -38,6 +39,8 @@
*/
final class SpanTranslator {

final CollectorMetrics metrics;

static final AttributeKey<String> PEER_SERVICE = AttributeKey.stringKey("peer.service");

static final String OTEL_DROPPED_ATTRIBUTES_COUNT = "otel.dropped_attributes_count";
Expand All @@ -46,14 +49,24 @@ final class SpanTranslator {

static final String ERROR_TAG = "error";

static List<zipkin2.Span> translate(ExportTraceServiceRequest otelSpans) {
SpanTranslator(CollectorMetrics metrics) {
this.metrics = metrics;
}

List<zipkin2.Span> translate(ExportTraceServiceRequest otelSpans) {
List<zipkin2.Span> spans = new ArrayList<>();
List<ResourceSpans> spansList = otelSpans.getResourceSpansList();
for (ResourceSpans resourceSpans : spansList) {
for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) {
InstrumentationScope scope = scopeSpans.getScope();
for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) {
spans.add(generateSpan(span, scope, resourceSpans.getResource()));
try {
spans.add(generateSpan(span, scope, resourceSpans.getResource()));
}
catch (RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally i would capture at call site calling translator if you can, as this is more like any other issue about malformed inclusing bad trace ID but not limited to that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is caught outside of the translator call, the valid spans in the trace data may also be lost. Is that expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, as consider if it were json and one span was in incorrect format, you would lose the message. I think bugs should be fixed basically

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

say I'm wrong, we get an issue asking for a different behavior also. meanwhile there isn't code at possibly not the right abstraction ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IOException that occurs when converting a protobuf payload to a Java object occurs before calling translate and is handled. This runtime exception occurs when converting an otel span to a zipkin span, and may be an illegal argument or null pointer exception. If a span with an invalid value is inserted among multiple valid spans, is it okay to drop all of them? This is a rare case, so I don't really care.

Copy link
Contributor

@codefromthecrypt codefromthecrypt Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably one thing that I didn't highlight is that when in doubt we can refer to the actual collector.java which handles exception and error the same way https://github.com/openzipkin/zipkin/blob/master/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java#L171-L180

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically what I mean is that error handling and stats handling were all done in collector.java. it is true that we cannot always re-use that type, but the behavior expectations we can borrow from it anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I looked at the implementation of Collector and left incrementing up to Collector as much as possible. I thought I would accept valid spans as much as possible. However, if it's not a good design for Translator to depend on CollectorMetrics, I'll count Dropped outside of Translator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider my opinion non binding basically i feel this code is a little cleaner not injecting collectormetrics to it. i will approve either way. your call

Copy link
Contributor Author

@making making Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically i feel this code is a little cleaner not injecting collectormetrics to it.

Agreed.

I considered passing a callback to decouple, but that would be too much for some rare cases, so I simply caught the exception outside the translator.
5195c30

// If the span is invalid, an exception such as IllegalArgumentException will be thrown.
metrics.incrementSpansDropped(1);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.protobuf.ByteString;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.opentelemetry.api.common.AttributeKey;
Expand All @@ -27,6 +28,9 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.TracesData;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
Expand Down Expand Up @@ -154,6 +158,11 @@ void testServerKind() throws Exception {
assertThat(span.remoteEndpoint()).isNull();
assertThat(span.annotations()).isEmpty();
}
assertThat(metrics.spans()).isEqualTo(size);
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
// TODO calculate received bytes
}

@Test
Expand Down Expand Up @@ -212,6 +221,11 @@ void testServerKindWithEvents() throws Exception {
assertThat(span.annotations().get(2).value()).isEqualTo("\"event-3\":{}");
assertThat(span.annotations().get(2).timestamp()).isEqualTo(toMillis(eventTime3.plusMillis(size)));
}
assertThat(metrics.spans()).isEqualTo(size);
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
// TODO calculate received bytes
}

@Test
Expand Down Expand Up @@ -259,6 +273,11 @@ void testServerKindWithError() throws Exception {
assertThat(span.remoteEndpoint()).isNull();
assertThat(span.annotations()).isEmpty();
}
assertThat(metrics.spans()).isEqualTo(size);
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
// TODO calculate received bytes
}

@Test
Expand Down Expand Up @@ -320,6 +339,104 @@ void testClientKind() throws Exception {
assertThat(span.remoteEndpoint().port()).isEqualTo(8080);
assertThat(span.annotations()).isEmpty();
}
assertThat(metrics.spans()).isEqualTo(size);
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
// TODO calculate received bytes
}

@Test
void minimalSpan() throws Exception {
TracesData tracesData = TracesData.newBuilder()
.addResourceSpans(ResourceSpans.newBuilder()
.addScopeSpans(ScopeSpans.newBuilder()
.addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder()
.setSpanId(ByteString.fromHex("0000000000000001"))
.setTraceId(ByteString.fromHex("00000000000000000000000000000001")))))
.build();

URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/x-protobuf");
try (OutputStream os = connection.getOutputStream()) {
os.write(tracesData.toByteArray());
os.flush();
}
connection.disconnect();
int responseCode = connection.getResponseCode();
assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED);
Awaitility.waitAtMost(Duration.ofMillis(200))
.untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(1));
assertThat(metrics.spans()).isEqualTo(1);
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize());
}

@Test
void invalidSpanId() throws Exception {
TracesData tracesData = TracesData.newBuilder()
.addResourceSpans(ResourceSpans.newBuilder()
.addScopeSpans(ScopeSpans.newBuilder()
.addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder()
.setSpanId(ByteString.fromHex("0000000000000000"))
.setTraceId(ByteString.fromHex("00000000000000000000000000000001")))))
.build();

URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/x-protobuf");
try (OutputStream os = connection.getOutputStream()) {
os.write(tracesData.toByteArray());
os.flush();
}
connection.disconnect();
int responseCode = connection.getResponseCode();
assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED);
Awaitility.waitAtMost(Duration.ofMillis(200))
.untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0));
assertThat(metrics.spans()).isZero();
assertThat(metrics.spansDropped()).isEqualTo(1);
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize());
}

@Test
void invalidTraceId() throws Exception {
TracesData tracesData = TracesData.newBuilder()
.addResourceSpans(ResourceSpans.newBuilder()
.addScopeSpans(ScopeSpans.newBuilder()
.addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder()
.setSpanId(ByteString.fromHex("0000000000000001"))
.setTraceId(ByteString.fromHex("00000000000000000000000000000000")))))
.build();

URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/x-protobuf");
try (OutputStream os = connection.getOutputStream()) {
os.write(tracesData.toByteArray());
os.flush();
}
connection.disconnect();
int responseCode = connection.getResponseCode();
assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED);
Awaitility.waitAtMost(Duration.ofMillis(200))
.untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0));
assertThat(metrics.spans()).isZero();
assertThat(metrics.spansDropped()).isEqualTo(1);
assertThat(metrics.messages()).isEqualTo(1);
assertThat(metrics.messagesDropped()).isZero();
assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize());
}

@Test
Expand All @@ -338,6 +455,11 @@ void emptyRequest() throws Exception {
assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED);
Awaitility.waitAtMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0));
assertThat(metrics.spans()).isZero();
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isZero();
assertThat(metrics.messagesDropped()).isZero();
assertThat(metrics.bytes()).isZero();
}

@Test
Expand All @@ -356,6 +478,11 @@ void brokenRequest() throws Exception {
assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR);
Awaitility.waitAtMost(Duration.ofMillis(200))
.untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0));
assertThat(metrics.spans()).isZero();
assertThat(metrics.spansDropped()).isZero();
assertThat(metrics.messages()).isZero();
assertThat(metrics.messagesDropped()).isEqualTo(1);
assertThat(metrics.bytes()).isEqualTo(1);
}

static long toMillis(Instant instant) {
Expand Down
Loading
Loading