Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add repeat interval on EventPipeline to prevent high frequency requests on continuing failing #250

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Sources/Amplitude/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public protocol ResponseHandler {
func handleTooManyRequestsResponse(data: [String: Any])
func handleTimeoutResponse(data: [String: Any])
func handleFailedResponse(data: [String: Any])

// Added on v1.11.2.
// A replacement for handle(result: Result<Int, Error>) -> Void
// Return true if some attempts to recover are implemented
func handle(result: Result<Int, Error>) -> Bool
}

extension ResponseHandler {
Expand All @@ -170,3 +175,11 @@ extension ResponseHandler {
return indices
}
}

// Provide compatibility for new `handle` function added on v1.11.2.
extension ResponseHandler {
public func handle(result: Result<Int, any Error>) -> Bool {
let _: Void = handle(result: result)
return false
}
}
45 changes: 37 additions & 8 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class EventPipeline {
let storage: Storage?
let logger: (any Logger)?
let configuration: Configuration
var maxRetryInterval: TimeInterval = 60
var maxRetryCount: Int = 6
sojingle marked this conversation as resolved.
Show resolved Hide resolved

@Atomic internal var eventCount: Int = 0
internal var flushTimer: QueueTimer?
Expand Down Expand Up @@ -66,7 +68,7 @@ public class EventPipeline {
}
}

private func sendNextEventFile() {
private func sendNextEventFile(failures: Int = 0) {
autoreleasepool {
guard currentUpload == nil else {
logger?.log(message: "Existing upload in progress, skipping...")
Expand Down Expand Up @@ -100,14 +102,41 @@ public class EventPipeline {
eventBlock: nextEventFile,
eventsString: eventsString
)
responseHandler.handle(result: result)
// Don't send the next event file if we're being deallocated
self.uploadsQueue.async { [weak self] in
guard let self = self else {
return
let handled: Bool = responseHandler.handle(result: result)
var failures = failures

switch result {
case .success:
failures = 0
case .failure:
if !handled {
failures += 1
}
}

if failures > self.maxRetryCount {
self.uploadsQueue.async {
self.currentUpload = nil
}
self.configuration.offline = true
self.logger?.log(message: "Request failed more than \(self.maxRetryCount) times, marking offline")
} else {
// Don't send the next event file if we're being deallocated
let nextFileBlock: () -> Void = { [weak self] in
guard let self = self else {
return
}
self.currentUpload = nil
self.sendNextEventFile(failures: failures)
}

if failures == 0 || handled {
self.uploadsQueue.async(execute: nextFileBlock)
} else {
let sendingInterval = min(self.maxRetryInterval, pow(2, Double(failures - 1)))
self.uploadsQueue.asyncAfter(deadline: .now() + sendingInterval, execute: nextFileBlock)
self.logger?.debug(message: "Request failed \(failures) times, send next event file in \(sendingInterval) seconds")
}
self.currentUpload = nil
self.sendNextEventFile()
}
}
}
Expand Down
76 changes: 56 additions & 20 deletions Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ class PersistentStorageResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
triggerEventsCallback(events: events, code: code, message: "Successfully send event")
storage.remove(eventBlock: eventBlock)
return true
}

func handleBadRequestResponse(data: [String: Any]) {
func handleBadRequestResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}

let error = data["error"] as? String ?? ""
Expand All @@ -55,7 +56,7 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}

var dropIndexes = Set<Int>()
Expand Down Expand Up @@ -90,13 +91,14 @@ class PersistentStorageResponseHandler: ResponseHandler {
}

storage.remove(eventBlock: eventBlock)
return true
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
if events.count == 1 {
let error = data["error"] as? String ?? ""
Expand All @@ -106,28 +108,32 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}
storage.splitBlock(eventBlock: eventBlock, events: events)
return true
}

func handleTooManyRequestsResponse(data: [String: Any]) {
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool {
// wait for next time to pick it up
return false
}

func handleTimeoutResponse(data: [String: Any]) {
func handleTimeoutResponse(data: [String: Any]) -> Bool {
// Wait for next time to pick it up
return false
}

func handleFailedResponse(data: [String: Any]) {
func handleFailedResponse(data: [String: Any]) -> Bool {
// wait for next time to try again
return false
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
crleona marked this conversation as resolved.
Show resolved Hide resolved
switch result {
case .success(let code):
// We don't care about the data when success
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
case .failure(let error):
switch error {
case HttpClient.Exception.httpError(let code, let data):
Expand All @@ -137,20 +143,20 @@ class PersistentStorageResponseHandler: ResponseHandler {
}
switch code {
case HttpClient.HttpStatus.BAD_REQUEST.rawValue:
handleBadRequestResponse(data: json)
return handleBadRequestResponse(data: json)
case HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue:
handlePayloadTooLargeResponse(data: json)
return handlePayloadTooLargeResponse(data: json)
case HttpClient.HttpStatus.TIMEOUT.rawValue:
handleTimeoutResponse(data: json)
return handleTimeoutResponse(data: json)
case HttpClient.HttpStatus.TOO_MANY_REQUESTS.rawValue:
handleTooManyRequestsResponse(data: json)
return handleTooManyRequestsResponse(data: json)
case HttpClient.HttpStatus.FAILED.rawValue:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
default:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
}
default:
break
return false
}
}
}
Expand Down Expand Up @@ -184,3 +190,33 @@ extension PersistentStorageResponseHandler {
}
}
}

extension PersistentStorageResponseHandler {
func handle(result: Result<Int, any Error>) {
let _: Bool = handle(result: result)
}

func handleSuccessResponse(code: Int) {
let _: Bool = handleSuccessResponse(code: code)
}

func handleBadRequestResponse(data: [String: Any]) {
let _: Bool = handleBadRequestResponse(data: data)
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
let _: Bool = handlePayloadTooLargeResponse(data: data)
}

func handleTooManyRequestsResponse(data: [String: Any]) {
let _: Bool = handleTooManyRequestsResponse(data: data)
}

func handleTimeoutResponse(data: [String: Any]) {
let _: Bool = handleTimeoutResponse(data: data)
}

func handleFailedResponse(data: [String: Any]) {
let _: Bool = handleFailedResponse(data: data)
}
}
17 changes: 13 additions & 4 deletions Tests/AmplitudeTests/Supports/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,26 @@ class FakeResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
switch result {
case .success(let code):
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
default:
break
return false
}
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
storage.remove(eventBlock: eventBlock)
return true
}

func handle(result: Result<Int, any Error>) {
let _: Bool = handle(result: result)
}

func handleSuccessResponse(code: Int) {
let _: Bool = handleSuccessResponse(code: code)
}

func handleBadRequestResponse(data: [String: Any]) {
Expand Down
77 changes: 77 additions & 0 deletions Tests/AmplitudeTests/Utilities/EventPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,81 @@ final class EventPipelineTests: XCTestCase {
XCTAssertEqual(uploadedEvents1?.count, 1)
XCTAssertEqual(uploadedEvents1?[0].eventType, "testEvent-1")
}

// test continues to fail until the event is uploaded
func testContinuousFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 2

let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)

let uploadExpectations = (0..<4).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // instant failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +1s failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +2s failure, go offline
.success(200)
]

