Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP support via HTTP and GRPC #2

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions collector-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>zipkin-collector</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-encoder-otel</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
Expand All @@ -52,18 +57,26 @@
<version>${zipkin.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-grpc</artifactId>
<version>${armeria.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-junit5</artifactId>
<version>${armeria.version}</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@
*/
package zipkin2.collector.otel.grpc;

import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServerConfigurator;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnsafeUnaryGrpcService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import zipkin2.Callback;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.internal.ReadBuffer;
import zipkin2.storage.StorageComponent;
import zipkin2.translation.zipkin.SpanTranslator;

public final class OpenTelemetryGrpcCollector extends CollectorComponent
implements ServerConfigurator {

public static Builder newBuilder() {
return new Builder();
}
Expand All @@ -36,23 +47,29 @@ public static final class Builder extends CollectorComponent.Builder {
Collector.Builder delegate = Collector.newBuilder(OpenTelemetryGrpcCollector.class);
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;

@Override public Builder storage(StorageComponent storageComponent) {
@Override
public Builder storage(StorageComponent storageComponent) {
delegate.storage(storageComponent);
return this;
}

@Override public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) throw new NullPointerException("metrics == null");
@Override
public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) {
throw new NullPointerException("metrics == null");
}
delegate.metrics(this.metrics = metrics.forTransport("otel/grpc"));
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
@Override
public Builder sampler(CollectorSampler sampler) {
delegate.sampler(sampler);
return this;
}

@Override public OpenTelemetryGrpcCollector build() {
@Override
public OpenTelemetryGrpcCollector build() {
return new OpenTelemetryGrpcCollector(this);
}

Expand All @@ -68,30 +85,74 @@ public static final class Builder extends CollectorComponent.Builder {
metrics = builder.metrics;
}

@Override public OpenTelemetryGrpcCollector start() {
@Override
public OpenTelemetryGrpcCollector start() {
return this;
}
@Override public String toString() {

@Override
public String toString() {
return "OpenTelemetryGrpcCollector{}";
}

/**
* Reconfigures the service per https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/collector/trace/v1/trace_service.proto
* Reconfigures the service per
* https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/collector/trace/v1/trace_service.proto
*/
@Override public void reconfigure(ServerBuilder sb) {
sb.service("/opentelemetry.proto.collector.trace.v1.TraceService/Export", new HttpService(this));
@Override
public void reconfigure(ServerBuilder sb) {
sb.service("/opentelemetry.proto.collector.trace.v1.TraceService/Export",
new HttpService(this));
}

static final class HttpService extends AbstractUnsafeUnaryGrpcService {
final OpenTelemetryGrpcCollector collector;

final Collector collector;
final CollectorMetrics metrics;

HttpService(OpenTelemetryGrpcCollector collector) {
this.collector = collector;
this.collector = collector.collector;
this.metrics = collector.metrics;
}

@Override
protected CompletionStage<ByteBuf> handleMessage(ServiceRequestContext ctx, ByteBuf bytes) {
metrics.incrementMessages();
metrics.incrementBytes(bytes.readableBytes());

if (!bytes.isReadable()) {
return CompletableFuture.completedFuture(bytes); // lenient on empty messages
}

try (ReadBuffer readBuffer = ReadBuffer.wrapUnsafe(bytes.nioBuffer())) {
CompletableFutureCallback result = new CompletableFutureCallback();
try {
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(
ByteBufUtil.getBytes(bytes));
List<zipkin2.Span> spans = SpanTranslator.translate(request);
byte[] encoded = SpanBytesEncoder.PROTO3.encodeList(spans);
collector.acceptSpans(encoded, result);
return result;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
} finally {
bytes.release();
}
}
}

static final class CompletableFutureCallback extends CompletableFuture<ByteBuf>
implements Callback<Void> {

@Override
public void onSuccess(Void value) {
complete(Unpooled.EMPTY_BUFFER);
}

@Override
protected CompletionStage<ByteBuf> handleMessage(ServiceRequestContext ctx, ByteBuf message) {
throw new RuntimeException("Implement me!");
public void onError(Throwable t) {
completeExceptionally(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,27 @@
*/
package zipkin2.collector.otel.grpc;

import static io.opentelemetry.sdk.trace.samplers.Sampler.alwaysOn;
import static org.assertj.core.api.Assertions.assertThat;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorSampler;
Expand All @@ -24,11 +42,29 @@

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ITOpenTelemetryGrpcCollector {

InMemoryStorage store;
InMemoryCollectorMetrics metrics;
CollectorComponent collector;

@BeforeEach public void setup() {
SpanExporter spanExporter = OtlpGrpcSpanExporter.builder().build();

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setSampler(alwaysOn())
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
.build();

OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.build();

Tracer tracer = openTelemetrySdk.getTracerProvider()
.get("zipkin2.collector.otel.grpc");

Server server;

@BeforeEach
public void setup() {
store = InMemoryStorage.newBuilder().build();
metrics = new InMemoryCollectorMetrics();

Expand All @@ -38,13 +74,42 @@ class ITOpenTelemetryGrpcCollector {
.storage(store)
.build()
.start();
ServerBuilder serverBuilder = Server.builder().http(4317);
((OpenTelemetryGrpcCollector) collector).reconfigure(serverBuilder);
metrics = metrics.forTransport("otel/grpc");
server = serverBuilder.build();
server.start().join();
}

@AfterEach void teardown() throws IOException {
@AfterEach
void teardown() throws IOException {
store.close();
collector.close();
server.stop().join();
}

// TODO: integration test
@Test
void otelGrpcExporterWorksWithZipkinOtelCollector() throws InterruptedException {
List<String> traceIds = new ArrayList<>();
final int size = 5;
for (int i = 0; i < size; i++) {
// Given
Span span = tracer.spanBuilder("foo " + i)
.setAttribute("foo tag", "foo value")
.setSpanKind(SpanKind.CONSUMER)
.startSpan();
String traceId = span.getSpanContext().getTraceId();
System.out.println("Trace Id <" + traceId + ">");
Thread.sleep(50);
span.addEvent("boom!");
Thread.sleep(50);

// When
span.end();
traceIds.add(traceId);
}

Awaitility.await().untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(5));

}
}
22 changes: 18 additions & 4 deletions collector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>zipkin-collector</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-encoder-otel</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
Expand All @@ -52,11 +57,20 @@
<version>${zipkin.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-junit5</artifactId>
<version>${armeria.version}</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Loading
Loading