From 02f32ffa50daa0c954685f21d783eeeaa7a5e178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Fri, 6 Sep 2024 14:34:27 +0200 Subject: [PATCH 1/2] Await early spans export + refactor handler wrapping --- .../src/instrumentation.ts | 409 ++++++++---------- 1 file changed, 173 insertions(+), 236 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts index c81257ac06..012915677f 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts @@ -37,10 +37,7 @@ import { AWSXRAY_TRACE_ID_HEADER, AWSXRayPropagator, } from '@opentelemetry/propagator-aws-xray'; -import { - SemanticAttributes, - SemanticResourceAttributes, -} from '@opentelemetry/semantic-conventions'; +import {SEMATTRS_FAAS_EXECUTION} from '@opentelemetry/semantic-conventions'; import { APIGatewayProxyEventHeaders, @@ -49,10 +46,9 @@ import { Handler, } from 'aws-lambda'; -import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types'; +import { AwsLambdaInstrumentationConfig } from './types'; import { VERSION } from './version'; import { env } from 'process'; -import { strict } from 'assert'; import { finalizeSpan, getEventTrigger, @@ -80,8 +76,17 @@ const TRACE_ID_ATTRIBUTE = 'cx.internal.trace.id'; const SPAN_ID_ATTRIBUTE = 'cx.internal.span.id'; const SPAN_ROLE_ATTRIBUTE = 'cx.internal.span.role'; +type InstrumentationContext = { + triggerOrigin: TriggerOrigin | undefined; + triggerSpan: Span | undefined; + invocationSpan: Span; + invocationParentContext: OtelContext; +} + export class AwsLambdaInstrumentation extends InstrumentationBase { - private triggerOrigin: TriggerOrigin | undefined; + public isProbablyCallbackBased: boolean = false; + // private triggerOrigin: TriggerOrigin | undefined; + private _traceForceFlusher?: () => Promise; private _metricForceFlusher?: () => Promise; @@ -109,8 +114,8 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } public getPatchHandler(original: Handler): Handler { - diag.debug('patch handler function'); - const plugin = this; + diag.debug('patching handler function'); + const self = this; return function patchedHandler( this: never, @@ -120,177 +125,108 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { context: Context, callback: Callback ) { - const config = plugin._config; - const upstreamContext = AwsLambdaInstrumentation._determineParent( - event, - context, - config.disableAwsContextPropagation === true, - config.eventContextExtractor || - AwsLambdaInstrumentation._defaultEventContextExtractor - ); - const name = context.functionName; - - const { triggerSpan, triggerOrigin } = - AwsLambdaInstrumentation._getTriggerSpan(plugin, event, upstreamContext) ?? {}; - plugin.triggerOrigin = triggerOrigin; - - const inner = (invocationParentContext: OtelContext) => { - const invocationSpan = plugin.tracer.startSpan( - name, - { - kind: SpanKind.SERVER, - attributes: { - [SemanticAttributes.FAAS_EXECUTION]: context.awsRequestId, - [SemanticResourceAttributes.FAAS_ID]: context.invokedFunctionArn, - [SemanticResourceAttributes.CLOUD_ACCOUNT_ID]: - AwsLambdaInstrumentation._extractAccountId( - context.invokedFunctionArn - ), - [SPAN_ROLE_ATTRIBUTE]: 'invocation', - }, - }, - invocationParentContext - ); - - if (config.requestHook) { - safeExecuteInTheMiddle( - () => config.requestHook!(invocationSpan, { event, context }), - e => { - if (e) - diag.error('aws-lambda instrumentation: requestHook error', e); - }, - true - ); - } - - // Not awaiting here. This has it's pros: (no extra latency, less complexity). But also cons: no guarantee that it gets delivered before the invocation ends. - plugin._sendEarlySpans(upstreamContext, triggerSpan, invocationParentContext, invocationSpan); - - return otelContext.with( - trace.setSpan(invocationParentContext, invocationSpan), - () => { - // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling - // the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If - // the handler happened to both call the callback and complete a returned Promise, whichever happens first will - // win and the latter will be ignored. - const wrappedCallback = plugin._wrapCallback( - config, - callback, - invocationSpan, - triggerSpan - ); - - const maybePromise = safeExecuteInTheMiddle( - () => original.apply(this, [event, context, wrappedCallback]), - error => { - if (error != null) { - // Exception thrown synchronously before resolving callback / promise. - // Callback may or may not have been called, we can't know for sure, but it doesn't matter, both will end the current span - plugin._applyResponseHook(invocationSpan, error); - plugin._endSpan(invocationSpan, error); - } - } - ) as Promise<{}> | undefined; - if (typeof maybePromise?.then === 'function') { - return maybePromise.then( - value => { - plugin._applyResponseHook(invocationSpan, null, value); - plugin._endSpan(invocationSpan, undefined); - return value; - }, - (err: Error | string) => { - plugin._applyResponseHook(invocationSpan, err); - plugin._endSpan(invocationSpan, err); - throw err; - } - ); - } - return maybePromise; - } - ); - }; - - let handlerReturn: Promise | undefined; - if (!triggerSpan) { - // No wrapper span - try { - handlerReturn = inner(upstreamContext); - } catch (e) { - // Catching a lambda that synchronously failed - - void plugin._flush(); - throw e; - } - } else { - const subCtx = trace.setSpan(upstreamContext, triggerSpan); - handlerReturn = otelContext.with(subCtx, () => { - return safeExecuteInTheMiddle( + self._before_execution(event, context).then( + (instrCtx) => { + otelContext.with( + trace.setSpan(instrCtx.invocationParentContext, instrCtx.invocationSpan), () => { - const innerResult = inner(subCtx); // This call never fails, because it either returns a promise, or was called with safeExecuteInTheMiddle - // The handler was an async, it returned a promise. - if (typeof innerResult?.then === 'function') { - return innerResult.then( - value => { - strict(triggerSpan); - - void plugin._endWrapperSpan(config, triggerSpan, value, undefined); + // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling + // the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If + // the handler happened to both call the callback and complete a returned Promise, whichever happens first will + // win and the latter will be ignored. + const wrappedCallback = self._wrapCallback(callback, instrCtx); + + let maybePromise: any; + try { + maybePromise = original.apply(this, [event, context, wrappedCallback]) + } catch (err: any) { + // Catching synchronous failures + diag.debug('handler threw synchronously'); + void self._after_execution(instrCtx, err, undefined); + context.callbackWaitsForEmptyEventLoop = false; + callback(err, undefined); + return; + } - return value; + if (typeof maybePromise?.then === 'function') { + diag.debug('handler returned a promise'); + // Promise based async handler + maybePromise.then( + (value: any) => { + diag.debug('handler promise completed'); + self._after_execution(instrCtx, undefined, value); + context.callbackWaitsForEmptyEventLoop = false; + callback(undefined, value); }, - async error => { - strict(triggerSpan); - await plugin._endWrapperSpan(config, triggerSpan, undefined, error); - throw error; // We don't want the instrumentation to hide the error from AWS + (err: Error | string) => { + diag.debug('handler promise failed'); + self._after_execution(instrCtx, err, undefined); + context.callbackWaitsForEmptyEventLoop = false; + callback(err, undefined); } ); } else { - // The lambda was synchronous, or it as synchronously thrown an error - strict(triggerSpan); - - //if (hasLambdaSynchronouslyThrown) { - void plugin._endWrapperSpan( - config, - triggerSpan, - innerResult, - undefined - ); - // } - // Fallthrough: sync reply, but callback may be in use. No way to query the event loop ! - } - - return innerResult; - }, - error => { - if (error) { - strict(triggerSpan); - void plugin._endWrapperSpan(config, triggerSpan, undefined, error); - void plugin._flush(); + diag.debug('handler returned synchronously (callback based)'); } } ); - }); - } + }, + (err) => { + diag.error('_before_execution failed', err); + self._after_execution(undefined, err, undefined); + context.callbackWaitsForEmptyEventLoop = false; + callback(err, undefined); + } + ) + } + } - // Second case, lambda was asynchronous, in which case - if (typeof handlerReturn?.then === 'function') { - return handlerReturn.then( - async success => { - await plugin._flush(); - return success; - }, - async error => { - await plugin._flush(); - throw error; - } - ); + private async _before_execution( + event: any, + context: Context, + ): Promise { + + const upstreamContext = this._determineUpstreamContext(event, context); + + const { triggerSpan, triggerOrigin } = this._startTriggerSpan(event, upstreamContext) ?? {}; + + let invocationParentContext; + if (triggerSpan) { + invocationParentContext = trace.setSpan(upstreamContext, triggerSpan); + } else { + invocationParentContext = upstreamContext + } + + const invocationSpan = this._startInvocationSpan(event, context, invocationParentContext); + + diag.info(`upstream: ${trace.getSpan(upstreamContext)?.spanContext().spanId} trigger: ${triggerSpan?.spanContext().spanId} invocationSpan: ${invocationSpan.spanContext().spanId}`) + + await this._sendEarlySpans(upstreamContext, triggerSpan, invocationParentContext, invocationSpan); + + return {triggerOrigin, triggerSpan, invocationSpan, invocationParentContext} + } + + // never fails + private async _after_execution( + context: InstrumentationContext | undefined, + err: string | Error | null | undefined, + res: any, + ): Promise { + try { + const plugin = this; + if (context?.invocationSpan) { + plugin._applyResponseHook(context.invocationSpan, err, res); + plugin._endInvocationSpan(context.invocationSpan, err); + } + if (context?.triggerSpan) { + plugin._endTriggerSpan(context.triggerSpan, context.triggerOrigin, res, err); } + } catch (e) { + diag.error('Error in _after_execution', e); + } - // Third case, the lambda is purely synchronous, without event loop, nor callback() being called - // Pitfall, no flushing ! - // We can't know for sure if the event loop is empty or not, so we can't know if we should flush or not. - return handlerReturn; - }; + await this._flush() } // never fails @@ -350,12 +286,11 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { return earlySpan; } - private static _getTriggerSpan( - plugin: AwsLambdaInstrumentation, + private _startTriggerSpan( event: unknown, parentContext: OtelContext ): { triggerOrigin: TriggerOrigin; triggerSpan: Span } | undefined { - if (plugin._config.detectTrigger === false) { + if (this._config.detectTrigger === false) { return undefined; } const trigger = getEventTrigger(event); @@ -368,16 +303,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } options.attributes[LambdaAttributes.TRIGGER_SERVICE] = origin; options.attributes[SPAN_ROLE_ATTRIBUTE] = 'trigger'; - const triggerSpan = plugin.tracer.startSpan(name, options, parentContext); + const triggerSpan = this.tracer.startSpan(name, options, parentContext); return { triggerOrigin: origin, triggerSpan }; } - private async _endWrapperSpan( - config: AwsLambdaInstrumentationConfig, + private _endTriggerSpan( span: Span, + triggerOrigin: TriggerOrigin | undefined, lambdaResponse: any, errorFromLambda: string | Error | null | undefined, - ) { + ): void { if (span.isRecording()) { if (errorFromLambda) { span.recordException(errorFromLambda); @@ -391,8 +326,8 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { return; } - if (this.triggerOrigin) { - finalizeSpan(config, this.triggerOrigin, span, lambdaResponse); + if (triggerOrigin) { + finalizeSpan(this._config, triggerOrigin, span, lambdaResponse); } span.end(); } else { @@ -400,33 +335,63 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } } + private _startInvocationSpan(event: any, context: Context, invocationParentContext: OtelContext): Span { + const invocationSpan = this.tracer.startSpan( + context.functionName, + { + kind: SpanKind.SERVER, + attributes: { + [SEMATTRS_FAAS_EXECUTION]: context.awsRequestId, + [SPAN_ROLE_ATTRIBUTE]: 'invocation', + }, + }, + invocationParentContext + ); + + if (this._config.requestHook) { + try { + this._config.requestHook!(invocationSpan, { event, context }) + } catch (e) { + diag.error('aws-lambda instrumentation: requestHook error', e) + } + } + return invocationSpan; + } + + private _endInvocationSpan(span: Span, err: string | Error | null | undefined): void { + if (span.isRecording()) { + if (err) { + span.recordException(err); + } + + const errMessage = this._errorToString(err); + if (errMessage) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: errMessage, + }); + } + span.end(); + } else { + diag.debug('Ending span for the second time'); + } + } + private _wrapCallback( - config: AwsLambdaInstrumentationConfig, originalAWSLambdaCallback: Callback, - span: Span, - wrapperSpan?: Span + instrumentationContext: InstrumentationContext ): Callback { - const plugin = this; return (err, res) => { - diag.debug('executing wrapped lookup callback function'); - plugin._applyResponseHook(span, err, res); - - plugin._endSpan(span, err); - if (wrapperSpan) { - void plugin._endWrapperSpan(config, wrapperSpan, res, err); - } - - void this._flush().then(() => { - diag.debug('executing original lookup callback function'); - originalAWSLambdaCallback.apply(this, [err, res]); // End of the function - }).catch(e => - diag.error('AWS Lambda callback failed', e) - ); + diag.debug('executing wrapped callback function'); + this._after_execution(instrumentationContext, err, res).then(() => { + diag.debug('executing original callback function'); + originalAWSLambdaCallback.apply(this, [err, res]); // End of the function + }); }; } // never fails - private async _flush() { + private async _flush(): Promise { await Promise.all([ this._flush_trace(), this._flush_metric() @@ -434,7 +399,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } // never fails - private async _flush_trace() { + private async _flush_trace(): Promise { if (this._traceForceFlusher) { try { await this._traceForceFlusher(); @@ -449,7 +414,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } // never fails - private async _flush_metric() { + private async _flush_metric(): Promise { if (this._metricForceFlusher) { try { await this._metricForceFlusher(); @@ -463,26 +428,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } } - private _endSpan(span: Span, err: string | Error | null | undefined) { - if (span.isRecording()) { - if (err) { - span.recordException(err); - } - - const errMessage = this._errorToString(err); - if (errMessage) { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: errMessage, - }); - } - span.end(); - } else { - diag.debug('Ending span for the second time'); - } - } - - private _errorToString(err: string | Error | null | undefined) { + private _errorToString(err: string | Error | null | undefined): string | undefined { let errMessage; if (typeof err === 'string') { errMessage = err; @@ -492,12 +438,12 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { return errMessage; } - override setTracerProvider(tracerProvider: TracerProvider) { + override setTracerProvider(tracerProvider: TracerProvider): void { super.setTracerProvider(tracerProvider); this._traceForceFlusher = this._traceForceFlush(tracerProvider); } - private _traceForceFlush(tracerProvider: TracerProvider) { + private _traceForceFlush(tracerProvider: TracerProvider): (() => Promise) | undefined { if (!tracerProvider) return undefined; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -514,12 +460,12 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { return undefined; } - override setMeterProvider(meterProvider: MeterProvider) { + override setMeterProvider(meterProvider: MeterProvider): void { super.setMeterProvider(meterProvider); this._metricForceFlusher = this._metricForceFlush(meterProvider); } - private _metricForceFlush(meterProvider: MeterProvider) { + private _metricForceFlush(meterProvider: MeterProvider): (() => Promise) | undefined { if (!meterProvider) return undefined; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -536,7 +482,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { span: Span, err?: Error | string | null, res?: any - ) { + ): void { if (this._config?.responseHook) { safeExecuteInTheMiddle( () => this._config.responseHook!(span, { err, res }), @@ -549,32 +495,22 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } } - private static _extractAccountId(arn: string): string | undefined { - const parts = arn.split(':'); - if (parts.length >= 5) { - return parts[4]; - } - return undefined; - } - private static _defaultEventContextExtractor(event: any): OtelContext { // The default extractor tries to get sampled trace header from HTTP headers. const httpHeaders = event.headers || {}; - return propagation.extract(otelContext.active(), httpHeaders, headerGetter); + return propagation.extract(ROOT_CONTEXT, httpHeaders, headerGetter); } - private static _determineParent( + private _determineUpstreamContext( event: any, context: Context, - disableAwsContextPropagation: boolean, - eventContextExtractor: EventContextExtractor ): OtelContext { let parent: OtelContext | undefined = undefined; - if (!disableAwsContextPropagation) { + if (!this._config.disableAwsContextPropagation) { const lambdaTraceHeader = process.env[traceContextEnvironmentKey]; if (lambdaTraceHeader) { parent = awsPropagator.extract( - otelContext.active(), + ROOT_CONTEXT, { [AWSXRAY_TRACE_ID_HEADER]: lambdaTraceHeader }, headerGetter ); @@ -592,6 +528,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { } } } + const eventContextExtractor = this._config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor const extractedContext = safeExecuteInTheMiddle( () => eventContextExtractor(event, context), e => { From f23d5ab45b5e852ffc23691cb0795fc9928f5cb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Sumis=C5=82awski?= Date: Fri, 6 Sep 2024 14:46:31 +0200 Subject: [PATCH 2/2] remove unused code --- .../src/instrumentation.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts index 012915677f..98eaa2e957 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-lambda/src/instrumentation.ts @@ -84,9 +84,6 @@ type InstrumentationContext = { } export class AwsLambdaInstrumentation extends InstrumentationBase { - public isProbablyCallbackBased: boolean = false; - // private triggerOrigin: TriggerOrigin | undefined; - private _traceForceFlusher?: () => Promise; private _metricForceFlusher?: () => Promise;