From 72fefeb5c6fe57d1d3d83ee030c0be7cd51d3259 Mon Sep 17 00:00:00 2001 From: RassK Date: Fri, 19 Apr 2024 14:44:18 +0300 Subject: [PATCH] Change AccountService from go to dotnet (auto) --- .../AccountingService.csproj | 29 +++++ src/accountingservice/AccountingService.sln | 25 ++++ src/accountingservice/Consumer.cs | 89 +++++++++++++ src/accountingservice/Dockerfile | 64 ++++----- src/accountingservice/Helpers.cs | 31 +++++ src/accountingservice/Log.cs | 13 ++ src/accountingservice/Program.cs | 21 +++ src/accountingservice/README.md | 18 +-- src/accountingservice/kafka/consumer.go | 77 ----------- .../kafka/trace_interceptor.go | 62 --------- src/accountingservice/main.go | 123 ------------------ src/accountingservice/tools.go | 17 --- 12 files changed, 243 insertions(+), 326 deletions(-) create mode 100644 src/accountingservice/AccountingService.csproj create mode 100644 src/accountingservice/AccountingService.sln create mode 100644 src/accountingservice/Consumer.cs create mode 100644 src/accountingservice/Helpers.cs create mode 100644 src/accountingservice/Log.cs create mode 100644 src/accountingservice/Program.cs delete mode 100644 src/accountingservice/kafka/consumer.go delete mode 100644 src/accountingservice/kafka/trace_interceptor.go delete mode 100644 src/accountingservice/main.go delete mode 100644 src/accountingservice/tools.go diff --git a/src/accountingservice/AccountingService.csproj b/src/accountingservice/AccountingService.csproj new file mode 100644 index 0000000000..c1f2bae683 --- /dev/null +++ b/src/accountingservice/AccountingService.csproj @@ -0,0 +1,29 @@ + + + + Exe + net8.0 + enable + enable + Linux + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + diff --git a/src/accountingservice/AccountingService.sln b/src/accountingservice/AccountingService.sln new file mode 100644 index 0000000000..e7ee43df26 --- /dev/null +++ b/src/accountingservice/AccountingService.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.9.34701.34 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AccountingService", "AccountingService.csproj", "{C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C66C35E2-DF04-4DCF-8F6A-87B6D6433FF6}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {6340CDDC-E917-4532-A056-5526E0A7BDDA} + EndGlobalSection +EndGlobal diff --git a/src/accountingservice/Consumer.cs b/src/accountingservice/Consumer.cs new file mode 100644 index 0000000000..5e6182e8aa --- /dev/null +++ b/src/accountingservice/Consumer.cs @@ -0,0 +1,89 @@ +using Confluent.Kafka; +using Confluent.Kafka.SyncOverAsync; +using Confluent.SchemaRegistry.Serdes; +using Microsoft.Extensions.Logging; +using Oteldemo; + +namespace AccountingService; + +internal class Consumer : IDisposable +{ + private const string TopicName = "orders"; + + private ILogger _logger; + private IConsumer _consumer; + private bool _isListening; + + public Consumer(ILogger logger) + { + _logger = logger; + + var servers = Environment.GetEnvironmentVariable("KAFKA_SERVICE_ADDR") + ?? throw new ArgumentNullException("KAFKA_SERVICE_ADDR"); + + _consumer = BuildConsumer(servers); + _consumer.Subscribe(TopicName); + + _logger.LogInformation($"Connecting to Kafka: {servers}"); + } + + public void StartListening() + { + _isListening = true; + + try + { + while (_isListening) + { + try + { + var consumeResult = _consumer.Consume(); + if (consumeResult.IsPartitionEOF) + { + continue; + } + + ProcessMessage(consumeResult.Message); + } + catch (ConsumeException e) + { + _logger.LogError(e, "Consume error: {0}", e.Error.Reason); + } + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("Closing consumer"); + + _consumer.Close(); + } + } + + private void ProcessMessage(Message message) + { + Log.LogOrderReceivedMessage(_logger, message.Value); + } + + private IConsumer BuildConsumer(string servers) + { + var conf = new ConsumerConfig + { + GroupId = $"accountingservice", + BootstrapServers = servers, + // https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example + AutoOffsetReset = AutoOffsetReset.Earliest, + CancellationDelayMaxMs = 10_000, + EnableAutoCommit = true + }; + + return new ConsumerBuilder(conf) + .SetValueDeserializer(new ProtobufDeserializer().AsSyncOverAsync()) + .Build(); + } + + public void Dispose() + { + _isListening = false; + _consumer?.Dispose(); + } +} diff --git a/src/accountingservice/Dockerfile b/src/accountingservice/Dockerfile index 8a1ab6556b..e12eb3b7ef 100644 --- a/src/accountingservice/Dockerfile +++ b/src/accountingservice/Dockerfile @@ -1,32 +1,32 @@ -# Copyright The OpenTelemetry Authors -# SPDX-License-Identifier: Apache-2.0 - - -FROM golang:1.22-alpine AS builder - -WORKDIR /usr/src/app - -RUN apk update \ - && apk add --no-cache make protobuf-dev - -RUN --mount=type=cache,target=/go/pkg/mod/ \ - --mount=type=bind,source=./src/accountingservice/go.sum,target=go.sum \ - --mount=type=bind,source=./src/accountingservice/go.mod,target=go.mod \ - --mount=type=bind,source=./src/accountingservice/tools.go,target=tools.go \ - go mod download \ - && go list -e -f '{{range .Imports}}{{.}} {{end}}' tools.go | CGO_ENABLED=0 xargs go install -mod=readonly - -RUN --mount=type=cache,target=/go/pkg/mod/ \ - --mount=type=cache,target=/root/.cache/go-build \ - --mount=type=bind,rw,source=./src/accountingservice,target=. \ - --mount=type=bind,rw,source=./pb,target=./pb \ - protoc -I ./pb ./pb/demo.proto --go_out=./ --go-grpc_out=./ \ - && go build -ldflags "-s -w" -o /go/bin/accountingservice/ ./ - -FROM alpine - -WORKDIR /usr/src/app/ - -COPY --from=builder /go/bin/accountingservice/ ./ - -ENTRYPOINT [ "./accountingservice" ] +FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base +USER app +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src +COPY ["AccountingService/AccountingService.csproj", "AccountingService/"] +RUN dotnet restore "./AccountingService/AccountingService.csproj" +COPY . . +WORKDIR "/src/AccountingService" +RUN dotnet build "./AccountingService.csproj" -c $BUILD_CONFIGURATION -o /app/build + +FROM build AS publish +ARG BUILD_CONFIGURATION=Release +RUN dotnet publish "./AccountingService.csproj" --use-current-runtime -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . + +USER root +RUN mkdir -p "/var/log/opentelemetry/dotnet" +RUN chown app "/var/log/opentelemetry/dotnet" +USER app + +ENV OTEL_TRACES_EXPORTER=otlp +ENV OTEL_METRICS_EXPORTER=none +ENV OTEL_LOGS_EXPORTER=otlp +ENV OTEL_LOG_LEVEL=debug + +ENTRYPOINT ["./instrument.sh", "dotnet", "AccountingService.dll"] \ No newline at end of file diff --git a/src/accountingservice/Helpers.cs b/src/accountingservice/Helpers.cs new file mode 100644 index 0000000000..18adc8e67e --- /dev/null +++ b/src/accountingservice/Helpers.cs @@ -0,0 +1,31 @@ +using System.Collections; + +namespace AccountingService +{ + internal static class Helpers + { + private static List RelevantPrefixes = ["DOTNET_", "CORECLR_", "OTEL_", "KAFKA_"]; + + public static IEnumerable FilterRelevant(this IDictionary envs) + { + foreach (DictionaryEntry env in envs) + { + foreach (var prefix in RelevantPrefixes) + { + if (env.Key.ToString()?.StartsWith(prefix, StringComparison.InvariantCultureIgnoreCase) ?? false) + { + yield return env; + } + } + } + } + + public static void OutputInOrder(this IEnumerable envs) + { + foreach (var env in envs.OrderBy(x => x.Key)) + { + Console.WriteLine(env); + } + } + } +} diff --git a/src/accountingservice/Log.cs b/src/accountingservice/Log.cs new file mode 100644 index 0000000000..923b5b730a --- /dev/null +++ b/src/accountingservice/Log.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.Logging; +using Oteldemo; + +namespace AccountingService +{ + internal static partial class Log + { + [LoggerMessage( + Level = LogLevel.Information, + Message = "Order details: {@OrderResult}.")] + public static partial void LogOrderReceivedMessage(ILogger logger, OrderResult orderResult); + } +} diff --git a/src/accountingservice/Program.cs b/src/accountingservice/Program.cs new file mode 100644 index 0000000000..ce7e34db1b --- /dev/null +++ b/src/accountingservice/Program.cs @@ -0,0 +1,21 @@ +using AccountingService; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +Console.WriteLine("Accounting service started"); + +Environment.GetEnvironmentVariables() + .FilterRelevant() + .OutputInOrder(); + +var host = Host.CreateDefaultBuilder(args) + .ConfigureServices(services => + { + services.AddSingleton(); + }) + .Build(); + +var consumer = host.Services.GetRequiredService(); +consumer.StartListening(); + +host.Run(); \ No newline at end of file diff --git a/src/accountingservice/README.md b/src/accountingservice/README.md index 6cd79fb6ba..c7138676e6 100644 --- a/src/accountingservice/README.md +++ b/src/accountingservice/README.md @@ -7,7 +7,7 @@ This service consumes new orders from a Kafka topic. To build the service binary, run: ```sh -go build -o /go/bin/accountingservice/ +dotnet build ``` ## Docker Build @@ -18,22 +18,10 @@ From the root directory, run: docker compose build accountingservice ``` -## Regenerate protos - -> [!NOTE] -> [`protoc`](https://grpc.io/docs/protoc-installation/) is required. - -To regenerate gRPC code run: - -```sh -go generate -``` - ## Bump dependencies -To bump all dependencies run: +To bump all dependencies run in Package manager: ```sh -go get -u -t ./... -go mod tidy +Update-Package -ProjectName AccountingService ``` diff --git a/src/accountingservice/kafka/consumer.go b/src/accountingservice/kafka/consumer.go deleted file mode 100644 index 2b1ab2b490..0000000000 --- a/src/accountingservice/kafka/consumer.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package kafka - -import ( - "context" - pb "github.com/open-telemetry/opentelemetry-demo/src/accountingservice/genproto/oteldemo" - - "github.com/IBM/sarama" - "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" -) - -var ( - Topic = "orders" - ProtocolVersion = sarama.V3_0_0_0 - GroupID = "accountingservice" -) - -func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logger) (sarama.ConsumerGroup, error) { - saramaConfig := sarama.NewConfig() - saramaConfig.Version = ProtocolVersion - // So we can know the partition and offset of messages. - saramaConfig.Producer.Return.Successes = true - saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor(GroupID)} - - consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig) - if err != nil { - return nil, err - } - - handler := groupHandler{ - log: log, - } - - err = consumerGroup.Consume(ctx, []string{Topic}, &handler) - if err != nil { - return nil, err - } - - return consumerGroup, nil -} - -type groupHandler struct { - log *logrus.Logger -} - -func (g *groupHandler) Setup(_ sarama.ConsumerGroupSession) error { - return nil -} - -func (g *groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { - return nil -} - -func (g *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for { - select { - case message := <-claim.Messages(): - orderResult := pb.OrderResult{} - err := proto.Unmarshal(message.Value, &orderResult) - if err != nil { - return err - } - - g.log.WithFields(logrus.Fields{ - "orderId": orderResult.OrderId, - "messageTimestamp": message.Timestamp, - "messageTopic": message.Topic, - }).Info("Message claimed") - session.MarkMessage(message, "") - - case <-session.Context().Done(): - return nil - } - } -} diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go deleted file mode 100644 index 91d26539a0..0000000000 --- a/src/accountingservice/kafka/trace_interceptor.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package kafka - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" - "go.opentelemetry.io/otel/trace" - - "github.com/IBM/sarama" -) - -type OTelInterceptor struct { - tracer trace.Tracer - fixedAttrs []attribute.KeyValue -} - -// NewOTelInterceptor processes span for intercepted messages and add some -// headers with the span data. -func NewOTelInterceptor(groupID string) *OTelInterceptor { - oi := OTelInterceptor{} - oi.tracer = otel.Tracer("accountingservice") - - oi.fixedAttrs = []attribute.KeyValue{ - semconv.MessagingSystemKafka, - semconv.MessagingOperationReceive, - semconv.MessagingKafkaConsumerGroup(groupID), - semconv.NetworkTransportTCP, - } - return &oi -} - -func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) { - headers := propagation.MapCarrier{} - - for _, recordHeader := range msg.Headers { - headers[string(recordHeader.Key)] = string(recordHeader.Value) - } - - propagator := otel.GetTextMapPropagator() - ctx := propagator.Extract(context.Background(), headers) - - _, span := oi.tracer.Start( - ctx, - fmt.Sprintf("%s receive", msg.Topic), - trace.WithSpanKind(trace.SpanKindConsumer), - trace.WithAttributes(oi.fixedAttrs...), - trace.WithAttributes( - semconv.MessagingDestinationName(msg.Topic), - semconv.MessagingKafkaMessageOffset(int(msg.Offset)), - semconv.MessagingMessageBodySize(len(msg.Value)), - semconv.MessagingOperationReceive, - semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), - ), - ) - defer span.End() -} diff --git a/src/accountingservice/main.go b/src/accountingservice/main.go deleted file mode 100644 index e2eb7a3dd0..0000000000 --- a/src/accountingservice/main.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package main - -//go:generate go install google.golang.org/protobuf/cmd/protoc-gen-go -//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc -//go:generate protoc --go_out=./ --go-grpc_out=./ --proto_path=../../pb ../../pb/demo.proto - -import ( - "context" - "fmt" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" - - "github.com/IBM/sarama" - "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/propagation" - sdkresource "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - - "github.com/open-telemetry/opentelemetry-demo/src/accountingservice/kafka" -) - -var log *logrus.Logger -var resource *sdkresource.Resource -var initResourcesOnce sync.Once - -func init() { - log = logrus.New() - log.Level = logrus.DebugLevel - log.Formatter = &logrus.JSONFormatter{ - FieldMap: logrus.FieldMap{ - logrus.FieldKeyTime: "timestamp", - logrus.FieldKeyLevel: "severity", - logrus.FieldKeyMsg: "message", - }, - TimestampFormat: time.RFC3339Nano, - } - log.Out = os.Stdout -} - -func initResource() *sdkresource.Resource { - initResourcesOnce.Do(func() { - extraResources, _ := sdkresource.New( - context.Background(), - sdkresource.WithOS(), - sdkresource.WithProcess(), - sdkresource.WithContainer(), - sdkresource.WithHost(), - ) - resource, _ = sdkresource.Merge( - sdkresource.Default(), - extraResources, - ) - }) - return resource -} - -func initTracerProvider() (*sdktrace.TracerProvider, error) { - ctx := context.Background() - - exporter, err := otlptracegrpc.New(ctx) - if err != nil { - return nil, err - } - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exporter), - sdktrace.WithResource(initResource()), - ) - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) - return tp, nil -} - -func main() { - tp, err := initTracerProvider() - if err != nil { - log.Fatal(err) - } - defer func() { - if err := tp.Shutdown(context.Background()); err != nil { - log.Printf("Error shutting down tracer provider: %v", err) - } - log.Println("Shutdown trace provider") - }() - - var brokers string - mustMapEnv(&brokers, "KAFKA_SERVICE_ADDR") - - brokerList := strings.Split(brokers, ",") - log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", ")) - - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL) - defer cancel() - var consumerGroup sarama.ConsumerGroup - if consumerGroup, err = kafka.StartConsumerGroup(ctx, brokerList, log); err != nil { - log.Fatal(err) - } - defer func() { - if err := consumerGroup.Close(); err != nil { - log.Printf("Error closing consumer group: %v", err) - } - log.Println("Closed consumer group") - }() - - <-ctx.Done() - - log.Println("Accounting service exited") -} - -func mustMapEnv(target *string, envKey string) { - v := os.Getenv(envKey) - if v == "" { - panic(fmt.Sprintf("environment variable %q not set", envKey)) - } - *target = v -} diff --git a/src/accountingservice/tools.go b/src/accountingservice/tools.go deleted file mode 100644 index 3557ee5358..0000000000 --- a/src/accountingservice/tools.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build tools -// +build tools - -package tools - -// This file follows the recommendation at -// https://go.dev/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module -// on how to pin tooling dependencies to a go.mod file. -// This ensures that all systems use the same version of tools in addition to regular dependencies. - -import ( - _ "google.golang.org/grpc/cmd/protoc-gen-go-grpc" - _ "google.golang.org/protobuf/cmd/protoc-gen-go" -)