From f72ef07e008b03eb42363ed77296d9351bb34d81 Mon Sep 17 00:00:00 2001 From: Luke Gao Date: Mon, 16 Dec 2024 19:26:22 +0800 Subject: [PATCH] [receiver/azureeventhub] support parsing more time format (#36762) #### Description Support parsing more time format by configuration instead of only iso8601. #### Link to tracking issue Fixes #36650 #### Testing - Added unit test for time parsing function - Validated locally to send data with time format in the issue and received successfully #### Documentation Add some new configurations, including `time_format` and `time_offset`. ```yaml receivers azureeventhub: connection: "xxxxx" format: "azure" # optional time_format: # All supported time format. Default is empty string array, which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time. logs: ["01/02/2006 15:04:05","2006-01-02 15:04:05","2006-01-02T15:04:05Z07:00"] metrics: [""] traces: [""] # optional time_offset: # The offset hours to parsed time. Mainly for cases when there's no time-zone info in time string. default is 0. logs: -8 metrics: +8 traces: -8 ``` --------- Co-authored-by: Antoine Toulme --- .chloggen/36650.yaml | 27 ++++++++++++++ pkg/translator/azure/resourcelogs_to_logs.go | 32 ++++++++++------ .../azure/resourcelogs_to_logs_test.go | 19 +++++++++- pkg/translator/azure/resources_to_traces.go | 7 ++-- .../azurelogs/resourcelogs_to_logs.go | 33 +++++++++++------ .../azurelogs/resourcelogs_to_logs_test.go | 17 +++++++++ receiver/azureeventhubreceiver/README.md | 11 ++++++ .../azureresourcelogs_unmarshaler.go | 12 +++--- .../azureresourcemetrics_unmarshaler.go | 37 ++++++++++++------- .../azureresourcetraces_unmarshaler.go | 7 ++-- receiver/azureeventhubreceiver/config.go | 7 ++++ receiver/azureeventhubreceiver/factory.go | 6 +-- 12 files changed, 163 insertions(+), 52 deletions(-) create mode 100644 .chloggen/36650.yaml diff --git a/.chloggen/36650.yaml b/.chloggen/36650.yaml new file mode 100644 index 000000000000..7fdb5eab0c99 --- /dev/null +++ b/.chloggen/36650.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azureeventhubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: support providing one or more time formats for timestamp parsing + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36650] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/translator/azure/resourcelogs_to_logs.go b/pkg/translator/azure/resourcelogs_to_logs.go index 57fd4de3025d..5b608570612a 100644 --- a/pkg/translator/azure/resourcelogs_to_logs.go +++ b/pkg/translator/azure/resourcelogs_to_logs.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "strconv" + "time" jsoniter "github.com/json-iterator/go" "github.com/relvacode/iso8601" @@ -71,8 +72,9 @@ type azureLogRecord struct { var _ plog.Unmarshaler = (*ResourceLogsUnmarshaler)(nil) type ResourceLogsUnmarshaler struct { - Version string - Logger *zap.Logger + Version string + Logger *zap.Logger + TimeFormats []string } func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { @@ -105,7 +107,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { for i := 0; i < len(logs); i++ { log := logs[i] - nanos, err := getTimestamp(log) + nanos, err := getTimestamp(log, r.TimeFormats...) if err != nil { r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time)) continue @@ -129,11 +131,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { return l, nil } -func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) { +func getTimestamp(record azureLogRecord, formats ...string) (pcommon.Timestamp, error) { if record.Time != "" { - return asTimestamp(record.Time) + return asTimestamp(record.Time, formats...) } else if record.Timestamp != "" { - return asTimestamp(record.Timestamp) + return asTimestamp(record.Timestamp, formats...) } return 0, errMissingTimestamp @@ -142,13 +144,21 @@ func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) { // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. -func asTimestamp(s string) (pcommon.Timestamp, error) { - t, err := iso8601.ParseString(s) - if err != nil { - return 0, err +func asTimestamp(s string, formats ...string) (pcommon.Timestamp, error) { + var err error + var t time.Time + // Try parsing with provided formats first + for _, format := range formats { + if t, err = time.Parse(format, s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } } - return pcommon.Timestamp(t.UnixNano()), nil + // Fallback to ISO 8601 parsing if no format matches + if t, err = iso8601.ParseString(s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } + return 0, err } // asSeverity converts the Azure log level to equivalent diff --git a/pkg/translator/azure/resourcelogs_to_logs_test.go b/pkg/translator/azure/resourcelogs_to_logs_test.go index 6a02dd187793..83a9540bce2a 100644 --- a/pkg/translator/azure/resourcelogs_to_logs_test.go +++ b/pkg/translator/azure/resourcelogs_to_logs_test.go @@ -217,8 +217,25 @@ func TestAsTimestamp(t *testing.T) { assert.NoError(t, err) assert.Less(t, pcommon.Timestamp(0), nanos) + timestamp = "11/20/2024 13:57:18" + nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05") + assert.NoError(t, err) + assert.Less(t, pcommon.Timestamp(0), nanos) + + // time_format set, but fallback to iso8601 and succeeded to parse + timestamp = "2022-11-11T04:48:27.6767145Z" + nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05") + assert.NoError(t, err) + assert.Less(t, pcommon.Timestamp(0), nanos) + + // time_format set, but all failed to parse + timestamp = "11/20/2024 13:57:18" + nanos, err = asTimestamp(timestamp, "2006-01-02 15:04:05") + assert.Error(t, err) + assert.Equal(t, pcommon.Timestamp(0), nanos) + timestamp = "invalid-time" - nanos, err = asTimestamp(timestamp) + nanos, err = asTimestamp(timestamp, nil...) assert.Error(t, err) assert.Equal(t, pcommon.Timestamp(0), nanos) } diff --git a/pkg/translator/azure/resources_to_traces.go b/pkg/translator/azure/resources_to_traces.go index 2e9f214f389d..2f92d013248c 100644 --- a/pkg/translator/azure/resources_to_traces.go +++ b/pkg/translator/azure/resources_to_traces.go @@ -62,8 +62,9 @@ type azureTracesRecord struct { var _ ptrace.Unmarshaler = (*TracesUnmarshaler)(nil) type TracesUnmarshaler struct { - Version string - Logger *zap.Logger + Version string + Logger *zap.Logger + TimeFormats []string } func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) { @@ -95,7 +96,7 @@ func (r TracesUnmarshaler) UnmarshalTraces(buf []byte) (ptrace.Traces, error) { resource.Attributes().PutStr("service.name", azureTrace.AppRoleName) - nanos, err := asTimestamp(azureTrace.Time) + nanos, err := asTimestamp(azureTrace.Time, r.TimeFormats...) if err != nil { r.Logger.Warn("Invalid Timestamp", zap.String("time", azureTrace.Time)) continue diff --git a/pkg/translator/azurelogs/resourcelogs_to_logs.go b/pkg/translator/azurelogs/resourcelogs_to_logs.go index 15c0ef59e531..437df4096163 100644 --- a/pkg/translator/azurelogs/resourcelogs_to_logs.go +++ b/pkg/translator/azurelogs/resourcelogs_to_logs.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "strconv" + "time" jsoniter "github.com/json-iterator/go" "github.com/relvacode/iso8601" @@ -44,7 +45,6 @@ const ( var errMissingTimestamp = errors.New("missing timestamp") -// azureRecords represents an array of Azure log records // as exported via an Azure Event Hub type azureRecords struct { Records []azureLogRecord `json:"records"` @@ -76,8 +76,9 @@ type azureLogRecord struct { var _ plog.Unmarshaler = (*ResourceLogsUnmarshaler)(nil) type ResourceLogsUnmarshaler struct { - Version string - Logger *zap.Logger + Version string + Logger *zap.Logger + TimeFormats []string } func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { @@ -109,7 +110,7 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { for i := 0; i < len(logs); i++ { log := logs[i] - nanos, err := getTimestamp(log) + nanos, err := getTimestamp(log, r.TimeFormats...) if err != nil { r.Logger.Warn("Unable to convert timestamp from log", zap.String("timestamp", log.Time)) continue @@ -137,11 +138,11 @@ func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { return l, nil } -func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) { +func getTimestamp(record azureLogRecord, formats ...string) (pcommon.Timestamp, error) { if record.Time != "" { - return asTimestamp(record.Time) + return asTimestamp(record.Time, formats...) } else if record.Timestamp != "" { - return asTimestamp(record.Timestamp) + return asTimestamp(record.Timestamp, formats...) } return 0, errMissingTimestamp @@ -150,13 +151,21 @@ func getTimestamp(record azureLogRecord) (pcommon.Timestamp, error) { // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. -func asTimestamp(s string) (pcommon.Timestamp, error) { - t, err := iso8601.ParseString(s) - if err != nil { - return 0, err +func asTimestamp(s string, formats ...string) (pcommon.Timestamp, error) { + var err error + var t time.Time + // Try parsing with provided formats first + for _, format := range formats { + if t, err = time.Parse(format, s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } } - return pcommon.Timestamp(t.UnixNano()), nil + // Fallback to ISO 8601 parsing if no format matches + if t, err = iso8601.ParseString(s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } + return 0, err } // asSeverity converts the Azure log level to equivalent diff --git a/pkg/translator/azurelogs/resourcelogs_to_logs_test.go b/pkg/translator/azurelogs/resourcelogs_to_logs_test.go index 4f4f17dbc0a2..9e0fbcf05f28 100644 --- a/pkg/translator/azurelogs/resourcelogs_to_logs_test.go +++ b/pkg/translator/azurelogs/resourcelogs_to_logs_test.go @@ -243,6 +243,23 @@ func TestAsTimestamp(t *testing.T) { assert.NoError(t, err) assert.Less(t, pcommon.Timestamp(0), nanos) + timestamp = "11/20/2024 13:57:18" + nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05") + assert.NoError(t, err) + assert.Less(t, pcommon.Timestamp(0), nanos) + + // time_format set, but fallback to iso8601 and succeeded to parse + timestamp = "2022-11-11T04:48:27.6767145Z" + nanos, err = asTimestamp(timestamp, "01/02/2006 15:04:05") + assert.NoError(t, err) + assert.Less(t, pcommon.Timestamp(0), nanos) + + // time_format set, but all failed to parse + timestamp = "11/20/2024 13:57:18" + nanos, err = asTimestamp(timestamp, "2006-01-02 15:04:05") + assert.Error(t, err) + assert.Equal(t, pcommon.Timestamp(0), nanos) + timestamp = "invalid-time" nanos, err = asTimestamp(timestamp) assert.Error(t, err) diff --git a/receiver/azureeventhubreceiver/README.md b/receiver/azureeventhubreceiver/README.md index 0d4873d5b6f7..bad2b947a5d8 100644 --- a/receiver/azureeventhubreceiver/README.md +++ b/receiver/azureeventhubreceiver/README.md @@ -50,6 +50,12 @@ attribute names are copied without any changes. Default: `false` (semantic conventions are not applied) +### time_formats (optional) + +All supported time format for logs, metrics and traces. Default is `nil` (unset), which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time. If all failed, it will use iso8601 format to parse. + +Default: `nil` + ### Example Configuration ```yaml @@ -60,6 +66,11 @@ receivers: group: bar offset: "1234-5566" format: "azure" + # optional + time_formats: + # All supported time format. Default is empty string array, which means using the current iso8601 parser. The format is based on https://pkg.go.dev/time#Layout. If no time-zone info, will use UTC time. + logs: ["01/02/2006 15:04:05","2006-01-02 15:04:05","2006-01-02T15:04:05Z07:00"] + metrics: ["01/02/2006 15:04:05"] ``` This component can persist its state using the [storage extension]. diff --git a/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go index 3c3c6a04387d..d7845de6201e 100644 --- a/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcelogs_unmarshaler.go @@ -21,19 +21,21 @@ type AzureResourceLogsEventUnmarshaler struct { unmarshaler logsUnmarshaler } -func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, applySemanticConventions bool) eventLogsUnmarshaler { +func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, applySemanticConventions bool, timeFormat []string) eventLogsUnmarshaler { if applySemanticConventions { return AzureResourceLogsEventUnmarshaler{ unmarshaler: &azurelogs.ResourceLogsUnmarshaler{ - Version: buildInfo.Version, - Logger: logger, + Version: buildInfo.Version, + Logger: logger, + TimeFormats: timeFormat, }, } } return AzureResourceLogsEventUnmarshaler{ unmarshaler: &azure.ResourceLogsUnmarshaler{ - Version: buildInfo.Version, - Logger: logger, + Version: buildInfo.Version, + Logger: logger, + TimeFormats: timeFormat, }, } } diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index efef7e72a60a..cce62e907fbf 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -21,13 +21,12 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata" ) -const ( - azureResourceID = "azure.resource.id" -) +const azureResourceID = "azure.resource.id" type azureResourceMetricsUnmarshaler struct { - buildInfo component.BuildInfo - logger *zap.Logger + buildInfo component.BuildInfo + logger *zap.Logger + TimeFormat []string } // azureMetricRecords represents an array of Azure metric records @@ -50,10 +49,11 @@ type azureMetricRecord struct { Average float64 `json:"average"` } -func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventMetricsUnmarshaler { +func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventMetricsUnmarshaler { return azureResourceMetricsUnmarshaler{ - buildInfo: buildInfo, - logger: logger, + buildInfo: buildInfo, + logger: logger, + TimeFormat: timeFormat, } } @@ -90,7 +90,7 @@ func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event) resourceID = azureMetric.ResourceID } - nanos, err := asTimestamp(azureMetric.Time) + nanos, err := asTimestamp(azureMetric.Time, r.TimeFormat) if err != nil { r.logger.Warn("Invalid Timestamp", zap.String("time", azureMetric.Time)) continue @@ -152,10 +152,19 @@ func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *eventhub.Event) // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. -func asTimestamp(s string) (pcommon.Timestamp, error) { - t, err := iso8601.ParseString(s) - if err != nil { - return 0, err +func asTimestamp(s string, formats []string) (pcommon.Timestamp, error) { + var err error + var t time.Time + // Try parsing with provided formats first + for _, format := range formats { + if t, err = time.Parse(format, s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } + } + + // Fallback to ISO 8601 parsing if no format matches + if t, err = iso8601.ParseString(s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil } - return pcommon.Timestamp(t.UnixNano()), nil + return 0, err } diff --git a/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go index 40fed32a62ff..6ff4213d8874 100755 --- a/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcetraces_unmarshaler.go @@ -16,11 +16,12 @@ type azureTracesEventUnmarshaler struct { unmarshaler *azure.TracesUnmarshaler } -func newAzureTracesUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventTracesUnmarshaler { +func newAzureTracesUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventTracesUnmarshaler { return azureTracesEventUnmarshaler{ unmarshaler: &azure.TracesUnmarshaler{ - Version: buildInfo.Version, - Logger: logger, + Version: buildInfo.Version, + Logger: logger, + TimeFormats: timeFormat, }, } } diff --git a/receiver/azureeventhubreceiver/config.go b/receiver/azureeventhubreceiver/config.go index e5e01c98cf87..b7db1541a38d 100644 --- a/receiver/azureeventhubreceiver/config.go +++ b/receiver/azureeventhubreceiver/config.go @@ -32,6 +32,13 @@ type Config struct { Format string `mapstructure:"format"` ConsumerGroup string `mapstructure:"group"` ApplySemanticConventions bool `mapstructure:"apply_semantic_conventions"` + TimeFormats TimeFormat `mapstructure:"time_formats"` +} + +type TimeFormat struct { + Logs []string `mapstructure:"logs"` + Metrics []string `mapstructure:"metrics"` + Traces []string `mapstructure:"traces"` } func isValidFormat(format string) bool { diff --git a/receiver/azureeventhubreceiver/factory.go b/receiver/azureeventhubreceiver/factory.go index 2286cdb952e1..cc0ee1098f1f 100644 --- a/receiver/azureeventhubreceiver/factory.go +++ b/receiver/azureeventhubreceiver/factory.go @@ -110,21 +110,21 @@ func (f *eventhubReceiverFactory) getReceiver( if logFormat(receiverConfig.Format) == rawLogFormat { logsUnmarshaler = newRawLogsUnmarshaler(settings.Logger) } else { - logsUnmarshaler = newAzureResourceLogsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.ApplySemanticConventions) + logsUnmarshaler = newAzureResourceLogsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.ApplySemanticConventions, receiverConfig.TimeFormats.Logs) } case pipeline.SignalMetrics: if logFormat(receiverConfig.Format) == rawLogFormat { metricsUnmarshaler = nil err = errors.New("raw format not supported for Metrics") } else { - metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger) + metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.TimeFormats.Metrics) } case pipeline.SignalTraces: if logFormat(receiverConfig.Format) == rawLogFormat { tracesUnmarshaler = nil err = errors.New("raw format not supported for Traces") } else { - tracesUnmarshaler = newAzureTracesUnmarshaler(settings.BuildInfo, settings.Logger) + tracesUnmarshaler = newAzureTracesUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.TimeFormats.Traces) } }