pipeline.flush()
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 2)

XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.configuration.offline, false)

wait(for: [uploadExpectations[2]], timeout: 3)

XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.configuration.offline, true)

pipeline.configuration.offline = false
let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[3], flushExpectation], timeout: 1)

XCTAssertEqual(httpClient.uploadCount, 4)
}

func testContinuesHandledFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 1

let testEvent1 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent1)
pipeline.storage?.rollover()

let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent2)
pipeline.storage?.rollover()

let testEvent3 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent3)
pipeline.storage?.rollover()

let uploadExpectations = (0..<3).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.BAD_REQUEST.rawValue, data: nil)),
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue, data: nil)),
.success(200),
]

let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.configuration.offline, false)

wait(for: [uploadExpectations[2], flushExpectation], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.configuration.offline, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase {
eventsString: eventsString
)

handler.handleSuccessResponse(code: 200)
let _: Bool = handler.handleSuccessResponse(code: 200)
XCTAssertEqual(
fakePersistentStorage.haveBeenCalledWith[0],
"remove(eventBlock: \(eventBlock.absoluteURL))"
Expand Down Expand Up @@ -150,7 +150,7 @@ final class PersistentStorageResponseHandlerTests: XCTestCase {
eventsString: eventsString
)

handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"])
let _: Bool = handler.handleBadRequestResponse(data: ["error": "Invalid API key: \(configuration.apiKey)"])
XCTAssertEqual(
fakePersistentStorage.haveBeenCalledWith[0],
"remove(eventBlock: \(eventBlock.absoluteURL))"
Expand Down
Loading