From cd566027c0ba41b9d4cb55e668a18dcc70ee11a3 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Mon, 2 Dec 2024 16:25:27 +0530 Subject: [PATCH] Skip attributes if the value is nan (#465) We had an incident where the user sent a nan value for a key and it was stored in the numbers map. when we serialzied the data the json.marshal threw error as it cann't marshan nan values. In this change I have skipped the attributes which contains nan values. the other way was to just remove it during the marshal process, but I went with this just to keep the things same even during marshal. --------- Co-authored-by: Srikanth Chekuri Co-authored-by: Vibhu Pandey --- .../clickhouse_exporter.go | 23 ++++++++---- .../clickhouse_exporter_v3.go | 36 ++++++++++++------- exporter/clickhousetracesexporter/writer.go | 26 ++++++-------- exporter/clickhousetracesexporter/writerV3.go | 30 ++++++---------- utils/utils.go | 6 ++++ 5 files changed, 66 insertions(+), 55 deletions(-) diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index d180ff10..73a42dca 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -300,9 +300,14 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo IsColumn: false, } if v.Type() == pcommon.ValueTypeDouble { - numberTagMap[k] = v.Double() - spanAttribute.NumberValue = v.Double() - spanAttribute.DataType = "float64" + if utils.IsValidFloat(v.Double()) { + numberTagMap[k] = v.Double() + spanAttribute.NumberValue = v.Double() + spanAttribute.DataType = "float64" + } else { + zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k)) + return true + } } else if v.Type() == pcommon.ValueTypeInt { numberTagMap[k] = float64(v.Int()) spanAttribute.NumberValue = float64(v.Int()) @@ -329,9 +334,15 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo } resourceAttrs[k] = v.AsString() if v.Type() == pcommon.ValueTypeDouble { - numberTagMap[k] = v.Double() - spanAttribute.NumberValue = v.Double() - spanAttribute.DataType = "float64" + if utils.IsValidFloat(v.Double()) { + numberTagMap[k] = v.Double() + spanAttribute.NumberValue = v.Double() + spanAttribute.DataType = "float64" + } else { + zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k)) + return true + } + } else if v.Type() == pcommon.ValueTypeInt { numberTagMap[k] = float64(v.Int()) spanAttribute.NumberValue = float64(v.Int()) diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter_v3.go b/exporter/clickhousetracesexporter/clickhouse_exporter_v3.go index e61faafa..00c6e621 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter_v3.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter_v3.go @@ -116,9 +116,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) { } if value.Type() == pcommon.ValueTypeDouble { - attrMap.NumberMap[key] = value.Double() - spanAttribute.NumberValue = value.Double() - spanAttribute.DataType = "float64" + if utils.IsValidFloat(value.Double()) { + attrMap.NumberMap[key] = value.Double() + spanAttribute.NumberValue = value.Double() + spanAttribute.DataType = "float64" + } else { + zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", key)) + return + } } else if value.Type() == pcommon.ValueTypeInt { attrMap.NumberMap[key] = float64(value.Int()) spanAttribute.NumberValue = float64(value.Int()) @@ -141,9 +146,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) { tSpanAttribute.StringValue = tempVal tSpanAttribute.DataType = "string" case float64: - attrMap.NumberMap[tempKey] = tempVal - tSpanAttribute.NumberValue = tempVal - tSpanAttribute.DataType = "float64" + if utils.IsValidFloat(tempVal) { + attrMap.NumberMap[tempKey] = tempVal + tSpanAttribute.NumberValue = tempVal + tSpanAttribute.DataType = "float64" + } else { + zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", tempKey)) + continue + } case bool: attrMap.BoolMap[tempKey] = tempVal tSpanAttribute.DataType = "bool" @@ -330,12 +340,14 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error { structuredSpan, err := newStructuredSpanV3(uint64(lBucketStart), fp, span, serviceName, rs.Resource(), s.config) if err != nil { - zap.S().Error("Error in creating newStructuredSpanV3: ", err) - return err + return fmt.Errorf("failed to create newStructuredSpanV3: %w", err) } batchOfSpans = append(batchOfSpans, structuredSpan) - serializedStructuredSpan, _ := json.Marshal(structuredSpan) + serializedStructuredSpan, err := json.Marshal(structuredSpan) + if err != nil { + return fmt.Errorf("failed to marshal structured span: %w", err) + } size += len(serializedStructuredSpan) count += 1 } @@ -348,15 +360,13 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error { err := s.Writer.WriteBatchOfSpansV3(ctx, batchOfSpans, metrics) if err != nil { - zap.S().Error("Error in writing spans to clickhouse: ", err) - return err + return fmt.Errorf("error in writing spans to clickhouse: %w", err) } // write the resources err = s.Writer.WriteResourcesV3(ctx, resourcesSeen) if err != nil { - zap.S().Error("Error in writing resources to clickhouse: ", err) - return err + return fmt.Errorf("error in writing resources to clickhouse: %w", err) } return nil diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index 3ed2e06e..2bda1389 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -112,8 +112,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er }() statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.indexTable), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for index table: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for index table: %w", err) } for _, span := range batchSpans { @@ -155,8 +154,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er span.SpanKind, ) if err != nil { - w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err)) - return err + return fmt.Errorf("could not append span to batch: %w", err) } } @@ -188,8 +186,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.spansTable), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for model table: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for model table: %w", err) } metrics := map[string]usage.Metric{} @@ -199,18 +196,17 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er usageMap.TagMap = map[string]string{} serialized, err = json.Marshal(span.TraceModel) if err != nil { - return err + return fmt.Errorf("could not marshal trace model: %w", err) } - serializedUsage, err := json.Marshal(usageMap) + serializedUsage, err := json.Marshal(usageMap) if err != nil { - return err + return fmt.Errorf("could not marshal usage map: %w", err) } err = statement.Append(time.Unix(0, int64(span.StartTimeUnixNano)), span.TraceId, string(serialized)) if err != nil { - w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err)) - return err + return fmt.Errorf("could not append span to batch: %w", err) } if !w.useNewSchema { @@ -227,7 +223,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er ) stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) if err != nil { - return err + return fmt.Errorf("could not send batch to model table: %w", err) } if !w.useNewSchema { @@ -244,16 +240,14 @@ func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error // inserts to the singoz_spans table if w.spansTable != "" { if err := w.writeModelBatch(ctx, batch); err != nil { - w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err)) - return err + return fmt.Errorf("could not write a batch of spans to model table: %w", err) } } // inserts to the signoz_index_v2 table if w.indexTable != "" { if err := w.writeIndexBatch(ctx, batch); err != nil { - w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err)) - return err + return fmt.Errorf("could not write a batch of spans to index table: %w", err) } } diff --git a/exporter/clickhousetracesexporter/writerV3.go b/exporter/clickhousetracesexporter/writerV3.go index 8d48e5ea..e1aefc8f 100644 --- a/exporter/clickhousetracesexporter/writerV3.go +++ b/exporter/clickhousetracesexporter/writerV3.go @@ -27,8 +27,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3 }() statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf(insertTraceSQLTemplateV2, w.traceDatabase, w.indexTableV3), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for index table: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for index table: %w", err) } for _, span := range batchSpans { err = statement.Append( @@ -67,8 +66,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3 span.IsRemote, ) if err != nil { - w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err)) - return err + return fmt.Errorf("could not append span to batch: %w", err) } } @@ -95,8 +93,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3 }() statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.errorTable), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for error table: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for error table: %w", err) } for _, span := range batchSpans { @@ -118,8 +115,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3 span.ResourcesString, ) if err != nil { - w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err)) - return err + return fmt.Errorf("could not append span to batch: %w", err) } } } @@ -151,13 +147,11 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) }() tagStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTable), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for span attributes table due to error: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for span attributes table due to error: %w", err) } tagKeyStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeKeyTable), driver.WithReleaseConnection()) if err != nil { - w.logger.Error("Could not prepare batch for span attributes key table due to error: ", zap.Error(err)) - return err + return fmt.Errorf("could not prepare batch for span attributes key table due to error: %w", err) } // create map of span attributes of key, tagType, dataType and isColumn to avoid duplicates in batch mapOfSpanAttributeKeys := make(map[string]struct{}) @@ -192,8 +186,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) spanAttribute.IsColumn, ) if err != nil { - w.logger.Error("Could not append span to tagKey Statement to batch due to error: ", zap.Error(err), zap.Any("span", span)) - return err + return fmt.Errorf("could not append span to tagKey Statement to batch due to error: %w", err) } } // add mapOfSpanAttributeKey to map @@ -231,8 +224,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) ) } if err != nil { - w.logger.Error("Could not append span to tag Statement batch due to error: ", zap.Error(err), zap.Any("span", span)) - return err + return fmt.Errorf("could not append span to tag Statement batch due to error: %w", err) } } } @@ -247,8 +239,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())), ) if err != nil { - w.logger.Error("Could not write to span attributes table due to error: ", zap.Error(err)) - return err + return fmt.Errorf("could not write to span attributes table due to error: %w", err) } tagKeyStart := time.Now() @@ -261,8 +252,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) writeLatencyMillis.M(int64(time.Since(tagKeyStart).Milliseconds())), ) if err != nil { - w.logger.Error("Could not write to span attributes key table due to error: ", zap.Error(err)) - return err + return fmt.Errorf("could not write to span attributes key table due to error: %w", err) } return err diff --git a/utils/utils.go b/utils/utils.go index e4026ff9..8cf107f8 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "encoding/hex" + "math" "go.opentelemetry.io/collector/pdata/pcommon" ) @@ -19,3 +20,8 @@ func SpanIDToHexOrEmptyString(spanID pcommon.SpanID) string { } return "" } + +func IsValidFloat(value float64) bool { + // Check for NaN, +/-Inf + return !math.IsNaN(value) && !math.IsInf(value, 0) +}