Skip to content

Commit

Permalink
Non-blocking lazy destinations (segmentio#945)
Browse files Browse the repository at this point in the history
This patch changes the architecture around destinations work. Previously AJS would use the waterfall like method when loading destinations (and other plugins) where it'd wait for their loading sequence to finish before it'd start processing events. With this patch the destinations load in "background" and AJS will start processing events the moment critical plugins are ready.

The events will get forwarded to each destination and it can send the event whenever it becomes ready (assuming it does). This way if 4 out of 6 destinations are ready in 0.2s after page load, and the other 2 take 0.8s, then those 4 won't be bottlenecked by the slower ones.
  • Loading branch information
zikaari authored Mar 4, 2024
1 parent 95fd2fd commit d212633
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 69 deletions.
7 changes: 7 additions & 0 deletions .changeset/spicy-pets-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@segment/analytics-next': minor
'@segment/analytics-core': minor
'@segment/analytics-generic-utils': minor
---

Load destinations lazily and start sending events as each becomes available instead of waiting for all to load first
2 changes: 1 addition & 1 deletion packages/browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"size-limit": [
{
"path": "dist/umd/index.js",
"limit": "29.5 KB"
"limit": "29.6 KB"
}
],
"dependencies": {
Expand Down
5 changes: 4 additions & 1 deletion packages/browser/qa/lib/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ export async function run(params: ComparisonParams) {
await page.goto(url)

await page.waitForLoadState('networkidle')
await page.waitForFunction(`window.analytics.initialized === true`)
await page.waitForFunction(
`window.analytics.initialized === true`,
undefined
)

// This forces every timestamp to look exactly the same.
// Moving this prototype manipulation after networkidle fixed a race condition around Object.freeze that interfered with certain scripts.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { sleep } from '@segment/analytics-core'
import { CorePlugin, PluginType, sleep } from '@segment/analytics-core'
import { getBufferedPageCtxFixture } from '../../test-helpers/fixtures'
import unfetch from 'unfetch'
import { AnalyticsBrowser } from '..'
import { Analytics } from '../../core/analytics'
import { createSuccess } from '../../test-helpers/factories'
import { createDeferred } from '@segment/analytics-generic-utils'
import { PluginFactory } from '../../plugins/remote-loader'

const nextTickP = () => new Promise((r) => setTimeout(r, 0))

jest.mock('unfetch')

Expand Down Expand Up @@ -48,3 +52,139 @@ describe('Lazy initialization', () => {
)
})
})

const createTestPluginFactory = (name: string, type: PluginType) => {
const lock = createDeferred<void>()
const load = createDeferred<void>()
const trackSpy = jest.fn()

const factory: PluginFactory = () => {
return {
name,
type,
version: '1.0.0',
load: jest
.fn()
.mockImplementation(() => lock.promise.then(() => load.resolve())),
isLoaded: () => lock.isSettled(),
track: trackSpy,
}
}

factory.pluginName = name

return {
loadingGuard: lock,
trackSpy,
factory,
loadPromise: load.promise,
}
}

