From 5cddbec37f907ee21c23ce867035b07d39adc260 Mon Sep 17 00:00:00 2001 From: Ashish Aggarwal Date: Fri, 30 Nov 2018 18:19:46 +0530 Subject: [PATCH 1/4] Adding haystack opencensus integration --- .travis.yml | 10 + commons/pom.xml | 77 ++++++ .../dispatchers/clients/BaseGrpcClient.java | 232 +++++++++++++++++ .../dispatchers/clients/BaseHttpClient.java | 115 +++++++++ .../client/dispatchers/clients/Client.java | 6 +- .../dispatchers/clients/ClientException.java | 0 .../clients/GRPCAgentProtoClient.java | 74 ++++++ .../clients/HttpCollectorProtoClient.java | 42 ++++ .../www/haystack/client/metrics/Counter.java | 0 .../www/haystack/client/metrics/Gauge.java | 0 .../www/haystack/client/metrics/Metrics.java | 0 .../client/metrics/MetricsRegistry.java | 0 .../client/metrics/NoopMetricsRegistry.java | 0 .../www/haystack/client/metrics/Tag.java | 0 .../www/haystack/client/metrics/Timer.java | 0 core/pom.xml | 6 + .../dispatchers/clients/GRPCAgentClient.java | 215 ++-------------- .../clients/HttpCollectorClient.java | 91 +------ .../dispatchers/clients/InMemoryClient.java | 2 +- .../dispatchers/clients/LoggerClient.java | 2 +- .../dispatchers/clients/NoopClient.java | 2 +- .../clients/GRPCAgentClientTest.java | 1 - .../configuration/AgentClientFactory.java | 14 +- .../configuration/ClientFactory.java | 1 - .../configuration/LoggerClientFactory.java | 5 +- .../configuration/NoopClientFactory.java | 1 - integrations/opencensus/docker-compose.yml | 31 +++ integrations/opencensus/pom.xml | 224 +++++++++++++++++ .../trace/HaystackExporterHandler.java | 234 ++++++++++++++++++ .../exporter/trace/HaystackTraceExporter.java | 98 ++++++++ .../trace/config/DispatcherConfig.java | 40 +++ .../config/GrpcAgentDispatcherConfig.java | 54 ++++ .../trace/config/HttpDispatcherConfig.java | 54 ++++ .../trace/HaystackExporterHandlerSpec.scala | 129 ++++++++++ .../HaystackExporterIntegrationSpec.scala | 100 ++++++++ .../trace/HaystackTraceExporterSpec.scala | 57 +++++ integrations/pom.xml | 7 +- pom.xml | 16 ++ 38 files changed, 1634 insertions(+), 306 deletions(-) create mode 100644 commons/pom.xml create mode 100644 commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java create mode 100644 commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseHttpClient.java rename {core => commons}/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java (89%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/ClientException.java (100%) create mode 100644 commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java create mode 100644 commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorProtoClient.java rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/Counter.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/Gauge.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/Metrics.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/MetricsRegistry.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/NoopMetricsRegistry.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/Tag.java (100%) rename {core => commons}/src/main/java/com/expedia/www/haystack/client/metrics/Timer.java (100%) create mode 100644 integrations/opencensus/docker-compose.yml create mode 100644 integrations/opencensus/pom.xml create mode 100644 integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandler.java create mode 100644 integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java create mode 100644 integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/DispatcherConfig.java create mode 100644 integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/GrpcAgentDispatcherConfig.java create mode 100644 integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/HttpDispatcherConfig.java create mode 100644 integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandlerSpec.scala create mode 100644 integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala create mode 100644 integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporterSpec.scala diff --git a/.travis.yml b/.travis.yml index 4f1fd75..53362bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,10 @@ +sudo: required + dist: trusty +services: + - docker + language: java jdk: @@ -9,6 +14,11 @@ cache: directories: - $HOME/.m2 +addons: + hosts: + - haystack-agent + - kafkasvc + install: - ./mvnw --batch-mode install -B -V diff --git a/commons/pom.xml b/commons/pom.xml new file mode 100644 index 0000000..3f13694 --- /dev/null +++ b/commons/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + + + com.expedia.www + haystack-client-java-parent + 0.2.1-SNAPSHOT + + + haystack-client-commons + jar + haystack-client-commons + + + + com.expedia.www + haystack-commons + + + + io.grpc + grpc-netty-shaded + + + + io.grpc + grpc-protobuf + + + + io.grpc + grpc-stub + + + + org.apache.commons + commons-lang3 + + + + org.slf4j + slf4j-api + + + + org.apache.httpcomponents + httpclient + + + + + junit + junit + test + + + + org.mockito + mockito-core + test + + + + io.grpc + grpc-testing + test + + + + org.awaitility + awaitility + test + + + + diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java new file mode 100644 index 0000000..3b1884c --- /dev/null +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java @@ -0,0 +1,232 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.expedia.www.haystack.client.dispatchers.clients; + +import com.expedia.open.tracing.agent.api.DispatchResult; +import com.expedia.open.tracing.agent.api.SpanAgentGrpc; +import com.expedia.www.haystack.client.metrics.*; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +abstract public class BaseGrpcClient { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseGrpcClient.class); + + protected final ManagedChannel channel; + protected final SpanAgentGrpc.SpanAgentStub stub; + protected final long shutdownTimeoutMS; + protected final StreamObserver observer; + + protected final Timer sendTimer; + protected final Counter sendExceptionCounter; + protected final Timer closeTimer; + protected final Counter closeTimeoutCounter; + protected final Counter closeInterruptedCounter; + protected final Counter closeExceptionCounter; + protected final Counter flushCounter; + + public BaseGrpcClient(Metrics metrics, + ManagedChannel channel, + SpanAgentGrpc.SpanAgentStub stub, StreamObserver observer, + long shutdownTimeoutMS) { + this.channel = channel; + this.stub = stub; + this.shutdownTimeoutMS = shutdownTimeoutMS; + this.observer = observer; + + this.sendTimer = Timer.builder("send").register(metrics); + this.sendExceptionCounter = Counter.builder("send").tag(new Tag("state", "exception")).register(metrics); + this.closeTimer = Timer.builder("close").register(metrics); + this.closeTimeoutCounter = Counter.builder("close").tag(new Tag("state", "timeout")).register(metrics); + this.closeInterruptedCounter = Counter.builder("close").tag(new Tag("state", "interrupted")).register(metrics); + this.closeExceptionCounter = Counter.builder("close").tag(new Tag("state", "exception")).register(metrics); + this.flushCounter = Counter.builder("flush").register(metrics); + } + + public void close() { + try (Timer.Sample timer = closeTimer.start()) { + channel.shutdown(); + try { + if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) { + channel.shutdownNow(); + closeTimeoutCounter.increment(); + LOGGER.warn("Channel failed to terminate, forcibly closing it."); + if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) { + closeTimeoutCounter.increment(); + LOGGER.error("Channel failed to terminate."); + } + } + } catch (InterruptedException e) { + closeInterruptedCounter.increment(); + LOGGER.error("Unable to close the channel.", e); + } + } catch (Exception e) { + closeExceptionCounter.increment(); + LOGGER.error("Unexpected exception caught on client shutdown.", e); + throw e; + } + } + + public void flush() { + flushCounter.increment(); + } + + public static class GRPCAgentClientStreamObserver implements StreamObserver { + private Counter onCompletedCounter; + private Counter onErrorCounter; + private Counter ratelimitCounter; + private Counter unknownCounter; + private Counter badresultCounter; + + public GRPCAgentClientStreamObserver(Metrics metrics) { + this.onCompletedCounter = Counter.builder("observer").tag(new Tag("state", "completed")).register(metrics); + this.onErrorCounter = Counter.builder("observer").tag(new Tag("state", "error")).register(metrics); + this.ratelimitCounter = Counter.builder("observer").tag(new Tag("state", "ratelimited")).register(metrics); + this.unknownCounter = Counter.builder("observer").tag(new Tag("state", "unknown")).register(metrics); + this.badresultCounter = Counter.builder("observer").tag(new Tag("state", "badresult")).register(metrics); + } + + @Override + public void onCompleted() { + onCompletedCounter.increment(); + LOGGER.debug("Dispatching span completed"); + } + + @Override + public void onError(Throwable t) { + onErrorCounter.increment(); + LOGGER.error("Dispatching span failed with error: {}", t); + } + + @Override + public void onNext(DispatchResult value) { + switch (value.getCode()) { + case SUCCESS: + // do nothing + break; + case RATE_LIMIT_ERROR: + ratelimitCounter.increment(); + LOGGER.error("Rate limit error received from agent"); + break; + case UNKNOWN_ERROR: + unknownCounter.increment(); + LOGGER.error("Unknown error received from agent"); + break; + default: + badresultCounter.increment(); + LOGGER.error("Unknown result received from agent: {}", value.getCode()); + } + } + } + + public static class Builder { + protected StreamObserver observer; + + protected Metrics metrics; + + // Options to build a channel + protected String host; + protected int port; + protected long keepAliveTimeMS = TimeUnit.SECONDS.toMillis(30); + protected long keepAliveTimeoutMS = TimeUnit.SECONDS.toMillis(30); + protected boolean keepAliveWithoutCalls = true; + protected NegotiationType negotiationType = NegotiationType.PLAINTEXT; + + // either build a channel or provide one + protected ManagedChannel channel; + + protected long shutdownTimeoutMS = TimeUnit.SECONDS.toMillis(30); + + private Builder(MetricsRegistry registry) { + this(new Metrics(registry, Client.class.getName(), Arrays.asList(new Tag("type", "grpc")))); + } + + private Builder(Metrics metrics) { + this.observer = new GRPCAgentClientStreamObserver(metrics); + this.metrics = metrics; + + } + + public Builder(MetricsRegistry metrics, ManagedChannel channel) { + this(metrics); + this.channel = channel; + } + + public Builder(Metrics metrics, ManagedChannel channel) { + this(metrics); + this.channel = channel; + } + + public Builder(MetricsRegistry metrics, String host, int port) { + this(metrics); + this.host = host; + this.port = port; + } + + public Builder(Metrics metrics, String host, int port) { + this(metrics); + this.host = host; + this.port = port; + } + + public Builder withObserver(StreamObserver observer) { + this.observer = observer; + return this; + } + + public Builder withKeepAliveTimeMS(long keepAliveTimeMS) { + this.keepAliveTimeMS = keepAliveTimeMS; + return this; + } + + public Builder withKeepAliveTimeoutMS(long keepAliveTimeoutMS) { + this.keepAliveTimeoutMS = keepAliveTimeoutMS; + return this; + } + + public Builder withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) { + this.keepAliveWithoutCalls = keepAliveWithoutCalls; + return this; + } + + public Builder withNegotiationType(NegotiationType negotiationType) { + this.negotiationType = negotiationType; + return this; + } + + public Builder withShutdownTimeoutMS(long shutdownTimeoutMS) { + this.shutdownTimeoutMS = shutdownTimeoutMS; + return this; + } + + protected ManagedChannel buildManagedChannel() { + return NettyChannelBuilder.forAddress(host, port) + .keepAliveTime(keepAliveTimeMS, TimeUnit.MILLISECONDS) + .keepAliveTimeout(keepAliveTimeoutMS, TimeUnit.MILLISECONDS) + .keepAliveWithoutCalls(keepAliveWithoutCalls) + .negotiationType(negotiationType) + .build(); + } + } +} diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseHttpClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseHttpClient.java new file mode 100644 index 0000000..af07db8 --- /dev/null +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseHttpClient.java @@ -0,0 +1,115 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.expedia.www.haystack.client.dispatchers.clients; + +import org.apache.commons.lang3.Validate; +import org.apache.http.Header; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +abstract class BaseHttpClient { + private final String endpoint; + private final BasicHeader[] headers; + private final CloseableHttpClient httpClient; + + public BaseHttpClient(final String endpoint, + final Map headers) { + this(endpoint, headers, HttpClients.createDefault()); + } + + public BaseHttpClient(final String endpoint) { + this(endpoint, new HashMap<>()); + } + + public BaseHttpClient(final String endpoint, + final Map headers, + final CloseableHttpClient httpClient) { + Validate.notEmpty(endpoint, "Haystack collector endpoint can't be empty"); + + this.endpoint = endpoint; + this.httpClient = httpClient; + this.headers = headers.entrySet().stream() + .map((entry) -> new BasicHeader(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()) + .toArray(new BasicHeader[0]); + } + + public void close() throws ClientException { + closeQuietly(httpClient); + } + + public void flush() throws ClientException { + /* no batching, no flushing */ + } + + public boolean send(final byte[] spanBytes) throws ClientException { + final HttpPost post = new HttpPost(endpoint); + if (headers != null && headers.length > 0) { + post.setHeaders(headers); + } + final ByteArrayEntity entity = new ByteArrayEntity(spanBytes); + entity.setContentType("application/octet-stream"); + post.setEntity(entity); + CloseableHttpResponse response = null; + try { + response = httpClient.execute(post); + final int statusCode = getStatusCode(response); + if (is2xx(statusCode)) { + return true; + } + throw new ClientException(String.format("Failed sending span to http collector endpoint=%s, http status code=%d", endpoint, statusCode)); + } catch (IOException e) { + throw new ClientException(String.format("Failed sending span to http collector endpoint=%s", endpoint), e); + } finally { + closeQuietly(response); + } + } + + private int getStatusCode(final CloseableHttpResponse response) { + return response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode(); + } + + private boolean is2xx(final int statusCode) { + return statusCode >= 200 && statusCode < 300; + } + + public Header[] getHeaders() { + return headers; + } + public String getEndpoint() { + return endpoint; + } + + private void closeQuietly(final Closeable obj) { + try { + obj.close(); + } catch (Exception ex) { + /* be quiet */ + } + } +} diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java similarity index 89% rename from core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java rename to commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java index eb91180..aa1d757 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/Client.java @@ -19,12 +19,10 @@ import java.io.Closeable; import java.io.Flushable; -import com.expedia.www.haystack.client.Span; - /** * A Client is how a RemoteDispatcher sends it's finished spans to a remote endpoint */ -public interface Client extends Closeable, Flushable { +public interface Client extends Closeable, Flushable { @Override @@ -41,5 +39,5 @@ public interface Client extends Closeable, Flushable { * false if it was unsuccessful * @throws ClientException throws a ClientException if an exception occured */ - boolean send(Span span) throws ClientException; + boolean send(R span) throws ClientException; } diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/ClientException.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/ClientException.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/ClientException.java rename to commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/ClientException.java diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java new file mode 100644 index 0000000..b1d5ec5 --- /dev/null +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.expedia.www.haystack.client.dispatchers.clients; + +import com.expedia.open.tracing.Span; +import com.expedia.open.tracing.agent.api.DispatchResult; +import com.expedia.open.tracing.agent.api.SpanAgentGrpc; +import com.expedia.open.tracing.agent.api.SpanAgentGrpc.SpanAgentStub; +import com.expedia.www.haystack.client.metrics.Metrics; +import com.expedia.www.haystack.client.metrics.MetricsRegistry; +import com.expedia.www.haystack.client.metrics.Timer.Sample; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; + +public class GRPCAgentProtoClient extends BaseGrpcClient implements Client { + public GRPCAgentProtoClient(Metrics metrics, + ManagedChannel channel, + SpanAgentStub stub, + StreamObserver observer, + long shutdownTimeoutMS) { + super(metrics, channel, stub, observer, shutdownTimeoutMS); + } + + @Override + public boolean send(Span span) throws ClientException { + try (Sample timer = sendTimer.start()) { + stub.dispatch(span, observer); + } catch (Exception e) { + sendExceptionCounter.increment(); + throw new ClientException(e.getMessage(), e); + } + // always true + return true; + } + + public static final class Builder extends BaseGrpcClient.Builder { + public Builder(MetricsRegistry metrics, ManagedChannel channel) { + super(metrics, channel); + } + + public Builder(Metrics metrics, String host, int port) { + super(metrics, host, port); + } + + public Builder(MetricsRegistry metrics, String host, int port) { + super(metrics, host, port); + } + + public GRPCAgentProtoClient build() { + ManagedChannel managedChannel = this.channel; + + if (managedChannel == null) { + managedChannel = buildManagedChannel(); + } + + SpanAgentStub stub = SpanAgentGrpc.newStub(managedChannel); + return new GRPCAgentProtoClient(metrics, managedChannel, stub, observer, shutdownTimeoutMS); + } + } +} diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorProtoClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorProtoClient.java new file mode 100644 index 0000000..5e7ed94 --- /dev/null +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorProtoClient.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.expedia.www.haystack.client.dispatchers.clients; + +import com.expedia.open.tracing.Span; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.util.Map; + +public class HttpCollectorProtoClient extends BaseHttpClient implements Client{ + public HttpCollectorProtoClient(String endpoint, Map headers) { + super(endpoint, headers); + } + + public HttpCollectorProtoClient(String endpoint) { + super(endpoint); + } + + public HttpCollectorProtoClient(String endpoint, Map headers, CloseableHttpClient httpClient) { + super(endpoint, headers, httpClient); + } + + @Override + public boolean send(Span span) throws ClientException { + return super.send(span.toByteArray()); + } +} diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/Counter.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/Counter.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/Counter.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/Counter.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/Gauge.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/Gauge.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/Gauge.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/Gauge.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/Metrics.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/Metrics.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/Metrics.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/Metrics.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/MetricsRegistry.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/MetricsRegistry.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/MetricsRegistry.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/MetricsRegistry.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/NoopMetricsRegistry.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/NoopMetricsRegistry.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/NoopMetricsRegistry.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/NoopMetricsRegistry.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/Tag.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/Tag.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/Tag.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/Tag.java diff --git a/core/src/main/java/com/expedia/www/haystack/client/metrics/Timer.java b/commons/src/main/java/com/expedia/www/haystack/client/metrics/Timer.java similarity index 100% rename from core/src/main/java/com/expedia/www/haystack/client/metrics/Timer.java rename to commons/src/main/java/com/expedia/www/haystack/client/metrics/Timer.java diff --git a/core/pom.xml b/core/pom.xml index 072f353..920dbeb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -14,6 +14,12 @@ base client library that provides opentracing bindings for the project + + com.expedia.www + haystack-client-commons + ${project.version} + + com.expedia.www haystack-commons diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java index 7e8e702..123d024 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java @@ -16,112 +16,26 @@ */ package com.expedia.www.haystack.client.dispatchers.clients; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.expedia.open.tracing.agent.api.DispatchResult; import com.expedia.open.tracing.agent.api.SpanAgentGrpc; import com.expedia.open.tracing.agent.api.SpanAgentGrpc.SpanAgentStub; import com.expedia.www.haystack.client.Span; import com.expedia.www.haystack.client.dispatchers.formats.Format; import com.expedia.www.haystack.client.dispatchers.formats.ProtoBufFormat; -import com.expedia.www.haystack.client.metrics.Counter; import com.expedia.www.haystack.client.metrics.Metrics; import com.expedia.www.haystack.client.metrics.MetricsRegistry; -import com.expedia.www.haystack.client.metrics.Tag; -import com.expedia.www.haystack.client.metrics.Timer; import com.expedia.www.haystack.client.metrics.Timer.Sample; - import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.StreamObserver; -public class GRPCAgentClient implements Client { - private static final Logger LOGGER = LoggerFactory.getLogger(GRPCAgentClient.class); - +public class GRPCAgentClient extends BaseGrpcClient implements com.expedia.www.haystack.client.dispatchers.clients.Client { private final Format format; - private final ManagedChannel channel; - private final SpanAgentStub stub; - private final long shutdownTimeoutMS; - private final StreamObserver observer; - - private final Timer sendTimer; - private final Counter sendExceptionCounter; - private final Timer closeTimer; - private final Counter closeTimeoutCounter; - private final Counter closeInterruptedCounter; - private final Counter closeExceptionCounter; - private final Counter flushCounter; public GRPCAgentClient(Metrics metrics, Format format, ManagedChannel channel, SpanAgentStub stub, StreamObserver observer, long shutdownTimeoutMS) { + super(metrics, channel, stub, observer, shutdownTimeoutMS); this.format = format; - this.channel = channel; - this.stub = stub; - this.shutdownTimeoutMS = shutdownTimeoutMS; - this.observer = observer; - - this.sendTimer = Timer.builder("send").register(metrics); - this.sendExceptionCounter = Counter.builder("send").tag(new Tag("state", "exception")).register(metrics); - this.closeTimer = Timer.builder("close").register(metrics); - this.closeTimeoutCounter = Counter.builder("close").tag(new Tag("state", "timeout")).register(metrics); - this.closeInterruptedCounter = Counter.builder("close").tag(new Tag("state", "interrupted")).register(metrics); - this.closeExceptionCounter = Counter.builder("close").tag(new Tag("state", "exception")).register(metrics); - this.flushCounter = Counter.builder("flush").register(metrics); - } - public static class GRPCAgentClientStreamObserver implements StreamObserver { - private Counter onCompletedCounter; - private Counter onErrorCounter; - private Counter ratelimitCounter; - private Counter unknownCounter; - private Counter badresultCounter; - - public GRPCAgentClientStreamObserver(Metrics metrics) { - this.onCompletedCounter = Counter.builder("observer").tag(new Tag("state", "completed")).register(metrics); - this.onErrorCounter = Counter.builder("observer").tag(new Tag("state", "error")).register(metrics); - this.ratelimitCounter = Counter.builder("observer").tag(new Tag("state", "ratelimited")).register(metrics); - this.unknownCounter = Counter.builder("observer").tag(new Tag("state", "unknown")).register(metrics); - this.badresultCounter = Counter.builder("observer").tag(new Tag("state", "badresult")).register(metrics); - } - - @Override - public void onCompleted() { - onCompletedCounter.increment(); - LOGGER.debug("Dispatching span completed"); - } - - @Override - public void onError(Throwable t) { - onErrorCounter.increment(); - LOGGER.error("Dispatching span failed with error: {}", t); - } - - @Override - public void onNext(DispatchResult value) { - switch (value.getCode()) { - case SUCCESS: - // do nothing - break; - case RATE_LIMIT_ERROR: - ratelimitCounter.increment(); - LOGGER.error("Rate limit error received from agent"); - break; - case UNKNOWN_ERROR: - unknownCounter.increment(); - LOGGER.error("Unknown error received from agent"); - break; - default: - badresultCounter.increment(); - LOGGER.error("Unknown result received from agent: {}", value.getCode()); - } - } - } @Override public boolean send(Span span) throws ClientException { @@ -135,139 +49,40 @@ public boolean send(Span span) throws ClientException { return true; } - @Override - public void close() { - try (Sample timer = closeTimer.start()) { - channel.shutdown(); - try { - if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) { - channel.shutdownNow(); - closeTimeoutCounter.increment(); - LOGGER.warn("Channel failed to terminate, forcibly closing it."); - if (!channel.awaitTermination(shutdownTimeoutMS, TimeUnit.SECONDS)) { - closeTimeoutCounter.increment(); - LOGGER.error("Channel failed to terminate."); - } - } - } catch (InterruptedException e) { - closeInterruptedCounter.increment(); - LOGGER.error("Unable to close the channel.", e); - } - } catch (Exception e) { - closeExceptionCounter.increment(); - LOGGER.error("Unexpected exception caught on client shutdown.", e); - throw e; - } - } - - @Override - public void flush() { - flushCounter.increment(); - } - - public static final class Builder { + public static final class Builder extends BaseGrpcClient.Builder { private Format format; - private StreamObserver observer; - - private Metrics metrics; - - // Options to build a channel - private String host; - private int port; - private long keepAliveTimeMS = TimeUnit.SECONDS.toMillis(30); - private long keepAliveTimeoutMS = TimeUnit.SECONDS.toMillis(30); - private boolean keepAliveWithoutCalls = true; - private NegotiationType negotiationType = NegotiationType.PLAINTEXT; - - // either build a channel or provide one - private ManagedChannel channel; - - private long shutdownTimeoutMS = TimeUnit.SECONDS.toMillis(30); - - private Builder(MetricsRegistry registry) { - this(new Metrics(registry, Client.class.getName(), Arrays.asList(new Tag("type", "grpc")))); - } - - private Builder(Metrics metrics) { - this.format = new ProtoBufFormat(); - this.observer = new GRPCAgentClientStreamObserver(metrics); - this.metrics = metrics; - - } - public Builder(MetricsRegistry metrics, ManagedChannel channel) { - this(metrics); - this.channel = channel; + super(metrics, channel); } - public Builder(Metrics metrics, ManagedChannel channel) { - this(metrics); - this.channel = channel; + public Builder(Metrics metrics, String host, int port) { + super(metrics, host, port); } public Builder(MetricsRegistry metrics, String host, int port) { - this(metrics); - this.host = host; - this.port = port; - } - - public Builder(Metrics metrics, String host, int port) { - this(metrics); - this.host = host; - this.port = port; + super(metrics, host, port); } - public Builder withFormat(Format format) { + public GRPCAgentClient.Builder withFormat(Format format) { this.format = format; return this; } - public Builder withObserver(StreamObserver observer) { - this.observer = observer; - return this; - } - - public Builder withKeepAliveTimeMS(long keepAliveTimeMS) { - this.keepAliveTimeMS = keepAliveTimeMS; - return this; - } - - public Builder withKeepAliveTimeoutMS(long keepAliveTimeoutMS) { - this.keepAliveTimeoutMS = keepAliveTimeoutMS; - return this; - } - - public Builder withKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) { - this.keepAliveWithoutCalls = keepAliveWithoutCalls; - return this; - } - - public Builder withNegotiationType(NegotiationType negotiationType) { - this.negotiationType = negotiationType; - return this; - } - - public Builder withShutdownTimeoutMS(long shutdownTimeoutMS) { - this.shutdownTimeoutMS = shutdownTimeoutMS; - return this; - } - public GRPCAgentClient build() { - - ManagedChannel managedChannel = channel; + ManagedChannel managedChannel = this.channel; if (managedChannel == null) { - managedChannel = NettyChannelBuilder.forAddress(host, port) - .keepAliveTime(keepAliveTimeMS, TimeUnit.MILLISECONDS) - .keepAliveTimeout(keepAliveTimeoutMS, TimeUnit.MILLISECONDS) - .keepAliveWithoutCalls(keepAliveWithoutCalls) - .negotiationType(negotiationType) - .build(); + managedChannel = buildManagedChannel(); } SpanAgentStub stub = SpanAgentGrpc.newStub(managedChannel); + Format format = this.format; + + if(format == null) { + format = new ProtoBufFormat(); + } return new GRPCAgentClient(metrics, format, managedChannel, stub, observer, shutdownTimeoutMS); } } diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorClient.java index f8dc067..cd9b2c1 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/HttpCollectorClient.java @@ -19,106 +19,33 @@ import com.expedia.www.haystack.client.Span; import com.expedia.www.haystack.client.dispatchers.formats.Format; import com.expedia.www.haystack.client.dispatchers.formats.ProtoBufFormat; -import org.apache.commons.lang3.Validate; -import org.apache.http.Header; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicHeader; -import java.io.Closeable; -import java.io.IOException; -import java.util.HashMap; +import java.util.Collections; import java.util.Map; -import java.util.stream.Collectors; -public class HttpCollectorClient implements Client { +public class HttpCollectorClient extends BaseHttpClient implements Client { private final Format format; - private final String endpoint; - private final BasicHeader[] headers; - private final CloseableHttpClient httpClient; - public HttpCollectorClient(final String endpoint, - final Map headers) { - this(endpoint, headers, HttpClients.createDefault()); + public HttpCollectorClient(String endpoint, Map headers) { + super(endpoint, headers); + this.format = new ProtoBufFormat(); } - public HttpCollectorClient(final String endpoint) { - this(endpoint, new HashMap<>()); + public HttpCollectorClient(String endpoint) { + this(endpoint, Collections.emptyMap()); } public HttpCollectorClient(final String endpoint, final Map headers, final CloseableHttpClient httpClient) { - Validate.notEmpty(endpoint, "Haystack collector endpoint can't be empty"); - + super(endpoint, headers, httpClient); this.format = new ProtoBufFormat(); - this.endpoint = endpoint; - this.httpClient = httpClient; - this.headers = headers.entrySet().stream() - .map((entry) -> new BasicHeader(entry.getKey(), entry.getValue())) - .collect(Collectors.toList()) - .toArray(new BasicHeader[0]); - } - - @Override - public void close() throws ClientException { - closeQuietly(httpClient); - } - - @Override - public void flush() throws ClientException { - /* no batching, no flushing */ } @Override public boolean send(Span span) throws ClientException { final byte[] spanBytes = format.format(span).toByteArray(); - - final HttpPost post = new HttpPost(endpoint); - if (headers != null && headers.length > 0) { - post.setHeaders(headers); - } - final ByteArrayEntity entity = new ByteArrayEntity(spanBytes); - entity.setContentType("application/octet-stream"); - post.setEntity(entity); - CloseableHttpResponse response = null; - try { - response = httpClient.execute(post); - final int statusCode = getStatusCode(response); - if (is2xx(statusCode)) { - return true; - } - throw new ClientException(String.format("Failed sending span to http collector endpoint=%s, http status code=%d", endpoint, statusCode)); - } catch (IOException e) { - throw new ClientException(String.format("Failed sending span to http collector endpoint=%s", endpoint), e); - } finally { - closeQuietly(response); - } - } - - private int getStatusCode(final CloseableHttpResponse response) { - return response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode(); - } - - private boolean is2xx(final int statusCode) { - return statusCode >= 200 && statusCode < 300; - } - - public Header[] getHeaders() { - return headers; - } - public String getEndpoint() { - return endpoint; - } - - private void closeQuietly(final Closeable obj) { - try { - obj.close(); - } catch (Exception ex) { - /* be quiet */ - } + return super.send(spanBytes); } } \ No newline at end of file diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/InMemoryClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/InMemoryClient.java index 8ac1528..b58313f 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/InMemoryClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/InMemoryClient.java @@ -35,7 +35,7 @@ import com.expedia.www.haystack.client.metrics.Timer; import com.expedia.www.haystack.client.metrics.Timer.Sample; -public class InMemoryClient implements Client { +public class InMemoryClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryClient.class); private Semaphore limiter; diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/LoggerClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/LoggerClient.java index d659df4..67f1408 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/LoggerClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/LoggerClient.java @@ -30,7 +30,7 @@ import com.expedia.www.haystack.client.metrics.Timer; import com.expedia.www.haystack.client.metrics.Timer.Sample; -public class LoggerClient implements Client { +public class LoggerClient implements Client { private final Format format; /** Event logger for data {@link LoggerClient#send sent} to the client instance */ diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/NoopClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/NoopClient.java index 8e3202e..f0b9ce4 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/NoopClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/NoopClient.java @@ -18,7 +18,7 @@ import com.expedia.www.haystack.client.Span; -public class NoopClient implements Client { +public class NoopClient implements Client { public NoopClient() { } diff --git a/core/src/test/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClientTest.java b/core/src/test/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClientTest.java index 78c6625..24d6864 100644 --- a/core/src/test/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClientTest.java +++ b/core/src/test/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClientTest.java @@ -74,5 +74,4 @@ public void testDispatch() throws Exception { verify(serviceImpl, times(1)).dispatch(spanCapture.capture(), Matchers.>any()); } - } diff --git a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/AgentClientFactory.java b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/AgentClientFactory.java index f92e6a4..35ff050 100644 --- a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/AgentClientFactory.java +++ b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/AgentClientFactory.java @@ -16,13 +16,6 @@ */ package com.expedia.haystack.dropwizard.configuration; -import javax.annotation.Nullable; -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; - -import org.hibernate.validator.constraints.NotEmpty; - import com.expedia.open.tracing.Span; import com.expedia.www.haystack.client.dispatchers.clients.Client; import com.expedia.www.haystack.client.dispatchers.clients.GRPCAgentClient; @@ -30,8 +23,13 @@ import com.expedia.www.haystack.client.metrics.MetricsRegistry; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; - import io.dropwizard.setup.Environment; +import org.hibernate.validator.constraints.NotEmpty; + +import javax.annotation.Nullable; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; /** * A factory for configuring and building {@link GRPCAgentClient} instances. diff --git a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/ClientFactory.java b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/ClientFactory.java index d3e3eb4..911959c 100644 --- a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/ClientFactory.java +++ b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/ClientFactory.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.As; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; - import io.dropwizard.jackson.Discoverable; import io.dropwizard.setup.Environment; diff --git a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/LoggerClientFactory.java b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/LoggerClientFactory.java index c280af5..bf99c70 100644 --- a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/LoggerClientFactory.java +++ b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/LoggerClientFactory.java @@ -16,16 +16,15 @@ */ package com.expedia.haystack.dropwizard.configuration; -import javax.annotation.Nullable; - import com.expedia.www.haystack.client.dispatchers.clients.Client; import com.expedia.www.haystack.client.dispatchers.clients.LoggerClient; import com.expedia.www.haystack.client.metrics.MetricsRegistry; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; - import io.dropwizard.setup.Environment; +import javax.annotation.Nullable; + /** * A factory for configuring and building {@link LoggerClient} instances. * diff --git a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/NoopClientFactory.java b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/NoopClientFactory.java index accb69c..26aed3c 100644 --- a/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/NoopClientFactory.java +++ b/integrations/dropwizard/src/main/java/com/expedia/haystack/dropwizard/configuration/NoopClientFactory.java @@ -20,7 +20,6 @@ import com.expedia.www.haystack.client.dispatchers.clients.NoopClient; import com.expedia.www.haystack.client.metrics.MetricsRegistry; import com.fasterxml.jackson.annotation.JsonTypeName; - import io.dropwizard.setup.Environment; /** diff --git a/integrations/opencensus/docker-compose.yml b/integrations/opencensus/docker-compose.yml new file mode 100644 index 0000000..3d1549f --- /dev/null +++ b/integrations/opencensus/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3' +services: + haystack_agent: + image: expediadotcom/haystack-agent:0.1 + depends_on: + - zookeeper + - kafkasvc + environment: + haystack_env_agents_spans_port: 35000 + haystack_env_agents_spans_dispatchers_kafka_bootstrap_servers: "kafkasvc:9092" + ports: + - "35000:35000" + entrypoint: + - /bin/sh + - -c + - 'sleep 10 && java -jar /app/bin/haystack-agent.jar --config-provider file --file-path /app/bin/dev.conf' + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + kafkasvc: + image: wurstmeister/kafka:2.11-1.1.1 + depends_on: + - zookeeper + environment: + KAFKA_ADVERTISED_HOST_NAME: "kafkasvc" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + volumes: + - /var/run/docker.sock:/var/run/docker.sock + ports: + - "9092:9092" diff --git a/integrations/opencensus/pom.xml b/integrations/opencensus/pom.xml new file mode 100644 index 0000000..cfd2ec2 --- /dev/null +++ b/integrations/opencensus/pom.xml @@ -0,0 +1,224 @@ + + + 4.0.0 + + + com.expedia.www + haystack-client-java-integrations + 0.2.1-SNAPSHOT + + + opencensus-exporter-trace-haystack + jar + + opencensus-exporter-trace-haystack + Opencensus Trace Exporter for Haystack + + + 0.17.0 + + 2 + 12 + 2 + ${scala.major.version}.${scala.minor.version} + ${scala.major.minor.version}.${scala.maintenance.version} + 0.11.0.0 + 1.6.0 + 3.0.3 + 3.4 + + 3.3.2 + 3.6.1 + + + + + + com.expedia.www + haystack-client-commons + ${project.version} + + + io.opencensus + opencensus-api + ${opencensus.version} + provided + + + io.opencensus + opencensus-impl + ${opencensus.version} + provided + + + + + + + com.expedia.www + haystack-client-commons + + + + io.opencensus + opencensus-api + + + + + org.scala-lang + scala-library + ${scala-library.version} + test + + + + org.scalatest + scalatest_${scala.major.minor.version} + ${scalatest.version} + test + + + + org.easymock + easymock + ${easymock.version} + test + + + + org.pegdown + pegdown + ${pegdown.version} + test + + + + io.opencensus + opencensus-impl + test + + + + org.apache.kafka + kafka_${scala.major.minor.version} + ${kafka-version} + + + org.slf4j + slf4j-log4j12 + + + test + + + org.apache.kafka + kafka-clients + ${kafka-version} + test + + + + + + + ${basedir}/src/main/resources + true + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.scalatest + scalatest-maven-plugin + + + test + + test + + + (?<!Integration)(Test|Spec) + + + + integration-test + integration-test + + test + + + (?<=Integration)(Test|Spec) + + + + + + + org.codehaus.mojo + exec-maven-plugin + + + pre-test + + exec + + pre-integration-test + + docker-compose + + -f + docker-compose.yml + up + -d + + + + + post-test + + exec + + post-integration-test + + docker-compose + + -f + docker-compose.yml + stop + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandler.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandler.java new file mode 100644 index 0000000..5c38ad9 --- /dev/null +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandler.java @@ -0,0 +1,234 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace; + +import com.expedia.open.tracing.Log; +import com.expedia.open.tracing.Tag; +import com.expedia.www.haystack.client.dispatchers.clients.Client; +import com.expedia.www.haystack.client.metrics.Metrics; +import com.google.common.primitives.Longs; +import com.google.errorprone.annotations.MustBeClosed; +import io.opencensus.common.Function; +import io.opencensus.common.Scope; +import io.opencensus.common.Timestamp; +import io.opencensus.trace.*; +import io.opencensus.trace.export.SpanData; +import io.opencensus.trace.export.SpanExporter; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +@ThreadSafe +final class HaystackExporterHandler extends SpanExporter.Handler { + private static final String EXPORT_SPAN_NAME = "ExportHaystackTraces"; + private static final Tag SPAN_KIND_SERVER_TAG = Tag.newBuilder().setKey("span.kind").setType(Tag.TagType.STRING).setVStr("server").build(); + private static final Tag SPAN_KIND_CLIENT_TAG = Tag.newBuilder().setKey("span.kind").setType(Tag.TagType.STRING).setVStr("client").build(); + private static final String DESCRIPTION = "message"; + private static final String MESSAGE_EVENT_ID = "id"; + + private static final String RECEIVED_MESSAGE_EVENT_NAME = "received message id"; + private static final String SENT_MESSAGE_EVENT_NAME = "sent message id"; + private static final String MESSAGE_EVENT_COMPRESSED_SIZE = "compressed_size"; + private static final String MESSAGE_EVENT_UNCOMPRESSED_SIZE = "uncompressed_size"; + + private static final Logger logger = Logger.getLogger(HaystackExporterHandler.class.getName()); + private static final Tracer tracer = Tracing.getTracer(); + + private static final Function defaultAttributeConverter = o -> Tag.newBuilder().setVStr(o.toString()).setType(Tag.TagType.STRING); + private static final Function stringAttributeConverter = s -> Tag.newBuilder().setVStr(s).setType(Tag.TagType.STRING); + private static final Function boolAttributeConverter = s -> Tag.newBuilder().setVBool(s).setType(Tag.TagType.BOOL); + private static final Function longAttributeConverter = s -> Tag.newBuilder().setVLong(s).setType(Tag.TagType.LONG); + private static final Function doubleAttributeConverter = s -> Tag.newBuilder().setVDouble(s).setType(Tag.TagType.DOUBLE); + + private final Client sender; + private final String serviceName; + + HaystackExporterHandler(final Client sender, + final String serviceName, + final Metrics metrics) { + checkNotNull(sender, "haystack span sender must NOT be null."); + checkNotNull(serviceName, "service name must NOT be null"); + this.sender = sender; + this.serviceName = serviceName; + } + + @Override + public void export(Collection spanDataList) { + try (final Scope exportScope = newExportScope()) { + doExport(spanDataList); + } catch (Exception e) { + tracer + .getCurrentSpan() // exportScope above. + .setStatus(Status.UNKNOWN.withDescription(getMessageOrDefault(e))); + logger.log(Level.WARNING, "Failed to export traces to Haystack: " + e); + } + } + + @MustBeClosed + private static Scope newExportScope() { + return tracer.spanBuilder(EXPORT_SPAN_NAME).startScopedSpan(); + } + + private void doExport(final Collection spanDataList) { + for (final SpanData spanData : spanDataList) { + final com.expedia.open.tracing.Span haystackSpan = spanDataToHaystackProtoSpan(spanData); + this.sender.send(haystackSpan); + } + } + + private com.expedia.open.tracing.Span spanDataToHaystackProtoSpan(final SpanData spanData) { + final long startTimeInMicros = timestampToMicros(spanData.getStartTimestamp()); + final long endTimeInMicros = timestampToMicros(spanData.getEndTimestamp()); + + final SpanContext context = spanData.getContext(); + + final String parentSpanId; + if (spanData.getParentSpanId() != null) { + parentSpanId = byteArrayToLongString(spanData.getParentSpanId().getBytes()); + } else { + parentSpanId = ""; + } + + final List tags = buildTagsFromAttributes(spanData.getAttributes().getAttributeMap()); + + // add span.kind tag + addSpanKindToTag(spanData.getKind()).ifPresent(tags::add); + + final List logs = timedEventsToLogs(spanData.getAnnotations().getEvents(), spanData.getMessageEvents().getEvents()); + + return com.expedia.open.tracing.Span.newBuilder() + .setTraceId(byteArrayToLongString(context.getTraceId().getBytes())) + .setSpanId(byteArrayToLongString(context.getSpanId().getBytes())) + .setParentSpanId(parentSpanId) + .setServiceName(serviceName) + .setOperationName(spanData.getName()) + .setStartTime(startTimeInMicros) + .setDuration(endTimeInMicros - startTimeInMicros) + .addAllTags(tags) + .addAllLogs(logs) + .build(); + } + + private List timedEventsToLogs(final List> annotations, + final List> messageEvents) { + final List logs = new ArrayList<>(); + + for (final SpanData.TimedEvent event : annotations) { + final long timestampsInMicros = timestampToMicros(event.getTimestamp()); + final List tags = buildTagsFromAttributes(event.getEvent().getAttributes()); + tags.add(Tag.newBuilder().setKey(DESCRIPTION).setVStr(event.getEvent().getDescription()).setType(Tag.TagType.STRING).build()); + final Log log = Log.newBuilder().setTimestamp(timestampsInMicros).addAllFields(tags).build(); + logs.add(log); + } + + for (final SpanData.TimedEvent event : messageEvents) { + final long timestampsInMicros = timestampToMicros(event.getTimestamp()); + final List tags = new ArrayList<>(4); + tags.add(addTagWithLongValue(MESSAGE_EVENT_ID, event.getEvent().getMessageId())); + tags.add(addTagWithLongValue(MESSAGE_EVENT_COMPRESSED_SIZE, event.getEvent().getCompressedMessageSize())); + tags.add(addTagWithLongValue(MESSAGE_EVENT_UNCOMPRESSED_SIZE, event.getEvent().getUncompressedMessageSize())); + if (event.getEvent().getType() == MessageEvent.Type.RECEIVED) { + tags.add(addTagWithLongValue(RECEIVED_MESSAGE_EVENT_NAME, event.getEvent().getMessageId())); + } else { + tags.add(addTagWithLongValue(SENT_MESSAGE_EVENT_NAME, event.getEvent().getMessageId())); + } + final Log log = Log.newBuilder().setTimestamp(timestampsInMicros).addAllFields(tags).build(); + logs.add(log); + } + + return logs; + } + + private Tag addTagWithLongValue(final String messageEventId, + final long value) { + return Tag.newBuilder().setKey(messageEventId).setVLong(value).setType(Tag.TagType.LONG).build(); + } + + private List buildTagsFromAttributes(final Map attributes) { + final List tags = new ArrayList<>(attributes.size() + 2); + attributes.forEach((attrKey, attrVal) -> { + final Tag.Builder tagBuilder = attrVal.match( + stringAttributeConverter, + boolAttributeConverter, + longAttributeConverter, + doubleAttributeConverter, + defaultAttributeConverter); + tagBuilder.setKey(attrKey); + tags.add(tagBuilder.build()); + }); + return tags; + } + + private String byteArrayToLongString(final byte[] idBuffer) { + if (idBuffer.length == 16) { + final Long lowLong = Longs.fromBytes( + idBuffer[8], + idBuffer[9], + idBuffer[10], + idBuffer[11], + idBuffer[12], + idBuffer[13], + idBuffer[14], + idBuffer[15]); + + final Long highLong = Longs.fromBytes( + idBuffer[0], + idBuffer[1], + idBuffer[2], + idBuffer[3], + idBuffer[4], + idBuffer[5], + idBuffer[6], + idBuffer[7]); + return new UUID(highLong, lowLong).toString(); + } else { + return String.valueOf(Longs.fromByteArray(idBuffer)); + } + } + + private static long timestampToMicros(final @Nullable Timestamp timestamp) { + return (timestamp == null) + ? 0L + : SECONDS.toMicros(timestamp.getSeconds()) + NANOSECONDS.toMicros(timestamp.getNanos()); + } + + private static Optional addSpanKindToTag(@Nullable final Span.Kind kind) { + if (kind == null) { + return Optional.empty(); + } + switch (kind) { + case CLIENT: + return Optional.of(SPAN_KIND_CLIENT_TAG); + case SERVER: + return Optional.of(SPAN_KIND_SERVER_TAG); + default: + return Optional.empty(); + } + } + + private static String getMessageOrDefault(final Exception e) { + return e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage(); + } +} diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java new file mode 100644 index 0000000..0908314 --- /dev/null +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java @@ -0,0 +1,98 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace; + +import com.expedia.open.tracing.Span; +import com.expedia.www.haystack.client.dispatchers.clients.Client; +import com.expedia.www.haystack.client.dispatchers.clients.GRPCAgentProtoClient; +import com.expedia.www.haystack.client.dispatchers.clients.HttpCollectorProtoClient; +import com.expedia.www.haystack.client.metrics.Metrics; +import com.expedia.www.haystack.client.metrics.NoopMetricsRegistry; +import com.www.expedia.opencensus.exporter.trace.config.DispatcherConfig; +import com.www.expedia.opencensus.exporter.trace.config.GrpcAgentDispatcherConfig; +import com.www.expedia.opencensus.exporter.trace.config.HttpDispatcherConfig; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.export.SpanExporter; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import static com.google.common.base.Preconditions.checkState; + +public class HaystackTraceExporter { + private static final String REGISTER_NAME = HaystackTraceExporter.class.getName(); + private static final Object monitor = new Object(); + + @GuardedBy("monitor") + @Nullable + static SpanExporter.Handler handler = null; + + + private HaystackTraceExporter() { + } + + public static void createAndRegister(final DispatcherConfig dispatcherConfig, + final String serviceName) { + createAndRegister(dispatcherConfig, serviceName, new Metrics(new NoopMetricsRegistry())); + } + + public static void createAndRegister(final DispatcherConfig dispatcherConfig, + final String serviceName, + final Metrics metrics) { + synchronized (monitor) { + checkState(handler == null, "haystack exporter is already registered."); + final Client dispatcher = buildRemoteClient(dispatcherConfig, metrics); + final SpanExporter.Handler newHandler = new HaystackExporterHandler(dispatcher, serviceName, metrics); + HaystackTraceExporter.handler = newHandler; + register(Tracing.getExportComponent().getSpanExporter(), newHandler); + } + } + + private static Client buildRemoteClient(final DispatcherConfig dispatcherConfig, final Metrics metrics) { + switch (dispatcherConfig.getType()) { + case GRPC: + final GrpcAgentDispatcherConfig grpcConfig = (GrpcAgentDispatcherConfig) dispatcherConfig; + return new GRPCAgentProtoClient.Builder(metrics, grpcConfig.getHost(), grpcConfig.getPort()).build(); + case HTTP: + final HttpDispatcherConfig httpConfig = (HttpDispatcherConfig) dispatcherConfig; + return new HttpCollectorProtoClient(httpConfig.getHost(), httpConfig.getHttpHeaders()); + default: + throw new RuntimeException("Fail to recognize the dispatcher config for haystack"); + } + } + + /** + * Registers the {@link HaystackTraceExporter}. + * + * @param spanExporter the instance of the {@code SpanExporter} where this service is registered. + */ + private static void register(final SpanExporter spanExporter, final SpanExporter.Handler handler) { + spanExporter.registerHandler(REGISTER_NAME, handler); + } + + /** + * Unregisters the {@link HaystackTraceExporter}. + */ + public static void unregister() { + synchronized (monitor) { + checkState(handler != null, "haystack exporter is not registered."); + Tracing.getExportComponent().getSpanExporter().unregisterHandler(REGISTER_NAME); + handler = null; + } + } +} \ No newline at end of file diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/DispatcherConfig.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/DispatcherConfig.java new file mode 100644 index 0000000..1f4e90c --- /dev/null +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/DispatcherConfig.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace.config; + +import org.apache.commons.lang3.Validate; + +public abstract class DispatcherConfig { + public enum DispatchType { + GRPC, + HTTP + } + + final private long shutdownTimeoutInMillis; + + public DispatcherConfig(long shutdownTimeoutInMillis) { + Validate.isTrue(shutdownTimeoutInMillis > 0, "shutdown timeout greater than zero"); + this.shutdownTimeoutInMillis = shutdownTimeoutInMillis; + } + + public long getShutdownTimeoutInMillis() { + return shutdownTimeoutInMillis; + } + + public abstract DispatchType getType(); +} diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/GrpcAgentDispatcherConfig.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/GrpcAgentDispatcherConfig.java new file mode 100644 index 0000000..db0310d --- /dev/null +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/GrpcAgentDispatcherConfig.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace.config; + +import org.apache.commons.lang3.Validate; + +public class GrpcAgentDispatcherConfig extends DispatcherConfig { + + private final String host; + private int port; + + public GrpcAgentDispatcherConfig(final String host, + final int port) { + this(host, port, 5000); + } + + public GrpcAgentDispatcherConfig(final String host, + final int port, + final long shutdownTimeoutInMillis) { + super(shutdownTimeoutInMillis); + + Validate.notEmpty(host); + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + @Override + public DispatchType getType() { + return DispatchType.GRPC; + } +} diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/HttpDispatcherConfig.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/HttpDispatcherConfig.java new file mode 100644 index 0000000..87dd758 --- /dev/null +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/config/HttpDispatcherConfig.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace.config; + +import org.apache.commons.lang3.Validate; + +import java.util.Collections; +import java.util.Map; + +public class HttpDispatcherConfig extends DispatcherConfig { + private final String host; + private final Map httpHeaders; + + public HttpDispatcherConfig(final String host) { + this(host, Collections.emptyMap(), 5000); + } + public HttpDispatcherConfig(final String host, + final Map httpHeaders, + final long shutdownTimeoutInMillis) { + super(shutdownTimeoutInMillis); + Validate.notEmpty(host, "haystack http host can't be empty"); + + this.host = host; + this.httpHeaders = httpHeaders == null ? Collections.emptyMap() : httpHeaders; + } + + public String getHost() { + return host; + } + + public Map getHttpHeaders() { + return httpHeaders; + } + + @Override + public DispatchType getType() { + return DispatchType.HTTP; + } +} diff --git a/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandlerSpec.scala b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandlerSpec.scala new file mode 100644 index 0000000..608b008 --- /dev/null +++ b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterHandlerSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace + + +import java.util.Collections + +import com.expedia.www.haystack.client.dispatchers.clients.Client +import com.expedia.www.haystack.client.metrics.{Metrics, NoopMetricsRegistry} +import com.google.common.collect.{ImmutableMap, Lists} +import io.opencensus.common.Timestamp +import io.opencensus.trace.export.SpanData +import io.opencensus.trace.{MessageEvent, Tracestate, _} +import org.easymock.EasyMock +import org.scalatest._ +import org.scalatest.easymock.EasyMockSugar +import scala.collection.JavaConverters._ + +class HaystackExporterHandlerSpec extends FunSpec with GivenWhenThen with Matchers with EasyMockSugar { + + private val FF = 0xFF.toByte + describe("Haystack Exporter Handler") { + it("should convert and dispatch opencensus modeled spans to haystack remote client") { + val client = mock[Client[com.expedia.open.tracing.Span]] + val capturedSpan = EasyMock.newCapture[com.expedia.open.tracing.Span]() + + expecting { + client.send(EasyMock.capture(capturedSpan)).andReturn(true) + } + + whenExecuting(client) { + val handler = new HaystackExporterHandler(client, "dummy-service", new Metrics(new NoopMetricsRegistry)) + handler.export(Collections.singleton(spanData(System.currentTimeMillis() - 10l*1000, System.currentTimeMillis()))) + } + + val haystackSpan = capturedSpan.getValue + haystackSpan.getServiceName shouldEqual "dummy-service" + haystackSpan.getOperationName shouldEqual "myop" + haystackSpan.getParentSpanId shouldEqual "9223372036854775807" + haystackSpan.getSpanId shouldEqual "256" + haystackSpan.getTraceId shouldEqual "ff000000-0000-0000-0000-000000000001" + haystackSpan.getTagsCount shouldBe 4 + haystackSpan.getTagsList.asScala.find(_.getKey == "BOOL").get.getVBool shouldBe false + haystackSpan.getTagsList.asScala.find(_.getKey == "STRING").get.getVStr shouldEqual "hello world!" + haystackSpan.getTagsList.asScala.find(_.getKey == "LONG").get.getVLong shouldBe 9223372036854775807l + haystackSpan.getTagsList.asScala.find(_.getKey == "span.kind").get.getVStr shouldEqual "server" + + haystackSpan.getLogsCount shouldBe 2 + haystackSpan.getDuration shouldBe 10l * 1000 * 1000l + haystackSpan.getLogs(0).getTimestamp shouldBe 1234567890000000l + + val logTags = haystackSpan.getLogs(0).getFieldsList.asScala + logTags.find(_.getKey == "bool").get.getVBool shouldBe true + logTags.find(_.getKey == "string").get.getVStr shouldEqual "hello asia!" + logTags.find(_.getKey == "message").get.getVStr shouldEqual "annotation #1" + haystackSpan.getLogs(1).getTimestamp shouldBe 1234567890000000l + } + + it("should swallow and only log exception if remote client is failing!") { + val client = mock[Client[com.expedia.open.tracing.Span]] + val capturedSpan = EasyMock.newCapture[com.expedia.open.tracing.Span]() + + expecting { + client.send(EasyMock.capture(capturedSpan)).andThrow(new RuntimeException("fail to connect to remote box!")).times(1) + } + + whenExecuting(client) { + val handler = new HaystackExporterHandler(client, "dummy-service", new Metrics(new NoopMetricsRegistry)) + handler.export(Collections.singleton(spanData(System.currentTimeMillis() - 10l*1000, System.currentTimeMillis()))) + } + } + } + + private def spanData(startTime: Long, endTime: Long): SpanData = { + import io.opencensus.trace.Span.Kind + import io.opencensus.trace.export.SpanData + SpanData.create( + sampleSpanContext, + SpanId.fromBytes(Array[Byte](0x7F.toByte, FF, FF, FF, FF, FF, FF, FF)), + true, + "myop", + Kind.SERVER, + Timestamp.fromMillis(startTime), + SpanData.Attributes.create(sampleAttributes, 0), + SpanData.TimedEvents.create(Collections.singletonList(sampleAnnotation), 0), + SpanData.TimedEvents.create(Collections.singletonList(sampleMessageEvent), 0), + SpanData.Links.create(sampleLinks, 0), 0, Status.OK, Timestamp.fromMillis(endTime)) + } + + + private def sampleSpanContext = + SpanContext.create( + TraceId.fromBytes(Array[Byte](FF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1)), + SpanId.fromBytes(Array[Byte](0, 0, 0, 0, 0, 0, 1, 0)), + TraceOptions.builder.setIsSampled(true).build, + Tracestate.builder.build) + + private def sampleAttributes = ImmutableMap.of( + "BOOL", AttributeValue.booleanAttributeValue(false), + "LONG", AttributeValue.longAttributeValue(Long.MaxValue), + "STRING", AttributeValue.stringAttributeValue("hello world!")) + + private def sampleAnnotation = SpanData.TimedEvent.create(Timestamp.create(1234567890L, 100), Annotation.fromDescriptionAndAttributes("annotation #1", ImmutableMap.of("bool", AttributeValue.booleanAttributeValue(true), "long", AttributeValue.longAttributeValue(12345l), "string", AttributeValue.stringAttributeValue("hello asia!")))) + + private def sampleMessageEvent = SpanData.TimedEvent.create(Timestamp.create(1234567890L, 500), MessageEvent.builder(MessageEvent.Type.SENT, 4L).setCompressedMessageSize(50).setUncompressedMessageSize(100).build) + + private def sampleLinks = + Lists.newArrayList( + Link.fromSpanContext( + SpanContext.create(TraceId.fromBytes(Array[Byte](FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, FF, 0)), + SpanId.fromBytes(Array[Byte](0, 0, 0, 0, 0, 0, 2, 0)), + TraceOptions.builder.setIsSampled(false).build, Tracestate.builder.build), + Link.Type.CHILD_LINKED_SPAN, ImmutableMap.of("Bool", AttributeValue.booleanAttributeValue(true), "Long", AttributeValue.longAttributeValue(299792458L), "String", AttributeValue.stringAttributeValue("hello-asia")))) +} \ No newline at end of file diff --git a/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala new file mode 100644 index 0000000..f09bfe4 --- /dev/null +++ b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala @@ -0,0 +1,100 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace + + +import java.util +import java.util.{Collections, Random} + +import com.www.expedia.opencensus.exporter.trace.config.GrpcAgentDispatcherConfig +import io.opencensus.trace.samplers.Samplers +import io.opencensus.trace.{AttributeValue, Status, Tracer, Tracing} +import org.apache.kafka.clients.consumer.ConsumerConfig._ +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} +import org.scalatest.{BeforeAndAfterAll, FunSpec, GivenWhenThen, Matchers} + +import scala.collection.JavaConverters._ + +class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Matchers with BeforeAndAfterAll { + private val OPERATION_NAME = "/search" + private val SERVICE_NAME = "my-service" + private var consumer: KafkaConsumer[String, Array[Byte]] = _ + + override def beforeAll(): Unit = { + Thread.sleep(20000) + val config: java.util.Map[String, Object] = new util.HashMap() + config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkasvc:9092") + config.put(GROUP_ID_CONFIG, "integration_test") + config.put(AUTO_OFFSET_RESET_CONFIG, "earliest") + consumer = new KafkaConsumer(config, new StringDeserializer(), new ByteArrayDeserializer()) + consumer.subscribe(Collections.singleton("proto-spans")) + } + + override def afterAll(): Unit = { + HaystackTraceExporter.unregister() + } + + private def generateTrace(tracer: Tracer, startTimeInMillis: Long) = { + val spanBuilder = tracer.spanBuilder(OPERATION_NAME).setRecordEvents(true).setSampler(Samplers.alwaysSample()) + val spanDurationInMillis = new Random().nextInt(10) + 1 + + val scopedSpan = spanBuilder.startScopedSpan + try { + tracer.getCurrentSpan.addAnnotation("start searching") + Thread.sleep(spanDurationInMillis) + tracer.getCurrentSpan.putAttribute("foo", AttributeValue.stringAttributeValue("bar")) + tracer.getCurrentSpan.putAttribute("items", AttributeValue.longAttributeValue(10l)) + tracer.getCurrentSpan.addAnnotation("done searching") + } catch { + case _: Exception => + tracer.getCurrentSpan.addAnnotation("Exception thrown when processing video.") + tracer.getCurrentSpan.setStatus(Status.UNKNOWN) + } finally { + scopedSpan.close() + } + } + + describe("Integration Test with haystack and opencensus") { + it ("should dispatch the spans to haystack-agent") { + val startTimeMillis = System.currentTimeMillis() + HaystackTraceExporter.createAndRegister(new GrpcAgentDispatcherConfig("haystack-agent", 35000), SERVICE_NAME) + + val tracer = Tracing.getTracer + + generateTrace(tracer, startTimeMillis) + Thread.sleep(2000) + generateTrace(tracer, startTimeMillis) + + // wait for few sec to let the span reach kafka + Thread.sleep(10000) + + val records = consumer.poll(2000) + records.count > 1 shouldBe true + val record = records.iterator().next() + val protoSpan = com.expedia.open.tracing.Span.parseFrom(record.value()) + protoSpan.getTraceId shouldEqual record.key() + protoSpan.getStartTime should (be >= startTimeMillis * 1000 and be <= System.currentTimeMillis() * 1000) // micros + protoSpan.getServiceName shouldEqual SERVICE_NAME + protoSpan.getOperationName shouldEqual OPERATION_NAME + protoSpan.getTagsCount shouldBe 2 + protoSpan.getTagsList.asScala.find(_.getKey == "foo").get.getVStr shouldEqual "bar" + protoSpan.getTagsList.asScala.find(_.getKey == "items").get.getVLong shouldBe 10 + } + } +} diff --git a/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporterSpec.scala b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporterSpec.scala new file mode 100644 index 0000000..f6ca7f4 --- /dev/null +++ b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporterSpec.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2018 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.www.expedia.opencensus.exporter.trace + +import com.www.expedia.opencensus.exporter.trace.config.{GrpcAgentDispatcherConfig, HttpDispatcherConfig} +import org.scalatest.{FunSpec, GivenWhenThen, Matchers} + +class HaystackTraceExporterSpec extends FunSpec with GivenWhenThen with Matchers { + describe("Haystack trace exporter") { + it ("should create and register with grpc client and throw illegal state exception if recreated") { + HaystackTraceExporter.createAndRegister(new GrpcAgentDispatcherConfig("haystack-agent", 35000), "my-service") + HaystackTraceExporter.handler should not be null + + intercept[IllegalStateException] { + HaystackTraceExporter.createAndRegister(new GrpcAgentDispatcherConfig("haystack-agent", 35000), "my-service") + } + + HaystackTraceExporter.unregister() + HaystackTraceExporter.handler shouldBe null + + intercept[IllegalStateException] { + HaystackTraceExporter.unregister() + } + } + + it ("should create and register with http client and throw illegal state exception if recreated") { + HaystackTraceExporter.createAndRegister(new HttpDispatcherConfig("http://haystack-http"), "my-service") + HaystackTraceExporter.handler should not be null + + intercept[IllegalStateException] { + HaystackTraceExporter.createAndRegister(new HttpDispatcherConfig("haystack-agent"), "my-service") + } + + HaystackTraceExporter.unregister() + HaystackTraceExporter.handler shouldBe null + + intercept[IllegalStateException] { + HaystackTraceExporter.unregister() + } + } + } +} \ No newline at end of file diff --git a/integrations/pom.xml b/integrations/pom.xml index ddf269c..55c9e28 100644 --- a/integrations/pom.xml +++ b/integrations/pom.xml @@ -38,8 +38,9 @@ - dropwizard - dropwizard-metrics - micrometer + dropwizard + dropwizard-metrics + micrometer + opencensus diff --git a/pom.xml b/pom.xml index 5791575..177a51b 100644 --- a/pom.xml +++ b/pom.xml @@ -34,6 +34,7 @@ + commons core integrations examples @@ -65,6 +66,8 @@ 2.10.4 3.0.1 1.6.8 + 1.6.0 + 1.0 @@ -316,6 +319,19 @@ + + + org.codehaus.mojo + exec-maven-plugin + ${exec-maven-plugin.version} + + + + org.scalatest + scalatest-maven-plugin + ${maven-scalatest-plugin.version} + + From a334516bfccab461c422bbe6337ef0da2c5cbb56 Mon Sep 17 00:00:00 2001 From: Ashish Aggarwal Date: Fri, 30 Nov 2018 19:50:11 +0530 Subject: [PATCH 2/4] refactoring for backward compatiblity --- .../client/dispatchers/clients/BaseGrpcClient.java | 6 ++++-- .../dispatchers/clients/GRPCAgentProtoClient.java | 2 +- .../java/com/expedia/www/haystack/client/Tracer.java | 11 ++++++++--- .../client/dispatchers/clients/GRPCAgentClient.java | 2 +- .../trace/HaystackExporterIntegrationSpec.scala | 9 +++------ 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java index 3b1884c..1fcee4e 100644 --- a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/BaseGrpcClient.java @@ -30,7 +30,7 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; -abstract public class BaseGrpcClient { +abstract public class BaseGrpcClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(BaseGrpcClient.class); protected final ManagedChannel channel; @@ -140,7 +140,7 @@ public void onNext(DispatchResult value) { } } - public static class Builder { + public static abstract class Builder { protected StreamObserver observer; protected Metrics metrics; @@ -228,5 +228,7 @@ protected ManagedChannel buildManagedChannel() { .negotiationType(negotiationType) .build(); } + + public abstract BaseGrpcClient build(); } } diff --git a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java index b1d5ec5..3dea2e3 100644 --- a/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java +++ b/commons/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentProtoClient.java @@ -26,7 +26,7 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; -public class GRPCAgentProtoClient extends BaseGrpcClient implements Client { +public class GRPCAgentProtoClient extends BaseGrpcClient { public GRPCAgentProtoClient(Metrics metrics, ManagedChannel channel, SpanAgentStub stub, diff --git a/core/src/main/java/com/expedia/www/haystack/client/Tracer.java b/core/src/main/java/com/expedia/www/haystack/client/Tracer.java index 2166b34..a1cec34 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/Tracer.java +++ b/core/src/main/java/com/expedia/www/haystack/client/Tracer.java @@ -62,6 +62,11 @@ public class Tracer implements io.opentracing.Tracer { private final Timer extractTimer; private final Counter extractFailureCounter; + public Tracer(String serviceName, ScopeManager scopeManager, Clock clock, + Dispatcher dispatcher, PropagationRegistry registry, Metrics metrics) { + this(serviceName, scopeManager, clock, dispatcher, registry, metrics, false); + + } public Tracer(String serviceName, ScopeManager scopeManager, Clock clock, Dispatcher dispatcher, PropagationRegistry registry, Metrics metrics, boolean dualSpanMode) { this.serviceName = serviceName; @@ -266,15 +271,15 @@ boolean isServerSpan() { return Tags.SPAN_KIND_SERVER.equals(tags.get(Tags.SPAN_KIND.getKey())); } - SpanContext createNewContext() { + protected SpanContext createNewContext() { return createContext(UUID.randomUUID(), UUID.randomUUID(), null, Collections.emptyMap()); } - SpanContext createContext(UUID traceId, UUID spanId, UUID parentId, Map baggage) { + protected SpanContext createContext(UUID traceId, UUID spanId, UUID parentId, Map baggage) { return new SpanContext(traceId, spanId, parentId, baggage, false); } - SpanContext createDependentContext() { + protected SpanContext createDependentContext() { Reference parent = references.get(0); for (Reference reference : references) { if (References.CHILD_OF.equals(reference.getReferenceType())) { diff --git a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java index 123d024..926fc5b 100644 --- a/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java +++ b/core/src/main/java/com/expedia/www/haystack/client/dispatchers/clients/GRPCAgentClient.java @@ -28,7 +28,7 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; -public class GRPCAgentClient extends BaseGrpcClient implements com.expedia.www.haystack.client.dispatchers.clients.Client { +public class GRPCAgentClient extends BaseGrpcClient { private final Format format; public GRPCAgentClient(Metrics metrics, Format format, ManagedChannel channel, SpanAgentStub stub, StreamObserver observer, long shutdownTimeoutMS) { diff --git a/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala index f09bfe4..9f4a509 100644 --- a/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala +++ b/integrations/opencensus/src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala @@ -50,7 +50,7 @@ class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Ma HaystackTraceExporter.unregister() } - private def generateTrace(tracer: Tracer, startTimeInMillis: Long) = { + private def generateTrace(tracer: Tracer) = { val spanBuilder = tracer.spanBuilder(OPERATION_NAME).setRecordEvents(true).setSampler(Samplers.alwaysSample()) val spanDurationInMillis = new Random().nextInt(10) + 1 @@ -72,14 +72,12 @@ class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Ma describe("Integration Test with haystack and opencensus") { it ("should dispatch the spans to haystack-agent") { - val startTimeMillis = System.currentTimeMillis() HaystackTraceExporter.createAndRegister(new GrpcAgentDispatcherConfig("haystack-agent", 35000), SERVICE_NAME) - val tracer = Tracing.getTracer - generateTrace(tracer, startTimeMillis) + generateTrace(tracer) Thread.sleep(2000) - generateTrace(tracer, startTimeMillis) + generateTrace(tracer) // wait for few sec to let the span reach kafka Thread.sleep(10000) @@ -89,7 +87,6 @@ class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Ma val record = records.iterator().next() val protoSpan = com.expedia.open.tracing.Span.parseFrom(record.value()) protoSpan.getTraceId shouldEqual record.key() - protoSpan.getStartTime should (be >= startTimeMillis * 1000 and be <= System.currentTimeMillis() * 1000) // micros protoSpan.getServiceName shouldEqual SERVICE_NAME protoSpan.getOperationName shouldEqual OPERATION_NAME protoSpan.getTagsCount shouldBe 2 From 92d7f5f69a8078f939b598280fc12d35677fb343 Mon Sep 17 00:00:00 2001 From: Ashish Aggarwal Date: Fri, 30 Nov 2018 20:29:31 +0530 Subject: [PATCH 3/4] close the client during deregistration --- .../exporter/trace/HaystackTraceExporter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java index 0908314..04e2a57 100644 --- a/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java +++ b/integrations/opencensus/src/main/java/com/www/expedia/opencensus/exporter/trace/HaystackTraceExporter.java @@ -42,6 +42,8 @@ public class HaystackTraceExporter { @Nullable static SpanExporter.Handler handler = null; + @Nullable + private static Client client = null; private HaystackTraceExporter() { } @@ -56,8 +58,8 @@ public static void createAndRegister(final DispatcherConfig dispatcherConfig, final Metrics metrics) { synchronized (monitor) { checkState(handler == null, "haystack exporter is already registered."); - final Client dispatcher = buildRemoteClient(dispatcherConfig, metrics); - final SpanExporter.Handler newHandler = new HaystackExporterHandler(dispatcher, serviceName, metrics); + client = buildRemoteClient(dispatcherConfig, metrics); + final SpanExporter.Handler newHandler = new HaystackExporterHandler(client, serviceName, metrics); HaystackTraceExporter.handler = newHandler; register(Tracing.getExportComponent().getSpanExporter(), newHandler); } @@ -92,6 +94,10 @@ public static void unregister() { synchronized (monitor) { checkState(handler != null, "haystack exporter is not registered."); Tracing.getExportComponent().getSpanExporter().unregisterHandler(REGISTER_NAME); + if (client != null) { + client.close(); + client = null; + } handler = null; } } From 457ed8e9c0935e11694a8676bc6d5158bfb316ec Mon Sep 17 00:00:00 2001 From: Ashish Aggarwal Date: Fri, 30 Nov 2018 20:36:10 +0530 Subject: [PATCH 4/4] adding README --- integrations/opencensus/README.md | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 integrations/opencensus/README.md diff --git a/integrations/opencensus/README.md b/integrations/opencensus/README.md new file mode 100644 index 0000000..f814569 --- /dev/null +++ b/integrations/opencensus/README.md @@ -0,0 +1,52 @@ +# Opencensus Trace Exporter for Haystack + +The OpenCensus Haystack Trace Exporter is a trace exporter that exports data to haystack. + +To know about haystack checkout [here](https://expediadotcom.github.io/haystack/) + +## Quickstart + +#### Add the dependencies to your project +For Maven add to your `pom.xml`: +```xml + + + io.opencensus + opencensus-api + 0.17.0 + + + io.opencensus + opencensus-exporter-trace-haystack + [0.2.1,) + + + io.opencensus + opencensus-impl + 0.17.0 + runtime + + +``` + +For Gradle add to your dependencies: +```groovy +compile 'io.opencensus:opencensus-api:0.17.0' +compile 'io.opencensus:opencensus-exporter-trace-haystack:0.2.1' +runtime 'io.opencensus:opencensus-impl:0.17.0' +``` + +#### Register the exporter + +This will export traces to the haystack: + +```java +public class MainClass { + public static void main(String[] args) throws Exception { + com.www.expedia.opencensus.exporter.trace.HaystackTraceExporter.createAndRegister(new GrpcAgentDispatcherConfig("haystack-agent", 35000), "my-service"); + // ... + } +} +``` + +You can look into the integration test [here](src/test/scala/com/www/expedia/opencensus/exporter/trace/HaystackExporterIntegrationSpec.scala) \ No newline at end of file