Skip to content

Commit

Permalink
update to flag poller loads new cohort, cohort updater polls updates
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Jul 3, 2024
1 parent 8072381 commit b5f441f
Show file tree
Hide file tree
Showing 14 changed files with 678 additions and 177 deletions.
18 changes: 11 additions & 7 deletions packages/node/src/local/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { InMemoryFlagConfigCache } from './cache';
import { CohortFetcher } from './cohort/fetcher';
import { CohortPoller } from './cohort/poller';
import { InMemoryCohortStorage } from './cohort/storage';
import { CohortUpdater } from './cohort/updater';
import { FlagConfigFetcher } from './fetcher';
import { FlagConfigPoller } from './poller';
import { FlagConfigStreamer } from './streamer';
Expand All @@ -55,6 +56,7 @@ export class LocalEvaluationClient {
private readonly updater: FlagConfigUpdater;
private readonly assignmentService: AssignmentService;
private readonly evaluation: EvaluationEngine;
private readonly cohortUpdater: CohortUpdater;

/**
* Directly access the client's flag config cache.
Expand Down Expand Up @@ -86,29 +88,30 @@ export class LocalEvaluationClient {
this.logger = new ConsoleLogger(this.config.debug);

this.cohortStorage = new InMemoryCohortStorage();
let cohortUpdater = undefined;
let cohortFetcher = undefined;
if (this.config.cohortConfig) {
const cohortFetcher = new CohortFetcher(
cohortFetcher = new CohortFetcher(
this.config.cohortConfig.apiKey,
this.config.cohortConfig.secretKey,
httpClient,
this.config.cohortConfig?.cohortServerUrl,
this.config.cohortConfig?.maxCohortSize,
this.config.debug,
);
const cohortPoller = new CohortPoller(
new CohortPoller(
cohortFetcher,
this.cohortStorage,
this.config.cohortConfig?.maxCohortSize,
60000, // this.config.cohortConfig?.cohortPollingIntervalMillis,
this.config.debug,
);
cohortUpdater = cohortPoller;
}

const flagsPoller = new FlagConfigPoller(
fetcher,
this.cache,
this.cohortStorage,
cohortFetcher,
this.config.flagConfigPollingIntervalMillis,
cohortUpdater,
this.config.debug,
);
this.updater = this.config.streamUpdates
Expand All @@ -123,7 +126,8 @@ export class LocalEvaluationClient {
STREAM_RETRY_DELAY_MILLIS +
Math.floor(Math.random() * STREAM_RETRY_JITTER_MAX_MILLIS),
this.config.streamServerUrl,
cohortUpdater,
this.cohortStorage,
cohortFetcher,
this.config.debug,
)
: flagsPoller;
Expand Down
2 changes: 2 additions & 0 deletions packages/node/src/local/cohort/cohort-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export type GetCohortOptions = {
lastModified?: number;
timeoutMillis?: number;
};

export interface CohortApi {
/**
* Calls /sdk/v1/cohort/<cohortId> with query params maxCohortSize and lastModified if specified.
Expand All @@ -21,6 +22,7 @@ export interface CohortApi {
*/
getCohort(options?: GetCohortOptions): Promise<Cohort>;
}

export class SdkCohortApi implements CohortApi {
private readonly cohortApiKey;
private readonly serverUrl;
Expand Down
109 changes: 100 additions & 9 deletions packages/node/src/local/cohort/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,135 @@ import { WrapperClient } from 'src/transport/http';
import { Cohort } from 'src/types/cohort';
import { CohortConfigDefaults } from 'src/types/config';
import { HttpClient } from 'src/types/transport';
import { BackoffPolicy, doWithBackoffFailLoudly } from 'src/util/backoff';
import { Mutex, Executor } from 'src/util/mutex';

import { version as PACKAGE_VERSION } from '../../../gen/version';

import { SdkCohortApi } from './cohort-api';

const COHORT_CONFIG_TIMEOUT = 20000;

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 3,
min: 1000,
max: 1000,
scalar: 1,
};

