Skip to content

Commit

Permalink
fix: Send a max of one upload at a time (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
crleona authored Aug 29, 2024
1 parent 67ae99e commit 63e76d9
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 37 deletions.
87 changes: 50 additions & 37 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ public class EventPipeline {
internal var flushTimer: QueueTimer?
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com")

internal struct UploadTaskInfo {
let events: String
let task: URLSessionDataTask
}
private var uploads = [URL: UploadTaskInfo]()
private var flushCompletions: [() -> Void] = []
private var currentUpload: URLSessionTask?

init(amplitude: Amplitude) {
storage = amplitude.storage
Expand Down Expand Up @@ -60,33 +57,58 @@ public class EventPipeline {
eventCount = 0
guard let storage = self.storage else { return }
storage.rollover()
guard let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS) else { return }
for eventFile in eventFiles {
uploadsQueue.sync {
guard uploads[eventFile] == nil else {
logger?.log(message: "Existing upload in progress, skipping...")
return
}
guard let eventsString = storage.getEventsString(eventBlock: eventFile),
!eventsString.isEmpty else {

uploadsQueue.async { [self] in
if let completion {
flushCompletions.append(completion)
}
self.sendNextEventFile()
}
}

private func sendNextEventFile() {
guard currentUpload == nil else {
logger?.log(message: "Existing upload in progress, skipping...")
return
}

guard let storage = storage,
let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS),
let nextEventFile = eventFiles.first else {
flushCompletions.forEach { $0() }
flushCompletions.removeAll()
logger?.debug(message: "No event files to upload")
return
}

guard configuration.offline != true else {
logger?.debug(message: "Skipping flush while offline.")
return
}

guard let eventsString = storage.getEventsString(eventBlock: nextEventFile),
!eventsString.isEmpty else {
logger?.log(message: "Could not read events file: \(nextEventFile)")
return
}

currentUpload = httpClient.upload(events: eventsString) { [self] result in
let responseHandler = storage.getResponseHandler(
configuration: self.configuration,
eventPipeline: self,
eventBlock: nextEventFile,
eventsString: eventsString
)
responseHandler.handle(result: result)
// Don't send the next event file if we're being deallocated
self.uploadsQueue.async { [weak self] in
guard let self = self else {
return
}
let uploadTask = httpClient.upload(events: eventsString) { [self] result in
let responseHandler = storage.getResponseHandler(
configuration: self.configuration,
eventPipeline: self,
eventBlock: eventFile,
eventsString: eventsString
)
responseHandler.handle(result: result)
self.completeUpload(for: eventFile)
}
if let uploadTask {
uploads[eventFile] = UploadTaskInfo(events: eventsString, task: uploadTask)
}
self.currentUpload = nil
self.sendNextEventFile()
}
}
completion?()
}

func start() {
Expand All @@ -106,12 +128,3 @@ public class EventPipeline {
return count != 0 ? count : 1
}
}

extension EventPipeline {

func completeUpload(for eventBlock: URL) {
uploadsQueue.sync {
uploads[eventBlock] = nil
}
}
}
29 changes: 29 additions & 0 deletions Tests/AmplitudeTests/Utilities/EventPipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,35 @@ final class EventPipelineTests: XCTestCase {
XCTAssertEqual(uploadedEvents![0].eventType, "testEvent")
}

func testOneUploadAtATime() {
let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)
pipeline?.storage?.rollover()

let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent2")
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)
pipeline.storage?.rollover()

let httpResponseExpectation1 = expectation(description: "httpresponse1")
let httpResponseExpectation2 = expectation(description: "httpresponse2")
httpClient.uploadExpectations = [httpResponseExpectation1, httpResponseExpectation2]

httpResponseExpectation2.isInverted = true

let flushExpectation = expectation(description: "flush")
pipeline.flush {
flushExpectation.fulfill()
}

wait(for: [httpResponseExpectation1], timeout: 1)

httpResponseExpectation2.isInverted = false

wait(for: [httpResponseExpectation2, flushExpectation], timeout: 1)

XCTAssertEqual(httpClient.uploadCount, 2)
}

func testInvalidEventUpload() {
let invalidResponseData = "{\"events_with_invalid_fields\": {\"user_id\": [0]}}".data(using: .utf8)!

Expand Down

0 comments on commit 63e76d9

Please sign in to comment.