Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): add tunnel time metric to opt-in server usage report #1551

Merged
merged 11 commits into from
Oct 18, 2024
2 changes: 1 addition & 1 deletion src/shadowbox/server/mocks/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class FakePrometheusClient extends PrometheusClient {
const bytesTransferred = this.bytesTransferredById[accessKeyId] || 0;
queryResultData.result.push({
metric: {access_key: accessKeyId},
value: [bytesTransferred, `${bytesTransferred}`],
value: [Date.now() / 1000, `${bytesTransferred}`],
});
}
return queryResultData;
Expand Down
91 changes: 60 additions & 31 deletions src/shadowbox/server/shared_metrics.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {AccessKeyConfigJson} from './server_access_key';

import {ServerConfigJson} from './server_config';
import {
CountryUsage,
LocationUsage,
DailyFeatureMetricsReportJson,
HourlyServerMetricsReportJson,
MetricsCollectorClient,
Expand Down Expand Up @@ -78,12 +78,12 @@ describe('OutlineSharedMetricsPublisher', () => {
);

publisher.startSharing();
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'BB', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
usageMetrics.locationUsage = [
{country: 'AA', inboundBytes: 11, tunnelTimeSec: 99},
{country: 'BB', inboundBytes: 11, tunnelTimeSec: 88},
{country: 'CC', inboundBytes: 22, tunnelTimeSec: 77},
{country: 'AA', inboundBytes: 33, tunnelTimeSec: 66},
{country: 'DD', inboundBytes: 33, tunnelTimeSec: 55},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -93,18 +93,18 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 11, countries: ['BB']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, countries: ['AA'], tunnelTimeSec: 99},
{bytesTransferred: 11, countries: ['BB'], tunnelTimeSec: 88},
{bytesTransferred: 22, countries: ['CC'], tunnelTimeSec: 77},
{bytesTransferred: 33, countries: ['AA'], tunnelTimeSec: 66},
{bytesTransferred: 33, countries: ['DD'], tunnelTimeSec: 55},
],
});

startTime = clock.nowMs;
usageMetrics.countryUsage = [
{country: 'EE', inboundBytes: 44},
{country: 'FF', inboundBytes: 55},
usageMetrics.locationUsage = [
{country: 'EE', inboundBytes: 44, tunnelTimeSec: 11},
{country: 'FF', inboundBytes: 55, tunnelTimeSec: 22},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -114,13 +114,42 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 44, countries: ['EE']},
{bytesTransferred: 55, countries: ['FF']},
{bytesTransferred: 44, countries: ['EE'], tunnelTimeSec: 11},
{bytesTransferred: 55, countries: ['FF'], tunnelTimeSec: 22},
],
});

publisher.stopSharing();
});

it('reports ASN metrics correctly', async () => {
const clock = new ManualClock();
const serverConfig = new InMemoryConfig<ServerConfigJson>({serverId: 'server-id'});
const usageMetrics = new ManualUsageMetrics();
const metricsCollector = new FakeMetricsCollector();
const publisher = new OutlineSharedMetricsPublisher(
clock,
serverConfig,
null,
usageMetrics,
metricsCollector
);
publisher.startSharing();

usageMetrics.locationUsage = [
{country: 'DD', inboundBytes: 44, tunnelTimeSec: 11, asn: 999},
{country: 'EE', inboundBytes: 55, tunnelTimeSec: 22},
];
clock.nowMs += 60 * 60 * 1000;
await clock.runCallbacks();

expect(metricsCollector.collectedServerUsageReport.userReports).toEqual([
{bytesTransferred: 44, tunnelTimeSec: 11, countries: ['DD'], asn: 999},
{bytesTransferred: 55, tunnelTimeSec: 22, countries: ['EE']},
]);
publisher.stopSharing();
});

