diff --git a/packages/node/src/local/cohort/fetcher.ts b/packages/node/src/local/cohort/fetcher.ts index f4958d0..a9d78d2 100644 --- a/packages/node/src/local/cohort/fetcher.ts +++ b/packages/node/src/local/cohort/fetcher.ts @@ -49,15 +49,20 @@ export class CohortFetcher { this.logger = new ConsoleLogger(debug); } + static getKey(cohortId: string, lastModified?: number): string { + return `${cohortId}_${lastModified ? lastModified : ''}`; + } + async fetch( cohortId: string, lastModified?: number, ): Promise { // This block may have async and awaits. No guarantee that executions are not interleaved. const unlock = await this.mutex.lock(); + const key = CohortFetcher.getKey(cohortId, lastModified); - if (!this.inProgressCohorts[cohortId]) { - this.inProgressCohorts[cohortId] = this.executor.run(async () => { + if (!this.inProgressCohorts[key]) { + this.inProgressCohorts[key] = this.executor.run(async () => { this.logger.debug('Start downloading', cohortId); const cohort = await doWithBackoffFailLoudly( async () => @@ -73,22 +78,23 @@ export class CohortFetcher { ) .then(async (cohort) => { const unlock = await this.mutex.lock(); - delete this.inProgressCohorts[cohortId]; + delete this.inProgressCohorts[key]; unlock(); return cohort; }) .catch(async (err) => { const unlock = await this.mutex.lock(); - delete this.inProgressCohorts[cohortId]; + delete this.inProgressCohorts[key]; unlock(); throw err; }); - this.logger.debug('Stop downloading', cohortId, cohort['cohortId']); + this.logger.debug('Stop downloading', cohortId); return cohort; }); } - const cohortPromise = this.inProgressCohorts[cohortId]; + const cohortPromise: Promise = + this.inProgressCohorts[key]; unlock(); return cohortPromise; } diff --git a/packages/node/test/local/benchmark.test.ts b/packages/node/test/local/benchmark.test.ts index 78dfcb5..a2bc881 100644 --- a/packages/node/test/local/benchmark.test.ts +++ b/packages/node/test/local/benchmark.test.ts @@ -1,3 +1,6 @@ +import path from 'path'; + +import * as dotenv from 'dotenv'; import { Experiment } from 'src/factory'; import { ExperimentUser } from 'src/types/user'; @@ -5,7 +8,23 @@ import { measure } from './util/performance'; const apiKey = 'server-Ed2doNl5YOblB5lRavQ9toj02arvHpMj'; -const client = Experiment.initializeLocal(apiKey, { debug: false }); +dotenv.config({ path: path.join(__dirname, '../../', '.env') }); + +if (!process.env['API_KEY'] && !process.env['SECRET_KEY']) { + throw new Error( + 'No env vars found. If running on local, have you created .env file correct environment variables? Checkout README.md', + ); +} + +const cohortConfig = { + apiKey: process.env['API_KEY'], + secretKey: process.env['SECRET_KEY'], +}; + +const client = Experiment.initializeLocal(apiKey, { + debug: false, + cohortConfig: cohortConfig, +}); beforeAll(async () => { await client.start(); diff --git a/packages/node/test/local/cohort/cohortFetcher.test.ts b/packages/node/test/local/cohort/cohortFetcher.test.ts index 14460cf..695fbe1 100644 --- a/packages/node/test/local/cohort/cohortFetcher.test.ts +++ b/packages/node/test/local/cohort/cohortFetcher.test.ts @@ -1,140 +1,214 @@ -import { CohortFetcher } from 'src/local/cohort/fetcher'; +import { SdkCohortApi } from 'src/local/cohort/cohort-api'; +import { COHORT_CONFIG_TIMEOUT, CohortFetcher } from 'src/local/cohort/fetcher'; +import { CohortConfigDefaults } from 'src/types/config'; import { version as PACKAGE_VERSION } from '../../../gen/version'; -import { MockHttpClient } from '../util/mockHttpClient'; - -const C_A = { - cohortId: 'c_a', - groupType: 'a', - groupTypeId: 0, - lastComputed: 0, - lastModified: 0, - size: 2, - memberIds: new Set(['membera1', 'membera2']), // memberIds needs to convert to array before json stringify. -}; -const cohortId = '1'; -const apiKey = 'apple'; -const secretKey = 'banana'; -const serverUrl = 'https://example.com/cohortapi'; -const encodedKey = `Basic ${Buffer.from(`${apiKey}:${secretKey}`).toString( - 'base64', -)}`; -const expectedHeaders = { - Authorization: encodedKey, - 'X-Amp-Exp-Library': `experiment-node-server/${PACKAGE_VERSION}`, +const COHORTS = { + c1: { + cohortId: 'c1', + groupType: 'a', + groupTypeId: 0, + lastComputed: 0, + lastModified: 1, + size: 2, + memberIds: new Set(['membera1', 'membera2']), + }, + c2: { + cohortId: 'c2', + groupType: 'a', + groupTypeId: 0, + lastComputed: 0, + lastModified: 10, + size: 3, + memberIds: new Set(['membera1', 'membera2', 'membera3']), + }, + c3: { + cohortId: 'c3', + groupType: 'a', + groupTypeId: 0, + lastComputed: 0, + lastModified: 10, + size: 3, + memberIds: new Set(['membera1', 'membera2', 'membera3']), + }, }; -test('cohort fetcher success', async () => { - const maxCohortSize = 10; - const httpClient = new MockHttpClient(async (params) => { - expect(params.requestUrl).toBe( - `${serverUrl}/sdk/v1/cohort/${cohortId}?maxCohortSize=${maxCohortSize}`, - ); - expect(params.headers).toStrictEqual(expectedHeaders); - return { - status: 200, - body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }), - }; +afterEach(() => { + jest.clearAllMocks(); +}); + +test('cohort fetch success', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation( + async (options) => COHORTS[options.cohortId], + ); + + const cohortFetcher = new CohortFetcher('', '', null); + + const c1 = await cohortFetcher.fetch('c1'); + expect(c1).toBe(COHORTS['c1']); + + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: undefined, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: CohortConfigDefaults.maxCohortSize, + timeoutMillis: COHORT_CONFIG_TIMEOUT, }); - const fetcher = new CohortFetcher( - apiKey, - secretKey, - httpClient, - serverUrl, - false, +}); + +test('cohort fetch success using maxCohortSize and lastModified', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation( + async (options) => COHORTS[options.cohortId], ); - const cohort = await fetcher.fetch(cohortId, maxCohortSize); - expect(cohort).toStrictEqual(C_A); + + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + + const c1 = await cohortFetcher.fetch('c1', 10); + expect(c1).toBe(COHORTS['c1']); + + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, + }); }); -test('cohort fetcher 413', async () => { - const maxCohortSize = 1; - const httpClient = new MockHttpClient(async (params) => { - expect(params.requestUrl).toBe( - `${serverUrl}/sdk/v1/cohort/${cohortId}?maxCohortSize=${maxCohortSize}`, - ); - expect(params.headers).toStrictEqual(expectedHeaders); - return { status: 413, body: '' }; +test('cohort fetch unchanged returns undefined', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async () => { + return undefined; + }); + + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + + // Make 3 requests at the same time. + const c1 = await cohortFetcher.fetch('c1', 20); + + expect(cohortApiGetCohortSpy).toBeCalledTimes(1); + expect(c1).toBeUndefined(); + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 20, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, }); - const fetcher = new CohortFetcher( - apiKey, - secretKey, - httpClient, - serverUrl, - false, - ); - await expect(fetcher.fetch(cohortId, maxCohortSize)).rejects.toThrow(); }); -test('cohort fetcher no modification 204', async () => { - const maxCohortSize = 10; - const lastModified = 10; - const httpClient = new MockHttpClient(async (params) => { - expect(params.requestUrl).toBe( - `${serverUrl}/sdk/v1/cohort/${cohortId}?maxCohortSize=${maxCohortSize}&lastModified=${lastModified}`, - ); - expect(params.headers).toStrictEqual(expectedHeaders); - return { status: 204, body: '' }; +test('cohort fetch failed', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async () => { + throw Error(); + }); + + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + + await expect(cohortFetcher.fetch('c1', 10)).rejects.toThrowError(); + + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, }); - const fetcher = new CohortFetcher( - apiKey, - secretKey, - httpClient, - serverUrl, - false, - ); - expect( - await fetcher.fetch(cohortId, maxCohortSize, lastModified), - ).toBeUndefined(); }); -test('cohort fetcher no modification but still return cohort due to cache miss', async () => { - const maxCohortSize = 10; - const lastModified = 10; - const httpClient = new MockHttpClient(async (params) => { - expect(params.requestUrl).toBe( - `${serverUrl}/sdk/v1/cohort/${cohortId}?maxCohortSize=${maxCohortSize}&lastModified=${lastModified}`, - ); - expect(params.headers).toStrictEqual(expectedHeaders); - return { - status: 200, - body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }), - }; +test('cohort fetch twice on same cohortId uses same promise and make only one request', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async (options) => { + // Await 2s to allow second fetch call being made. + await new Promise((resolve) => setTimeout(resolve, 2000)); + // Always return a new object. + return { ...COHORTS[options.cohortId] }; + }); + + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + + // Make 2 requests at the same time. + const promise1 = cohortFetcher.fetch('c1', 10); + const promise2 = cohortFetcher.fetch('c1', 10); + + // Cannot do following assertion because the promise returned by an async func may not be exactly the promise being returned. + // https://stackoverflow.com/questions/61354565/does-async-tag-wrap-a-javascript-function-with-a-promise + // expect(promise1 === promise2).toBeTruthy(); + + const c1 = await promise1; + const c1_2 = await promise2; + + // Only made one request. + expect(cohortApiGetCohortSpy).toBeCalledTimes(1); + // The references of objects returned by both are the same. + expect(c1 === c1_2).toBeTruthy(); + // A new object is returned. + expect(c1 !== COHORTS['c1']).toBeTruthy(); + // Contents are the same. + expect(c1).toStrictEqual(COHORTS['c1']); + // Check args. + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, }); - const fetcher = new CohortFetcher( - apiKey, - secretKey, - httpClient, - serverUrl, - false, - ); - expect( - await fetcher.fetch(cohortId, maxCohortSize, lastModified), - ).toStrictEqual(C_A); }); -test('cohort fetcher other errors', async () => { - const maxCohortSize = 10; - const lastModified = 10; - const httpClient = new MockHttpClient(async (params) => { - expect(params.requestUrl).toBe( - `${serverUrl}/sdk/v1/cohort/${cohortId}?maxCohortSize=${maxCohortSize}&lastModified=${lastModified}`, - ); - expect(params.headers).toStrictEqual(expectedHeaders); - return { - status: 500, - body: JSON.stringify({ ...C_A, memberIds: Array.from(C_A.memberIds) }), - }; +test('cohort fetch twice on same cohortId different lastModified makes 2 requests', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async (options) => { + await new Promise((resolve) => setTimeout(resolve, 2000)); + return { ...COHORTS[options.cohortId] }; + }); + + const cohortFetcher = new CohortFetcher('', '', null, 'someurl', 10); + + // Make 3 requests at the same time. + const promise1 = cohortFetcher.fetch('c1', 20); + const promise2 = cohortFetcher.fetch('c1', 10); + const promise3 = cohortFetcher.fetch('c2', 10); + const c1 = await promise1; + const c1_2 = await promise2; + const c2 = await promise3; + + expect(cohortApiGetCohortSpy).toBeCalledTimes(3); + expect(c1 !== c1_2).toBeTruthy(); + expect(c1 !== c2).toBeTruthy(); + expect(c1).toStrictEqual(COHORTS['c1']); + expect(c1_2).toStrictEqual(COHORTS['c1']); + expect(c2).toStrictEqual(COHORTS['c2']); + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 20, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, + }); + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c1', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, + }); + expect(cohortApiGetCohortSpy).toHaveBeenCalledWith({ + cohortId: 'c2', + lastModified: 10, + libraryName: 'experiment-node-server', + libraryVersion: PACKAGE_VERSION, + maxCohortSize: 10, + timeoutMillis: COHORT_CONFIG_TIMEOUT, }); - const fetcher = new CohortFetcher( - apiKey, - secretKey, - httpClient, - serverUrl, - false, - ); - await expect( - fetcher.fetch(cohortId, maxCohortSize, lastModified), - ).rejects.toThrow(); }); diff --git a/packages/node/test/local/cohort/cohortPoller.test.ts b/packages/node/test/local/cohort/cohortPoller.test.ts index 7e6c370..5f4fcfb 100644 --- a/packages/node/test/local/cohort/cohortPoller.test.ts +++ b/packages/node/test/local/cohort/cohortPoller.test.ts @@ -1,9 +1,10 @@ +import { SdkCohortApi } from 'src/local/cohort/cohort-api'; import { CohortFetcher } from 'src/local/cohort/fetcher'; import { CohortPoller } from 'src/local/cohort/poller'; import { InMemoryCohortStorage } from 'src/local/cohort/storage'; -import { CohortConfigDefaults } from 'src/types/config'; +import { CohortStorage } from 'src/types/cohort'; -const COHORTS = { +const OLD_COHORTS = { c1: { cohortId: 'c1', groupType: 'a', @@ -33,269 +34,288 @@ const COHORTS = { }, }; +const NEW_COHORTS = { + c1: { + cohortId: 'c1', + groupType: 'a', + groupTypeId: 0, + lastComputed: 1, + lastModified: 2, + size: 2, + memberIds: new Set(['membera1', 'membera2']), + }, + c2: { + cohortId: 'c2', + groupType: 'a', + groupTypeId: 0, + lastComputed: 0, + lastModified: 20, + size: 3, + memberIds: new Set(['membera1', 'membera2', 'membera3']), + }, + c3: { + cohortId: 'c3', + groupType: 'a', + groupTypeId: 0, + lastComputed: 0, + lastModified: 20, + size: 3, + memberIds: new Set(['membera1', 'membera2', 'membera3']), + }, +}; + +const sleep = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +const POLL_MILLIS = 500; +let storage: CohortStorage; +let fetcher: CohortFetcher; +let poller: CohortPoller; +let storageGetAllCohortIdsSpy: jest.SpyInstance; +let storageGetCohortSpy: jest.SpyInstance; +let storagePutSpy: jest.SpyInstance; + +beforeEach(() => { + storage = new InMemoryCohortStorage(); + fetcher = new CohortFetcher('', '', null); + poller = new CohortPoller(fetcher, storage, POLL_MILLIS); + + storageGetAllCohortIdsSpy = jest.spyOn(storage, 'getAllCohortIds'); + storageGetAllCohortIdsSpy.mockImplementation( + () => new Set(['c1', 'c2']), + ); + storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); + storageGetCohortSpy.mockImplementation( + (cohortId: string) => OLD_COHORTS[cohortId], + ); + storagePutSpy = jest.spyOn(storage, 'put'); +}); + afterEach(() => { + poller.stop(); jest.clearAllMocks(); }); -test('', async () => { - const fetcher = new CohortFetcher('', '', null); +test('CohortPoller update success', async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); fetcherFetchSpy.mockImplementation( - async (cohortId: string) => COHORTS[cohortId], + async (cohortId: string) => NEW_COHORTS[cohortId], ); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); - - const cohortPoller = new CohortPoller(fetcher, storage); - - await cohortPoller.update(new Set(['c1', 'c2'])); - - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c1', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledWith({ - c1: COHORTS['c1'], - c2: COHORTS['c2'], - }); + await poller.update(); + + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + expect(fetcherFetchSpy).toHaveBeenCalledWith( + cohortId, + OLD_COHORTS[cohortId].lastModified, + ); + } + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c1']); + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c2']); }); -test('cohort fetch all failed', async () => { - const fetcher = new CohortFetcher('', '', null); +test("CohortPoller update don't update unchanged cohort", async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation(async () => { - throw Error(); + fetcherFetchSpy.mockImplementation(async (cohortId) => { + if (cohortId === 'c1') { + return NEW_COHORTS['c1']; + } + return undefined; }); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); - - const cohortPoller = new CohortPoller(fetcher, storage); - - await expect( - cohortPoller.update(new Set(['c1', 'c2', 'c3'])), - ).rejects.toThrow(); - - expect(fetcherFetchSpy).toHaveBeenCalled(); - expect(storageGetCohortSpy).toHaveBeenCalled(); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); + await poller.update(); + + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + expect(fetcherFetchSpy).toHaveBeenCalledWith( + cohortId, + OLD_COHORTS[cohortId].lastModified, + ); + } + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c1']); + expect(storagePutSpy).toHaveBeenCalledTimes(1); }); -test('cohort fetch partial failed', async () => { - const fetcher = new CohortFetcher('', '', null); +test("CohortPoller update error don't update cohort", async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation(async (cohortId: string) => { - if (cohortId === 'c3') { - throw Error(); + fetcherFetchSpy.mockImplementation(async (cohortId) => { + if (cohortId === 'c1') { + return NEW_COHORTS['c1']; } - return COHORTS[cohortId]; + throw Error(); }); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); - - const cohortPoller = new CohortPoller(fetcher, storage); - - await expect( - cohortPoller.update(new Set(['c1', 'c2', 'c3'])), - ).rejects.toThrow(); - - expect(fetcherFetchSpy).toHaveBeenCalled(); - expect(storageGetCohortSpy).toHaveBeenCalled(); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); + await poller.update(); + + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + expect(fetcherFetchSpy).toHaveBeenCalledWith( + cohortId, + OLD_COHORTS[cohortId].lastModified, + ); + } + expect(storagePutSpy).toHaveBeenCalledTimes(1); + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c1']); }); -test('cohort fetch no change', async () => { - const fetcher = new CohortFetcher('', '', null); +test('CohortPoller update no lastModified still fetches cohort', async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation(async () => undefined); - - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); + fetcherFetchSpy.mockImplementation(async (cohortId) => NEW_COHORTS[cohortId]); + storageGetCohortSpy.mockImplementation((cohortId: string) => { + const cohort = OLD_COHORTS[cohortId]; + if (cohortId === 'c2') { + delete cohort['lastModified']; + } + return cohort; + }); - const cohortPoller = new CohortPoller(fetcher, storage); + await poller.update(); - await cohortPoller.update(new Set(['c1', 'c2'])); + expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); expect(fetcherFetchSpy).toHaveBeenCalledWith( 'c1', - CohortConfigDefaults.maxCohortSize, - undefined, + OLD_COHORTS['c1'].lastModified, ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); + expect(fetcherFetchSpy).toHaveBeenCalledWith('c2', undefined); + expect(storagePutSpy).toHaveBeenCalledTimes(2); + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c1']); + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS['c2']); }); -test('cohort fetch partial changed', async () => { - const fetcher = new CohortFetcher('', '', null); +test('CohortPoller polls every defined ms', async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation(async (cohortId: string) => { - if (cohortId === 'c1') { - return undefined; - } - return COHORTS[cohortId]; + fetcherFetchSpy.mockImplementation(async (cohortId) => { + return NEW_COHORTS[cohortId]; }); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); - - const cohortPoller = new CohortPoller(fetcher, storage); - - await cohortPoller.update(new Set(['c1', 'c2'])); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c1', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(1); + const pollerUpdateSpy = jest.spyOn(poller, 'update'); + + await poller.start(); + + await sleep(100); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(0); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(0); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(0); + expect(storagePutSpy).toHaveBeenCalledTimes(0); + + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(1); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(2); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(2); + expect(storagePutSpy).toHaveBeenCalledTimes(2); + + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(2); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(4); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(4); + expect(storagePutSpy).toHaveBeenCalledTimes(4); + + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(3); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(6); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(6); + expect(storagePutSpy).toHaveBeenCalledTimes(6); + + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + expect(fetcherFetchSpy).toHaveBeenCalledWith( + cohortId, + OLD_COHORTS[cohortId].lastModified, + ); + } }); -test('cohort fetch using maxCohortSize', async () => { - const fetcher = new CohortFetcher('', '', null); - const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation( - async (cohortId: string) => COHORTS[cohortId], - ); +test('CohortPoller polls takes long time but only makes necessary requests', async () => { + const cohortApiGetCohortSpy = jest.spyOn(SdkCohortApi.prototype, 'getCohort'); + cohortApiGetCohortSpy.mockImplementation(async (options) => { + await new Promise((resolve) => setTimeout(resolve, POLL_MILLIS * 2.25)); + return NEW_COHORTS[options.cohortId]; + }); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); + const pollerUpdateSpy = jest.spyOn(poller, 'update'); - const cohortPoller = new CohortPoller(fetcher, storage, 100); + await poller.start(); - await cohortPoller.update(new Set(['c1', 'c2'])); - expect(fetcherFetchSpy).toHaveBeenCalledWith('c1', 100, undefined); - expect(fetcherFetchSpy).toHaveBeenCalledWith('c2', 100, undefined); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(1); -}); + await sleep(100); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(0); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(0); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(0); -test('cohort fetch using lastModified', async () => { - const fetcher = new CohortFetcher('', '', null); - const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - fetcherFetchSpy.mockImplementation( - async (cohortId: string, maxCohortSize: number, lastModified?) => { - if (lastModified === COHORTS[cohortId].lastModified) { - return undefined; - } - return COHORTS[cohortId]; - }, - ); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(1); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(2); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(2); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(2); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(4); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(2); - const cohortPoller = new CohortPoller(fetcher, storage); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(3); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(6); + expect(cohortApiGetCohortSpy).toHaveBeenCalledTimes(2); - await cohortPoller.update(new Set(['c1', 'c2'])); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c1', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(1); - jest.clearAllMocks(); + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + } + expect(storagePutSpy).toHaveBeenCalledTimes(0); - await cohortPoller.update(new Set(['c1', 'c2'])); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c1', - CohortConfigDefaults.maxCohortSize, - COHORTS['c1'].lastModified, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - COHORTS['c2'].lastModified, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); - jest.clearAllMocks(); + await sleep(POLL_MILLIS / 2); - await cohortPoller.update(new Set(['c1', 'c2', 'c3'])); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c1', - CohortConfigDefaults.maxCohortSize, - COHORTS['c1'].lastModified, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c2', - CohortConfigDefaults.maxCohortSize, - COHORTS['c2'].lastModified, - ); - expect(fetcherFetchSpy).toHaveBeenCalledWith( - 'c3', - CohortConfigDefaults.maxCohortSize, - undefined, - ); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c1'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c2'); - expect(storageGetCohortSpy).toHaveBeenCalledWith('c3'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(1); - jest.clearAllMocks(); + expect(storagePutSpy).toHaveBeenCalledTimes(6); }); -test('cohort fetch fails 2 times success 3rd', async () => { - const fetcher = new CohortFetcher('', '', null); +test('CohortPoller polls every defined ms with failures', async () => { const fetcherFetchSpy = jest.spyOn(fetcher, 'fetch'); - let tries = 0; - fetcherFetchSpy.mockImplementation(async (cohortId: string) => { - if (++tries === 3) { - return COHORTS[cohortId]; - } - throw Error(); - }); - const storage = new InMemoryCohortStorage(); - const storageReplaceAllSpy = jest.spyOn(storage, 'replaceAll'); - const storageGetCohortSpy = jest.spyOn(storage, 'getCohort'); - expect(storageReplaceAllSpy).toHaveBeenCalledTimes(0); + const pollerUpdateSpy = jest.spyOn(poller, 'update'); - const cohortPoller = new CohortPoller(fetcher, storage); + await poller.start(); - const start = new Date().getTime(); - await cohortPoller.update(new Set(['c1'])); - expect(new Date().getTime() - start).toBeGreaterThanOrEqual(2000); + await sleep(100); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(0); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(0); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(0); + expect(storagePutSpy).toHaveBeenCalledTimes(0); - expect(fetcherFetchSpy).toHaveBeenCalledTimes(3); - expect(storageGetCohortSpy).toHaveBeenCalledTimes(1); - expect(storageReplaceAllSpy).toHaveBeenCalledWith({ - c1: COHORTS['c1'], + // Error case. + fetcherFetchSpy.mockImplementation(async () => { + throw Error(); + }); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(1); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(2); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(2); + expect(storagePutSpy).toHaveBeenCalledTimes(0); + + // No update. + fetcherFetchSpy.mockImplementation(async () => { + return undefined; + }); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(2); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(4); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(4); + expect(storagePutSpy).toHaveBeenCalledTimes(0); + + // Success case. + fetcherFetchSpy.mockImplementation(async (cohortId) => { + return NEW_COHORTS[cohortId]; }); + await sleep(POLL_MILLIS); + expect(pollerUpdateSpy).toHaveBeenCalledTimes(3); + expect(storageGetCohortSpy).toHaveBeenCalledTimes(6); + expect(fetcherFetchSpy).toHaveBeenCalledTimes(6); + expect(storagePutSpy).toHaveBeenCalledTimes(2); + + for (const cohortId of storage.getAllCohortIds()) { + expect(storageGetCohortSpy).toHaveBeenCalledWith(cohortId); + expect(fetcherFetchSpy).toHaveBeenCalledWith( + cohortId, + OLD_COHORTS[cohortId].lastModified, + ); + expect(storagePutSpy).toHaveBeenCalledWith(NEW_COHORTS[cohortId]); + } }); diff --git a/packages/node/test/local/flagConfigPoller.test.ts b/packages/node/test/local/flagConfigPoller.test.ts index 2eb7ff7..8821c89 100644 --- a/packages/node/test/local/flagConfigPoller.test.ts +++ b/packages/node/test/local/flagConfigPoller.test.ts @@ -3,8 +3,8 @@ import { FlagConfigPoller, InMemoryFlagConfigCache, } from 'src/index'; +import { SdkCohortApi } from 'src/local/cohort/cohort-api'; import { CohortFetcher } from 'src/local/cohort/fetcher'; -import { CohortPoller } from 'src/local/cohort/poller'; import { InMemoryCohortStorage } from 'src/local/cohort/storage'; import { MockHttpClient } from './util/mockHttpClient'; @@ -153,6 +153,27 @@ const FLAG = [ return acc; }, {}); +const NEW_FLAGS = { + ...FLAG, + flag6: { + key: 'flag6', + segments: [ + { + conditions: [ + [ + { + op: 'set contains any', + selector: ['context', 'user', 'cohort_ids'], + values: ['anewcohortid'], + }, + ], + ], + }, + ], + variants: {}, + }, +}; + afterEach(() => { // Note that if a test failed, and the poller has not stopped, // the test will hang and this won't be called. @@ -168,29 +189,29 @@ test('flagConfig poller success', async () => { new MockHttpClient(async () => ({ status: 200, body: '' })), ), new InMemoryFlagConfigCache(), - 2000, - new CohortPoller( - new CohortFetcher( - 'apikey', - 'secretkey', - new MockHttpClient(async () => ({ status: 200, body: '' })), - ), - cohortStorage, + cohortStorage, + new CohortFetcher( + 'apikey', + 'secretkey', + new MockHttpClient(async () => ({ status: 200, body: '' })), ), + 2000, ); let flagPolled = 0; // Return FLAG for flag polls. jest .spyOn(FlagConfigFetcher.prototype, 'fetch') .mockImplementation(async () => { - return { ...FLAG, flagPolled: { key: flagPolled++ } }; + ++flagPolled; + if (flagPolled == 1) return { ...FLAG, flagPolled: { key: flagPolled } }; + return { ...NEW_FLAGS, flagPolled: { key: flagPolled } }; }); // Return cohort with their own cohortId. jest - .spyOn(CohortFetcher.prototype, 'fetch') - .mockImplementation(async (cohortId) => { + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async (options) => { return { - cohortId: cohortId, + cohortId: options.cohortId, groupType: '', groupTypeId: 0, lastComputed: 0, @@ -204,36 +225,39 @@ test('flagConfig poller success', async () => { expect(flagPolled).toBe(1); expect(await poller.cache.getAll()).toStrictEqual({ ...FLAG, - flagPolled: { key: 0 }, + flagPolled: { key: flagPolled }, }); expect(cohortStorage.getCohort('hahahaha1').cohortId).toBe('hahahaha1'); expect(cohortStorage.getCohort('hahahaha2').cohortId).toBe('hahahaha2'); expect(cohortStorage.getCohort('hahahaha3').cohortId).toBe('hahahaha3'); expect(cohortStorage.getCohort('hahahaha4').cohortId).toBe('hahahaha4'); expect(cohortStorage.getCohort('hahaorgname1').cohortId).toBe('hahaorgname1'); + expect(cohortStorage.getCohort('newcohortid')).toBeUndefined(); expect(cohortStorage.getCohort('hahahaha1').lastModified).toBe(1); expect(cohortStorage.getCohort('hahahaha2').lastModified).toBe(1); expect(cohortStorage.getCohort('hahahaha3').lastModified).toBe(1); expect(cohortStorage.getCohort('hahahaha4').lastModified).toBe(1); expect(cohortStorage.getCohort('hahaorgname1').lastModified).toBe(1); - // On update, flag and cohort should both be updated. + // On update, flag, existing cohort doesn't update. await new Promise((f) => setTimeout(f, 2000)); expect(flagPolled).toBe(2); expect(await poller.cache.getAll()).toStrictEqual({ - ...FLAG, - flagPolled: { key: 1 }, + ...NEW_FLAGS, + flagPolled: { key: flagPolled }, }); expect(cohortStorage.getCohort('hahahaha1').cohortId).toBe('hahahaha1'); expect(cohortStorage.getCohort('hahahaha2').cohortId).toBe('hahahaha2'); expect(cohortStorage.getCohort('hahahaha3').cohortId).toBe('hahahaha3'); expect(cohortStorage.getCohort('hahahaha4').cohortId).toBe('hahahaha4'); expect(cohortStorage.getCohort('hahaorgname1').cohortId).toBe('hahaorgname1'); - expect(cohortStorage.getCohort('hahahaha1').lastModified).toBe(2); - expect(cohortStorage.getCohort('hahahaha2').lastModified).toBe(2); - expect(cohortStorage.getCohort('hahahaha3').lastModified).toBe(2); - expect(cohortStorage.getCohort('hahahaha4').lastModified).toBe(2); - expect(cohortStorage.getCohort('hahaorgname1').lastModified).toBe(2); + expect(cohortStorage.getCohort('anewcohortid').cohortId).toBe('anewcohortid'); + expect(cohortStorage.getCohort('hahahaha1').lastModified).toBe(1); + expect(cohortStorage.getCohort('hahahaha2').lastModified).toBe(1); + expect(cohortStorage.getCohort('hahahaha3').lastModified).toBe(1); + expect(cohortStorage.getCohort('hahahaha4').lastModified).toBe(1); + expect(cohortStorage.getCohort('hahaorgname1').lastModified).toBe(1); + expect(cohortStorage.getCohort('anewcohortid').lastModified).toBe(2); poller.stop(); }); @@ -244,15 +268,13 @@ test('flagConfig poller initial error', async () => { new MockHttpClient(async () => ({ status: 200, body: '' })), ), new InMemoryFlagConfigCache(), - 10, - new CohortPoller( - new CohortFetcher( - 'apikey', - 'secretkey', - new MockHttpClient(async () => ({ status: 200, body: '' })), - ), - new InMemoryCohortStorage(), + new InMemoryCohortStorage(), + new CohortFetcher( + 'apikey', + 'secretkey', + new MockHttpClient(async () => ({ status: 200, body: '' })), ), + 10, ); // Fetch returns FLAG, but cohort fails. jest @@ -260,9 +282,11 @@ test('flagConfig poller initial error', async () => { .mockImplementation(async () => { return FLAG; }); - jest.spyOn(CohortPoller.prototype, 'update').mockImplementation(async () => { - throw new Error(); - }); + jest + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async () => { + throw new Error(); + }); // FLAG should be empty, as cohort failed. Poller should be stopped immediately and test exists cleanly. await poller.start(); expect(await poller.cache.getAll()).toStrictEqual({}); @@ -275,39 +299,50 @@ test('flagConfig poller initial success, polling error and use old flags', async new MockHttpClient(async () => ({ status: 200, body: '' })), ), new InMemoryFlagConfigCache(), - 2000, - new CohortPoller( - new CohortFetcher( - 'apikey', - 'secretkey', - new MockHttpClient(async () => ({ status: 200, body: '' })), - ), - new InMemoryCohortStorage(), + new InMemoryCohortStorage(), + new CohortFetcher( + 'apikey', + 'secretkey', + new MockHttpClient(async () => ({ status: 200, body: '' })), ), + 2000, ); // Only return the flag on first poll, return a different one on future polls where cohort would fail. - let cohortPolled = 0; + let flagPolled = 0; jest .spyOn(FlagConfigFetcher.prototype, 'fetch') .mockImplementation(async () => { - if (cohortPolled === 0) return FLAG; - return {}; + if (++flagPolled === 1) return FLAG; + return NEW_FLAGS; }); // Only success on first poll and fail on all later ones. - jest.spyOn(CohortPoller.prototype, 'update').mockImplementation(async () => { - if (cohortPolled++ === 0) return; - throw new Error(); - }); + jest + .spyOn(SdkCohortApi.prototype, 'getCohort') + .mockImplementation(async (options) => { + if (options.cohortId !== 'anewcohortid') { + return { + cohortId: options.cohortId, + groupType: '', + groupTypeId: 0, + lastComputed: 0, + lastModified: 10, + size: 0, + memberIds: new Set([]), + }; + } + throw new Error(); + }); // First poll should return FLAG. await poller.start(); expect(await poller.cache.getAll()).toStrictEqual(FLAG); - expect(cohortPolled).toBe(1); + expect(flagPolled).toBe(1); - // Second poll should fail. The different flag should not be updated. + // Second poll flags with new cohort should fail when new cohort download failed. + // The different flag should not be updated. await new Promise((f) => setTimeout(f, 2000)); - expect(cohortPolled).toBe(2); + expect(flagPolled).toBeGreaterThanOrEqual(2); expect(await poller.cache.getAll()).toStrictEqual(FLAG); poller.stop(); diff --git a/packages/node/test/local/flagConfigStreamer.test.ts b/packages/node/test/local/flagConfigStreamer.test.ts index 34a2eb7..c008615 100644 --- a/packages/node/test/local/flagConfigStreamer.test.ts +++ b/packages/node/test/local/flagConfigStreamer.test.ts @@ -1,6 +1,8 @@ import assert from 'assert'; import { FlagConfigPoller, InMemoryFlagConfigCache } from 'src/index'; +import { CohortFetcher } from 'src/local/cohort/fetcher'; +import { InMemoryCohortStorage } from 'src/local/cohort/storage'; import { FlagConfigFetcher } from 'src/local/fetcher'; import { FlagConfigStreamer } from 'src/local/streamer'; @@ -17,7 +19,16 @@ const getTestObjs = ({ serverUrl = 'http://localhostxxxx:00000000', debug = false, }) => { - const fetchObj = { fetchCalls: 0, fetcher: undefined }; + const fetchObj = { + fetchCalls: 0, + fetcher: undefined, + cohortStorage: new InMemoryCohortStorage(), + cohortFetcher: new CohortFetcher( + 'apikey', + 'secretkey', + new MockHttpClient(async () => ({ status: 200, body: '' })), + ), + }; let dataI = 0; const data = [ '[{"key": "fetcher-a", "variants": {}, "segments": []}]', @@ -40,8 +51,9 @@ const getTestObjs = ({ new FlagConfigPoller( fetchObj.fetcher, cache, + fetchObj.cohortStorage, + fetchObj.cohortFetcher, pollingIntervalMillis, - null, debug, ), cache, @@ -51,7 +63,8 @@ const getTestObjs = ({ streamFlagTryDelayMillis, streamFlagRetryDelayMillis, serverUrl, - null, + fetchObj.cohortStorage, + fetchObj.cohortFetcher, debug, ); return {