export class CohortFetcher {
readonly cohortApi: SdkCohortApi;
readonly maxCohortSize: number;
readonly debug: boolean;

private readonly inProgressCohorts: Record<
string,
Promise<Cohort | undefined>
> = {};
private readonly mutex: Mutex = new Mutex();
private readonly executor: Executor = new Executor(4);

constructor(
apiKey: string,
secretKey: string,
httpClient: HttpClient,
serverUrl = CohortConfigDefaults.cohortServerUrl,
maxCohortSize = CohortConfigDefaults.maxCohortSize,
debug = false,
) {
this.cohortApi = new SdkCohortApi(
Buffer.from(apiKey + ':' + secretKey).toString('base64'),
serverUrl,
new WrapperClient(httpClient),
);
this.maxCohortSize = maxCohortSize;
this.debug = debug;
}

async fetch(
cohortId: string,
maxCohortSize: number,
lastModified?: number,
): Promise<Cohort | undefined> {
return this.cohortApi.getCohort({
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
cohortId: cohortId,
maxCohortSize: maxCohortSize,
lastModified: lastModified,
timeoutMillis: COHORT_CONFIG_TIMEOUT,
});
// This block may have async and awaits. No guarantee that executions are not interleaved.
// TODO: Add download concurrency limit.
const unlock = await this.mutex.lock();

if (!this.inProgressCohorts[cohortId]) {
this.inProgressCohorts[cohortId] = this.executor.run(async () => {
console.log('Start downloading', cohortId);
const cohort = await doWithBackoffFailLoudly<Cohort>(
async () =>
this.cohortApi.getCohort({
libraryName: 'experiment-node-server',
libraryVersion: PACKAGE_VERSION,
cohortId: cohortId,
maxCohortSize: this.maxCohortSize,
lastModified: lastModified,
timeoutMillis: COHORT_CONFIG_TIMEOUT,
}),
BACKOFF_POLICY,
)
.then(async (cohort) => {
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[cohortId];
unlock();
return cohort;
})
.catch(async (err) => {
const unlock = await this.mutex.lock();
delete this.inProgressCohorts[cohortId];
unlock();
throw err;
});
console.log('Stop downloading', cohortId, cohort['cohortId']);
return cohort;
});
}

unlock();
return this.inProgressCohorts[cohortId];
}

// queueMutex = new Mutex();
// queue = [];
// running = 0;

// private startNextTask() {
// const unlock = this.queueMutex.lock();
// if (this.running >= 10) {
// unlock();
// return;
// }

// const nextTask = this.queue[0];
// delete this.queue[0];

// this.running++;
// new Promise((resolve, reject) => {
// nextTask()
// .then((v) => {
// const unlock = this.queueMutex.lock();
// this.running--;
// unlock();
// this.startNextTask();
// return v;
// })
// .catch((err) => {
// const unlock = this.queueMutex.lock();
// this.running--;
// unlock();
// this.startNextTask();
// throw err;
// });
// });

// unlock();
// }

// private queueTask<T>(task: () => Promise<T>): Promise<T> {
// const unlock = this.queueMutex.lock();
// this.queue.push(task);
// unlock();
// this.startNextTask();
// }
}
102 changes: 61 additions & 41 deletions packages/node/src/local/cohort/poller.ts
Original file line number Diff line number Diff line change
@@ -1,82 +1,102 @@
import { Cohort, CohortStorage } from 'src/types/cohort';
import { CohortConfigDefaults } from 'src/types/config';
import { BackoffPolicy, doWithBackoffFailLoudly } from 'src/util/backoff';
import { CohortStorage } from 'src/types/cohort';

import { ConsoleLogger } from '../../util/logger';
import { Logger } from '../../util/logger';

import { CohortFetcher } from './fetcher';
import { CohortUpdater } from './updater';

const BACKOFF_POLICY: BackoffPolicy = {
attempts: 3,
min: 1000,
max: 1000,
scalar: 1,
};