it('ignores sanctioned countries', async () => {
const clock = new ManualClock();
const startTime = clock.nowMs;
Expand All @@ -136,12 +165,12 @@ describe('OutlineSharedMetricsPublisher', () => {
);

publisher.startSharing();
usageMetrics.countryUsage = [
{country: 'AA', inboundBytes: 11},
{country: 'SY', inboundBytes: 11},
{country: 'CC', inboundBytes: 22},
{country: 'AA', inboundBytes: 33},
{country: 'DD', inboundBytes: 33},
usageMetrics.locationUsage = [
{country: 'AA', tunnelTimeSec: 99, inboundBytes: 11},
{country: 'SY', tunnelTimeSec: 88, inboundBytes: 11},
{country: 'CC', tunnelTimeSec: 77, inboundBytes: 22},
{country: 'AA', tunnelTimeSec: 66, inboundBytes: 33},
{country: 'DD', tunnelTimeSec: 55, inboundBytes: 33},
];

clock.nowMs += 60 * 60 * 1000;
Expand All @@ -151,10 +180,10 @@ describe('OutlineSharedMetricsPublisher', () => {
startUtcMs: startTime,
endUtcMs: clock.nowMs,
userReports: [
{bytesTransferred: 11, countries: ['AA']},
{bytesTransferred: 22, countries: ['CC']},
{bytesTransferred: 33, countries: ['AA']},
{bytesTransferred: 33, countries: ['DD']},
{bytesTransferred: 11, tunnelTimeSec: 99, countries: ['AA']},
{bytesTransferred: 22, tunnelTimeSec: 77, countries: ['CC']},
{bytesTransferred: 33, tunnelTimeSec: 66, countries: ['AA']},
{bytesTransferred: 33, tunnelTimeSec: 55, countries: ['DD']},
],
});
publisher.stopSharing();
Expand Down Expand Up @@ -257,13 +286,13 @@ class FakeMetricsCollector implements MetricsCollectorClient {
}

class ManualUsageMetrics implements UsageMetrics {
countryUsage = [] as CountryUsage[];
locationUsage = [] as LocationUsage[];

getCountryUsage(): Promise<CountryUsage[]> {
return Promise.resolve(this.countryUsage);
getLocationUsage(): Promise<LocationUsage[]> {
return Promise.resolve(this.locationUsage);
}

reset() {
this.countryUsage = [] as CountryUsage[];
this.locationUsage = [] as LocationUsage[];
}
}
88 changes: 64 additions & 24 deletions src/shadowbox/server/shared_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ const MS_PER_HOUR = 60 * 60 * 1000;
const MS_PER_DAY = 24 * MS_PER_HOUR;
const SANCTIONED_COUNTRIES = new Set(['CU', 'KP', 'SY']);

export interface CountryUsage {
export interface LocationUsage {
country: string;
asn?: number;
inboundBytes: number;
tunnelTimeSec: number;
}

// JSON format for the published report.
Expand All @@ -44,7 +46,9 @@ export interface HourlyServerMetricsReportJson {
// Field renames will break backwards-compatibility.
export interface HourlyUserMetricsReportJson {
countries: string[];
asn?: number;
bytesTransferred: number;
tunnelTimeSec: number;
}

// JSON format for the feature metrics report.
Expand All @@ -70,7 +74,7 @@ export interface SharedMetricsPublisher {
}

export interface UsageMetrics {
getCountryUsage(): Promise<CountryUsage[]>;
getLocationUsage(): Promise<LocationUsage[]>;
reset();
}

Expand All @@ -80,19 +84,50 @@ export class PrometheusUsageMetrics implements UsageMetrics {

constructor(private prometheusClient: PrometheusClient) {}

async getCountryUsage(): Promise<CountryUsage[]> {
async getLocationUsage(): Promise<LocationUsage[]> {
const timeDeltaSecs = Math.round((Date.now() - this.resetTimeMs) / 1000);
// We measure the traffic to and from the target, since that's what we are protecting.
const result = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}[${timeDeltaSecs}s])) by (location)`
);
const usage = [] as CountryUsage[];
for (const entry of result.result) {
const country = entry.metric['location'] || '';
const inboundBytes = Math.round(parseFloat(entry.value[1]));
usage.push({country, inboundBytes});
// Return both data bytes and tunnel time information with a single
// Prometheus query, by using a custom "metric_type" label.
const queryResponse = await this.prometheusClient.query(`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an ok solution, though I don't consider good practice to mix data of different types. Here we are creating a single time series with different units and meaning. And later split them again. I'd rather keep them as separate time series, as it's a lot clearer.
I also wonder if this approach affects performance somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fortuna Ok let me try something else. How about this?

label_replace(
sum(increase(shadowsocks_data_bytes_per_location{dir=~"p>t|p<t"}[${timeDeltaSecs}s]))
by (location, asn),
"metric_type", "inbound_bytes", "", ""
) or
label_replace(
sum(increase(shadowsocks_tunnel_time_seconds_per_location[${timeDeltaSecs}s]))
by (location, asn),
"metric_type", "tunnel_time", "", ""
)
`);

const usage = new Map<string, LocationUsage>();
for (const result of queryResponse.result) {
const country = result.metric['location'] || '';
const asn = result.metric['asn'] ? Number(result.metric['asn']) : undefined;

// Get or create an entry for the country+ASN combination.
const key = `${country}-${asn}`;
let entry: LocationUsage;
if (usage.has(key)) {
entry = usage.get(key);
} else {
entry = {
country,
asn,
inboundBytes: 0,
tunnelTimeSec: 0,
};
}

if (result.metric['metric_type'] === 'inbound_bytes') {
entry.inboundBytes = Math.round(parseFloat(result.value[1]));
} else if (result.metric['metric_type'] === 'tunnel_time') {
entry.tunnelTimeSec = Math.round(parseFloat(result.value[1]));
}
usage.set(key, entry);
}
return usage;
return Array.from(usage.values());
}

reset() {
Expand All @@ -105,7 +140,7 @@ export interface MetricsCollectorClient {
collectFeatureMetrics(reportJson: DailyFeatureMetricsReportJson): Promise<void>;
}

export class RestMetricsCollectorClient {
export class RestMetricsCollectorClient implements MetricsCollectorClient {
constructor(private serviceUrl: string) {}

collectServerUsageMetrics(reportJson: HourlyServerMetricsReportJson): Promise<void> {
Expand Down Expand Up @@ -163,7 +198,7 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return;
}
try {
await this.reportServerUsageMetrics(await usageMetrics.getCountryUsage());
await this.reportServerUsageMetrics(await usageMetrics.getLocationUsage());
usageMetrics.reset();
} catch (err) {
logging.error(`Failed to report server usage metrics: ${err}`);
Expand Down Expand Up @@ -197,24 +232,29 @@ export class OutlineSharedMetricsPublisher implements SharedMetricsPublisher {
return this.serverConfig.data().metricsEnabled || false;
}

private async reportServerUsageMetrics(countryUsageMetrics: CountryUsage[]): Promise<void> {
private async reportServerUsageMetrics(locationUsageMetrics: LocationUsage[]): Promise<void> {
const reportEndTimestampMs = this.clock.now();

const userReports = [] as HourlyUserMetricsReportJson[];
for (const countryUsage of countryUsageMetrics) {
if (countryUsage.inboundBytes === 0) {
const userReports: HourlyUserMetricsReportJson[] = [];
for (const locationUsage of locationUsageMetrics) {
if (locationUsage.inboundBytes === 0 && locationUsage.tunnelTimeSec === 0) {
continue;
}
if (isSanctionedCountry(countryUsage.country)) {
if (isSanctionedCountry(locationUsage.country)) {
continue;
}
// Make sure to always set a country, which is required by the metrics server validation.
// It's used to differentiate the row from the legacy key usage rows.
const country = countryUsage.country || 'ZZ';
userReports.push({
bytesTransferred: countryUsage.inboundBytes,
const country = locationUsage.country || 'ZZ';
const report: HourlyUserMetricsReportJson = {
countries: [country],
});
bytesTransferred: locationUsage.inboundBytes,
tunnelTimeSec: locationUsage.tunnelTimeSec,
};
if (locationUsage.asn) {
report.asn = locationUsage.asn;
}
userReports.push(report);
}
const report = {
serverId: this.serverConfig.data().serverId,
Expand Down
Loading