diff --git a/packages/client/src/app/server.ts b/packages/client/src/app/server.ts index 67f14a1..cdb89cd 100644 --- a/packages/client/src/app/server.ts +++ b/packages/client/src/app/server.ts @@ -74,6 +74,8 @@ export const makeServer = ({ environment = 'test' } = {}) => { traceMessages: faker.datatype.boolean(), version: '15', workerId: faker.vehicle.vrm() + `${i}`, + deviceId: faker.vehicle.vrm(), + userAgent: '', }; }, }), diff --git a/packages/connections/src/lib/mitmWorkerConnection.ts b/packages/connections/src/lib/mitmWorkerConnection.ts index a380e4c..72c7a9f 100644 --- a/packages/connections/src/lib/mitmWorkerConnection.ts +++ b/packages/connections/src/lib/mitmWorkerConnection.ts @@ -12,6 +12,8 @@ export type MitmWorkerDTO = Omit, 'ws' | 'log' | 'hear export class MitmWorkerConnection extends EventEmitter { workerId?: string; + deviceId?: string; + userAgent?: string; init: boolean; noMessagesReceived: number; dateLastMessageReceived: number; @@ -65,6 +67,8 @@ export class MitmWorkerConnection extends EventEmitter { welcome = WelcomeMessage.decode(new Uint8Array(message)); this.workerId = welcome.workerId; + this.deviceId = welcome.deviceId; + this.userAgent = welcome.useragent; this.version = welcome.versionCode.toString(); this.origin = welcome.origin; } catch (e) { @@ -134,6 +138,8 @@ export class MitmWorkerConnection extends EventEmitter { traceMessages: this.traceMessages, version: this.version, workerId: this.workerId, + deviceId: this.deviceId, + userAgent: this.userAgent, }; } } diff --git a/packages/connections/src/lib/utils/mitmProto.d.ts b/packages/connections/src/lib/utils/mitmProto.d.ts index d9a28d8..41f6aed 100644 --- a/packages/connections/src/lib/utils/mitmProto.d.ts +++ b/packages/connections/src/lib/utils/mitmProto.d.ts @@ -615,8 +615,8 @@ export namespace RotomProtos { /** Properties of a LoginResponse. */ interface ILoginResponse { - /** LoginResponse deviceId */ - deviceId?: (string|null); + /** LoginResponse workerId */ + workerId?: (string|null); /** LoginResponse status */ status?: (RotomProtos.AuthStatus|null); @@ -634,8 +634,8 @@ export namespace RotomProtos { */ constructor(properties?: RotomProtos.MitmResponse.ILoginResponse); - /** LoginResponse deviceId. */ - public deviceId: string; + /** LoginResponse workerId. */ + public workerId: string; /** LoginResponse status. */ public status: RotomProtos.AuthStatus; @@ -1099,6 +1099,7 @@ export namespace RotomProtos { RPC_STATUS_ACCESS_DENIED = 15, RPC_STATUS_ACCESS_SUSPENDED = 16, RPC_STATUS_DEVICE_INCOMPATIBLE = 17, - RPC_STATUS_ACCESS_RATE_LIMITED = 18 + RPC_STATUS_ACCESS_RATE_LIMITED = 18, + RPC_STATUS_MITM_DISALLOWED_REQUEST = 99 } } diff --git a/packages/connections/src/lib/utils/mitmProto.js b/packages/connections/src/lib/utils/mitmProto.js index 3bd50d1..fda42db 100644 --- a/packages/connections/src/lib/utils/mitmProto.js +++ b/packages/connections/src/lib/utils/mitmProto.js @@ -1618,7 +1618,7 @@ export const RotomProtos = $root.RotomProtos = (() => { * Properties of a LoginResponse. * @memberof RotomProtos.MitmResponse * @interface ILoginResponse - * @property {string|null} [deviceId] LoginResponse deviceId + * @property {string|null} [workerId] LoginResponse workerId * @property {RotomProtos.AuthStatus|null} [status] LoginResponse status * @property {boolean|null} [supportsCompression] LoginResponse supportsCompression */ @@ -1639,12 +1639,12 @@ export const RotomProtos = $root.RotomProtos = (() => { } /** - * LoginResponse deviceId. - * @member {string} deviceId + * LoginResponse workerId. + * @member {string} workerId * @memberof RotomProtos.MitmResponse.LoginResponse * @instance */ - LoginResponse.prototype.deviceId = ""; + LoginResponse.prototype.workerId = ""; /** * LoginResponse status. @@ -1686,8 +1686,8 @@ export const RotomProtos = $root.RotomProtos = (() => { LoginResponse.encode = function encode(message, writer) { if (!writer) writer = $Writer.create(); - if (message.deviceId != null && Object.hasOwnProperty.call(message, "deviceId")) - writer.uint32(/* id 1, wireType 2 =*/10).string(message.deviceId); + if (message.workerId != null && Object.hasOwnProperty.call(message, "workerId")) + writer.uint32(/* id 1, wireType 2 =*/10).string(message.workerId); if (message.status != null && Object.hasOwnProperty.call(message, "status")) writer.uint32(/* id 2, wireType 0 =*/16).int32(message.status); if (message.supportsCompression != null && Object.hasOwnProperty.call(message, "supportsCompression")) @@ -1727,7 +1727,7 @@ export const RotomProtos = $root.RotomProtos = (() => { let tag = reader.uint32(); switch (tag >>> 3) { case 1: { - message.deviceId = reader.string(); + message.workerId = reader.string(); break; } case 2: { @@ -1773,9 +1773,9 @@ export const RotomProtos = $root.RotomProtos = (() => { LoginResponse.verify = function verify(message) { if (typeof message !== "object" || message === null) return "object expected"; - if (message.deviceId != null && message.hasOwnProperty("deviceId")) - if (!$util.isString(message.deviceId)) - return "deviceId: string expected"; + if (message.workerId != null && message.hasOwnProperty("workerId")) + if (!$util.isString(message.workerId)) + return "workerId: string expected"; if (message.status != null && message.hasOwnProperty("status")) switch (message.status) { default: @@ -1812,8 +1812,8 @@ export const RotomProtos = $root.RotomProtos = (() => { if (object instanceof $root.RotomProtos.MitmResponse.LoginResponse) return object; let message = new $root.RotomProtos.MitmResponse.LoginResponse(); - if (object.deviceId != null) - message.deviceId = String(object.deviceId); + if (object.workerId != null) + message.workerId = String(object.workerId); switch (object.status) { default: if (typeof object.status === "number") { @@ -1889,12 +1889,12 @@ export const RotomProtos = $root.RotomProtos = (() => { options = {}; let object = {}; if (options.defaults) { - object.deviceId = ""; + object.workerId = ""; object.status = options.enums === String ? "AUTH_STATUS_UNSET" : 0; object.supportsCompression = false; } - if (message.deviceId != null && message.hasOwnProperty("deviceId")) - object.deviceId = message.deviceId; + if (message.workerId != null && message.hasOwnProperty("workerId")) + object.workerId = message.workerId; if (message.status != null && message.hasOwnProperty("status")) object.status = options.enums === String ? $root.RotomProtos.AuthStatus[message.status] === undefined ? message.status : $root.RotomProtos.AuthStatus[message.status] : message.status; if (message.supportsCompression != null && message.hasOwnProperty("supportsCompression")) @@ -2103,6 +2103,7 @@ export const RotomProtos = $root.RotomProtos = (() => { case 16: case 17: case 18: + case 99: break; } if (message.response != null && message.hasOwnProperty("response")) { @@ -2208,6 +2209,10 @@ export const RotomProtos = $root.RotomProtos = (() => { case 18: message.rpcStatus = 18; break; + case "RPC_STATUS_MITM_DISALLOWED_REQUEST": + case 99: + message.rpcStatus = 99; + break; } if (object.response) { if (!Array.isArray(object.response)) @@ -2915,6 +2920,7 @@ export const RotomProtos = $root.RotomProtos = (() => { * @property {number} RPC_STATUS_ACCESS_SUSPENDED=16 RPC_STATUS_ACCESS_SUSPENDED value * @property {number} RPC_STATUS_DEVICE_INCOMPATIBLE=17 RPC_STATUS_DEVICE_INCOMPATIBLE value * @property {number} RPC_STATUS_ACCESS_RATE_LIMITED=18 RPC_STATUS_ACCESS_RATE_LIMITED value + * @property {number} RPC_STATUS_MITM_DISALLOWED_REQUEST=99 RPC_STATUS_MITM_DISALLOWED_REQUEST value */ RotomProtos.RpcStatus = (function() { const valuesById = {}, values = Object.create(valuesById); @@ -2936,6 +2942,7 @@ export const RotomProtos = $root.RotomProtos = (() => { values[valuesById[16] = "RPC_STATUS_ACCESS_SUSPENDED"] = 16; values[valuesById[17] = "RPC_STATUS_DEVICE_INCOMPATIBLE"] = 17; values[valuesById[18] = "RPC_STATUS_ACCESS_RATE_LIMITED"] = 18; + values[valuesById[99] = "RPC_STATUS_MITM_DISALLOWED_REQUEST"] = 99; return values; })(); diff --git a/packages/server/mitm.proto b/packages/server/mitm.proto index 9198a2f..480d013 100644 --- a/packages/server/mitm.proto +++ b/packages/server/mitm.proto @@ -54,7 +54,7 @@ message MitmResponse { } message LoginResponse { - string device_id = 1; + string worker_id = 1; AuthStatus status = 2; bool supports_compression = 3; } @@ -121,4 +121,5 @@ enum RpcStatus { RPC_STATUS_ACCESS_SUSPENDED = 16; RPC_STATUS_DEVICE_INCOMPATIBLE = 17; RPC_STATUS_ACCESS_RATE_LIMITED = 18; + RPC_STATUS_MITM_DISALLOWED_REQUEST = 99; } diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 228a098..1e8a64c 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -182,34 +182,21 @@ wssMitm.on('connection', (ws, req) => { const wssScanner = new WebSocketServer({ port: config.controllerListener.port }); -// function onSocketError(err: Error) { -// log.info(err); -// } -// -// wssScanner.on('upgrade', (request, socket, head) => { -// socket.on('error', onSocketError); -// -// // This function is not defined on purpose. Implement it with your own logic. -// authenticate(request, (err : Error, client) => { -// if (err || !client) { -// socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); -// socket.destroy(); -// return; -// } -// -// socket.removeListener('error', onSocketError); -// -// wssScanner.handleUpgrade(request, socket, head, function done(ws) { -// wssScanner.emit('connection', ws, request, client); -// }); -// }); -// }); - -function identifyControlChannelFromDevice(deviceId: string): string | null { - // Find a currently connected control connection that starts with the same characters +function identifyControlChannelFromWorkerId(workerId: string): string | null { + // Try to look up connected worker id and see if it presented us with a device id + const connection = currentConnections[workerId]; + + if (connection) { + const deviceId = connection.mitm?.deviceId; + if (deviceId) { + return deviceId; + } + } + + // Fallback: Find a currently connected control connection that starts with the same characters // as the given device id for (const key of Object.keys(controlConnections)) { - if (deviceId.substring(0, key.length) === key) return key; + if (workerId.substring(0, key.length) === key) return key; } return null; } @@ -230,30 +217,37 @@ wssScanner.on('connection', (ws, req) => { return; } - let nextSpareDeviceId = unallocatedConnections.shift() as string; + let nextSpareWorkerId = unallocatedConnections.shift() as string; let eligibleDeviceFound = false; - const firstSpareDeviceId = nextSpareDeviceId; + const firstSpareWorkerId = nextSpareWorkerId; do { - const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId); + const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId); + log.info(`SCANNER: Found ${mainDeviceId} connects to workerId ${nextSpareWorkerId}`); if (mainDeviceId == null) { - log.info(`SCANNER: Warning - found ${nextSpareDeviceId} in pool with no record of main device`); - unallocatedConnections.push(nextSpareDeviceId); - nextSpareDeviceId = unallocatedConnections.shift() as string; + log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device`); + unallocatedConnections.push(nextSpareWorkerId); + nextSpareWorkerId = unallocatedConnections.shift() as string; } else { const mainDeviceInfo = deviceInformation[mainDeviceId]; - if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) { - // device was allocated to someone else too recently, find another - unallocatedConnections.push(nextSpareDeviceId); - nextSpareDeviceId = unallocatedConnections.shift() as string; + if (!mainDeviceInfo) { + log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device ${mainDeviceId}`); + unallocatedConnections.push(nextSpareWorkerId); + nextSpareWorkerId = unallocatedConnections.shift() as string; } else { - eligibleDeviceFound = true; + if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) { + // device was allocated to someone else too recently, find another + unallocatedConnections.push(nextSpareWorkerId); + nextSpareWorkerId = unallocatedConnections.shift() as string; + } else { + eligibleDeviceFound = true; + } } } - } while (!eligibleDeviceFound && nextSpareDeviceId != firstSpareDeviceId); + } while (!eligibleDeviceFound && nextSpareWorkerId != firstSpareWorkerId); if (!eligibleDeviceFound) { // no devices found, return the original one back to pool - unallocatedConnections.push(nextSpareDeviceId); + unallocatedConnections.push(nextSpareWorkerId); log.info( `SCANNER: New connection from ${req.socket.remoteAddress} - no MITMs available outside cooldown, rejecting`, ); @@ -263,12 +257,12 @@ wssScanner.on('connection', (ws, req) => { } // Set last connection time on device - const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId) as string; + const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId) as string; deviceInformation[mainDeviceId].lastScannerConnection = Date.now() / 1000; - log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareDeviceId}`); + log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareWorkerId}`); - const currentConnection = currentConnections[nextSpareDeviceId]; + const currentConnection = currentConnections[nextSpareWorkerId]; const scannerConnection = new ScannerConnection(log, ws, currentConnection.mitm); currentConnection.scanner = scannerConnection;