describe('Lazy destination loading', () => {
beforeEach(() => {
jest.mock('unfetch')
jest.mocked(unfetch).mockImplementation(() =>
createSuccess({
integrations: {},
remotePlugins: [
{
name: 'braze',
libraryName: 'braze',
},
{
name: 'google',
libraryName: 'google',
},
],
})
)
})

afterAll(() => jest.resetAllMocks())

it('loads destinations in the background', async () => {
const testEnrichmentHarness = createTestPluginFactory(
'enrichIt',
'enrichment'
)
const dest1Harness = createTestPluginFactory('braze', 'destination')
const dest2Harness = createTestPluginFactory('google', 'destination')

const analytics = new AnalyticsBrowser()

const testEnrichmentPlugin = testEnrichmentHarness.factory(
null
) as CorePlugin

analytics.register(testEnrichmentPlugin).catch(() => {})

await analytics.load({
writeKey: 'abc',
plugins: [dest1Harness.factory, dest2Harness.factory],
})

// we won't hold enrichment plugin from loading since they are not lazy loaded
testEnrichmentHarness.loadingGuard.resolve()
// and we'll also let one destination load so we can assert some behaviours
dest1Harness.loadingGuard.resolve()

await testEnrichmentHarness.loadPromise
await dest1Harness.loadPromise

analytics.track('test event 1').catch(() => {})

// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless

await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(1)

await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(1)

// now we'll send another event

analytics.track('test event 2').catch(() => {})

// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless

await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(2)

await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(2)

// this whole time the other destination was not engaged with at all
expect(dest2Harness.trackSpy).not.toHaveBeenCalled()

// now "after some time" the other destination will load
dest2Harness.loadingGuard.resolve()
await dest2Harness.loadPromise

// and now that it is "online" - the previous events that it missed will be handed over
await nextTickP()
expect(dest2Harness.trackSpy).toHaveBeenCalledTimes(2)
})

it('emits initialize regardless of whether all destinations have loaded', async () => {
const dest1Harness = createTestPluginFactory('braze', 'destination')
const dest2Harness = createTestPluginFactory('google', 'destination')

const analytics = new AnalyticsBrowser()

let initializeEmitted = false

analytics.on('initialize', () => {
initializeEmitted = true
})

await analytics.load({
writeKey: 'abc',
plugins: [dest1Harness.factory, dest2Harness.factory],
})

expect(initializeEmitted).toBe(true)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jest.mock('../../core/analytics', () => ({
addSourceMiddleware,
register,
emit: jest.fn(),
ready: () => Promise.resolve(),
on,
queue: new EventQueue(new PersistedPriorityQueue(1, 'event-queue') as any),
options,
Expand Down
23 changes: 13 additions & 10 deletions packages/browser/src/browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ async function registerPlugins(
writeKey: string,
legacySettings: LegacySettings,
analytics: Analytics,
opts: InitOptions,
options: InitOptions,
pluginLikes: (Plugin | PluginFactory)[] = [],
legacyIntegrationSources: ClassicIntegrationSource[]
Expand Down Expand Up @@ -222,7 +221,7 @@ async function registerPlugins(
writeKey,
legacySettings,
analytics.integrations,
opts,
options,
tsubMiddleware,
legacyIntegrationSources
)
Expand All @@ -237,11 +236,11 @@ async function registerPlugins(
})
}

const schemaFilter = opts.plan?.track
const schemaFilter = options.plan?.track
? await import(
/* webpackChunkName: "schemaFilter" */ '../plugins/schema-filter'
).then((mod) => {
return mod.schemaFilter(opts.plan?.track, legacySettings)
return mod.schemaFilter(options.plan?.track, legacySettings)
})
: undefined

Expand All @@ -250,7 +249,7 @@ async function registerPlugins(
legacySettings,
analytics.integrations,
mergedSettings,
options.obfuscate,
options,
tsubMiddleware,
pluginSources
).catch(() => [])
Expand All @@ -268,8 +267,9 @@ async function registerPlugins(
}

const shouldIgnoreSegmentio =
(opts.integrations?.All === false && !opts.integrations['Segment.io']) ||
(opts.integrations && opts.integrations['Segment.io'] === false)
(options.integrations?.All === false &&
!options.integrations['Segment.io']) ||
(options.integrations && options.integrations['Segment.io'] === false)

if (!shouldIgnoreSegmentio) {
toRegister.push(
Expand Down Expand Up @@ -345,8 +345,12 @@ async function loadAnalytics(
const retryQueue: boolean =
legacySettings.integrations['Segment.io']?.retryQueue ?? true

const opts: InitOptions = { retryQueue, ...options }
const analytics = new Analytics(settings, opts)
options = {
retryQueue,
...options,
}

const analytics = new Analytics(settings, options)

attachInspector(analytics)

Expand All @@ -362,7 +366,6 @@ async function loadAnalytics(
settings.writeKey,
legacySettings,
analytics,
opts,
options,
plugins,
classicIntegrations
Expand Down
35 changes: 19 additions & 16 deletions packages/browser/src/plugins/ajs-destination/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
isInstallableIntegration,
} from './utils'
import { recordIntegrationMetric } from '../../core/stats/metric-helpers'
import { createDeferred } from '@segment/analytics-generic-utils'

export type ClassType<T> = new (...args: unknown[]) => T

Expand Down Expand Up @@ -72,10 +73,10 @@ export class LegacyDestination implements DestinationPlugin {
type: Plugin['type'] = 'destination'
middleware: DestinationMiddlewareFunction[] = []

private _ready = false
private _initialized = false
private _ready: boolean | undefined
private _initialized: boolean | undefined
private onReady: Promise<unknown> | undefined
private onInitialize: Promise<unknown> | undefined
private initializePromise = createDeferred<boolean>()
private disableAutoISOConversion: boolean

integrationSource?: ClassicIntegrationSource
Expand Down Expand Up @@ -104,6 +105,11 @@ export class LegacyDestination implements DestinationPlugin {
delete this.settings['type']
}

this.initializePromise.promise.then(
(isInitialized) => (this._initialized = isInitialized),
() => {}
)

this.options = options
this.buffer = options.disableClientPersistence
? new PriorityQueue(4, [])
Expand All @@ -113,11 +119,13 @@ export class LegacyDestination implements DestinationPlugin {
}

isLoaded(): boolean {
return this._ready
return !!this._ready
}

ready(): Promise<unknown> {
return this.onReady ?? Promise.resolve()
return this.initializePromise.promise.then(
() => this.onReady ?? Promise.resolve()
)
}

async load(ctx: Context, analyticsInstance: Analytics): Promise<void> {
Expand Down Expand Up @@ -149,13 +157,8 @@ export class LegacyDestination implements DestinationPlugin {
this.integration!.once('ready', onReadyFn)
})

this.onInitialize = new Promise((resolve) => {
const onInit = (): void => {
this._initialized = true
resolve(true)
}

this.integration!.on('initialize', onInit)
this.integration!.on('initialize', () => {
this.initializePromise.resolve(true)
})

try {
Expand All @@ -172,6 +175,7 @@ export class LegacyDestination implements DestinationPlugin {
type: 'classic',
didError: true,
})
this.initializePromise.resolve(false)
throw error
}
}
Expand Down Expand Up @@ -264,7 +268,7 @@ export class LegacyDestination implements DestinationPlugin {

try {
if (this.integration) {
await this.integration.invoke.call(this.integration, eventType, event)
await this.integration!.invoke.call(this.integration, eventType, event)
}
} catch (err) {
recordIntegrationMetric(ctx, {
Expand All @@ -288,9 +292,8 @@ export class LegacyDestination implements DestinationPlugin {
this.integration.initialize()
}

return this.onInitialize!.then(() => {
return this.send(ctx, Page as ClassType<Page>, 'page')
})
await this.initializePromise.promise
return this.send(ctx, Page as ClassType<Page>, 'page')
}

async identify(ctx: Context): Promise<Context> {
Expand Down
Loading

0 comments on commit d212633

Please sign in to comment.