Skip to content

Commit

Permalink
Merge branch 'main' into gb-client-rpc-executor-hedging
Browse files Browse the repository at this point in the history
  • Loading branch information
glbrntt authored Nov 22, 2023
2 parents 48c0528 + d5a05a2 commit cbc47cc
Show file tree
Hide file tree
Showing 10 changed files with 1,029 additions and 0 deletions.
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ extension Target {
name: "GRPCCore",
dependencies: [
.dequeModule,
.atomics
],
path: "Sources/GRPCCore"
)
Expand Down
299 changes: 299 additions & 0 deletions Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
/*
* Copyright 2023, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
@usableFromInline
struct ServerRPCExecutor {
/// Executes an RPC using the provided handler.
///
/// - Parameters:
/// - stream: The accepted stream to execute the RPC on.
/// - deserializer: A deserializer for messages received from the client.
/// - serializer: A serializer for messages to send to the client.
/// - interceptors: Server interceptors to apply to this RPC.
/// - handler: A handler which turns the request into a response.
@inlinable
static func execute<Input, Output>(
stream: RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @Sendable @escaping (
_ request: ServerRequest.Stream<Input>
) async throws -> ServerResponse.Stream<Output>
) async {
// Wait for the first request part from the transport.
let firstPart = await Self._waitForFirstRequestPart(inbound: stream.inbound)

switch firstPart {
case .process(let metadata, let inbound):
await Self._execute(
method: stream.descriptor,
metadata: metadata,
inbound: inbound,
outbound: stream.outbound,
deserializer: deserializer,
serializer: serializer,
interceptors: interceptors,
handler: handler
)

case .reject(let error):
// Stream can't be handled; write an error status and close.
let status = Status(code: Status.Code(error.code), message: error.message)
try? await stream.outbound.write(.status(status, error.metadata))
stream.outbound.finish()
}
}

@inlinable
static func _execute<Input, Output>(
method: MethodDescriptor,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @escaping @Sendable (
_ request: ServerRequest.Stream<Input>
) async throws -> ServerResponse.Stream<Output>
) async {
await withTaskGroup(of: ServerExecutorTask.self) { group in
if let timeout = metadata.timeout {
group.addTask {
let result = await Result {
try await Task.sleep(until: .now.advanced(by: timeout), clock: .continuous)
}
return .timedOut(result)
}
}

group.addTask {
await Self._processRPC(
method: method,
metadata: metadata,
inbound: inbound,
outbound: outbound,
deserializer: deserializer,
serializer: serializer,
interceptors: interceptors,
handler: handler
)
return .executed
}

while let next = await group.next() {
switch next {
case .timedOut(.success):
// Timeout expired; cancel the work.
group.cancelAll()

case .timedOut(.failure):
// Timeout failed (because it was cancelled). Wait for more tasks to finish.
()

case .executed:
// The work finished. Cancel any remaining tasks.
group.cancelAll()
}
}
}
}

@inlinable
static func _processRPC<Input, Output>(
method: MethodDescriptor,
metadata: Metadata,
inbound: UnsafeTransfer<RPCAsyncSequence<RPCRequestPart>.AsyncIterator>,
outbound: RPCWriter<RPCResponsePart>.Closable,
deserializer: some MessageDeserializer<Input>,
serializer: some MessageSerializer<Output>,
interceptors: [any ServerInterceptor],
handler: @escaping @Sendable (
ServerRequest.Stream<Input>
) async throws -> ServerResponse.Stream<Output>
) async {
let messages = AsyncIteratorSequence(inbound.wrappedValue).map { part throws -> Input in
switch part {
case .message(let bytes):
return try deserializer.deserialize(bytes)
case .metadata:
throw RPCError(
code: .internalError,
message: """
Server received an extra set of metadata. Only one set of metadata may be received \
at the start of the RPC. This is likely to be caused by a misbehaving client.
"""
)
}
}

let response = await Result {
// Run the request through the interceptors, finally passing it to the handler.
return try await Self._intercept(
request: ServerRequest.Stream(
metadata: metadata,
messages: RPCAsyncSequence(wrapping: messages)
),
context: ServerInterceptorContext(descriptor: method),
interceptors: interceptors
) { request, _ in
try await handler(request)
}
}.castError(to: RPCError.self) { error in
RPCError(code: .unknown, message: "Service method threw an unknown error.", cause: error)
}.flatMap { response in
response.accepted
}

let status: Status
let metadata: Metadata

switch response {
case .success(let contents):
let result = await Result {
// Write the metadata and run the producer.
try await outbound.write(.metadata(contents.metadata))
return try await contents.producer(
.serializingToRPCResponsePart(into: outbound, with: serializer)
)
}.castError(to: RPCError.self) { error in
RPCError(code: .unknown, message: "", cause: error)
}

switch result {
case .success(let trailingMetadata):
status = .ok
metadata = trailingMetadata
case .failure(let error):
status = Status(code: Status.Code(error.code), message: error.message)
metadata = error.metadata
}

case .failure(let error):
status = Status(code: Status.Code(error.code), message: error.message)
metadata = error.metadata
}

try? await outbound.write(.status(status, metadata))
outbound.finish()
}

@inlinable
static func _waitForFirstRequestPart(
inbound: RPCAsyncSequence<RPCRequestPart>
) async -> OnFirstRequestPart {
var iterator = inbound.makeAsyncIterator()
let part = await Result { try await iterator.next() }
let onFirstRequestPart: OnFirstRequestPart

switch part {
case .success(.metadata(let metadata)):
// The only valid first part.
onFirstRequestPart = .process(metadata, UnsafeTransfer(iterator))

case .success(.none):
// Empty stream; reject.
let error = RPCError(code: .internalError, message: "Empty inbound server stream.")
onFirstRequestPart = .reject(error)

case .success(.message):
let error = RPCError(
code: .internalError,
message: """
Invalid inbound server stream; received message bytes at start of stream. This is \
likely to be a transport specific bug.
"""
)
onFirstRequestPart = .reject(error)

case .failure(let error):
let error = RPCError(
code: .unknown,
message: "Inbound server stream threw error when reading metadata.",
cause: error
)
onFirstRequestPart = .reject(error)
}

return onFirstRequestPart
}

@usableFromInline
enum OnFirstRequestPart {
case process(Metadata, UnsafeTransfer<RPCAsyncSequence<RPCRequestPart>.AsyncIterator>)
case reject(RPCError)
}

@usableFromInline
enum ServerExecutorTask {
case timedOut(Result<Void, Error>)
case executed
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension ServerRPCExecutor {
@inlinable
static func _intercept<Input, Output>(
request: ServerRequest.Stream<Input>,
context: ServerInterceptorContext,
interceptors: [any ServerInterceptor],
finally: @escaping @Sendable (
_ request: ServerRequest.Stream<Input>,
_ context: ServerInterceptorContext
) async throws -> ServerResponse.Stream<Output>
) async throws -> ServerResponse.Stream<Output> {
return try await self._intercept(
request: request,
context: context,
iterator: interceptors.makeIterator(),
finally: finally
)
}

@inlinable
static func _intercept<Input, Output>(
request: ServerRequest.Stream<Input>,
context: ServerInterceptorContext,
iterator: Array<any ServerInterceptor>.Iterator,
finally: @escaping @Sendable (
_ request: ServerRequest.Stream<Input>,
_ context: ServerInterceptorContext
) async throws -> ServerResponse.Stream<Output>
) async throws -> ServerResponse.Stream<Output> {
var iterator = iterator

switch iterator.next() {
case .some(let interceptor):
let iter = iterator
do {
return try await interceptor.intercept(request: request, context: context) {
try await self._intercept(request: $0, context: $1, iterator: iter, finally: finally)
}
} catch let error as RPCError {
return ServerResponse.Stream(error: error)
} catch let other {
let error = RPCError(code: .unknown, message: "", cause: other)
return ServerResponse.Stream(error: error)
}

case .none:
return try await finally(request, context)
}
}
}
13 changes: 13 additions & 0 deletions Sources/GRPCCore/Internal/Metadata+GRPC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,24 @@ extension Metadata {
RetryPushback(milliseconds: $0)
}
}

@inlinable
var timeout: Duration? {
// Temporary hack to support tests; only supports nanoseconds.
guard let value = self.firstString(forKey: .timeout) else { return nil }
guard value.utf8.last == UTF8.CodeUnit(ascii: "n") else { return nil }
var index = value.utf8.endIndex
value.utf8.formIndex(before: &index)
guard let digits = String(value.utf8[..<index]) else { return nil }
guard let nanoseconds = Int64(digits) else { return nil }
return .nanoseconds(nanoseconds)
}
}

extension Metadata {
@usableFromInline
enum GRPCKey: String, Sendable, Hashable {
case timeout = "grpc-timeout"
case retryPushbackMs = "grpc-retry-pushback-ms"
case previousRPCAttempts = "grpc-previous-rpc-attempts"
}
Expand Down
6 changes: 6 additions & 0 deletions Sources/GRPCCore/Status.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public struct Status: @unchecked Sendable, Hashable {
self.storage = Storage(code: code, message: message)
}
}

/// A status with code ``Code-swift.struct/ok`` and an empty message.
@inlinable
internal static var ok: Self {
Status(code: .ok, message: "")
}
}

extension Status: CustomStringConvertible {
Expand Down
Loading

0 comments on commit cbc47cc

Please sign in to comment.