diff --git a/receiver/signozkafkareceiver/kafka_receiver.go b/receiver/signozkafkareceiver/kafka_receiver.go index 5184e825..f93da4b9 100644 --- a/receiver/signozkafkareceiver/kafka_receiver.go +++ b/receiver/signozkafkareceiver/kafka_receiver.go @@ -497,6 +497,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) if err != nil { + c.logger.Error("kafka receiver: failed to export traces", zap.Error(err), zap.Int32("partition", claim.Partition()), zap.String("topic", claim.Topic())) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } @@ -572,6 +573,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err) if err != nil { + c.logger.Error("kafka receiver: failed to export metrics", zap.Error(err), zap.Int32("partition", claim.Partition()), zap.String("topic", claim.Topic())) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") } @@ -652,6 +654,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess // TODO c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) if err != nil { + c.logger.Error("kafka receiver: failed to export logs", zap.Error(err), zap.Int32("partition", claim.Partition()), zap.String("topic", claim.Topic())) if c.messageMarking.After && c.messageMarking.OnError { session.MarkMessage(message, "") }