Skip to content

Commit

Permalink
Skip attributes if the value is nan (#465)
Browse files Browse the repository at this point in the history
We had an incident where the user sent a nan value for a key and it was
stored in the numbers map. when we serialzied the data the json.marshal
threw error as it cann't marshan nan values.

In this change I have skipped the attributes which contains nan values.

the other way was to just remove it during the marshal process, but I
went with this just to keep the things same even during marshal.

---------

Co-authored-by: Srikanth Chekuri <[email protected]>
Co-authored-by: Vibhu Pandey <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent bbbd3ba commit cd56602
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 55 deletions.
23 changes: 17 additions & 6 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,14 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo
IsColumn: false,
}
if v.Type() == pcommon.ValueTypeDouble {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(v.Double()) {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k))
return true
}
} else if v.Type() == pcommon.ValueTypeInt {
numberTagMap[k] = float64(v.Int())
spanAttribute.NumberValue = float64(v.Int())
Expand All @@ -329,9 +334,15 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo
}
resourceAttrs[k] = v.AsString()
if v.Type() == pcommon.ValueTypeDouble {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(v.Double()) {
numberTagMap[k] = v.Double()
spanAttribute.NumberValue = v.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", k))
return true
}

} else if v.Type() == pcommon.ValueTypeInt {
numberTagMap[k] = float64(v.Int())
spanAttribute.NumberValue = float64(v.Int())
Expand Down
36 changes: 23 additions & 13 deletions exporter/clickhousetracesexporter/clickhouse_exporter_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) {
}

if value.Type() == pcommon.ValueTypeDouble {
attrMap.NumberMap[key] = value.Double()
spanAttribute.NumberValue = value.Double()
spanAttribute.DataType = "float64"
if utils.IsValidFloat(value.Double()) {
attrMap.NumberMap[key] = value.Double()
spanAttribute.NumberValue = value.Double()
spanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", key))
return
}
} else if value.Type() == pcommon.ValueTypeInt {
attrMap.NumberMap[key] = float64(value.Int())
spanAttribute.NumberValue = float64(value.Int())
Expand All @@ -141,9 +146,14 @@ func (attrMap *attributesData) add(key string, value pcommon.Value) {
tSpanAttribute.StringValue = tempVal
tSpanAttribute.DataType = "string"
case float64:
attrMap.NumberMap[tempKey] = tempVal
tSpanAttribute.NumberValue = tempVal
tSpanAttribute.DataType = "float64"
if utils.IsValidFloat(tempVal) {
attrMap.NumberMap[tempKey] = tempVal
tSpanAttribute.NumberValue = tempVal
tSpanAttribute.DataType = "float64"
} else {
zap.S().Warn("NaN value in tag map, skipping key: ", zap.String("key", tempKey))
continue
}
case bool:
attrMap.BoolMap[tempKey] = tempVal
tSpanAttribute.DataType = "bool"
Expand Down Expand Up @@ -330,12 +340,14 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error {

structuredSpan, err := newStructuredSpanV3(uint64(lBucketStart), fp, span, serviceName, rs.Resource(), s.config)
if err != nil {
zap.S().Error("Error in creating newStructuredSpanV3: ", err)
return err
return fmt.Errorf("failed to create newStructuredSpanV3: %w", err)
}
batchOfSpans = append(batchOfSpans, structuredSpan)

serializedStructuredSpan, _ := json.Marshal(structuredSpan)
serializedStructuredSpan, err := json.Marshal(structuredSpan)
if err != nil {
return fmt.Errorf("failed to marshal structured span: %w", err)
}
size += len(serializedStructuredSpan)
count += 1
}
Expand All @@ -348,15 +360,13 @@ func (s *storage) pushTraceDataV3(ctx context.Context, td ptrace.Traces) error {

err := s.Writer.WriteBatchOfSpansV3(ctx, batchOfSpans, metrics)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
return fmt.Errorf("error in writing spans to clickhouse: %w", err)
}

// write the resources
err = s.Writer.WriteResourcesV3(ctx, resourcesSeen)
if err != nil {
zap.S().Error("Error in writing resources to clickhouse: ", err)
return err
return fmt.Errorf("error in writing resources to clickhouse: %w", err)
}

return nil
Expand Down
26 changes: 10 additions & 16 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.indexTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for index table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for index table: %w", err)
}

for _, span := range batchSpans {
Expand Down Expand Up @@ -155,8 +154,7 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
span.SpanKind,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}

Expand Down Expand Up @@ -188,8 +186,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.spansTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for model table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for model table: %w", err)
}

