Skip to content

Commit

Permalink
Nodejs cx wrapper (#37)
Browse files Browse the repository at this point in the history
Introduce cx-wrapper wrapping handler function without using require/import hooks
  • Loading branch information
RafalSumislawski authored Jul 4, 2024
1 parent 3fc0b23 commit f2838bd
Show file tree
Hide file tree
Showing 18 changed files with 3,231 additions and 345 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ __pycache__/*
build.toml

*.zip
tsconfig.tsbuildinfo
35 changes: 30 additions & 5 deletions ci-scripts/build_nodejs_layer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ fi

IITM_PATH=$(realpath $IITM_PATH)

CWD=$(pwd)

echo OPENTELEMETRY_JS_CONTRIB_PATH=$OPENTELEMETRY_JS_CONTRIB_PATH
echo OPENTELEMETRY_JS_PATH=$OPENTELEMETRY_JS_PATH
echo IITM_PATH=$IITM_PATH
echo CWD=$CWD

pushd $OPENTELEMETRY_JS_CONTRIB_PATH > /dev/null
# Generate version files in opentelemetry-js-contrib
npx [email protected] run version:update # Newer versions have trouble with our lerna.json which contains `useWorkspaces`
Expand Down Expand Up @@ -90,8 +97,8 @@ rm -f import-in-the-middle-*.tgz
npm install && npm pack
popd > /dev/null

# Install forked opentelemetry-js/opentelemetry-js-contrib libraries
pushd ./nodejs/packages/layer
# Install forked libraries in cx-wrapper
pushd ./nodejs/packages/cx-wrapper
npm install \
${OPENTELEMETRY_JS_CONTRIB_PATH}/plugins/node/opentelemetry-instrumentation-aws-lambda/opentelemetry-instrumentation-aws-lambda-*.tgz \
${OPENTELEMETRY_JS_CONTRIB_PATH}/plugins/node/opentelemetry-instrumentation-mongodb/opentelemetry-instrumentation-mongodb-*.tgz \
Expand All @@ -101,10 +108,28 @@ npm install \
${IITM_PATH}/import-in-the-middle-*.tgz
popd > /dev/null

# Install copyfiles and bestzip # used by `npm run compile`
npm install -g copyfiles bestzip
# Build cx-wrapper
pushd ./nodejs/packages/cx-wrapper
rm -f cx-wrapper-*.tgz
npm install && npm pack
popd > /dev/null

# Install libraries in layer
pushd ./nodejs/packages/layer
npm install \
${OPENTELEMETRY_JS_CONTRIB_PATH}/plugins/node/opentelemetry-instrumentation-aws-lambda/opentelemetry-instrumentation-aws-lambda-*.tgz \
${OPENTELEMETRY_JS_CONTRIB_PATH}/plugins/node/opentelemetry-instrumentation-mongodb/opentelemetry-instrumentation-mongodb-*.tgz \
${OPENTELEMETRY_JS_CONTRIB_PATH}/plugins/node/opentelemetry-instrumentation-aws-sdk/opentelemetry-instrumentation-aws-sdk-*.tgz \
${OPENTELEMETRY_JS_PATH}/experimental/packages/opentelemetry-instrumentation/opentelemetry-instrumentation-*.tgz \
${OPENTELEMETRY_JS_PATH}/packages/opentelemetry-sdk-trace-base/opentelemetry-sdk-trace-base-*.tgz \
${IITM_PATH}/import-in-the-middle-*.tgz \
${CWD}/nodejs/packages/cx-wrapper/cx-wrapper-*.tgz
popd > /dev/null

# Install copyfiles and bestzip # used by `npm run clean/compile`
npm install -g copyfiles bestzip rimraf

# Build layer
pushd ./nodejs/packages/layer
npm install && npm run compile
npm run clean && npm install
popd > /dev/null
11 changes: 11 additions & 0 deletions nodejs/packages/cx-wrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
*.map
common.d.ts
common.js
index.d.ts
index.js
instrumentation-init.d.ts
instrumentation-init.js
lambda-instrumentation-init.d.ts
lambda-instrumentation-init.js
provider-init.d.ts
provider-init.js
21 changes: 21 additions & 0 deletions nodejs/packages/cx-wrapper/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { getEnv } from '@opentelemetry/core';

export const logLevel = getEnv().OTEL_LOG_LEVEL;

export const OtelAttributes = {
RPC_REQUEST_PAYLOAD: 'rpc.request.payload',
RPC_RESPONSE_PAYLOAD: 'rpc.response.payload',
DB_RESPONSE: 'db.response',
};

export const parseIntEnvvar = (envName: string): number | undefined => {
const envVar = process.env?.[envName];
if (envVar === undefined) return undefined;
const numericEnvvar = parseInt(envVar);
if (isNaN(numericEnvvar)) return undefined;
return numericEnvvar;
};

const DEFAULT_OTEL_PAYLOAD_SIZE_LIMIT = 50 * 1024;
export const OTEL_PAYLOAD_SIZE_LIMIT: number =
parseIntEnvvar('OTEL_PAYLOAD_SIZE_LIMIT') ?? DEFAULT_OTEL_PAYLOAD_SIZE_LIMIT;
60 changes: 60 additions & 0 deletions nodejs/packages/cx-wrapper/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { diag, DiagConsoleLogger } from '@opentelemetry/api';
import { getEnv } from '@opentelemetry/core';

// configure lambda logging (before we load libraries that might log)
diag.setLogger(new DiagConsoleLogger(), getEnv().OTEL_LOG_LEVEL);

import { Callback, Context } from 'aws-lambda';
import { Handler } from "aws-lambda/handler.js";
import { load } from './loader.js';
import { initializeInstrumentations } from './instrumentation-init.js';
import { initializeProvider } from './provider-init.js';
import { makeLambdaInstrumentation } from './lambda-instrumentation-init.js';

const instrumentations = initializeInstrumentations();
initializeProvider(instrumentations);
const lambdaInstrumentation = makeLambdaInstrumentation();

if (process.env.CX_ORIGINAL_HANDLER === undefined)
throw Error('CX_ORIGINAL_HANDLER is missing');

// We want user code to get initialized during lambda init phase
diag.debug(`Initialization: Loading original handler ${process.env.CX_ORIGINAL_HANDLER}`);
try {
(async () => {
await load(
process.env.LAMBDA_TASK_ROOT,
process.env.CX_ORIGINAL_HANDLER
);
})();
} catch (e) {}

export const handler = (event: any, context: Context, callback: Callback) => {
diag.debug(`Loading original handler ${process.env.CX_ORIGINAL_HANDLER}`);
load(
process.env.LAMBDA_TASK_ROOT,
process.env.CX_ORIGINAL_HANDLER
).then(
(originalHandler) => {
diag.debug(`Instrumenting handler`);
const patchedHandler = lambdaInstrumentation.getPatchHandler(originalHandler) as any as Handler;
diag.debug(`Running CX handler and redirecting to ${process.env.CX_ORIGINAL_HANDLER}`)
const maybePromise = patchedHandler(event, context, callback);
if (typeof maybePromise?.then === 'function') {
maybePromise.then(
value => {
callback(null, value)
},
(err: Error | string) => {
callback(err, null)
}
);
}
},
(err: Error | string) => {
callback(err, null)
}
);
}

diag.debug('OpenTelemetry instrumentation is ready');
114 changes: 114 additions & 0 deletions nodejs/packages/cx-wrapper/instrumentation-init.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { diag, Span } from '@opentelemetry/api';
import { Instrumentation, registerInstrumentations } from '@opentelemetry/instrumentation';
import { AwsInstrumentation } from '@opentelemetry/instrumentation-aws-sdk';
import { PgResponseHookInformation } from '@opentelemetry/instrumentation-pg';
import { OTEL_PAYLOAD_SIZE_LIMIT, OtelAttributes } from './common';

declare global {
function configureInstrumentations(): Instrumentation[]
}

export function initializeInstrumentations(): any[] {
diag.debug('Initializing OpenTelemetry instrumentations');

const instrumentations = [
new AwsInstrumentation({
suppressInternalInstrumentation: true,
preRequestHook: (span: Span, { request }) => {
const data = JSON.stringify(request.commandInput);
if (data !== undefined) {
span.setAttribute(
OtelAttributes.RPC_REQUEST_PAYLOAD,
data.substring(0, OTEL_PAYLOAD_SIZE_LIMIT)
);
}
},
responseHook: (span, { response }) => {
const data =
'data' in response && typeof response.data === 'object'
? JSON.stringify(response.data)
: response?.data?.toString();
if (data !== undefined) {
span.setAttribute(
OtelAttributes.RPC_RESPONSE_PAYLOAD,
data.substring(0, OTEL_PAYLOAD_SIZE_LIMIT)
);
}
},
}),
...(typeof configureInstrumentations === 'function' ? configureInstrumentations: defaultConfigureInstrumentations)()
];

// Register instrumentations synchronously to ensure code is patched even before provider is ready.
registerInstrumentations({
instrumentations,
});

return instrumentations;
}

function defaultConfigureInstrumentations() {
// Use require statements for instrumentation to avoid having to have transitive dependencies on all the typescript
// definitions.
const { DnsInstrumentation } = require('@opentelemetry/instrumentation-dns');
const { ExpressInstrumentation } = require('@opentelemetry/instrumentation-express');
const { GraphQLInstrumentation } = require('@opentelemetry/instrumentation-graphql');
const { GrpcInstrumentation } = require('@opentelemetry/instrumentation-grpc');
const { HapiInstrumentation } = require('@opentelemetry/instrumentation-hapi');
const { HttpInstrumentation } = require('@opentelemetry/instrumentation-http');
const { IORedisInstrumentation } = require('@opentelemetry/instrumentation-ioredis');
const { KoaInstrumentation } = require('@opentelemetry/instrumentation-koa');
const { MongoDBInstrumentation } = require('@opentelemetry/instrumentation-mongodb');
const { MySQLInstrumentation } = require('@opentelemetry/instrumentation-mysql');
const { NetInstrumentation } = require('@opentelemetry/instrumentation-net');
const { PgInstrumentation } = require('@opentelemetry/instrumentation-pg');
const { RedisInstrumentation } = require('@opentelemetry/instrumentation-redis');
return [ new DnsInstrumentation(),
new ExpressInstrumentation(),
new GraphQLInstrumentation(),
new GrpcInstrumentation(),
new HapiInstrumentation(),
new HttpInstrumentation(),
new IORedisInstrumentation(),
new KoaInstrumentation(),
new MongoDBInstrumentation({
enhancedDatabaseReporting: process.env.MONGO_ENHANCED_REPORTING === 'true'
}),
new MySQLInstrumentation(),
new NetInstrumentation(),
new PgInstrumentation({
responseHook: (span: Span, responseInfo: PgResponseHookInformation) => {
try {
if (responseInfo?.data?.rows) {
const data = JSON.stringify(responseInfo?.data?.rows);
span.setAttribute(
OtelAttributes.DB_RESPONSE,
data.substring(0, OTEL_PAYLOAD_SIZE_LIMIT)
);
}
} catch (e) {
return;
}
},
}),
new RedisInstrumentation({
responseHook: (
span: Span,
cmdName: string,
cmdArgs: string[],
response: unknown
) => {
const data =
response && typeof response === 'object'
? JSON.stringify(response)
: response?.toString();
if (data !== undefined) {
span.setAttribute(
OtelAttributes.DB_RESPONSE,
data.substring(0, OTEL_PAYLOAD_SIZE_LIMIT)
);
}
},
}),
]
}
93 changes: 93 additions & 0 deletions nodejs/packages/cx-wrapper/lambda-instrumentation-init.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {
context as otelContext,
defaultTextMapGetter,
Context as OtelContext,
propagation,
trace,
diag
} from '@opentelemetry/api';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { AwsLambdaInstrumentation, AwsLambdaInstrumentationConfig } from '@opentelemetry/instrumentation-aws-lambda';
import { OTEL_PAYLOAD_SIZE_LIMIT, OtelAttributes } from './common.js';

declare global {
function configureLambdaInstrumentation(config: AwsLambdaInstrumentationConfig): AwsLambdaInstrumentationConfig
}

export function makeLambdaInstrumentation(): AwsLambdaInstrumentation {
diag.debug('Preparing handler function instrumentation');

const lambdaAutoInstrumentConfig: AwsLambdaInstrumentationConfig = {
requestHook: (span, { event }) => {
const data =
event && typeof event === 'object'
? JSON.stringify(event)
: event?.toString();
if (data !== undefined) {
span.setAttribute(
OtelAttributes.RPC_REQUEST_PAYLOAD,
data.substring(0, OTEL_PAYLOAD_SIZE_LIMIT)
);
}
},
disableAwsContextPropagation: true,
eventContextExtractor: (event, context) => {
// try to extract propagation from http headers first
const httpHeaders = event?.headers || {};
const extractedHttpContext: OtelContext = propagation.extract(
otelContext.active(),
httpHeaders,
defaultTextMapGetter
);
if (trace.getSpan(extractedHttpContext)?.spanContext()) {
return extractedHttpContext;
}

// extract from client context
if (context.clientContext?.Custom) {
try {
const extractedClientContextOtelContext: OtelContext =
propagation.extract(
otelContext.active(),
context.clientContext.Custom,
defaultTextMapGetter
);
if (trace.getSpan(extractedClientContextOtelContext)?.spanContext()) {
return extractedClientContextOtelContext;
}
} catch (e) {
diag.debug(
'error extracting context from lambda client context payload',
e
);
}
} else if ((context.clientContext as any)?.custom) {
try {
const extractedClientContextOtelContext: OtelContext =
propagation.extract(
otelContext.active(),
(context.clientContext as any).custom,
defaultTextMapGetter
);
if (trace.getSpan(extractedClientContextOtelContext)?.spanContext()) {
return extractedClientContextOtelContext;
}
} catch (e) {
diag.debug(
'error extracting context from lambda client context payload',
e
);
}
}
return otelContext.active();
},
payloadSizeLimit: OTEL_PAYLOAD_SIZE_LIMIT,
};

// TODO consider not treating it as an instrumentation
const instrumentation = new AwsLambdaInstrumentation(typeof configureLambdaInstrumentation === 'function' ? configureLambdaInstrumentation(lambdaAutoInstrumentConfig) : lambdaAutoInstrumentConfig)

registerInstrumentations({instrumentations: [instrumentation]})

return instrumentation
}
3 changes: 3 additions & 0 deletions nodejs/packages/cx-wrapper/loader.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {Handler} from "aws-lambda/handler.js";

export function load(taskRoot?: string, originalHandler?: string): Promise<Handler>;
3 changes: 3 additions & 0 deletions nodejs/packages/cx-wrapper/loader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const { load } = require('UserFunction.js'); // https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/main/src/UserFunction.js

module.exports.load = load
Loading

0 comments on commit f2838bd

Please sign in to comment.