From 6726b37778def20e9f118066a46685e374bf0c2b Mon Sep 17 00:00:00 2001 From: Thomas Watson Date: Tue, 7 Jan 2025 13:55:10 +0100 Subject: [PATCH] [DI] Batch outgoing http requests (#5007) --- integration-tests/debugger/basic.spec.js | 282 ++++++++++-------- .../debugger/snapshot-pruning.spec.js | 2 +- integration-tests/debugger/snapshot.spec.js | 10 +- integration-tests/debugger/utils.js | 8 +- .../src/debugger/devtools_client/config.js | 3 +- .../src/debugger/devtools_client/index.js | 5 +- .../debugger/devtools_client/json-buffer.js | 36 +++ .../src/debugger/devtools_client/send.js | 38 ++- .../src/debugger/devtools_client/status.js | 11 +- .../devtools_client/json-buffer.spec.js | 45 +++ .../debugger/devtools_client/send.spec.js | 111 +++++++ .../debugger/devtools_client/status.spec.js | 97 ++++-- .../test/debugger/devtools_client/utils.js | 20 +- 13 files changed, 483 insertions(+), 185 deletions(-) create mode 100644 packages/dd-trace/src/debugger/devtools_client/json-buffer.js create mode 100644 packages/dd-trace/test/debugger/devtools_client/json-buffer.spec.js create mode 100644 packages/dd-trace/test/debugger/devtools_client/send.spec.js diff --git a/integration-tests/debugger/basic.spec.js b/integration-tests/debugger/basic.spec.js index aa6a1881d33..f51278bc2ee 100644 --- a/integration-tests/debugger/basic.spec.js +++ b/integration-tests/debugger/basic.spec.js @@ -47,20 +47,22 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.axios.get(t.breakpoint.url) - .then((response) => { - assert.strictEqual(response.status, 200) - assert.deepStrictEqual(response.data, { hello: 'bar' }) - }) - .catch(done) - } else { - endIfDone() - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.axios.get(t.breakpoint.url) + .then((response) => { + assert.strictEqual(response.status, 200) + assert.deepStrictEqual(response.data, { hello: 'bar' }) + }) + .catch(done) + } else { + endIfDone() + } + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -108,11 +110,13 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()() - endIfDone() + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()() + endIfDone() + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -147,18 +151,20 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.agent.removeRemoteConfig(t.rcConfig.id) - // Wait a little to see if we get any follow-up `debugger-diagnostics` messages - setTimeout(() => { - payloadsProcessed = true - endIfDone() - }, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.agent.removeRemoteConfig(t.rcConfig.id) + // Wait a little to see if we get any follow-up `debugger-diagnostics` messages + setTimeout(() => { + payloadsProcessed = true + endIfDone() + }, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval + } + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -206,19 +212,21 @@ describe('Dynamic Instrumentation', function () { }] t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - const { diagnostics } = payload.debugger - assertUUID(diagnostics.runtimeId) - - if (diagnostics.status === 'ERROR') { - assert.property(diagnostics, 'exception') - assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace']) - assert.typeOf(diagnostics.exception.message, 'string') - assert.typeOf(diagnostics.exception.stacktrace, 'string') - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + const { diagnostics } = event.debugger + assertUUID(diagnostics.runtimeId) + + if (diagnostics.status === 'ERROR') { + assert.property(diagnostics, 'exception') + assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace']) + assert.typeOf(diagnostics.exception.message, 'string') + assert.typeOf(diagnostics.exception.stacktrace, 'string') + } - endIfDone() + endIfDone() + }) }) t.agent.addRemoteConfig({ @@ -255,10 +263,12 @@ describe('Dynamic Instrumentation', function () { ] t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done) + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done) + }) }) - t.agent.on('debugger-input', ({ payload }) => { + t.agent.on('debugger-input', ({ payload: [payload] }) => { assert.strictEqual(payload.message, expectedMessages.shift()) if (expectedMessages.length === 0) done() }) @@ -268,17 +278,19 @@ describe('Dynamic Instrumentation', function () { it('should not trigger if probe is deleted', function (done) { t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.agent.once('remote-confg-responded', async () => { - await t.axios.get(t.breakpoint.url) - // We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail - // if it does, but not so long that the test times out. - // TODO: Is there some signal we can use instead of a timer? - setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval - }) + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.agent.once('remote-confg-responded', async () => { + await t.axios.get(t.breakpoint.url) + // We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail + // if it does, but not so long that the test times out. + // TODO: Is there some signal we can use instead of a timer? + setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval + }) - t.agent.removeRemoteConfig(t.rcConfig.id) - } + t.agent.removeRemoteConfig(t.rcConfig.id) + } + }) }) t.agent.on('debugger-input', () => { @@ -291,8 +303,7 @@ describe('Dynamic Instrumentation', function () { describe('sampling', function () { it('should respect sampling rate for single probe', function (done) { - let start, timer - let payloadsReceived = 0 + let prev, timer const rcConfig = t.generateRemoteConfig({ sampling: { snapshotsPerSecond: 1 } }) function triggerBreakpointContinuously () { @@ -301,27 +312,26 @@ describe('Dynamic Instrumentation', function () { } t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') triggerBreakpointContinuously() + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') triggerBreakpointContinuously() + }) }) - t.agent.on('debugger-input', () => { - payloadsReceived++ - if (payloadsReceived === 1) { - start = Date.now() - } else if (payloadsReceived === 2) { - const duration = Date.now() - start - clearTimeout(timer) - - // Allow for a variance of -5/+50ms (time will tell if this is enough) - assert.isAbove(duration, 995) - assert.isBelow(duration, 1050) - - // Wait at least a full sampling period, to see if we get any more payloads - timer = setTimeout(done, 1250) - } else { - clearTimeout(timer) - done(new Error('Too many payloads received!')) - } + t.agent.on('debugger-input', ({ payload }) => { + payload.forEach(({ 'debugger.snapshot': { timestamp } }) => { + if (prev !== undefined) { + const duration = timestamp - prev + clearTimeout(timer) + + // Allow for a variance of +50ms (time will tell if this is enough) + assert.isAtLeast(duration, 1000) + assert.isBelow(duration, 1050) + + // Wait at least a full sampling period, to see if we get any more payloads + timer = setTimeout(done, 1250) + } + prev = timestamp + }) }) t.agent.addRemoteConfig(rcConfig) @@ -332,14 +342,12 @@ describe('Dynamic Instrumentation', function () { const rcConfig2 = t.breakpoints[1].generateRemoteConfig({ sampling: { snapshotsPerSecond: 1 } }) const state = { [rcConfig1.config.id]: { - payloadsReceived: 0, tiggerBreakpointContinuously () { t.axios.get(t.breakpoints[0].url).catch(done) this.timer = setTimeout(this.tiggerBreakpointContinuously.bind(this), 10) } }, [rcConfig2.config.id]: { - payloadsReceived: 0, tiggerBreakpointContinuously () { t.axios.get(t.breakpoints[1].url).catch(done) this.timer = setTimeout(this.tiggerBreakpointContinuously.bind(this), 10) @@ -348,29 +356,29 @@ describe('Dynamic Instrumentation', function () { } t.agent.on('debugger-diagnostics', ({ payload }) => { - const { probeId, status } = payload.debugger.diagnostics - if (status === 'INSTALLED') state[probeId].tiggerBreakpointContinuously() + payload.forEach((event) => { + const { probeId, status } = event.debugger.diagnostics + if (status === 'INSTALLED') state[probeId].tiggerBreakpointContinuously() + }) }) t.agent.on('debugger-input', ({ payload }) => { - const _state = state[payload['debugger.snapshot'].probe.id] - _state.payloadsReceived++ - if (_state.payloadsReceived === 1) { - _state.start = Date.now() - } else if (_state.payloadsReceived === 2) { - const duration = Date.now() - _state.start - clearTimeout(_state.timer) - - // Allow for a variance of -5/+50ms (time will tell if this is enough) - assert.isAbove(duration, 995) - assert.isBelow(duration, 1050) - - // Wait at least a full sampling period, to see if we get any more payloads - _state.timer = setTimeout(doneWhenCalledTwice, 1250) - } else { - clearTimeout(_state.timer) - done(new Error('Too many payloads received!')) - } + payload.forEach((result) => { + const _state = state[result['debugger.snapshot'].probe.id] + const { timestamp } = result['debugger.snapshot'] + if (_state.prev !== undefined) { + const duration = timestamp - _state.prev + clearTimeout(_state.timer) + + // Allow for a variance of +50ms (time will tell if this is enough) + assert.isAtLeast(duration, 1000) + assert.isBelow(duration, 1050) + + // Wait at least a full sampling period, to see if we get any more payloads + _state.timer = setTimeout(doneWhenCalledTwice, 1250) + } + _state.prev = timestamp + }) }) t.agent.addRemoteConfig(rcConfig1) @@ -387,39 +395,42 @@ describe('Dynamic Instrumentation', function () { it('should remove the last breakpoint completely before trying to add a new one', function (done) { const rcConfig2 = t.generateRemoteConfig() - t.agent.on('debugger-diagnostics', ({ payload: { debugger: { diagnostics: { status, probeId } } } }) => { - if (status !== 'INSTALLED') return - - if (probeId === t.rcConfig.config.id) { - // First INSTALLED payload: Try to trigger the race condition. - t.agent.removeRemoteConfig(t.rcConfig.id) - t.agent.addRemoteConfig(rcConfig2) - } else { - // Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition. - let finished = false - - // If the race condition occurred, the debugger will have been detached from the main thread and the new - // probe will never trigger. If that's the case, the following timer will fire: - const timer = setTimeout(() => { - done(new Error('Race condition occurred!')) - }, 1000) - - // If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the - // following event listener will be called: - t.agent.once('debugger-input', () => { - clearTimeout(timer) - finished = true - done() - }) + t.agent.on('debugger-diagnostics', ({ payload }) => { + payload.forEach((event) => { + const { probeId, status } = event.debugger.diagnostics + if (status !== 'INSTALLED') return + + if (probeId === t.rcConfig.config.id) { + // First INSTALLED payload: Try to trigger the race condition. + t.agent.removeRemoteConfig(t.rcConfig.id) + t.agent.addRemoteConfig(rcConfig2) + } else { + // Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition. + let finished = false + + // If the race condition occurred, the debugger will have been detached from the main thread and the new + // probe will never trigger. If that's the case, the following timer will fire: + const timer = setTimeout(() => { + done(new Error('Race condition occurred!')) + }, 2000) + + // If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the + // following event listener will be called: + t.agent.once('debugger-input', () => { + clearTimeout(timer) + finished = true + done() + }) - // Perform HTTP request to try and trigger the probe - t.axios.get(t.breakpoint.url).catch((err) => { - // If the request hasn't fully completed by the time the tests ends and the target app is destroyed, Axios - // will complain with a "socket hang up" error. Hence this sanity check before calling `done(err)`. If we - // later add more tests below this one, this shouuldn't be an issue. - if (!finished) done(err) - }) - } + // Perform HTTP request to try and trigger the probe + t.axios.get(t.breakpoint.url).catch((err) => { + // If the request hasn't fully completed by the time the tests ends and the target app is destroyed, + // Axios will complain with a "socket hang up" error. Hence this sanity check before calling + // `done(err)`. If we later add more tests below this one, this shouuldn't be an issue. + if (!finished) done(err) + }) + } + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -467,7 +478,9 @@ function testBasicInputWithDD (t, done) { t.triggerBreakpoint() t.agent.on('message', ({ payload }) => { - const span = payload.find((arr) => arr[0].name === 'fastify.request')[0] + const span = payload.find((arr) => arr[0].name === 'fastify.request')?.[0] + if (!span) return + traceId = span.trace_id.toString() spanId = span.span_id.toString() @@ -477,6 +490,7 @@ function testBasicInputWithDD (t, done) { t.agent.on('debugger-input', ({ payload }) => { assertBasicInputPayload(t, payload) + payload = payload[0] assert.isObject(payload.dd) assert.hasAllKeys(payload.dd, ['trace_id', 'span_id']) assert.typeOf(payload.dd.trace_id, 'string') @@ -503,7 +517,7 @@ function testBasicInputWithoutDD (t, done) { t.agent.on('debugger-input', ({ payload }) => { assertBasicInputPayload(t, payload) - assert.doesNotHaveAnyKeys(payload, ['dd']) + assert.doesNotHaveAnyKeys(payload[0], ['dd']) done() }) @@ -511,6 +525,10 @@ function testBasicInputWithoutDD (t, done) { } function assertBasicInputPayload (t, payload) { + assert.isArray(payload) + assert.lengthOf(payload, 1) + payload = payload[0] + const expected = { ddsource: 'dd_debugger', hostname: os.hostname(), diff --git a/integration-tests/debugger/snapshot-pruning.spec.js b/integration-tests/debugger/snapshot-pruning.spec.js index c1ba218dd1c..b94d6afcce3 100644 --- a/integration-tests/debugger/snapshot-pruning.spec.js +++ b/integration-tests/debugger/snapshot-pruning.spec.js @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () { beforeEach(t.triggerBreakpoint) it('should prune snapshot if payload is too large', function (done) { - t.agent.on('debugger-input', ({ payload }) => { + t.agent.on('debugger-input', ({ payload: [payload] }) => { assert.isBelow(Buffer.byteLength(JSON.stringify(payload)), 1024 * 1024) // 1MB assert.deepEqual(payload['debugger.snapshot'].captures, { lines: { diff --git a/integration-tests/debugger/snapshot.spec.js b/integration-tests/debugger/snapshot.spec.js index e2f9d9eb047..68b42c97d35 100644 --- a/integration-tests/debugger/snapshot.spec.js +++ b/integration-tests/debugger/snapshot.spec.js @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () { beforeEach(t.triggerBreakpoint) it('should capture a snapshot', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { assert.deepEqual(Object.keys(captures), ['lines']) assert.deepEqual(Object.keys(captures.lines), [String(t.breakpoint.line)]) @@ -114,7 +114,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxReferenceDepth', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] delete locals.request delete locals.fastify @@ -150,7 +150,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxLength', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(locals.lstr, { @@ -167,7 +167,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxCollectionSize', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(locals.arr, { @@ -205,7 +205,7 @@ describe('Dynamic Instrumentation', function () { } } - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(Object.keys(locals), [ diff --git a/integration-tests/debugger/utils.js b/integration-tests/debugger/utils.js index 4f215723816..9f5175d84fc 100644 --- a/integration-tests/debugger/utils.js +++ b/integration-tests/debugger/utils.js @@ -50,9 +50,11 @@ function setup ({ env, testApp } = {}) { function triggerBreakpoint (url) { // Trigger the breakpoint once probe is successfully installed t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.axios.get(url) - } + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.axios.get(url) + } + }) }) } diff --git a/packages/dd-trace/src/debugger/devtools_client/config.js b/packages/dd-trace/src/debugger/devtools_client/config.js index 7783bc84d75..950d8938872 100644 --- a/packages/dd-trace/src/debugger/devtools_client/config.js +++ b/packages/dd-trace/src/debugger/devtools_client/config.js @@ -9,7 +9,8 @@ const config = module.exports = { service: parentConfig.service, commitSHA: parentConfig.commitSHA, repositoryUrl: parentConfig.repositoryUrl, - parentThreadId + parentThreadId, + maxTotalPayloadSize: 5 * 1024 * 1024 // 5MB } updateUrl(parentConfig) diff --git a/packages/dd-trace/src/debugger/devtools_client/index.js b/packages/dd-trace/src/debugger/devtools_client/index.js index df158b7d2da..89c96db18c6 100644 --- a/packages/dd-trace/src/debugger/devtools_client/index.js +++ b/packages/dd-trace/src/debugger/devtools_client/index.js @@ -129,9 +129,8 @@ session.on('Debugger.paused', async ({ params }) => { } // TODO: Process template (DEBUG-2628) - send(probe.template, logger, dd, snapshot, (err) => { - if (err) log.error('Debugger error', err) - else ackEmitting(probe) + send(probe.template, logger, dd, snapshot, () => { + ackEmitting(probe) }) } }) diff --git a/packages/dd-trace/src/debugger/devtools_client/json-buffer.js b/packages/dd-trace/src/debugger/devtools_client/json-buffer.js new file mode 100644 index 00000000000..5010aafac3d --- /dev/null +++ b/packages/dd-trace/src/debugger/devtools_client/json-buffer.js @@ -0,0 +1,36 @@ +'use strict' + +class JSONBuffer { + constructor ({ size, timeout, onFlush }) { + this._maxSize = size + this._timeout = timeout + this._onFlush = onFlush + this._reset() + } + + _reset () { + clearTimeout(this._timer) + this._timer = null + this._partialJson = null + } + + _flush () { + const json = `${this._partialJson}]` + this._reset() + this._onFlush(json) + } + + write (str, size = Buffer.byteLength(str)) { + if (this._timer === null) { + this._partialJson = `[${str}` + this._timer = setTimeout(() => this._flush(), this._timeout) + } else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) { + this._flush() + this.write(str, size) + } else { + this._partialJson += `,${str}` + } + } +} + +module.exports = JSONBuffer diff --git a/packages/dd-trace/src/debugger/devtools_client/send.js b/packages/dd-trace/src/debugger/devtools_client/send.js index 375afd7d47a..12d9b8cad84 100644 --- a/packages/dd-trace/src/debugger/devtools_client/send.js +++ b/packages/dd-trace/src/debugger/devtools_client/send.js @@ -4,13 +4,15 @@ const { hostname: getHostname } = require('os') const { stringify } = require('querystring') const config = require('./config') +const JSONBuffer = require('./json-buffer') const request = require('../../exporters/common/request') const { GIT_COMMIT_SHA, GIT_REPOSITORY_URL } = require('../../plugins/util/tags') +const log = require('../../log') const { version } = require('../../../../../package.json') module.exports = send -const MAX_PAYLOAD_SIZE = 1024 * 1024 // 1MB +const MAX_LOG_PAYLOAD_SIZE = 1024 * 1024 // 1MB const ddsource = 'dd_debugger' const hostname = getHostname() @@ -27,14 +29,10 @@ const ddtags = [ const path = `/debugger/v1/input?${stringify({ ddtags })}` -function send (message, logger, dd, snapshot, cb) { - const opts = { - method: 'POST', - url: config.url, - path, - headers: { 'Content-Type': 'application/json; charset=utf-8' } - } +let callbacks = [] +const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush }) +function send (message, logger, dd, snapshot, cb) { const payload = { ddsource, hostname, @@ -46,8 +44,9 @@ function send (message, logger, dd, snapshot, cb) { } let json = JSON.stringify(payload) + let size = Buffer.byteLength(json) - if (Buffer.byteLength(json) > MAX_PAYLOAD_SIZE) { + if (size > MAX_LOG_PAYLOAD_SIZE) { // TODO: This is a very crude way to handle large payloads. Proper pruning will be implemented later (DEBUG-2624) const line = Object.values(payload['debugger.snapshot'].captures.lines)[0] line.locals = { @@ -55,7 +54,26 @@ function send (message, logger, dd, snapshot, cb) { size: Object.keys(line.locals).length } json = JSON.stringify(payload) + size = Buffer.byteLength(json) + } + + jsonBuffer.write(json, size) + callbacks.push(cb) +} + +function onFlush (payload) { + const opts = { + method: 'POST', + url: config.url, + path, + headers: { 'Content-Type': 'application/json; charset=utf-8' } } - request(json, opts, cb) + const _callbacks = callbacks + callbacks = [] + + request(payload, opts, (err) => { + if (err) log.error('Could not send debugger payload', err) + else _callbacks.forEach(cb => cb()) + }) } diff --git a/packages/dd-trace/src/debugger/devtools_client/status.js b/packages/dd-trace/src/debugger/devtools_client/status.js index b228d7e50b7..7a7db799e53 100644 --- a/packages/dd-trace/src/debugger/devtools_client/status.js +++ b/packages/dd-trace/src/debugger/devtools_client/status.js @@ -2,6 +2,7 @@ const LRUCache = require('lru-cache') const config = require('./config') +const JSONBuffer = require('./json-buffer') const request = require('../../exporters/common/request') const FormData = require('../../exporters/common/form-data') const log = require('../../log') @@ -25,6 +26,8 @@ const cache = new LRUCache({ ttlAutopurge: true }) +const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush }) + const STATUSES = { RECEIVED: 'RECEIVED', INSTALLED: 'INSTALLED', @@ -71,11 +74,15 @@ function ackError (err, { id: probeId, version }) { } function send (payload) { + jsonBuffer.write(JSON.stringify(payload)) +} + +function onFlush (payload) { const form = new FormData() form.append( 'event', - JSON.stringify(payload), + payload, { filename: 'event.json', contentType: 'application/json; charset=utf-8' } ) @@ -87,7 +94,7 @@ function send (payload) { } request(form, options, (err) => { - if (err) log.error('[debugger:devtools_client] Error sending debugger payload', err) + if (err) log.error('[debugger:devtools_client] Error sending probe payload', err) }) } diff --git a/packages/dd-trace/test/debugger/devtools_client/json-buffer.spec.js b/packages/dd-trace/test/debugger/devtools_client/json-buffer.spec.js new file mode 100644 index 00000000000..34312f808dd --- /dev/null +++ b/packages/dd-trace/test/debugger/devtools_client/json-buffer.spec.js @@ -0,0 +1,45 @@ +'use strict' + +require('../../setup/mocha') + +const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer') + +const MAX_SAFE_SIGNED_INTEGER = 2 ** 31 - 1 + +describe('JSONBuffer', () => { + it('should call onFlush with the expected payload when the timeout is reached', function (done) { + const onFlush = (json) => { + const diff = Date.now() - start + expect(json).to.equal('[{"message":1},{"message":2},{"message":3}]') + expect(diff).to.be.within(95, 110) + done() + } + + const jsonBuffer = new JSONBuffer({ size: Infinity, timeout: 100, onFlush }) + + const start = Date.now() + jsonBuffer.write(JSON.stringify({ message: 1 })) + jsonBuffer.write(JSON.stringify({ message: 2 })) + jsonBuffer.write(JSON.stringify({ message: 3 })) + }) + + it('should call onFlush with the expected payload when the size is reached', function (done) { + const expectedPayloads = [ + '[{"message":1},{"message":2}]', + '[{"message":3},{"message":4}]' + ] + + const onFlush = (json) => { + expect(json).to.equal(expectedPayloads.shift()) + if (expectedPayloads.length === 0) done() + } + + const jsonBuffer = new JSONBuffer({ size: 30, timeout: MAX_SAFE_SIGNED_INTEGER, onFlush }) + + jsonBuffer.write(JSON.stringify({ message: 1 })) // size: 15 + jsonBuffer.write(JSON.stringify({ message: 2 })) // size: 29 + jsonBuffer.write(JSON.stringify({ message: 3 })) // size: 15 (flushed, and re-added) + jsonBuffer.write(JSON.stringify({ message: 4 })) // size: 29 + jsonBuffer.write(JSON.stringify({ message: 5 })) // size: 15 (flushed, and re-added) + }) +}) diff --git a/packages/dd-trace/test/debugger/devtools_client/send.spec.js b/packages/dd-trace/test/debugger/devtools_client/send.spec.js new file mode 100644 index 00000000000..ea4551d8ff6 --- /dev/null +++ b/packages/dd-trace/test/debugger/devtools_client/send.spec.js @@ -0,0 +1,111 @@ +'use strict' + +require('../../setup/mocha') + +const { hostname: getHostname } = require('os') +const { expectWithin, getRequestOptions } = require('./utils') +const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer') +const { version } = require('../../../../../package.json') + +process.env.DD_ENV = 'my-env' +process.env.DD_VERSION = 'my-version' +const service = 'my-service' +const commitSHA = 'my-commit-sha' +const repositoryUrl = 'my-repository-url' +const url = 'my-url' +const ddsource = 'dd_debugger' +const hostname = getHostname() +const message = { message: true } +const logger = { logger: true } +const dd = { dd: true } +const snapshot = { snapshot: true } + +describe('input message http requests', function () { + let send, request, jsonBuffer + + beforeEach(function () { + request = sinon.spy() + request['@noCallThru'] = true + + class JSONBufferSpy extends JSONBuffer { + constructor (...args) { + super(...args) + jsonBuffer = this + sinon.spy(this, 'write') + } + } + + send = proxyquire('../src/debugger/devtools_client/send', { + './config': { service, commitSHA, repositoryUrl, url, '@noCallThru': true }, + './json-buffer': JSONBufferSpy, + '../../exporters/common/request': request + }) + }) + + it('should buffer instead of calling request directly', function () { + const callback = sinon.spy() + + send(message, logger, dd, snapshot, callback) + expect(request).to.not.have.been.called + expect(jsonBuffer.write).to.have.been.calledOnceWith( + JSON.stringify(getPayload()) + ) + expect(callback).to.not.have.been.called + }) + + it('should call request with the expected payload once the buffer is flushed', function (done) { + const callback1 = sinon.spy() + const callback2 = sinon.spy() + const callback3 = sinon.spy() + + send({ message: 1 }, logger, dd, snapshot, callback1) + send({ message: 2 }, logger, dd, snapshot, callback2) + send({ message: 3 }, logger, dd, snapshot, callback3) + expect(request).to.not.have.been.called + + expectWithin(1200, () => { + expect(request).to.have.been.calledOnceWith(JSON.stringify([ + getPayload({ message: 1 }), + getPayload({ message: 2 }), + getPayload({ message: 3 }) + ])) + + const opts = getRequestOptions(request) + expect(opts).to.have.property('method', 'POST') + expect(opts).to.have.property( + 'path', + '/debugger/v1/input?ddtags=' + + `env%3A${process.env.DD_ENV}%2C` + + `version%3A${process.env.DD_VERSION}%2C` + + `debugger_version%3A${version}%2C` + + `host_name%3A${hostname}%2C` + + `git.commit.sha%3A${commitSHA}%2C` + + `git.repository_url%3A${repositoryUrl}` + ) + + expect(callback1).to.not.have.been.calledOnce + expect(callback2).to.not.have.been.calledOnce + expect(callback3).to.not.have.been.calledOnce + + request.firstCall.callback() + + expect(callback1).to.have.been.calledOnce + expect(callback2).to.have.been.calledOnce + expect(callback3).to.have.been.calledOnce + + done() + }) + }) +}) + +function getPayload (_message = message) { + return { + ddsource, + hostname, + service, + message: _message, + logger, + dd, + 'debugger.snapshot': snapshot + } +} diff --git a/packages/dd-trace/test/debugger/devtools_client/status.spec.js b/packages/dd-trace/test/debugger/devtools_client/status.spec.js index 365d86d6e96..88edde917e3 100644 --- a/packages/dd-trace/test/debugger/devtools_client/status.spec.js +++ b/packages/dd-trace/test/debugger/devtools_client/status.spec.js @@ -2,12 +2,15 @@ require('../../setup/mocha') +const { expectWithin, getRequestOptions } = require('./utils') +const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer') + const ddsource = 'dd_debugger' const service = 'my-service' const runtimeId = 'my-runtime-id' -describe('diagnostic message http request caching', function () { - let statusproxy, request +describe('diagnostic message http requests', function () { + let statusproxy, request, jsonBuffer const acks = [ ['ackReceived', 'RECEIVED'], @@ -20,8 +23,17 @@ describe('diagnostic message http request caching', function () { request = sinon.spy() request['@noCallThru'] = true + class JSONBufferSpy extends JSONBuffer { + constructor (...args) { + super(...args) + jsonBuffer = this + sinon.spy(this, 'write') + } + } + statusproxy = proxyquire('../src/debugger/devtools_client/status', { './config': { service, runtimeId, '@noCallThru': true }, + './json-buffer': JSONBufferSpy, '../../exporters/common/request': request }) }) @@ -45,54 +57,85 @@ describe('diagnostic message http request caching', function () { } }) - it('should only call once if no change', function () { + it('should buffer instead of calling request directly', function () { + ackFn({ id: 'foo', version: 0 }) + expect(request).to.not.have.been.called + expect(jsonBuffer.write).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) + }) + + it('should only add to buffer once if no change', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(jsonBuffer.write).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce + expect(jsonBuffer.write).to.have.been.calledOnce }) - it('should call again if version changes', function () { + it('should add to buffer again if version changes', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(jsonBuffer.write).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'foo', version: 1 }) - expect(request).to.have.been.calledTwice - assertRequestData(request, { probeId: 'foo', version: 1, status, exception }) + expect(jsonBuffer.write).to.have.been.calledTwice + expect(jsonBuffer.write.lastCall).to.have.been.calledWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 1, status, exception })) + ) }) - it('should call again if probeId changes', function () { + it('should add to buffer again if probeId changes', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(jsonBuffer.write).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'bar', version: 0 }) - expect(request).to.have.been.calledTwice - assertRequestData(request, { probeId: 'bar', version: 0, status, exception }) + expect(jsonBuffer.write).to.have.been.calledTwice + expect(jsonBuffer.write.lastCall).to.have.been.calledWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'bar', version: 0, status, exception })) + ) + }) + + it('should call request with the expected payload once the buffer is flushed', function (done) { + ackFn({ id: 'foo', version: 0 }) + ackFn({ id: 'foo', version: 1 }) + ackFn({ id: 'bar', version: 0 }) + expect(request).to.not.have.been.called + + expectWithin(1200, () => { + expect(request).to.have.been.calledOnce + + const payload = getFormPayload(request) + + expect(payload).to.deep.equal([ + formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }), + formatAsDiagnosticsEvent({ probeId: 'foo', version: 1, status, exception }), + formatAsDiagnosticsEvent({ probeId: 'bar', version: 0, status, exception }) + ]) + + const opts = getRequestOptions(request) + expect(opts).to.have.property('method', 'POST') + expect(opts).to.have.property('path', '/debugger/v1/diagnostics') + + done() + }) }) }) } }) -function assertRequestData (request, { probeId, version, status, exception }) { - const payload = getFormPayload(request) +function formatAsDiagnosticsEvent ({ probeId, version, status, exception }) { const diagnostics = { probeId, runtimeId, probeVersion: version, status } // Error requests will also contain an `exception` property if (exception) diagnostics.exception = exception - expect(payload).to.deep.equal({ ddsource, service, debugger: { diagnostics } }) - - const opts = getRequestOptions(request) - expect(opts).to.have.property('method', 'POST') - expect(opts).to.have.property('path', '/debugger/v1/diagnostics') -} - -function getRequestOptions (request) { - return request.lastCall.args[1] + return { ddsource, service, debugger: { diagnostics } } } function getFormPayload (request) { diff --git a/packages/dd-trace/test/debugger/devtools_client/utils.js b/packages/dd-trace/test/debugger/devtools_client/utils.js index e15d567a7c1..2da3216cea1 100644 --- a/packages/dd-trace/test/debugger/devtools_client/utils.js +++ b/packages/dd-trace/test/debugger/devtools_client/utils.js @@ -3,7 +3,21 @@ const { randomUUID } = require('node:crypto') module.exports = { - generateProbeConfig + expectWithin, + generateProbeConfig, + getRequestOptions +} + +function expectWithin (timeout, fn, start = Date.now(), backoff = 1) { + try { + fn() + } catch (e) { + if (Date.now() - start > timeout) { + throw e + } else { + setTimeout(expectWithin, backoff, timeout, fn, start, backoff < 128 ? backoff * 2 : backoff) + } + } } function generateProbeConfig (breakpoint, overrides = {}) { @@ -23,3 +37,7 @@ function generateProbeConfig (breakpoint, overrides = {}) { ...overrides } } + +function getRequestOptions (request) { + return request.lastCall.args[1] +}