Skip to content

Commit

Permalink
Merge branch 'main' into update-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabio1988 authored Nov 28, 2023
2 parents f49cbea + f720522 commit dbd5d8d
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 63 deletions.
2 changes: 2 additions & 0 deletions packages/client/src/app/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export const makeServer = ({ environment = 'test' } = {}) => {
traceMessages: faker.datatype.boolean(),
version: '15',
workerId: faker.vehicle.vrm() + `${i}`,
deviceId: faker.vehicle.vrm(),
userAgent: '',
};
},
}),
Expand Down
6 changes: 6 additions & 0 deletions packages/connections/src/lib/mitmWorkerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export type MitmWorkerDTO = Omit<DTO<MitmWorkerConnection>, 'ws' | 'log' | 'hear

export class MitmWorkerConnection extends EventEmitter {
workerId?: string;
deviceId?: string;
userAgent?: string;
init: boolean;
noMessagesReceived: number;
dateLastMessageReceived: number;
Expand Down Expand Up @@ -65,6 +67,8 @@ export class MitmWorkerConnection extends EventEmitter {
welcome = WelcomeMessage.decode(new Uint8Array(message));

this.workerId = welcome.workerId;
this.deviceId = welcome.deviceId;
this.userAgent = welcome.useragent;
this.version = welcome.versionCode.toString();
this.origin = welcome.origin;
} catch (e) {
Expand Down Expand Up @@ -134,6 +138,8 @@ export class MitmWorkerConnection extends EventEmitter {
traceMessages: this.traceMessages,
version: this.version,
workerId: this.workerId,
deviceId: this.deviceId,
userAgent: this.userAgent,
};
}
}
11 changes: 6 additions & 5 deletions packages/connections/src/lib/utils/mitmProto.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ export namespace RotomProtos {
/** Properties of a LoginResponse. */
interface ILoginResponse {

/** LoginResponse deviceId */
deviceId?: (string|null);
/** LoginResponse workerId */
workerId?: (string|null);

/** LoginResponse status */
status?: (RotomProtos.AuthStatus|null);
Expand All @@ -634,8 +634,8 @@ export namespace RotomProtos {
*/
constructor(properties?: RotomProtos.MitmResponse.ILoginResponse);

/** LoginResponse deviceId. */
public deviceId: string;
/** LoginResponse workerId. */
public workerId: string;

/** LoginResponse status. */
public status: RotomProtos.AuthStatus;
Expand Down Expand Up @@ -1099,6 +1099,7 @@ export namespace RotomProtos {
RPC_STATUS_ACCESS_DENIED = 15,
RPC_STATUS_ACCESS_SUSPENDED = 16,
RPC_STATUS_DEVICE_INCOMPATIBLE = 17,
RPC_STATUS_ACCESS_RATE_LIMITED = 18
RPC_STATUS_ACCESS_RATE_LIMITED = 18,
RPC_STATUS_MITM_DISALLOWED_REQUEST = 99
}
}
37 changes: 22 additions & 15 deletions packages/connections/src/lib/utils/mitmProto.js
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
* Properties of a LoginResponse.
* @memberof RotomProtos.MitmResponse
* @interface ILoginResponse
* @property {string|null} [deviceId] LoginResponse deviceId
* @property {string|null} [workerId] LoginResponse workerId
* @property {RotomProtos.AuthStatus|null} [status] LoginResponse status
* @property {boolean|null} [supportsCompression] LoginResponse supportsCompression
*/
Expand All @@ -1639,12 +1639,12 @@ export const RotomProtos = $root.RotomProtos = (() => {
}

/**
* LoginResponse deviceId.
* @member {string} deviceId
* LoginResponse workerId.
* @member {string} workerId
* @memberof RotomProtos.MitmResponse.LoginResponse
* @instance
*/
LoginResponse.prototype.deviceId = "";
LoginResponse.prototype.workerId = "";

/**
* LoginResponse status.
Expand Down Expand Up @@ -1686,8 +1686,8 @@ export const RotomProtos = $root.RotomProtos = (() => {
LoginResponse.encode = function encode(message, writer) {
if (!writer)
writer = $Writer.create();
if (message.deviceId != null && Object.hasOwnProperty.call(message, "deviceId"))
writer.uint32(/* id 1, wireType 2 =*/10).string(message.deviceId);
if (message.workerId != null && Object.hasOwnProperty.call(message, "workerId"))
writer.uint32(/* id 1, wireType 2 =*/10).string(message.workerId);
if (message.status != null && Object.hasOwnProperty.call(message, "status"))
writer.uint32(/* id 2, wireType 0 =*/16).int32(message.status);
if (message.supportsCompression != null && Object.hasOwnProperty.call(message, "supportsCompression"))
Expand Down Expand Up @@ -1727,7 +1727,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
let tag = reader.uint32();
switch (tag >>> 3) {
case 1: {
message.deviceId = reader.string();
message.workerId = reader.string();
break;
}
case 2: {
Expand Down Expand Up @@ -1773,9 +1773,9 @@ export const RotomProtos = $root.RotomProtos = (() => {
LoginResponse.verify = function verify(message) {
if (typeof message !== "object" || message === null)
return "object expected";
if (message.deviceId != null && message.hasOwnProperty("deviceId"))
if (!$util.isString(message.deviceId))
return "deviceId: string expected";
if (message.workerId != null && message.hasOwnProperty("workerId"))
if (!$util.isString(message.workerId))
return "workerId: string expected";
if (message.status != null && message.hasOwnProperty("status"))
switch (message.status) {
default:
Expand Down Expand Up @@ -1812,8 +1812,8 @@ export const RotomProtos = $root.RotomProtos = (() => {
if (object instanceof $root.RotomProtos.MitmResponse.LoginResponse)
return object;
let message = new $root.RotomProtos.MitmResponse.LoginResponse();
if (object.deviceId != null)
message.deviceId = String(object.deviceId);
if (object.workerId != null)
message.workerId = String(object.workerId);
switch (object.status) {
default:
if (typeof object.status === "number") {
Expand Down Expand Up @@ -1889,12 +1889,12 @@ export const RotomProtos = $root.RotomProtos = (() => {
options = {};
let object = {};
if (options.defaults) {
object.deviceId = "";
object.workerId = "";
object.status = options.enums === String ? "AUTH_STATUS_UNSET" : 0;
object.supportsCompression = false;
}
if (message.deviceId != null && message.hasOwnProperty("deviceId"))
object.deviceId = message.deviceId;
if (message.workerId != null && message.hasOwnProperty("workerId"))
object.workerId = message.workerId;
if (message.status != null && message.hasOwnProperty("status"))
object.status = options.enums === String ? $root.RotomProtos.AuthStatus[message.status] === undefined ? message.status : $root.RotomProtos.AuthStatus[message.status] : message.status;
if (message.supportsCompression != null && message.hasOwnProperty("supportsCompression"))
Expand Down Expand Up @@ -2103,6 +2103,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
case 16:
case 17:
case 18:
case 99:
break;
}
if (message.response != null && message.hasOwnProperty("response")) {
Expand Down Expand Up @@ -2208,6 +2209,10 @@ export const RotomProtos = $root.RotomProtos = (() => {
case 18:
message.rpcStatus = 18;
break;
case "RPC_STATUS_MITM_DISALLOWED_REQUEST":
case 99:
message.rpcStatus = 99;
break;
}
if (object.response) {
if (!Array.isArray(object.response))
Expand Down Expand Up @@ -2915,6 +2920,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
* @property {number} RPC_STATUS_ACCESS_SUSPENDED=16 RPC_STATUS_ACCESS_SUSPENDED value
* @property {number} RPC_STATUS_DEVICE_INCOMPATIBLE=17 RPC_STATUS_DEVICE_INCOMPATIBLE value
* @property {number} RPC_STATUS_ACCESS_RATE_LIMITED=18 RPC_STATUS_ACCESS_RATE_LIMITED value
* @property {number} RPC_STATUS_MITM_DISALLOWED_REQUEST=99 RPC_STATUS_MITM_DISALLOWED_REQUEST value
*/
RotomProtos.RpcStatus = (function() {
const valuesById = {}, values = Object.create(valuesById);
Expand All @@ -2936,6 +2942,7 @@ export const RotomProtos = $root.RotomProtos = (() => {
values[valuesById[16] = "RPC_STATUS_ACCESS_SUSPENDED"] = 16;
values[valuesById[17] = "RPC_STATUS_DEVICE_INCOMPATIBLE"] = 17;
values[valuesById[18] = "RPC_STATUS_ACCESS_RATE_LIMITED"] = 18;
values[valuesById[99] = "RPC_STATUS_MITM_DISALLOWED_REQUEST"] = 99;
return values;
})();

Expand Down
3 changes: 2 additions & 1 deletion packages/server/mitm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ message MitmResponse {
}

message LoginResponse {
string device_id = 1;
string worker_id = 1;
AuthStatus status = 2;
bool supports_compression = 3;
}
Expand Down Expand Up @@ -121,4 +121,5 @@ enum RpcStatus {
RPC_STATUS_ACCESS_SUSPENDED = 16;
RPC_STATUS_DEVICE_INCOMPATIBLE = 17;
RPC_STATUS_ACCESS_RATE_LIMITED = 18;
RPC_STATUS_MITM_DISALLOWED_REQUEST = 99;
}
78 changes: 36 additions & 42 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,21 @@ wssMitm.on('connection', (ws, req) => {

const wssScanner = new WebSocketServer({ port: config.controllerListener.port });

// function onSocketError(err: Error) {
// log.info(err);
// }
//
// wssScanner.on('upgrade', (request, socket, head) => {
// socket.on('error', onSocketError);
//
// // This function is not defined on purpose. Implement it with your own logic.
// authenticate(request, (err : Error, client) => {
// if (err || !client) {
// socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
// socket.destroy();
// return;
// }
//
// socket.removeListener('error', onSocketError);
//
// wssScanner.handleUpgrade(request, socket, head, function done(ws) {
// wssScanner.emit('connection', ws, request, client);
// });
// });
// });

function identifyControlChannelFromDevice(deviceId: string): string | null {
// Find a currently connected control connection that starts with the same characters
function identifyControlChannelFromWorkerId(workerId: string): string | null {
// Try to look up connected worker id and see if it presented us with a device id
const connection = currentConnections[workerId];

if (connection) {
const deviceId = connection.mitm?.deviceId;
if (deviceId) {
return deviceId;
}
}

// Fallback: Find a currently connected control connection that starts with the same characters
// as the given device id
for (const key of Object.keys(controlConnections)) {
if (deviceId.substring(0, key.length) === key) return key;
if (workerId.substring(0, key.length) === key) return key;
}
return null;
}
Expand All @@ -230,30 +217,37 @@ wssScanner.on('connection', (ws, req) => {
return;
}

let nextSpareDeviceId = unallocatedConnections.shift() as string;
let nextSpareWorkerId = unallocatedConnections.shift() as string;
let eligibleDeviceFound = false;
const firstSpareDeviceId = nextSpareDeviceId;
const firstSpareWorkerId = nextSpareWorkerId;
do {
const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId);
const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId);
log.info(`SCANNER: Found ${mainDeviceId} connects to workerId ${nextSpareWorkerId}`);
if (mainDeviceId == null) {
log.info(`SCANNER: Warning - found ${nextSpareDeviceId} in pool with no record of main device`);
unallocatedConnections.push(nextSpareDeviceId);
nextSpareDeviceId = unallocatedConnections.shift() as string;
log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device`);
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
const mainDeviceInfo = deviceInformation[mainDeviceId];
if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) {
// device was allocated to someone else too recently, find another
unallocatedConnections.push(nextSpareDeviceId);
nextSpareDeviceId = unallocatedConnections.shift() as string;
if (!mainDeviceInfo) {
log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device ${mainDeviceId}`);
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
eligibleDeviceFound = true;
if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) {
// device was allocated to someone else too recently, find another
unallocatedConnections.push(nextSpareWorkerId);
nextSpareWorkerId = unallocatedConnections.shift() as string;
} else {
eligibleDeviceFound = true;
}
}
}
} while (!eligibleDeviceFound && nextSpareDeviceId != firstSpareDeviceId);
} while (!eligibleDeviceFound && nextSpareWorkerId != firstSpareWorkerId);

if (!eligibleDeviceFound) {
// no devices found, return the original one back to pool
unallocatedConnections.push(nextSpareDeviceId);
unallocatedConnections.push(nextSpareWorkerId);
log.info(
`SCANNER: New connection from ${req.socket.remoteAddress} - no MITMs available outside cooldown, rejecting`,
);
Expand All @@ -263,12 +257,12 @@ wssScanner.on('connection', (ws, req) => {
}

// Set last connection time on device
const mainDeviceId = identifyControlChannelFromDevice(nextSpareDeviceId) as string;
const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId) as string;
deviceInformation[mainDeviceId].lastScannerConnection = Date.now() / 1000;

log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareDeviceId}`);
log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareWorkerId}`);

const currentConnection = currentConnections[nextSpareDeviceId];
const currentConnection = currentConnections[nextSpareWorkerId];
const scannerConnection = new ScannerConnection(log, ws, currentConnection.mitm);
currentConnection.scanner = scannerConnection;

Expand Down

0 comments on commit dbd5d8d

Please sign in to comment.