From 8af9acf3ccd666111f03040467d9891fc0c3b76a Mon Sep 17 00:00:00 2001 From: Jin Xu Date: Tue, 17 Dec 2024 13:25:12 -0800 Subject: [PATCH] fix: add repeat interval on EventPipeline to prevent high frequency requests on continuing failing (#250) * fix: add repeat interval on EventPipeline to prevent high-frequency requests on continuing failing --- Sources/Amplitude/Types.swift | 13 ++++ .../Amplitude/Utilities/EventPipeline.swift | 45 +++++++++-- .../PersistentStorageResponseHandler.swift | 76 +++++++++++++----- .../Supports/TestUtilities.swift | 17 +++- .../Utilities/EventPipelineTests.swift | 77 +++++++++++++++++++ ...ersistentStorageResponseHandlerTests.swift | 4 +- 6 files changed, 198 insertions(+), 34 deletions(-) diff --git a/Sources/Amplitude/Types.swift b/Sources/Amplitude/Types.swift index 06a228b9..4bb8ab14 100644 --- a/Sources/Amplitude/Types.swift +++ b/Sources/Amplitude/Types.swift @@ -157,6 +157,11 @@ public protocol ResponseHandler { func handleTooManyRequestsResponse(data: [String: Any]) func handleTimeoutResponse(data: [String: Any]) func handleFailedResponse(data: [String: Any]) + + // Added on v1.11.2. + // A replacement for handle(result: Result) -> Void + // Return true if some attempts to recover are implemented + func handle(result: Result) -> Bool } extension ResponseHandler { @@ -170,3 +175,11 @@ extension ResponseHandler { return indices } } + +// Provide compatibility for new `handle` function added on v1.11.2. +extension ResponseHandler { + public func handle(result: Result) -> Bool { + let _: Void = handle(result: result) + return false + } +} diff --git a/Sources/Amplitude/Utilities/EventPipeline.swift b/Sources/Amplitude/Utilities/EventPipeline.swift index 611931a8..ccd509ea 100644 --- a/Sources/Amplitude/Utilities/EventPipeline.swift +++ b/Sources/Amplitude/Utilities/EventPipeline.swift @@ -12,6 +12,8 @@ public class EventPipeline { let storage: Storage? let logger: (any Logger)? let configuration: Configuration + var maxRetryInterval: TimeInterval = 60 + var maxRetryCount: Int = 6 @Atomic internal var eventCount: Int = 0 internal var flushTimer: QueueTimer? @@ -66,7 +68,7 @@ public class EventPipeline { } } - private func sendNextEventFile() { + private func sendNextEventFile(failures: Int = 0) { autoreleasepool { guard currentUpload == nil else { logger?.log(message: "Existing upload in progress, skipping...") @@ -100,14 +102,41 @@ public class EventPipeline { eventBlock: nextEventFile, eventsString: eventsString ) - responseHandler.handle(result: result) - // Don't send the next event file if we're being deallocated - self.uploadsQueue.async { [weak self] in - guard let self = self else { - return + let handled: Bool = responseHandler.handle(result: result) + var failures = failures + + switch result { + case .success: + failures = 0 + case .failure: + if !handled { + failures += 1 + } + } + + if failures > self.maxRetryCount { + self.uploadsQueue.async { + self.currentUpload = nil + } + self.configuration.offline = true + self.logger?.log(message: "Request failed more than \(self.maxRetryCount) times, marking offline") + } else { + // Don't send the next event file if we're being deallocated + let nextFileBlock: () -> Void = { [weak self] in + guard let self = self else { + return + } + self.currentUpload = nil + self.sendNextEventFile(failures: failures) + } + + if failures == 0 || handled { + self.uploadsQueue.async(execute: nextFileBlock) + } else { + let sendingInterval = min(self.maxRetryInterval, pow(2, Double(failures - 1))) + self.uploadsQueue.asyncAfter(deadline: .now() + sendingInterval, execute: nextFileBlock) + self.logger?.debug(message: "Request failed \(failures) times, send next event file in \(sendingInterval) seconds") } - self.currentUpload = nil - self.sendNextEventFile() } } } diff --git a/Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift b/Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift index 2d4ea0ca..dcdcef3d 100644 --- a/Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift +++ b/Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift @@ -28,21 +28,22 @@ class PersistentStorageResponseHandler: ResponseHandler { self.eventsString = eventsString } - func handleSuccessResponse(code: Int) { + func handleSuccessResponse(code: Int) -> Bool { guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else { storage.remove(eventBlock: eventBlock) removeEventCallbackByEventsString(eventsString: eventsString) - return + return true } triggerEventsCallback(events: events, code: code, message: "Successfully send event") storage.remove(eventBlock: eventBlock) + return true } - func handleBadRequestResponse(data: [String: Any]) { + func handleBadRequestResponse(data: [String: Any]) -> Bool { guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else { storage.remove(eventBlock: eventBlock) removeEventCallbackByEventsString(eventsString: eventsString) - return + return true } let error = data["error"] as? String ?? "" @@ -55,7 +56,7 @@ class PersistentStorageResponseHandler: ResponseHandler { message: error ) storage.remove(eventBlock: eventBlock) - return + return true } var dropIndexes = Set() @@ -90,13 +91,14 @@ class PersistentStorageResponseHandler: ResponseHandler { } storage.remove(eventBlock: eventBlock) + return true } - func handlePayloadTooLargeResponse(data: [String: Any]) { + func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool { guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else { storage.remove(eventBlock: eventBlock) removeEventCallbackByEventsString(eventsString: eventsString) - return + return true } if events.count == 1 { let error = data["error"] as? String ?? "" @@ -106,28 +108,32 @@ class PersistentStorageResponseHandler: ResponseHandler { message: error ) storage.remove(eventBlock: eventBlock) - return + return true } storage.splitBlock(eventBlock: eventBlock, events: events) + return true } - func handleTooManyRequestsResponse(data: [String: Any]) { + func handleTooManyRequestsResponse(data: [String: Any]) -> Bool { // wait for next time to pick it up + return false } - func handleTimeoutResponse(data: [String: Any]) { + func handleTimeoutResponse(data: [String: Any]) -> Bool { // Wait for next time to pick it up + return false } - func handleFailedResponse(data: [String: Any]) { + func handleFailedResponse(data: [String: Any]) -> Bool { // wait for next time to try again + return false } - func handle(result: Result) { + func handle(result: Result) -> Bool { switch result { case .success(let code): // We don't care about the data when success - handleSuccessResponse(code: code) + return handleSuccessResponse(code: code) case .failure(let error): switch error { case HttpClient.Exception.httpError(let code, let data): @@ -137,20 +143,20 @@ class PersistentStorageResponseHandler: ResponseHandler { } switch code { case HttpClient.HttpStatus.BAD_REQUEST.rawValue: - handleBadRequestResponse(data: json) + return handleBadRequestResponse(data: json) case HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue: - handlePayloadTooLargeResponse(data: json) + return handlePayloadTooLargeResponse(data: json) case HttpClient.HttpStatus.TIMEOUT.rawValue: - handleTimeoutResponse(data: json) + return handleTimeoutResponse(data: json) case HttpClient.HttpStatus.TOO_MANY_REQUESTS.rawValue: - handleTooManyRequestsResponse(data: json) + return handleTooManyRequestsResponse(data: json) case HttpClient.HttpStatus.FAILED.rawValue: - handleFailedResponse(data: json) + return handleFailedResponse(data: json) default: - handleFailedResponse(data: json) + return handleFailedResponse(data: json) } default: - break + return false } } } @@ -184,3 +190,33 @@ extension PersistentStorageResponseHandler { } } } + +extension PersistentStorageResponseHandler { + func handle(result: Result) { + let _: Bool = handle(result: result) + } + + func handleSuccessResponse(code: Int) { + let _: Bool = handleSuccessResponse(code: code) + } + + func handleBadRequestResponse(data: [String: Any]) { + let _: Bool = handleBadRequestResponse(data: data) + } + + func handlePayloadTooLargeResponse(data: [String: Any]) { + let _: Bool = handlePayloadTooLargeResponse(data: data) + } + + func handleTooManyRequestsResponse(data: [String: Any]) { + let _: Bool = handleTooManyRequestsResponse(data: data) + } + + func handleTimeoutResponse(data: [String: Any]) { + let _: Bool = handleTimeoutResponse(data: data) + } + + func handleFailedResponse(data: [String: Any]) { + let _: Bool = handleFailedResponse(data: data) + } +} diff --git a/Tests/AmplitudeTests/Supports/TestUtilities.swift b/Tests/AmplitudeTests/Supports/TestUtilities.swift index 08b4c918..abb5c1ae 100644 --- a/Tests/AmplitudeTests/Supports/TestUtilities.swift +++ b/Tests/AmplitudeTests/Supports/TestUtilities.swift @@ -186,17 +186,26 @@ class FakeResponseHandler: ResponseHandler { self.eventsString = eventsString } - func handle(result: Result) { + func handle(result: Result) -> Bool { switch result { case .success(let code): - handleSuccessResponse(code: code) + return handleSuccessResponse(code: code) default: - break + return false } } - func handleSuccessResponse(code: Int) { + func handleSuccessResponse(code: Int) -> Bool { storage.remove(eventBlock: eventBlock) + return true + } + + func handle(result: Result) { + let _: Bool = handle(result: result) + } + + func handleSuccessResponse(code: Int) { + let _: Bool = handleSuccessResponse(code: code) } func handleBadRequestResponse(data: [String: Any]) { diff --git a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift index b480d1a0..99bd19cc 100644 --- a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift +++ b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift @@ -183,4 +183,81 @@ final class EventPipelineTests: XCTestCase { XCTAssertEqual(uploadedEvents1?.count, 1) XCTAssertEqual(uploadedEvents1?[0].eventType, "testEvent-1") } + + // test continues to fail until the event is uploaded + func testContinuousFailure() { + pipeline.configuration.offline = false + pipeline.maxRetryCount = 2 + + let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent) + + let uploadExpectations = (0..<4).map { i in expectation(description: "httpresponse-\(i)") } + httpClient.uploadExpectations = uploadExpectations + + httpClient.uploadResults = [ + .failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // instant failure + .failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +1s failure + .failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +2s failure, go offline + .success(200) + ] + + pipeline.flush() + wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 2) + + XCTAssertEqual(httpClient.uploadCount, 2) + XCTAssertEqual(pipeline.configuration.offline, false) + + wait(for: [uploadExpectations[2]], timeout: 3) + + XCTAssertEqual(httpClient.uploadCount, 3) + XCTAssertEqual(pipeline.configuration.offline, true) + + pipeline.configuration.offline = false + let flushExpectation = expectation(description: "flush") + pipeline.flush { + flushExpectation.fulfill() + } + wait(for: [uploadExpectations[3], flushExpectation], timeout: 1) + + XCTAssertEqual(httpClient.uploadCount, 4) + } + + func testContinuesHandledFailure() { + pipeline.configuration.offline = false + pipeline.maxRetryCount = 1 + + let testEvent1 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent1) + pipeline.storage?.rollover() + + let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent2) + pipeline.storage?.rollover() + + let testEvent3 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent3) + pipeline.storage?.rollover() + + let uploadExpectations = (0..<3).map { i in expectation(description: "httpresponse-\(i)") } + httpClient.uploadExpectations = uploadExpectations + + httpClient.uploadResults = [ + .failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.BAD_REQUEST.rawValue, data: nil)), + .failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue, data: nil)), + .success(200), + ] + + let flushExpectation = expectation(description: "flush") + pipeline.flush { + flushExpectation.fulfill() + } + wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 1) + XCTAssertEqual(httpClient.uploadCount, 2) + XCTAssertEqual(pipeline.configuration.offline, false) + + wait(for: [uploadExpectations[2], flushExpectation], timeout: 1) + XCTAssertEqual(httpClient.uploadCount, 3) + XCTAssertEqual(pipeline.configuration.offline, false) + } } diff --git a/Tests/AmplitudeTests/Utilities/PersistentStorageResponseHandlerTests.swift b/Tests/AmplitudeTests/Utilities/PersistentStorageResponseHandlerTests.swift index f4a2693a..c54e11fe 100644 --- a/Tests/AmplitudeTests/Utilities/PersistentStorageResponseHandlerTests.swift +++ b/Tests/AmplitudeTests/Utilities/PersistentStorageResponseHandlerTests.swift @@ -119,7 +119,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase { eventsString: eventsString ) - handler.handleSuccessResponse(code: 200) + let _: Bool = handler.handleSuccessResponse(code: 200) XCTAssertEqual( fakePersistentStorage.haveBeenCalledWith[0], "remove(eventBlock: \(eventBlock.absoluteURL))" @@ -150,7 +150,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase { eventsString: eventsString ) - handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"]) + let _: Bool = handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"]) XCTAssertEqual( fakePersistentStorage.haveBeenCalledWith[0], "remove(eventBlock: \(eventBlock.absoluteURL))"