Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP/HTTP collector #13

Merged
merged 14 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
# zipkin-otel
Shared libraries that provide Zipkin integration with the OpenTelemetry. Requires JRE 11 or later.

# Usage
## Usage
These components integrate traced applications and servers with OpenTelemetry protocols
via interfaces defined by [Zipkin](https://github.com/openzipkin/zipkin).

## Collectors
### Collectors
The component in a zipkin server that receives trace data is called a
collector. A collector decodes spans reported by applications and
persists them to a configured collector component.
Expand Down
6 changes: 2 additions & 4 deletions collector-http/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# zipkin-collector-otel-http
# collector-http

This component implements
the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp)
with [Armeria](https://armeria.dev/).
This component implements the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) with [Armeria](https://armeria.dev/).
50 changes: 46 additions & 4 deletions collector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -17,8 +17,8 @@
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>zipkin-collector-otel-http</artifactId>
<name>Zipkin Collector: OpenTelemetry HTTP</name>
<artifactId>collector-http</artifactId>
<name>Zipkin Collector: OTLP HTTP</name>

<properties>
<main.basedir>${project.basedir}/..</main.basedir>
Expand All @@ -37,6 +37,29 @@
<version>${armeria.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>${opentelemetry-proto.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>${opentelemetry-semconv.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<!-- We use provided scope to avoid pinning a protobuf version -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${zipkin.groupId}</groupId>
<artifactId>zipkin-tests</artifactId>
Expand All @@ -50,5 +73,24 @@
<version>${armeria.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>${opentelemetry.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@
*/
package zipkin2.collector.otel.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.protobuf.UnsafeByteOperations;
import com.linecorp.armeria.common.AggregationOptions;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.encoding.StreamDecoderFactory;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServerConfigurator;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.encoding.DecodingService;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
Expand All @@ -18,32 +35,40 @@

public final class OpenTelemetryHttpCollector extends CollectorComponent
implements ServerConfigurator {

public static Builder newBuilder() {
return new Builder();
}

public static final class Builder extends CollectorComponent.Builder {

Collector.Builder delegate = Collector.newBuilder(OpenTelemetryHttpCollector.class);

CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;

@Override public Builder storage(StorageComponent storageComponent) {
@Override
public Builder storage(StorageComponent storageComponent) {
delegate.storage(storageComponent);
return this;
}

@Override public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) throw new NullPointerException("metrics == null");
@Override
public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) {
throw new NullPointerException("metrics == null");
}
delegate.metrics(this.metrics = metrics.forTransport("otel/http"));
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
@Override
public Builder sampler(CollectorSampler sampler) {
delegate.sampler(sampler);
return this;
}

@Override public OpenTelemetryHttpCollector build() {
@Override
public OpenTelemetryHttpCollector build() {
return new OpenTelemetryHttpCollector(this);
}

Expand All @@ -52,38 +77,87 @@ public static final class Builder extends CollectorComponent.Builder {
}

final Collector collector;

final CollectorMetrics metrics;

OpenTelemetryHttpCollector(Builder builder) {
collector = builder.delegate.build();
metrics = builder.metrics;
}

@Override public OpenTelemetryHttpCollector start() {
@Override
public OpenTelemetryHttpCollector start() {
return this;
}

@Override public String toString() {
@Override
public String toString() {
return "OpenTelemetryHttpCollector{}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to name this similar to the sender?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate this?

}

/**
* Reconfigures the service per https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
*/
@Override public void reconfigure(ServerBuilder sb) {
@Override
public void reconfigure(ServerBuilder sb) {
sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip()));
sb.service("/v1/traces", new HttpService(this));
}

static final class HttpService extends AbstractHttpService {
private static final Logger LOG = Logger.getLogger(HttpService.class.getName());

final OpenTelemetryHttpCollector collector;

HttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
}

@Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
throw new RuntimeException("Implement me!");
@Override
protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
CompletableCallback result = new CompletableCallback();
req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop()
)).handle((msg, t) -> {
if (t != null) {
result.onError(t);
return null;
}
try (HttpData content = msg.content()) {
if (content.isEmpty()) {
result.onSuccess(null);
return null;
}

try {
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput());
List<Span> spans = SpanTranslator.translate(request);
collector.collector.accept(spans, result);
}
catch (IOException e) {
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(new UncheckedIOException(e));
Copy link

@kojilin kojilin Sep 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if not rethrow, just use result.onError(e); is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. fixed in b162311

}
return null;
}
});
return HttpResponse.of(result);
}
}

static final class CompletableCallback extends CompletableFuture<HttpResponse>
implements Callback<Void> {

static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED);

@Override
public void onSuccess(Void value) {
complete(HttpResponse.of(ACCEPTED_RESPONSE));
}

@Override
public void onError(Throwable t) {
completeExceptionally(t);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.collector.otel.http;

import java.util.List;

import com.google.protobuf.TextFormat;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;

import static java.util.stream.Collectors.joining;

final class ProtoUtils {
static String kvListToJson(List<KeyValue> attributes) {
return attributes.stream()
.map(entry -> "\"" + entry.getKey() + "\":" + valueToJson(entry.getValue()))
.collect(joining(",", "{", "}"));
}

static String valueToString(AnyValue value) {
if (value.hasStringValue()) {
return value.getStringValue();
}
return valueToJson(value);
}

static String valueToJson(AnyValue value) {
if (value.hasStringValue()) {
return "\"" + value.getStringValue() + "\"";
}
if (value.hasArrayValue()) {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute
return value.getArrayValue().getValuesList().stream()
.map(ProtoUtils::valueToJson)
.collect(joining(",", "[", "]"));
}
if (value.hasKvlistValue()) {
return kvListToJson(value.getKvlistValue().getValuesList());
}
if (value.hasBoolValue()) {
return String.valueOf(value.getBoolValue());
}
if (value.hasDoubleValue()) {
return String.valueOf(value.getDoubleValue());
}
if (value.hasIntValue()) {
return String.valueOf(value.getIntValue());
}
if (value.hasBytesValue()) {
// TODO
return TextFormat.escapeBytes(value.getBytesValue());
}
return value.toString();
}
}
Loading
Loading