metrics := map[string]usage.Metric{}
Expand All @@ -199,18 +196,17 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er
usageMap.TagMap = map[string]string{}
serialized, err = json.Marshal(span.TraceModel)
if err != nil {
return err
return fmt.Errorf("could not marshal trace model: %w", err)
}
serializedUsage, err := json.Marshal(usageMap)

serializedUsage, err := json.Marshal(usageMap)
if err != nil {
return err
return fmt.Errorf("could not marshal usage map: %w", err)
}

err = statement.Append(time.Unix(0, int64(span.StartTimeUnixNano)), span.TraceId, string(serialized))
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Object("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}

if !w.useNewSchema {
Expand All @@ -227,7 +223,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
if err != nil {
return err
return fmt.Errorf("could not send batch to model table: %w", err)
}

if !w.useNewSchema {
Expand All @@ -244,16 +240,14 @@ func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error
// inserts to the singoz_spans table
if w.spansTable != "" {
if err := w.writeModelBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
return fmt.Errorf("could not write a batch of spans to model table: %w", err)
}
}

// inserts to the signoz_index_v2 table
if w.indexTable != "" {
if err := w.writeIndexBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
return fmt.Errorf("could not write a batch of spans to index table: %w", err)
}
}

Expand Down
30 changes: 10 additions & 20 deletions exporter/clickhousetracesexporter/writerV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf(insertTraceSQLTemplateV2, w.traceDatabase, w.indexTableV3), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for index table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for index table: %w", err)
}
for _, span := range batchSpans {
err = statement.Append(
Expand Down Expand Up @@ -67,8 +66,7 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3
span.IsRemote,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}

Expand All @@ -95,8 +93,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3
}()
statement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.errorTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for error table: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for error table: %w", err)
}

for _, span := range batchSpans {
Expand All @@ -118,8 +115,7 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3
span.ResourcesString,
)
if err != nil {
w.logger.Error("Could not append span to batch: ", zap.Any("span", span), zap.Error(err))
return err
return fmt.Errorf("could not append span to batch: %w", err)
}
}
}
Expand Down Expand Up @@ -151,13 +147,11 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
}()
tagStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for span attributes table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for span attributes table due to error: %w", err)
}
tagKeyStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeKeyTable), driver.WithReleaseConnection())
if err != nil {
w.logger.Error("Could not prepare batch for span attributes key table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not prepare batch for span attributes key table due to error: %w", err)
}
// create map of span attributes of key, tagType, dataType and isColumn to avoid duplicates in batch
mapOfSpanAttributeKeys := make(map[string]struct{})
Expand Down Expand Up @@ -192,8 +186,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
spanAttribute.IsColumn,
)
if err != nil {
w.logger.Error("Could not append span to tagKey Statement to batch due to error: ", zap.Error(err), zap.Any("span", span))
return err
return fmt.Errorf("could not append span to tagKey Statement to batch due to error: %w", err)
}
}
// add mapOfSpanAttributeKey to map
Expand Down Expand Up @@ -231,8 +224,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
)
}
if err != nil {
w.logger.Error("Could not append span to tag Statement batch due to error: ", zap.Error(err), zap.Any("span", span))
return err
return fmt.Errorf("could not append span to tag Statement batch due to error: %w", err)
}
}
}
Expand All @@ -247,8 +239,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())),
)
if err != nil {
w.logger.Error("Could not write to span attributes table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not write to span attributes table due to error: %w", err)
}

tagKeyStart := time.Now()
Expand All @@ -261,8 +252,7 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
writeLatencyMillis.M(int64(time.Since(tagKeyStart).Milliseconds())),
)
if err != nil {
w.logger.Error("Could not write to span attributes key table due to error: ", zap.Error(err))
return err
return fmt.Errorf("could not write to span attributes key table due to error: %w", err)
}

return err
Expand Down
6 changes: 6 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"encoding/hex"
"math"

"go.opentelemetry.io/collector/pdata/pcommon"
)
Expand All @@ -19,3 +20,8 @@ func SpanIDToHexOrEmptyString(spanID pcommon.SpanID) string {
}
return ""
}

func IsValidFloat(value float64) bool {
// Check for NaN, +/-Inf
return !math.IsNaN(value) && !math.IsInf(value, 0)
}

0 comments on commit cd56602

Please sign in to comment.