Skip to content

Commit

Permalink
Add tunnel time metric to opt-in server usage report.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed May 21, 2024
1 parent 6204f4f commit add47d3
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 62 deletions.
8 changes: 6 additions & 2 deletions src/shadowbox/infrastructure/prometheus_scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import * as path from 'path';

import * as logging from '../infrastructure/logging';

export interface QueryResultMetric {
[labelValue: string]: string;
}

export interface QueryResultData {
resultType: 'matrix' | 'vector' | 'scalar' | 'string';
result: Array<{
metric: {[labelValue: string]: string};
metric: QueryResultMetric;
value: [number, string];
}>;
}
Expand Down Expand Up @@ -101,7 +105,7 @@ async function spawnPrometheusSubprocess(
prometheusEndpoint: string
): Promise<child_process.ChildProcess> {
logging.info('======== Starting Prometheus ========');
logging.info(`${binaryFilename} ${processArgs.map(a => `"${a}"`).join(' ')}`);
logging.info(`${binaryFilename} ${processArgs.map((a) => `"${a}"`).join(' ')}`);
const runProcess = child_process.spawn(binaryFilename, processArgs);
runProcess.on('error', (error) => {
logging.error(`Error spawning Prometheus: ${error}`);
Expand Down
110 changes: 63 additions & 47 deletions src/shadowbox/server/shared_metrics.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import {
UsageMetrics,
PrometheusUsageMetrics,
} from './shared_metrics';
import {FakePrometheusClient} from './mocks/mocks';
import {PrometheusClient} from '../infrastructure/prometheus_scraper';
import {PrometheusClient, QueryResultData} from '../infrastructure/prometheus_scraper';

describe('OutlineSharedMetricsPublisher', () => {
let clock: ManualClock;
Expand Down Expand Up @@ -93,11 +92,11 @@ describe('OutlineSharedMetricsPublisher', () => {
describe('for server usage', () => {
it('is sending correct reports', async () => {
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'BB', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
{country: 'AA', inboundBytes: 11, tunnelTimeSec: 0},
{country: 'BB', inboundBytes: 11, tunnelTimeSec: 0},
{country: 'CC', inboundBytes: 22, tunnelTimeSec: 0},
{country: 'AA', inboundBytes: 33, tunnelTimeSec: 0},
{country: 'DD', inboundBytes: 33, tunnelTimeSec: 0},
];
clock.nowMs += 60 * 60 * 1000;

Expand All @@ -108,70 +107,70 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 11, countries: ['BB']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, tunnelTimeSec: 0, countries: ['AA']},
{bytesTransferred: 11, tunnelTimeSec: 0, countries: ['BB']},
{bytesTransferred: 22, tunnelTimeSec: 0, countries: ['CC']},
{bytesTransferred: 33, tunnelTimeSec: 0, countries: ['AA']},
{bytesTransferred: 33, tunnelTimeSec: 0, countries: ['DD']},
],
});
});

it('sends ASN data if present', async () => {
usageMetrics.countryUsage = [
{country: 'DD', asn: 999, inboundBytes: 44},
{country: 'EE', inboundBytes: 55},
{country: 'DD', asn: 999, inboundBytes: 44, tunnelTimeSec: 0},
{country: 'EE', inboundBytes: 55, tunnelTimeSec: 0},
];
clock.nowMs += 60 * 60 * 1000;

await clock.runCallbacks();

expect(metricsCollector.collectedServerUsageReport.userReports).toEqual([
{bytesTransferred: 44, countries: ['DD'], asn: 999},
{bytesTransferred: 55, countries: ['EE']},
{bytesTransferred: 44, tunnelTimeSec: 0, countries: ['DD'], asn: 999},
{bytesTransferred: 55, tunnelTimeSec: 0, countries: ['EE']},
]);
});

it('resets metrics to avoid double reporting', async () => {
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'BB', inboundBytes: 11},
{country: 'AA', inboundBytes: 11, tunnelTimeSec: 0},
{country: 'BB', inboundBytes: 11, tunnelTimeSec: 0},
];
clock.nowMs += 60 * 60 * 1000;
startTime = clock.nowMs;
await clock.runCallbacks();
usageMetrics.countryUsage = [
...usageMetrics.countryUsage,
{country: 'CC', inboundBytes: 22},
{country: 'DD', inboundBytes: 22},
{country: 'CC', inboundBytes: 22, tunnelTimeSec: 0},
{country: 'DD', inboundBytes: 22, tunnelTimeSec: 0},
];
clock.nowMs += 60 * 60 * 1000;

await clock.runCallbacks();

expect(metricsCollector.collectedServerUsageReport.userReports).toEqual([
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 22, countries: ['DD']},
{bytesTransferred: 22, tunnelTimeSec: 0, countries: ['CC']},
{bytesTransferred: 22, tunnelTimeSec: 0, countries: ['DD']},
]);
});

it('ignores sanctioned countries', async () => {
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'SY', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
{country: 'AA', inboundBytes: 11, tunnelTimeSec: 0},
{country: 'SY', inboundBytes: 11, tunnelTimeSec: 0},
{country: 'CC', inboundBytes: 22, tunnelTimeSec: 0},
{country: 'AA', inboundBytes: 33, tunnelTimeSec: 0},
{country: 'DD', inboundBytes: 33, tunnelTimeSec: 0},
];
clock.nowMs += 60 * 60 * 1000;

await clock.runCallbacks();

expect(metricsCollector.collectedServerUsageReport.userReports).toEqual([
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, tunnelTimeSec: 0, countries: ['AA']},
{bytesTransferred: 22, tunnelTimeSec: 0, countries: ['CC']},
{bytesTransferred: 33, tunnelTimeSec: 0, countries: ['AA']},
{bytesTransferred: 33, tunnelTimeSec: 0, countries: ['DD']},
]);
});
});
Expand Down Expand Up @@ -236,27 +235,44 @@ describe('PrometheusUsageMetrics', () => {
});

