Skip to content

Commit

Permalink
'runServer' implementation for Performance Worker Service (#1840)
Browse files Browse the repository at this point in the history
Motivation:

The run server RPC starts a server and reports stats back to the client. We can’t yet start a real server, because we don’t yet have the http/2 transport, but we can stub it out and do the rest of the RPC which reports stats back to the client.

Modifications:

- implemented the methods that start the server and collects the stats
- added the initial stats property used in computing the requested server stats

Result:

We will be able to start the server from a performance worker.
  • Loading branch information
stefanadranca authored Apr 3, 2024
1 parent e1ef29d commit f84657d
Showing 1 changed file with 115 additions and 8 deletions.
123 changes: 115 additions & 8 deletions Sources/performance-worker/WorkerService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,17 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable

enum Role {
case client(GRPCClient)
case server(GRPCServer)
case server(ServerState)
}

struct ServerState {
var server: GRPCServer
var stats: ServerStats

init(server: GRPCServer, stats: ServerStats) {
self.server = server
self.stats = stats
}
}

init() {}
Expand All @@ -41,12 +51,41 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
self.role = role
}

init(server: GRPCServer) {
self.role = .server(server)
var server: GRPCServer? {
switch self.role {
case let .server(serverState):
return serverState.server
case .client, .none:
return nil
}
}

mutating func serverStats(replaceWith newStats: ServerStats? = nil) -> ServerStats? {
switch self.role {
case var .server(serverState):
let stats = serverState.stats
if let newStats = newStats {
serverState.stats = newStats
self.role = .server(serverState)
}
return stats
case .client, .none:
return nil
}
}

init(client: GRPCClient) {
self.role = .client(client)
mutating func setupServer(server: GRPCServer, stats: ServerStats) throws {
let serverState = State.ServerState(server: server, stats: stats)
switch self.role {
case .server(_):
throw RPCError(code: .alreadyExists, message: "A server has already been set up.")

case .client(_):
throw RPCError(code: .failedPrecondition, message: "This worker has a client setup.")

case .none:
self.role = .server(serverState)
}
}
}

Expand All @@ -63,8 +102,8 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
switch role {
case .client(let client):
client.close()
case .server(let server):
server.stopListening()
case .server(let serverState):
serverState.server.stopListening()
}
}

Expand All @@ -87,7 +126,34 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
) async throws
-> GRPCCore.ServerResponse.Stream<Grpc_Testing_WorkerService.Method.RunServer.Output>
{
throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
return ServerResponse.Stream { writer in
try await withThrowingTaskGroup(of: Void.self) { group in
for try await message in request.messages {
switch message.argtype {
case let .some(.setup(serverConfig)):
let server = try await self.setupServer(serverConfig)
group.addTask { try await server.run() }

case let .some(.mark(mark)):
let response = try await self.makeServerStatsResponse(reset: mark.reset)
try await writer.write(response)

case .none:
()
}
}

try await group.next()
}

let server = self.state.withLockedValue { state in
defer { state.role = nil }
return state.server
}

server?.stopListening()
return [:]
}
}

func runClient(
Expand All @@ -98,3 +164,44 @@ final class WorkerService: Grpc_Testing_WorkerService.ServiceProtocol, Sendable
throw RPCError(code: .unimplemented, message: "This RPC has not been implemented yet.")
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension WorkerService {
private func setupServer(_ config: Grpc_Testing_ServerConfig) async throws -> GRPCServer {
let server = GRPCServer(transports: [], services: [BenchmarkService()])
let stats = try await ServerStats()

try self.state.withLockedValue { state in
try state.setupServer(server: server, stats: stats)
}

return server
}

private func makeServerStatsResponse(
reset: Bool
) async throws -> Grpc_Testing_WorkerService.Method.RunServer.Output {
let currentStats = try await ServerStats()
let initialStats = self.state.withLockedValue { state in
return state.serverStats(replaceWith: reset ? currentStats : nil)
}

guard let initialStats = initialStats else {
throw RPCError(
code: .notFound,
message: "There are no initial server stats. A server must be setup before calling 'mark'."
)
}

let differences = currentStats.difference(to: initialStats)
return Grpc_Testing_WorkerService.Method.RunServer.Output.with {
$0.stats = Grpc_Testing_ServerStats.with {
$0.idleCpuTime = differences.idleCPUTime
$0.timeElapsed = differences.time
$0.timeSystem = differences.systemTime
$0.timeUser = differences.userTime
$0.totalCpuTime = differences.totalCPUTime
}
}
}
}

0 comments on commit f84657d

Please sign in to comment.