Skip to content

Commit

Permalink
Add OTLP/HTTP collector (#13)
Browse files Browse the repository at this point in the history
This pull request adds an OTLP/HTTP implementation of the Zipkin
Collector.

It mostly follows the following document, but does not implement
fallback for remoteEndpoint name and deprecated attribute names have
been changed to new ones.

https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md

Also, the mapping of resource attributes is TBD in this document, and is
not implemented in this PR, but I would like to open a separate issue to
discuss the mapping of resource attributes.

---------

Co-authored-by: minux <[email protected]>
  • Loading branch information
making and minwoox committed Sep 16, 2024
1 parent 98af0bc commit 96a3550
Show file tree
Hide file tree
Showing 11 changed files with 1,513 additions and 27 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
# zipkin-otel
Shared libraries that provide Zipkin integration with the OpenTelemetry. Requires JRE 11 or later.

# Usage
## Usage
These components integrate traced applications and servers with OpenTelemetry protocols
via interfaces defined by [Zipkin](https://github.com/openzipkin/zipkin).

## Collectors
### Collectors
The component in a zipkin server that receives trace data is called a
collector. A collector decodes spans reported by applications and
persists them to a configured collector component.
Expand Down
6 changes: 2 additions & 4 deletions collector-http/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# zipkin-collector-otel-http
# collector-http

This component implements
the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp)
with [Armeria](https://armeria.dev/).
This component implements the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) with [Armeria](https://armeria.dev/).
50 changes: 46 additions & 4 deletions collector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -17,8 +17,8 @@
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>zipkin-collector-otel-http</artifactId>
<name>Zipkin Collector: OpenTelemetry HTTP</name>
<artifactId>collector-http</artifactId>
<name>Zipkin Collector: OTLP HTTP</name>

<properties>
<main.basedir>${project.basedir}/..</main.basedir>
Expand All @@ -37,6 +37,29 @@
<version>${armeria.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>${opentelemetry-proto.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry-semconv.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<!-- We use provided scope to avoid pinning a protobuf version -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${zipkin.groupId}</groupId>
<artifactId>zipkin-tests</artifactId>
Expand All @@ -50,5 +73,24 @@
<version>${armeria.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@
*/
package zipkin2.collector.otel.http;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.protobuf.UnsafeByteOperations;
import com.linecorp.armeria.common.AggregationOptions;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.encoding.StreamDecoderFactory;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServerConfigurator;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.encoding.DecodingService;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
Expand All @@ -18,32 +35,40 @@

public final class OpenTelemetryHttpCollector extends CollectorComponent
implements ServerConfigurator {

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends CollectorComponent.Builder {

Collector.Builder delegate = Collector.newBuilder(OpenTelemetryHttpCollector.class);

CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;

@Override public Builder storage(StorageComponent storageComponent) {
@Override
public Builder storage(StorageComponent storageComponent) {
delegate.storage(storageComponent);
return this;
}

@Override public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) throw new NullPointerException("metrics == null");
@Override
public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) {
throw new NullPointerException("metrics == null");
}
delegate.metrics(this.metrics = metrics.forTransport("otel/http"));
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
@Override
public Builder sampler(CollectorSampler sampler) {
delegate.sampler(sampler);
return this;
}

@Override public OpenTelemetryHttpCollector build() {
@Override
public OpenTelemetryHttpCollector build() {
return new OpenTelemetryHttpCollector(this);
}

Expand All @@ -52,38 +77,101 @@ public static final class Builder extends CollectorComponent.Builder {
}

final Collector collector;

final CollectorMetrics metrics;

OpenTelemetryHttpCollector(Builder builder) {
collector = builder.delegate.build();
metrics = builder.metrics;
}

@Override public OpenTelemetryHttpCollector start() {
@Override
public OpenTelemetryHttpCollector start() {
return this;
}

@Override public String toString() {
@Override
public String toString() {
return "OpenTelemetryHttpCollector{}";
}

/**
* Reconfigures the service per https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
*/
@Override public void reconfigure(ServerBuilder sb) {
@Override
public void reconfigure(ServerBuilder sb) {
sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip()));
sb.service("/v1/traces", new HttpService(this));
}

static final class HttpService extends AbstractHttpService {
private static final Logger LOG = Logger.getLogger(HttpService.class.getName());

final OpenTelemetryHttpCollector collector;

HttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
}

@Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
throw new RuntimeException("Implement me!");
@Override
protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
CompletableCallback result = new CompletableCallback();
req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop()
)).handle((msg, t) -> {
if (t != null) {
collector.metrics.incrementMessagesDropped();
result.onError(t);
return null;
}
try (HttpData content = msg.content()) {
if (content.isEmpty()) {
result.onSuccess(null);
return null;
}
collector.metrics.incrementBytes(content.length());
try {
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput());
collector.metrics.incrementMessages();
try {
List<Span> spans = SpanTranslator.translate(request);
collector.collector.accept(spans, result);
}
catch (RuntimeException e) {
// If the span is invalid, an exception such as IllegalArgumentException will be thrown.
int spanSize = request.getResourceSpansList().stream()
.flatMap(rs -> rs.getScopeSpansList().stream())
.mapToInt(ScopeSpans::getSpansCount).sum();
collector.metrics.incrementSpansDropped(spanSize);
LOG.log(Level.WARNING, "Unable to translate the spans:", e);
result.onError(e);
}
}
catch (IOException e) {
collector.metrics.incrementMessagesDropped();
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(e);
}
return null;
}
});
return HttpResponse.of(result);
}
}

static final class CompletableCallback extends CompletableFuture<HttpResponse>
implements Callback<Void> {

static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED);

@Override
public void onSuccess(Void value) {
complete(HttpResponse.of(ACCEPTED_RESPONSE));
}

@Override
public void onError(Throwable t) {
completeExceptionally(t);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.collector.otel.http;

import java.util.List;

import com.google.protobuf.TextFormat;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;

import static java.util.stream.Collectors.joining;

final class ProtoUtils {
static String kvListToJson(List<KeyValue> attributes) {
return attributes.stream()
.map(entry -> "\"" + entry.getKey() + "\":" + valueToJson(entry.getValue()))
.collect(joining(",", "{", "}"));
}

static String valueToString(AnyValue value) {
if (value.hasStringValue()) {
return value.getStringValue();
}
if (value.hasArrayValue()) {
// While https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute says
// that an array should be encoded as a json,
// the Otel Zipkin Exporter doesn't implement like that https://github.com/open-telemetry/opentelemetry-java/blob/main/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java#L382-L385
// Also Brave doesn't use the json encoding.
// So follow the comma separator here.
return value.getArrayValue().getValuesList().stream()
.map(ProtoUtils::valueToString)
.collect(joining(","));
}
return valueToJson(value);
}

static String valueToJson(AnyValue value) {
if (value.hasStringValue()) {
return "\"" + value.getStringValue() + "\"";
}
if (value.hasArrayValue()) {
return value.getArrayValue().getValuesList().stream()
.map(ProtoUtils::valueToJson)
.collect(joining(",", "[", "]"));
}
if (value.hasKvlistValue()) {
return kvListToJson(value.getKvlistValue().getValuesList());
}
if (value.hasBoolValue()) {
return String.valueOf(value.getBoolValue());
}
if (value.hasDoubleValue()) {
return String.valueOf(value.getDoubleValue());
}
if (value.hasIntValue()) {
return String.valueOf(value.getIntValue());
}
if (value.hasBytesValue()) {
// TODO
return TextFormat.escapeBytes(value.getBytesValue());
}
return value.toString();
}
}
Loading

0 comments on commit 96a3550

Please sign in to comment.