it('returns a list of location usage', async () => {
prometheusClient.query.and.returnValue(
Promise.resolve({
resultType: 'vector',
result: [
{
metric: {location: 'US', asn: '15169'},
value: [Date.now() / 1000, '123'],
},
{
metric: {location: 'NL'},
value: [Date.now() / 1000, '456'],
},
],
})
const mockDataBytesResponse: QueryResultData = {
resultType: 'vector',
result: [
{
metric: {location: 'US', asn: '15169'},
value: [Date.now() / 1000, '123'],
},
{
metric: {location: 'NL', asn: '1136'},
value: [Date.now() / 1000, '456'],
},
],
};
const mockTunnelTimeResponse: QueryResultData = {
resultType: 'vector',
result: [
{
metric: {location: 'US', asn: '15169'},
value: [Date.now() / 1000, '9999'],
},
{
metric: {location: 'FR'},
value: [Date.now() / 1000, '8888'],
},
],
};

prometheusClient.query.and.returnValues(
Promise.resolve(mockDataBytesResponse),
Promise.resolve(mockTunnelTimeResponse)
);

const observedUsage = await publisher.getLocationUsage();

expect(observedUsage).toEqual([
{country: 'US', inboundBytes: 123, asn: 15169},
{country: 'NL', inboundBytes: 456, asn: undefined},
{country: 'US', asn: 15169, inboundBytes: 123, tunnelTimeSec: 9999},
{country: 'NL', asn: 1136, inboundBytes: 456, tunnelTimeSec: 0},
{country: 'FR', asn: undefined, inboundBytes: 0, tunnelTimeSec: 8888},
]);
});

Expand Down
79 changes: 66 additions & 13 deletions src/shadowbox/server/shared_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {Clock} from '../infrastructure/clock';
import * as follow_redirects from '../infrastructure/follow_redirects';
import {JsonConfig} from '../infrastructure/json_config';
import * as logging from '../infrastructure/logging';
import {PrometheusClient} from '../infrastructure/prometheus_scraper';
import {PrometheusClient, QueryResultMetric} from '../infrastructure/prometheus_scraper';
import * as version from './version';
import {AccessKeyConfigJson} from './server_access_key';

Expand All @@ -26,10 +26,21 @@ const MS_PER_HOUR = 60 * 60 * 1000;
const MS_PER_DAY = 24 * MS_PER_HOUR;
const SANCTIONED_COUNTRIES = new Set(['CU', 'KP', 'SY']);

const PROMETHEUS_COUNTRY_LABEL = 'location';
const PROMETHEUS_ASN_LABEL = 'asn';

type PrometheusQueryResult = {
[metricKey: string]: {
metric: QueryResultMetric;
value: number;
};
};

export interface LocationUsage {
country: string;
asn?: number;
inboundBytes: number;
tunnelTimeSec: number;
}

// JSON format for the published report.
Expand All @@ -47,6 +58,7 @@ export interface HourlyUserMetricsReportJson {
countries: string[];
asn?: number;
bytesTransferred: number;
tunnelTimeSec: number;
}

// JSON format for the feature metrics report.
Expand Down Expand Up @@ -82,18 +94,58 @@ export class PrometheusUsageMetrics implements UsageMetrics {

constructor(private prometheusClient: PrometheusClient) {}

private async query(metric: string, deltaSecs: number): Promise<PrometheusQueryResult> {
const query = `
sum(increase(${metric}[${deltaSecs}s]))
by (${PROMETHEUS_COUNTRY_LABEL}, ${PROMETHEUS_ASN_LABEL})
`;
const queryResponse = await this.prometheusClient.query(query);
const result: PrometheusQueryResult = {};
for (const entry of queryResponse.result) {
const serializedKey = JSON.stringify(entry.metric, Object.keys(entry.metric).sort());
result[serializedKey] = {
metric: entry.metric,
value: Math.round(parseFloat(entry.value[1])),
};
}
return result;
}

async getLocationUsage(): Promise<LocationUsage[]> {
const timeDeltaSecs = Math.round((Date.now() - this.resetTimeMs) / 1000);
// We measure the traffic to and from the target, since that's what we are protecting.
const result = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (location, asn)`
);
const usage = [] as LocationUsage[];
for (const entry of result.result) {
const country = entry.metric['location'] || '';
const asn = entry.metric['asn'] ? Number(entry.metric['asn']) : undefined;
const inboundBytes = Math.round(parseFloat(entry.value[1]));
usage.push({country, inboundBytes, asn});
const [dataBytesResult, tunnelTimeResult] = await Promise.all([
// We measure the traffic to and from the target, since that's what we are protecting.
this.query('shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}', timeDeltaSecs),
this.query('shadowsocks_tunnel_time_seconds_per_location', timeDeltaSecs),
]);

// We join the bytes and tunneltime metrics together by location (i.e. country and ASN).
const mergedResult: {
[metricKey: string]: {
metric: QueryResultMetric;
inboundBytes?: number;
tunnelTimeSec?: number;
};
} = {};
for (const [key, entry] of Object.entries(dataBytesResult)) {
mergedResult[key] = {...mergedResult[key], metric: entry.metric, inboundBytes: entry.value};
}
for (const [key, entry] of Object.entries(tunnelTimeResult)) {
mergedResult[key] = {...mergedResult[key], metric: entry.metric, tunnelTimeSec: entry.value};
}

const usage: LocationUsage[] = [];
for (const entry of Object.values(mergedResult)) {
const country = entry.metric[PROMETHEUS_COUNTRY_LABEL] || '';
const asn = entry.metric[PROMETHEUS_ASN_LABEL]
? Number(entry.metric[PROMETHEUS_ASN_LABEL])
: undefined;
usage.push({
country,
asn,
inboundBytes: entry.inboundBytes || 0,
tunnelTimeSec: entry.tunnelTimeSec || 0,
});
}
return usage;
}
Expand Down Expand Up @@ -205,7 +257,7 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {

const userReports: HourlyUserMetricsReportJson[] = [];
for (const locationUsage of locationUsageMetrics) {
if (locationUsage.inboundBytes === 0) {
if (locationUsage.inboundBytes === 0 && locationUsage.tunnelTimeSec === 0) {
continue;
}
if (isSanctionedCountry(locationUsage.country)) {
Expand All @@ -215,8 +267,9 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
// It's used to differentiate the row from the legacy key usage rows.
const country = locationUsage.country || 'ZZ';
const report: HourlyUserMetricsReportJson = {
bytesTransferred: locationUsage.inboundBytes,
countries: [country],
bytesTransferred: locationUsage.inboundBytes,
tunnelTimeSec: locationUsage.tunnelTimeSec,
};
if (locationUsage.asn) {
report.asn = locationUsage.asn;
Expand Down

0 comments on commit add47d3

Please sign in to comment.