From 9f012021931bc8b1401ed8b2936d87ca7aa94766 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 9 Oct 2023 10:49:23 +0100 Subject: [PATCH] Add client request and response objects (#1665) --- Sources/GRPCCore/Call/ClientRequest.swift | 116 +++++ Sources/GRPCCore/Call/ClientResponse.swift | 436 ++++++++++++++++++ .../GRPCCore/Stream/AsyncSequenceOfOne.swift | 58 +++ .../Call/ClientRequestTests.swift | 31 ++ .../Call/ClientResponseTests.swift | 171 +++++++ .../Stream/AsyncSequenceOfOne.swift | 39 ++ .../AsyncSequence+Utilities.swift | 36 ++ .../RPCAsyncSequence+Utilities.swift | 32 ++ .../Test Utilities/RPCWriter+Utilities.swift | 49 ++ .../Test Utilities/XCTest+Utilities.swift | 28 ++ 10 files changed, 996 insertions(+) create mode 100644 Sources/GRPCCore/Call/ClientRequest.swift create mode 100644 Sources/GRPCCore/Call/ClientResponse.swift create mode 100644 Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift create mode 100644 Tests/GRPCCoreTests/Call/ClientRequestTests.swift create mode 100644 Tests/GRPCCoreTests/Call/ClientResponseTests.swift create mode 100644 Tests/GRPCCoreTests/Stream/AsyncSequenceOfOne.swift create mode 100644 Tests/GRPCCoreTests/Test Utilities/AsyncSequence+Utilities.swift create mode 100644 Tests/GRPCCoreTests/Test Utilities/RPCAsyncSequence+Utilities.swift create mode 100644 Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift diff --git a/Sources/GRPCCore/Call/ClientRequest.swift b/Sources/GRPCCore/Call/ClientRequest.swift new file mode 100644 index 000000000..bc03d5a8f --- /dev/null +++ b/Sources/GRPCCore/Call/ClientRequest.swift @@ -0,0 +1,116 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A namespace for request message types used by clients. +public enum ClientRequest {} + +extension ClientRequest { + /// A request created by the client for a single message. + /// + /// This is used for unary and server-streaming RPCs. + /// + /// See ``ClientRequest/Stream`` for streaming requests and ``ServerRequest/Single`` for the + /// servers representation of a single-message request. + /// + /// ## Creating ``Single`` requests + /// + /// ```swift + /// let request = ClientRequest.Single(message: "Hello, gRPC!") + /// print(request.metadata) // prints '[:]' + /// print(request.message) // prints 'Hello, gRPC!' + /// ``` + public struct Single: Sendable { + /// Caller-specified metadata to send to the server at the start of the RPC. + /// + /// Both gRPC Swift and its transport layer may insert additional metadata. Keys prefixed with + /// "grpc-" are prohibited and may result in undefined behaviour. Transports may also insert + /// their own metadata, you should avoid using key names which may clash with transport specific + /// metadata. Note that transports may also impose limits in the amount of metadata which may + /// be sent. + public var metadata: Metadata + + /// The message to send to the server. + public var message: Message + + /// Create a new single client request. + /// + /// - Parameters: + /// - message: The message to send to the server. + /// - metadata: Metadata to send to the server at the start of the request. Defaults to empty. + public init( + message: Message, + metadata: Metadata = [:] + ) { + self.metadata = metadata + self.message = message + } + } +} + +extension ClientRequest { + /// A request created by the client for a stream of messages. + /// + /// This is used for client-streaming and bidirectional-streaming RPCs. + /// + /// See ``ClientRequest/Single`` for single-message requests and ``ServerRequest/Stream`` for the + /// servers representation of a streaming-message request. + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + public struct Stream: Sendable { + /// Caller-specified metadata sent to the server at the start of the RPC. + /// + /// Both gRPC Swift and its transport layer may insert additional metadata. Keys prefixed with + /// "grpc-" are prohibited and may result in undefined behaviour. Transports may also insert + /// their own metadata, you should avoid using key names which may clash with transport specific + /// metadata. Note that transports may also impose limits in the amount of metadata which may + /// be sent. + public var metadata: Metadata + + /// A closure which, when called, writes messages in the writer. + /// + /// The producer will only be consumed once by gRPC and therefore isn't required to be + /// idempotent. If the producer throws an error then the RPC will be cancelled. Once the + /// producer returns the request stream is closed. + public var producer: @Sendable (RPCWriter) async throws -> Void + + /// Create a new streaming client request. + /// + /// - Parameters: + /// - messageType: The type of message contained in this request, defaults to `Message.self`. + /// - metadata: Metadata to send to the server at the start of the request. Defaults to empty. + /// - producer: A closure which writes messages to send to the server. The closure is called + /// at most once and may not be called. + public init( + of messageType: Message.Type = Message.self, + metadata: Metadata = [:], + producer: @escaping @Sendable (RPCWriter) async throws -> Void + ) { + self.metadata = metadata + self.producer = producer + } + } +} + +// MARK: - Conversion + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension ClientRequest.Stream { + @_spi(Testing) + public init(single request: ClientRequest.Single) { + self.init(metadata: request.metadata) { + try await $0.write(request.message) + } + } +} diff --git a/Sources/GRPCCore/Call/ClientResponse.swift b/Sources/GRPCCore/Call/ClientResponse.swift new file mode 100644 index 000000000..f384ac80a --- /dev/null +++ b/Sources/GRPCCore/Call/ClientResponse.swift @@ -0,0 +1,436 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// A namespace for response message types used by clients. +public enum ClientResponse {} + +extension ClientResponse { + /// A response for a single message received by a client. + /// + /// Single responses are used for unary and client-streaming RPCs. For streaming responses + /// see ``ClientResponse/Stream``. + /// + /// A single response captures every part of the response stream and distinguishes successful + /// and unsuccessful responses via the ``accepted`` property. The value for the `success` case + /// contains the initial metadata, response message, and the trailing metadata and implicitly + /// has an ``Status/Code-swift.struct/ok`` status code. + /// + /// The `failure` case indicates that the server chose not to process the RPC, or the processing + /// of the RPC failed, or the client failed to execute the request. The failure case contains + /// an ``RPCError`` describing why the RPC failed, including an error code, error message and any + /// metadata sent by the server. + /// + /// ### Using ``Single`` responses + /// + /// Each response has a ``accepted`` property which contains all RPC information. You can create + /// one by calling ``init(accepted:)`` or one of the two convenience initializers: + /// - ``init(message:metadata:trailingMetadata:)`` to create a successful response, or + /// - ``init(of:error:)`` to create a failed response. + /// + /// You can interrogate a response by inspecting the ``accepted`` property directly or by using + /// its convenience properties: + /// - ``metadata`` extracts the initial metadata, + /// - ``message`` extracts the message, or throws if the response failed, and + /// - ``trailingMetadata`` extracts the trailing metadata. + /// + /// The following example demonstrates how you can use the API: + /// + /// ```swift + /// // Create a successful response + /// let response = ClientResponse.Single( + /// message: "Hello, World!", + /// metadata: ["hello": "initial metadata"], + /// trailingMetadata: ["goodbye": "trailing metadata"] + /// ) + /// + /// // The explicit API: + /// switch response { + /// case .success(let contents): + /// print("Received response with message '\(contents.message)'") + /// case .failure(let error): + /// print("RPC failed with code '\(error.code)'") + /// } + /// + /// // The convenience API: + /// do { + /// print("Received response with message '\(try response.message)'") + /// } catch let error as RPCError { + /// print("RPC failed with code '\(error.code)'") + /// } + /// ``` + public struct Single: Sendable { + /// The contents of an accepted response with a single message. + public struct Contents: Sendable { + /// Metadata received from the server at the beginning of the response. + /// + /// The metadata may contain transport-specific information in addition to any application + /// level metadata provided by the service. + public var metadata: Metadata + + /// The response message received from the server. + public var message: Message + + /// Metadata received from the server at the end of the response. + /// + /// The metadata may contain transport-specific information in addition to any application + /// level metadata provided by the service. + public var trailingMetadata: Metadata + + /// Creates a `Contents`. + /// + /// - Parameters: + /// - metadata: Metadata received from the server at the beginning of the response. + /// - message: The response message received from the server. + /// - trailingMetadata: Metadata received from the server at the end of the response. + public init( + metadata: Metadata, + message: Message, + trailingMetadata: Metadata + ) { + self.metadata = metadata + self.message = message + self.trailingMetadata = trailingMetadata + } + } + + /// Whether the RPC was accepted or rejected. + /// + /// The `success` case indicates the RPC completed successfully with an + /// ``Status/Code-swift.struct/ok`` status code. The `failure` case indicates that the RPC was + /// rejected by the server and wasn't processed or couldn't be processed successfully. + public var accepted: Result + + /// Creates a new response. + /// + /// - Parameter accepted: The result of the RPC. + public init(accepted: Result) { + self.accepted = accepted + } + } +} + +extension ClientResponse { + /// A response for a stream of messages received by a client. + /// + /// Stream responses are used for server-streaming and bidirectional-streaming RPCs. For single + /// responses see ``ClientResponse/Single``. + /// + /// A stream response captures every part of the response stream over time and distinguishes + /// accepted and rejected requests via the ``accepted`` property. An "accepted" request is one + /// where the the server responds with initial metadata and attempts to process the request. A + /// "rejected" request is one where the server responds with a status as the first and only + /// response part and doesn't process the request body. + /// + /// The value for the `success` case contains the initial metadata and a ``RPCAsyncSequence`` of + /// message parts (messages followed by a single status). If the sequence completes without + /// throwing then the response implicitly has an ``Status/Code-swift.struct/ok`` status code. + /// However, the response sequence may also throw an ``RPCError`` if the server fails to complete + /// processing the request. + /// + /// The `failure` case indicates that the server chose not to process the RPC or the client failed + /// to execute the request. The failure case contains an ``RPCError`` describing why the RPC + /// failed, including an error code, error message and any metadata sent by the server. + /// + /// ### Using ``Stream`` responses + /// + /// Each response has a ``accepted`` property which contains RPC information. You can create + /// one by calling ``init(accepted:)`` or one of the two convenience initializers: + /// - ``init(of:metadata:bodyParts:)`` to create an accepted response, or + /// - ``init(of:error:)`` to create a failed response. + /// + /// You can interrogate a response by inspecting the ``accepted`` property directly or by using + /// its convenience properties: + /// - ``metadata`` extracts the initial metadata, + /// - ``messages`` extracts the sequence of response message, or throws if the response failed. + /// + /// The following example demonstrates how you can use the API: + /// + /// ```swift + /// // Create a failed response + /// let response = ClientResponse.Stream( + /// of: String.self, + /// error: RPCError(code: .notFound, message: "The requested resource couldn't be located") + /// ) + /// + /// // The explicit API: + /// switch response { + /// case .success(let contents): + /// for try await part in contents.bodyParts { + /// switch part { + /// case .message(let message): + /// print("Received message '\(message)'") + /// case .trailingMetadata(let metadata): + /// print("Received trailing metadata '\(metadata)'") + /// } + /// } + /// case .failure(let error): + /// print("RPC failed with code '\(error.code)'") + /// } + /// + /// // The convenience API: + /// do { + /// for try await message in response.messages { + /// print("Received message '\(message)'") + /// } + /// } catch let error as RPCError { + /// print("RPC failed with code '\(error.code)'") + /// } + /// ``` + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + public struct Stream: Sendable { + public struct Contents: Sendable { + /// Metadata received from the server at the beginning of the response. + /// + /// The metadata may contain transport-specific information in addition to any application + /// level metadata provided by the service. + public var metadata: Metadata + + /// A sequence of stream parts received from the server ending with metadata if the RPC + /// succeeded. + /// + /// If the RPC fails then the sequence will throw an ``RPCError``. + /// + /// The sequence may only be iterated once. + public var bodyParts: RPCAsyncSequence + + /// Parts received from the server. + public enum BodyPart: Sendable { + /// A response message. + case message(Message) + /// Metadata. Must be the final value of the sequence unless the stream throws an error. + case trailingMetadata(Metadata) + } + + /// Creates a ``Contents``. + /// + /// - Parameters: + /// - metadata: Metadata received from the server at the beginning of the response. + /// - bodyParts: An `AsyncSequence` of parts received from the server. + public init( + metadata: Metadata, + bodyParts: RPCAsyncSequence + ) { + self.metadata = metadata + self.bodyParts = bodyParts + } + } + + /// Whether the RPC was accepted or rejected. + /// + /// The `success` case indicates the RPC was accepted by the server for + /// processing, however, the RPC may still fail by throwing an error from its + /// `messages` sequence. The `failure` case indicates that the RPC was + /// rejected by the server. + public var accepted: Result + + /// Creates a new response. + /// + /// - Parameter accepted: The result of the RPC. + public init(accepted: Result) { + self.accepted = accepted + } + } +} + +// MARK: - Convenience API + +extension ClientResponse.Single { + /// Creates a new accepted response. + /// + /// - Parameters: + /// - metadata: Metadata received from the server at the beginning of the response. + /// - message: The response message received from the server. + /// - trailingMetadata: Metadata received from the server at the end of the response. + public init(message: Message, metadata: Metadata = [:], trailingMetadata: Metadata = [:]) { + let contents = Contents( + metadata: metadata, + message: message, + trailingMetadata: trailingMetadata + ) + self.accepted = .success(contents) + } + + /// Creates a new failed response. + /// + /// - Parameters: + /// - messageType: The type of message. + /// - error: An error describing why the RPC failed. + public init(of messageType: Message.Type = Message.self, error: RPCError) { + self.accepted = .failure(error) + } + + /// Returns metadata received from the server at the start of the response. + /// + /// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty. + public var metadata: Metadata { + switch self.accepted { + case let .success(contents): + return contents.metadata + case .failure: + return [:] + } + } + + /// Returns the message received from the server. + /// + /// - Throws: ``RPCError`` if the request failed. + public var message: Message { + get throws { + try self.accepted.map { $0.message }.get() + } + } + + /// Returns metadata received from the server at the end of the response. + /// + /// Unlike ``metadata``, for rejected RPCs the metadata returned may contain values. + public var trailingMetadata: Metadata { + switch self.accepted { + case let .success(contents): + return contents.trailingMetadata + case let .failure(error): + return error.metadata + } + } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension ClientResponse.Stream { + /// Creates a new accepted response. + /// + /// - Parameters: + /// - messageType: The type of message. + /// - metadata: Metadata received from the server at the beginning of the response. + /// - bodyParts: An ``RPCAsyncSequence`` of response parts received from the server. + public init( + of messageType: Message.Type = Message.self, + metadata: Metadata, + bodyParts: RPCAsyncSequence + ) { + let contents = Contents(metadata: metadata, bodyParts: bodyParts) + self.accepted = .success(contents) + } + + /// Creates a new failed response. + /// + /// - Parameters: + /// - messageType: The type of message. + /// - error: An error describing why the RPC failed. + public init(of messageType: Message.Type = Message.self, error: RPCError) { + self.accepted = .failure(error) + } + + /// Returns metadata received from the server at the start of the response. + /// + /// For rejected RPCs (in other words, where ``accepted`` is `failure`) the metadata is empty. + public var metadata: Metadata { + switch self.accepted { + case let .success(contents): + return contents.metadata + case .failure: + return [:] + } + } + + /// Returns metadata received from the server at the end of the response. + /// + /// Unlike ``metadata``, for rejected RPCs the metadata returned may contain values. + public var messages: RPCAsyncSequence { + switch self.accepted { + case let .success(contents): + let filtered = contents.bodyParts.compactMap { + switch $0 { + case let .message(message): + return message + case .trailingMetadata: + return nil + } + } + + return RPCAsyncSequence(wrapping: filtered) + + case let .failure(error): + return RPCAsyncSequence.throwing(error) + } + } +} + +// MARK: - Conversion + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension ClientResponse.Single { + @_spi(Testing) + public init(stream response: ClientResponse.Stream) async { + switch response.accepted { + case .success(let contents): + do { + let metadata = contents.metadata + var iterator = contents.bodyParts.makeAsyncIterator() + + // Happy path: message, trailing metadata, nil. + let part1 = try await iterator.next() + let part2 = try await iterator.next() + let part3 = try await iterator.next() + + switch (part1, part2, part3) { + case (.some(.message(let message)), .some(.trailingMetadata(let trailingMetadata)), .none): + let contents = Contents( + metadata: metadata, + message: message, + trailingMetadata: trailingMetadata + ) + self.accepted = .success(contents) + + case (.some(.message), .some(.message), _): + let error = RPCError( + code: .unimplemented, + message: """ + Multiple messages received, but only one is expected. The server may have \ + incorrectly implemented the RPC or the client and server may have a different \ + opinion on whether this RPC streams responses. + """ + ) + self.accepted = .failure(error) + + case (.some(.trailingMetadata), .none, .none): + let error = RPCError( + code: .unimplemented, + message: "No messages received, exactly one was expected." + ) + self.accepted = .failure(error) + + case (_, _, _): + let error = RPCError( + code: .internalError, + message: """ + The stream from the client transport is invalid. This is likely to be an incorrectly \ + implemented transport. Received parts: \([part1, part2, part3])." + """ + ) + self.accepted = .failure(error) + } + } catch let error as RPCError { + // Known error type. + self.accepted = .failure(error) + } catch { + // Unexpected, but should be handled nonetheless. + self.accepted = .failure(RPCError(code: .unknown, message: String(describing: error))) + } + + case .failure(let error): + self.accepted = .failure(error) + } + } +} diff --git a/Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift b/Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift new file mode 100644 index 000000000..b0b794e9a --- /dev/null +++ b/Sources/GRPCCore/Stream/AsyncSequenceOfOne.swift @@ -0,0 +1,58 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension RPCAsyncSequence { + /// Returns an ``RPCAsyncSequence`` containing just the given element. + @_spi(Testing) + public static func one(_ element: Element) -> Self { + return Self(wrapping: AsyncSequenceOfOne(result: .success(element))) + } + + /// Returns an ``RPCAsyncSequence`` throwing the given error. + @_spi(Testing) + public static func throwing(_ error: E) -> Self { + return Self(wrapping: AsyncSequenceOfOne(result: .failure(error))) + } +} + +/// An `AsyncSequence` of a single value. +private struct AsyncSequenceOfOne: AsyncSequence { + private let result: Result + + init(result: Result) { + self.result = result + } + + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(result: self.result) + } + + struct AsyncIterator: AsyncIteratorProtocol { + private var result: Result? + + fileprivate init(result: Result) { + self.result = result + } + + mutating func next() async throws -> Element? { + guard let result = self.result else { return nil } + + self.result = nil + return try result.get() + } + } +} diff --git a/Tests/GRPCCoreTests/Call/ClientRequestTests.swift b/Tests/GRPCCoreTests/Call/ClientRequestTests.swift new file mode 100644 index 000000000..a45f11621 --- /dev/null +++ b/Tests/GRPCCoreTests/Call/ClientRequestTests.swift @@ -0,0 +1,31 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@_spi(Testing) import GRPCCore +import XCTest + +final class ClientRequestTests: XCTestCase { + func testSingleToStreamConversion() async throws { + let (messages, continuation) = AsyncStream.makeStream(of: String.self) + let single = ClientRequest.Single(message: "foo", metadata: ["bar": "baz"]) + let stream = ClientRequest.Stream(single: single) + + XCTAssertEqual(stream.metadata, ["bar": "baz"]) + try await stream.producer(.gathering(into: continuation)) + continuation.finish() + let collected = try await messages.collect() + XCTAssertEqual(collected, ["foo"]) + } +} diff --git a/Tests/GRPCCoreTests/Call/ClientResponseTests.swift b/Tests/GRPCCoreTests/Call/ClientResponseTests.swift new file mode 100644 index 000000000..5444075fe --- /dev/null +++ b/Tests/GRPCCoreTests/Call/ClientResponseTests.swift @@ -0,0 +1,171 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@_spi(Testing) import GRPCCore +import XCTest + +final class ClientResponseTests: XCTestCase { + func testAcceptedSingleResponseConvenienceMethods() { + let response = ClientResponse.Single( + message: "message", + metadata: ["foo": "bar"], + trailingMetadata: ["bar": "baz"] + ) + + XCTAssertEqual(response.metadata, ["foo": "bar"]) + XCTAssertEqual(try response.message, "message") + XCTAssertEqual(response.trailingMetadata, ["bar": "baz"]) + } + + func testRejectedSingleResponseConvenienceMethods() { + let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"]) + let response = ClientResponse.Single(of: String.self, error: error) + + XCTAssertEqual(response.metadata, [:]) + XCTAssertThrowsRPCError(try response.message) { + XCTAssertEqual($0, error) + } + XCTAssertEqual(response.trailingMetadata, ["bar": "baz"]) + } + + func testAcceptedStreamResponseConvenienceMethods() async throws { + let response = ClientResponse.Stream( + of: String.self, + metadata: ["foo": "bar"], + bodyParts: RPCAsyncSequence( + wrapping: AsyncStream { + $0.yield(.message("foo")) + $0.yield(.message("bar")) + $0.yield(.message("baz")) + $0.yield(.trailingMetadata(["baz": "baz"])) + $0.finish() + } + ) + ) + + XCTAssertEqual(response.metadata, ["foo": "bar"]) + let messages = try await response.messages.collect() + XCTAssertEqual(messages, ["foo", "bar", "baz"]) + } + + func testRejectedStreamResponseConvenienceMethods() async throws { + let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"]) + let response = ClientResponse.Stream(of: String.self, error: error) + + XCTAssertEqual(response.metadata, [:]) + await XCTAssertThrowsRPCErrorAsync { + try await response.messages.collect() + } errorHandler: { + XCTAssertEqual($0, error) + } + } + + func testStreamToSingleConversionForValidStream() async throws { + let stream = ClientResponse.Stream( + of: String.self, + metadata: ["foo": "bar"], + bodyParts: .elements(.message("foo"), .trailingMetadata(["bar": "baz"])) + ) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertEqual(single.metadata, ["foo": "bar"]) + XCTAssertEqual(try single.message, "foo") + XCTAssertEqual(single.trailingMetadata, ["bar": "baz"]) + } + + func testStreamToSingleConversionForFailedStream() async throws { + let error = RPCError(code: .aborted, message: "aborted", metadata: ["bar": "baz"]) + let stream = ClientResponse.Stream(of: String.self, error: error) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertEqual(single.metadata, [:]) + XCTAssertThrowsRPCError(try single.message) { + XCTAssertEqual($0, error) + } + XCTAssertEqual(single.trailingMetadata, ["bar": "baz"]) + } + + func testStreamToSingleConversionForInvalidSingleStream() async throws { + let bodies: [[ClientResponse.Stream.Contents.BodyPart]] = [ + [.message("1"), .message("2")], // Too many messages. + [.trailingMetadata([:])], // Too few messages + ] + + for body in bodies { + let stream = ClientResponse.Stream( + of: String.self, + metadata: ["foo": "bar"], + bodyParts: .elements(body) + ) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertEqual(single.metadata, [:]) + XCTAssertThrowsRPCError(try single.message) { error in + XCTAssertEqual(error.code, .unimplemented) + } + XCTAssertEqual(single.trailingMetadata, [:]) + } + } + + func testStreamToSingleConversionForInvalidStream() async throws { + let bodies: [[ClientResponse.Stream.Contents.BodyPart]] = [ + [], // Empty stream + [.trailingMetadata([:]), .trailingMetadata([:])], // Multiple metadatas + [.trailingMetadata([:]), .message("")], // Metadata then message + ] + + for body in bodies { + let stream = ClientResponse.Stream( + of: String.self, + metadata: ["foo": "bar"], + bodyParts: .elements(body) + ) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertEqual(single.metadata, [:]) + XCTAssertThrowsRPCError(try single.message) { error in + XCTAssertEqual(error.code, .internalError) + } + XCTAssertEqual(single.trailingMetadata, [:]) + } + } + + func testStreamToSingleConversionForStreamThrowingRPCError() async throws { + let error = RPCError(code: .dataLoss, message: "oops") + let stream = ClientResponse.Stream( + of: String.self, + metadata: [:], + bodyParts: .throwing(error) + ) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertThrowsRPCError(try single.message) { + XCTAssertEqual($0, error) + } + } + + func testStreamToSingleConversionForStreamThrowingUnknownError() async throws { + let stream = ClientResponse.Stream( + of: String.self, + metadata: [:], + bodyParts: .throwing(CancellationError()) + ) + + let single = await ClientResponse.Single(stream: stream) + XCTAssertThrowsRPCError(try single.message) { error in + XCTAssertEqual(error.code, .unknown) + } + } +} diff --git a/Tests/GRPCCoreTests/Stream/AsyncSequenceOfOne.swift b/Tests/GRPCCoreTests/Stream/AsyncSequenceOfOne.swift new file mode 100644 index 000000000..00f9d4410 --- /dev/null +++ b/Tests/GRPCCoreTests/Stream/AsyncSequenceOfOne.swift @@ -0,0 +1,39 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@_spi(Testing) import GRPCCore +import XCTest + +internal final class AsyncSequenceOfOneTests: XCTestCase { + func testSuccessPath() async throws { + let sequence = RPCAsyncSequence.one("foo") + let contents = try await sequence.collect() + XCTAssertEqual(contents, ["foo"]) + } + + func testFailurePath() async throws { + let sequence = RPCAsyncSequence.throwing(RPCError(code: .cancelled, message: "foo")) + + do { + let _ = try await sequence.collect() + XCTFail("Expected an error to be thrown") + } catch let error as RPCError { + XCTAssertEqual(error.code, .cancelled) + XCTAssertEqual(error.message, "foo") + } catch { + XCTFail("Expected error of type RPCError to be thrown") + } + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/AsyncSequence+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/AsyncSequence+Utilities.swift new file mode 100644 index 000000000..2acd24bb6 --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/AsyncSequence+Utilities.swift @@ -0,0 +1,36 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +extension AsyncSequence { + func collect() async throws -> [Element] { + return try await self.reduce(into: []) { $0.append($1) } + } +} + +#if swift(<5.9) +extension AsyncStream { + static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(Element.self, bufferingPolicy: limit) { + continuation = $0 + } + return (stream, continuation) + } +} +#endif diff --git a/Tests/GRPCCoreTests/Test Utilities/RPCAsyncSequence+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/RPCAsyncSequence+Utilities.swift new file mode 100644 index 000000000..26bef603a --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/RPCAsyncSequence+Utilities.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPCCore + +extension RPCAsyncSequence { + static func elements(_ elements: Element...) -> Self { + return .elements(elements) + } + + static func elements(_ elements: [Element]) -> Self { + let stream = AsyncStream { + for element in elements { + $0.yield(element) + } + $0.finish() + } + return RPCAsyncSequence(wrapping: stream) + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift new file mode 100644 index 000000000..e334dceb2 --- /dev/null +++ b/Tests/GRPCCoreTests/Test Utilities/RPCWriter+Utilities.swift @@ -0,0 +1,49 @@ +/* + * Copyright 2023, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPCCore +import XCTest + +extension RPCWriter { + /// Returns a writer which calls `XCTFail(_:)` on every write. + static func failTestOnWrite(elementType: Element.Type = Element.self) -> Self { + return RPCWriter(wrapping: FailOnWrite()) + } + + /// Returns a writer which gathers writes into an `AsyncStream`. + static func gathering(into continuation: AsyncStream.Continuation) -> Self { + return RPCWriter(wrapping: AsyncStreamGatheringWriter(continuation: continuation)) + } +} + +private struct FailOnWrite: RPCWriterProtocol { + func write(contentsOf elements: some Sequence) async throws { + XCTFail("Unexpected write") + } +} + +private struct AsyncStreamGatheringWriter: RPCWriterProtocol { + let continuation: AsyncStream.Continuation + + init(continuation: AsyncStream.Continuation) { + self.continuation = continuation + } + + func write(contentsOf elements: some Sequence) async throws { + for element in elements { + self.continuation.yield(element) + } + } +} diff --git a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift index 275396d4f..aa51a4b98 100644 --- a/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift +++ b/Tests/GRPCCoreTests/Test Utilities/XCTest+Utilities.swift @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import GRPCCore import XCTest func XCTAssertDescription( @@ -23,3 +24,30 @@ func XCTAssertDescription( ) { XCTAssertEqual(String(describing: subject), expected, file: file, line: line) } + +func XCTAssertThrowsRPCError( + _ expression: @autoclosure () throws -> T, + _ errorHandler: (RPCError) -> Void +) { + XCTAssertThrowsError(try expression()) { error in + guard let error = error as? RPCError else { + return XCTFail("Error had unexpected type '\(type(of: error))'") + } + + errorHandler(error) + } +} + +func XCTAssertThrowsRPCErrorAsync( + _ expression: () async throws -> T, + errorHandler: (RPCError) -> Void +) async { + do { + _ = try await expression() + XCTFail("Expression didn't throw") + } catch let error as RPCError { + errorHandler(error) + } catch { + XCTFail("Error had unexpected type '\(type(of: error))'") + } +}