Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add repeat interval on EventPipeline to prevent high frequency requests on continuing failing #250

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,39 @@ class TestStorage: Storage {
) -> ResponseHandler {
class TestResponseHandler: ResponseHandler {

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
// no-op
return false
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
// no-op
return false
}

func handleBadRequestResponse(data: [String: Any]) {
func handleBadRequestResponse(data: [String: Any]) -> Bool {
// no-op
return false
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool {
// no-op
return false
}

func handleTooManyRequestsResponse(data: [String: Any]) {
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool {
// no-op
return false
}

func handleTimeoutResponse(data: [String: Any]) {
func handleTimeoutResponse(data: [String: Any]) -> Bool {
// no-op
return false
}

func handleFailedResponse(data: [String: Any]) {
func handleFailedResponse(data: [String: Any]) -> Bool {
// no-op
return false
}
}
return TestResponseHandler()
Expand Down
15 changes: 8 additions & 7 deletions Sources/Amplitude/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ extension Plugin {
}

public protocol ResponseHandler {
func handle(result: Result<Int, Error>)
func handleSuccessResponse(code: Int)
func handleBadRequestResponse(data: [String: Any])
func handlePayloadTooLargeResponse(data: [String: Any])
func handleTooManyRequestsResponse(data: [String: Any])
func handleTimeoutResponse(data: [String: Any])
func handleFailedResponse(data: [String: Any])
// return true if some attempts to recover are implemented
func handle(result: Result<Int, Error>) -> Bool
func handleSuccessResponse(code: Int) -> Bool
func handleBadRequestResponse(data: [String: Any]) -> Bool
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool
func handleTimeoutResponse(data: [String: Any]) -> Bool
func handleFailedResponse(data: [String: Any]) -> Bool
}
crleona marked this conversation as resolved.
Show resolved Hide resolved

extension ResponseHandler {
Expand Down
43 changes: 36 additions & 7 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ public class EventPipeline {
let storage: Storage?
let logger: (any Logger)?
let configuration: Configuration
var maxRetryInterval: TimeInterval = 60
var maxRetryCount: Int = 6
sojingle marked this conversation as resolved.
Show resolved Hide resolved

@Atomic internal var eventCount: Int = 0
internal var flushTimer: QueueTimer?
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com")

private var flushCompletions: [() -> Void] = []
private var currentUpload: URLSessionTask?
private(set) var continuousFailure: Int = 0
crleona marked this conversation as resolved.
Show resolved Hide resolved

init(amplitude: Amplitude) {
storage = amplitude.storage
Expand Down Expand Up @@ -100,14 +103,40 @@ public class EventPipeline {
eventBlock: nextEventFile,
eventsString: eventsString
)
responseHandler.handle(result: result)
// Don't send the next event file if we're being deallocated
self.uploadsQueue.async { [weak self] in
guard let self = self else {
return
let handled = responseHandler.handle(result: result)

switch result {
case .success:
self.continuousFailure = 0
case .failure:
if !handled {
self.continuousFailure += 1
}
}

if self.continuousFailure > self.maxRetryCount {
self.uploadsQueue.async {
self.currentUpload = nil
}
self.configuration.offline = true
self.logger?.log(message: "Request failed more than \(self.maxRetryCount) times, marking offline")
} else {
// Don't send the next event file if we're being deallocated
let nextFileBlock: () -> Void = { [weak self] in
guard let self = self else {
return
}
self.currentUpload = nil
self.sendNextEventFile()
}

if self.continuousFailure == 0 || handled {
self.uploadsQueue.async(execute: nextFileBlock)
} else {
let sendingInterval = min(self.maxRetryInterval, pow(2, Double(self.continuousFailure - 1)))
self.uploadsQueue.asyncAfter(deadline: .now() + sendingInterval, execute: nextFileBlock)
self.logger?.debug(message: "Request failed \(self.continuousFailure) times, send next event file in \(sendingInterval) seconds")
}
self.currentUpload = nil
self.sendNextEventFile()
}
}
}
Expand Down
46 changes: 26 additions & 20 deletions Sources/Amplitude/Utilities/PersistentStorageResponseHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ class PersistentStorageResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
triggerEventsCallback(events: events, code: code, message: "Successfully send event")
storage.remove(eventBlock: eventBlock)
return true
}

func handleBadRequestResponse(data: [String: Any]) {
func handleBadRequestResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}

let error = data["error"] as? String ?? ""
Expand All @@ -55,7 +56,7 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}

var dropIndexes = Set<Int>()
Expand Down Expand Up @@ -90,13 +91,14 @@ class PersistentStorageResponseHandler: ResponseHandler {
}

storage.remove(eventBlock: eventBlock)
return true
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool {
guard let events = BaseEvent.fromArrayString(jsonString: eventsString) else {
storage.remove(eventBlock: eventBlock)
removeEventCallbackByEventsString(eventsString: eventsString)
return
return true
}
if events.count == 1 {
let error = data["error"] as? String ?? ""
Expand All @@ -106,28 +108,32 @@ class PersistentStorageResponseHandler: ResponseHandler {
message: error
)
storage.remove(eventBlock: eventBlock)
return
return true
}
storage.splitBlock(eventBlock: eventBlock, events: events)
return true
}

func handleTooManyRequestsResponse(data: [String: Any]) {
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool {
// wait for next time to pick it up
return false
}

func handleTimeoutResponse(data: [String: Any]) {
func handleTimeoutResponse(data: [String: Any]) -> Bool {
// Wait for next time to pick it up
return false
}

func handleFailedResponse(data: [String: Any]) {
func handleFailedResponse(data: [String: Any]) -> Bool {
// wait for next time to try again
return false
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
crleona marked this conversation as resolved.
Show resolved Hide resolved
switch result {
case .success(let code):
// We don't care about the data when success
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
case .failure(let error):
switch error {
case HttpClient.Exception.httpError(let code, let data):
Expand All @@ -137,20 +143,20 @@ class PersistentStorageResponseHandler: ResponseHandler {
}
switch code {
case HttpClient.HttpStatus.BAD_REQUEST.rawValue:
handleBadRequestResponse(data: json)
return handleBadRequestResponse(data: json)
case HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue:
handlePayloadTooLargeResponse(data: json)
return handlePayloadTooLargeResponse(data: json)
case HttpClient.HttpStatus.TIMEOUT.rawValue:
handleTimeoutResponse(data: json)
return handleTimeoutResponse(data: json)
case HttpClient.HttpStatus.TOO_MANY_REQUESTS.rawValue:
handleTooManyRequestsResponse(data: json)
return handleTooManyRequestsResponse(data: json)
case HttpClient.HttpStatus.FAILED.rawValue:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
default:
handleFailedResponse(data: json)
return handleFailedResponse(data: json)
}
default:
break
return false
}
}
}
Expand Down
24 changes: 15 additions & 9 deletions Tests/AmplitudeTests/Supports/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,32 +186,38 @@ class FakeResponseHandler: ResponseHandler {
self.eventsString = eventsString
}

func handle(result: Result<Int, Error>) {
func handle(result: Result<Int, Error>) -> Bool {
switch result {
case .success(let code):
handleSuccessResponse(code: code)
return handleSuccessResponse(code: code)
default:
break
return false
}
}

func handleSuccessResponse(code: Int) {
func handleSuccessResponse(code: Int) -> Bool {
storage.remove(eventBlock: eventBlock)
return true
}

func handleBadRequestResponse(data: [String: Any]) {
func handleBadRequestResponse(data: [String: Any]) -> Bool {
return true
}

func handlePayloadTooLargeResponse(data: [String: Any]) {
func handlePayloadTooLargeResponse(data: [String: Any]) -> Bool {
return true
}

func handleTooManyRequestsResponse(data: [String: Any]) {
func handleTooManyRequestsResponse(data: [String: Any]) -> Bool {
return false
}

func handleTimeoutResponse(data: [String: Any]) {
func handleTimeoutResponse(data: [String: Any]) -> Bool {
return false
}

func handleFailedResponse(data: [String: Any]) {
func handleFailedResponse(data: [String: Any]) -> Bool {
return false
}
}

Expand Down
80 changes: 80 additions & 0 deletions Tests/AmplitudeTests/Utilities/EventPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,84 @@ final class EventPipelineTests: XCTestCase {
XCTAssertEqual(uploadedEvents1?.count, 1)
XCTAssertEqual(uploadedEvents1?[0].eventType, "testEvent-1")
}

// test continues to fail until the event is uploaded
func testContinuousFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 2

let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)

let uploadExpectations = (0..<4).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // instant failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +1s failure
.failure(NSError(domain: "unknown", code: 0, userInfo: nil)), // +2s failure, go offline
.success(200)
]

pipeline.flush()
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 2)

XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.continuousFailure, 2)
XCTAssertEqual(pipeline.configuration.offline, false)

wait(for: [uploadExpectations[2]], timeout: 3)

XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.continuousFailure, 3)
XCTAssertEqual(pipeline.configuration.offline, true)

pipeline.configuration.offline = false
let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[3], flushExpectation], timeout: 1)

XCTAssertEqual(httpClient.uploadCount, 4)
XCTAssertEqual(pipeline.continuousFailure, 0)
}

func testContinuesHandledFailure() {
pipeline.configuration.offline = false
pipeline.maxRetryCount = 1

let testEvent1 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent1)
pipeline.storage?.rollover()

let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent2)
pipeline.storage?.rollover()

let testEvent3 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent3)
pipeline.storage?.rollover()

let uploadExpectations = (0..<3).map { i in expectation(description: "httpresponse-\(i)") }
httpClient.uploadExpectations = uploadExpectations

httpClient.uploadResults = [
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.BAD_REQUEST.rawValue, data: nil)),
.failure(HttpClient.Exception.httpError(code: HttpClient.HttpStatus.PAYLOAD_TOO_LARGE.rawValue, data: nil)),
.success(200),
]

let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}
wait(for: [uploadExpectations[0], uploadExpectations[1]], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 2)
XCTAssertEqual(pipeline.continuousFailure, 0)

wait(for: [uploadExpectations[2], flushExpectation], timeout: 1)
XCTAssertEqual(httpClient.uploadCount, 3)
XCTAssertEqual(pipeline.continuousFailure, 0)
}
}
Loading