export class CohortPoller implements CohortUpdater {
private readonly logger: Logger;

public readonly fetcher: CohortFetcher;
public readonly storage: CohortStorage;
private readonly maxCohortSize: number;

private poller: NodeJS.Timeout;
private pollingIntervalMillis: number;

constructor(
fetcher: CohortFetcher,
storage: CohortStorage,
maxCohortSize = CohortConfigDefaults.maxCohortSize,
pollingIntervalMillis = 60,
debug = false,
) {
this.fetcher = fetcher;
this.storage = storage;
this.maxCohortSize = maxCohortSize;
this.pollingIntervalMillis = pollingIntervalMillis;
this.logger = new ConsoleLogger(debug);
}
/**
* You must call this function to begin polling for flag config updates.
* The promise returned by this function is resolved when the initial call
* to fetch the flag configuration completes.
*
* Calling this function while the poller is already running does nothing.
*/
public async start(
onChange?: (storage: CohortStorage) => Promise<void>,
): Promise<void> {
if (!this.poller) {
this.logger.debug('[Experiment] cohort poller - start');
this.poller = setInterval(async () => {
try {
await this.update(onChange);
} catch (e) {
this.logger.debug('[Experiment] flag config update failed', e);
}
}, this.pollingIntervalMillis);
}
}

/**
* Stop polling for flag configurations.
*
* Calling this function while the poller is not running will do nothing.
*/
public stop(): void {
if (this.poller) {
this.logger.debug('[Experiment] cohort poller - stop');
clearTimeout(this.poller);
this.poller = undefined;
}
}

public async update(
cohortIds: Set<string>,
onChange?: (storage: CohortStorage) => Promise<void>,
): Promise<void> {
let changed = false;
const updatedCohorts: Record<string, Cohort> = {};
for (const cohortId of cohortIds) {
const promises = [];

for (const cohortId of this.storage.getAllCohortIds()) {
this.logger.debug(`[Experiment] updating cohort ${cohortId}`);

// Get existing cohort and lastModified.
const existingCohort = this.storage.getCohort(cohortId);
let lastModified = undefined;
if (existingCohort) {
lastModified = existingCohort.lastModified;
updatedCohorts[cohortId] = existingCohort;
}

// Download.
let cohort = undefined;
try {
cohort = await doWithBackoffFailLoudly<Cohort>(async () => {
return await this.fetcher.fetch(
cohortId,
this.maxCohortSize,
lastModified,
);
}, BACKOFF_POLICY);
} catch (e) {
this.logger.error('[Experiment] cohort poll failed', e);
throw e;
}

// Set.
if (cohort) {
updatedCohorts[cohortId] = cohort;
changed = true;
}
}
if (changed) {
this.storage.replaceAll(updatedCohorts);
this.logger.debug('[Experiment] cohort updated');
promises.push(
this.fetcher
.fetch(cohortId, lastModified)
.then((cohort) => {
// Set.
if (cohort) {
this.storage.put(cohort);
changed = true;
}
})
.catch((err) => {
this.logger.error('[Experiment] cohort poll failed', err);
}),
);
}

await Promise.all(promises);

this.logger.debug(`[Experiment] cohort polled, changed: ${changed}`);

if (onChange && changed) {
await onChange(this.storage);
}
Expand Down
18 changes: 16 additions & 2 deletions packages/node/src/local/cohort/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { Cohort, CohortStorage, USER_GROUP_TYPE } from 'src/types/cohort';
export class InMemoryCohortStorage implements CohortStorage {
store: Record<string, Cohort> = {};

getAllCohortIds(): Set<string> {
return new Set<string>(Object.keys(this.store));
}

getCohort(cohortId: string): Cohort | undefined {
return cohortId in this.store ? this.store[cohortId] : undefined;
}
Expand All @@ -28,8 +32,18 @@ export class InMemoryCohortStorage implements CohortStorage {
return validCohortIds;
}

replaceAll(cohorts: Record<string, Cohort>): void {
put(cohort: Cohort): void {
this.store[cohort.cohortId] = cohort;
}

putAll(cohorts: Record<string, Cohort>): void {
// Assignments are atomic.
this.store = { ...cohorts };
this.store = { ...this.store, ...cohorts };
}

removeAll(cohortIds: Set<string>): void {
cohortIds.forEach((id) => {
delete this.store[id];
});
}
}
Loading

0 comments on commit b5f441f

Please sign in to comment.