Skip to content

Commit

Permalink
feat: Dispatch operations to internal queue
Browse files Browse the repository at this point in the history
  • Loading branch information
crleona committed May 3, 2024
1 parent 0cd014a commit 9fdf2b3
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 65 deletions.
5 changes: 5 additions & 0 deletions Amplitude-Swift.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
3E281B8C2B967F19009D913B /* Diagonostics.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3E281B8B2B967F19009D913B /* Diagonostics.swift */; };
3E281B8E2B96833D009D913B /* DiagnosticsTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3E281B8D2B96833D009D913B /* DiagnosticsTests.swift */; };
3E281B912B9BCC14009D913B /* DispatchQueueHolder.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3E281B902B9BCC14009D913B /* DispatchQueueHolder.swift */; };
4E05BB942BE41AEB009DE475 /* Amplitude+Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4E05BB932BE41AEB009DE475 /* Amplitude+Extensions.swift */; };
4E2B646B2BA127460010E6F8 /* UIKitScreenViewsPluginTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4E2B646A2BA127460010E6F8 /* UIKitScreenViewsPluginTests.swift */; };
4E3871622BB34DBC002890AB /* PrivacyInfo.xcprivacy in Resources */ = {isa = PBXBuildFile; fileRef = B6DF481F2B5B45BE00B3E6AA /* PrivacyInfo.xcprivacy */; };
8EDEC02B99EE2092B567A61D /* ObjCIngestionMetadata.swift in Sources */ = {isa = PBXBuildFile; fileRef = 8EDEC500EBDA8B813056E2DB /* ObjCIngestionMetadata.swift */; };
Expand Down Expand Up @@ -150,6 +151,7 @@
3E281B8B2B967F19009D913B /* Diagonostics.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Diagonostics.swift; sourceTree = "<group>"; };
3E281B8D2B96833D009D913B /* DiagnosticsTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DiagnosticsTests.swift; sourceTree = "<group>"; };
3E281B902B9BCC14009D913B /* DispatchQueueHolder.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DispatchQueueHolder.swift; sourceTree = "<group>"; };
4E05BB932BE41AEB009DE475 /* Amplitude+Extensions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Amplitude+Extensions.swift"; sourceTree = "<group>"; };
4E2B646A2BA127460010E6F8 /* UIKitScreenViewsPluginTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UIKitScreenViewsPluginTests.swift; sourceTree = "<group>"; };
8EDEC0630C3B587334275D9B /* AmplitudeSessionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AmplitudeSessionTests.swift; sourceTree = "<group>"; };
8EDEC1160D95DC3F0E48DDF7 /* ObjCPlugin.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObjCPlugin.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -466,6 +468,7 @@
8EDECBC5925DC68913C7CB89 /* Migration */,
BA1EC0F32A9F2FC700C2D547 /* DefaultTrackingOptionsTests.swift */,
BA1EC0F52A9F63FD00C2D547 /* AmplitudeIOSTests.swift */,
4E05BB932BE41AEB009DE475 /* Amplitude+Extensions.swift */,
);
name = Tests;
path = Tests/AmplitudeTests;
Expand Down Expand Up @@ -662,6 +665,7 @@
/* Begin PBXShellScriptBuildPhase section */
3E281B8F2B98EC92009D913B /* ShellScript */ = {
isa = PBXShellScriptBuildPhase;
alwaysOutOfDate = 1;
buildActionMask = 2147483647;
files = (
);
Expand Down Expand Up @@ -713,6 +717,7 @@
OBJ_156 /* HttpClientTests.swift in Sources */,
D01043612B6C5A8500F8173C /* SandboxHelperTests.swift in Sources */,
OBJ_157 /* PersistentStorageResponseHandlerTests.swift in Sources */,
4E05BB942BE41AEB009DE475 /* Amplitude+Extensions.swift in Sources */,
OBJ_158 /* UrlExtensionTests.swift in Sources */,
8EDEC4EE0DE1C89889F451B5 /* QueueTimeTests.swift in Sources */,
BA1EC0F62A9F63FD00C2D547 /* AmplitudeIOSTests.swift in Sources */,
Expand Down
60 changes: 38 additions & 22 deletions Sources/Amplitude/Amplitude.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Amplitude {
return self.configuration.loggerProvider
}()

let trackingQueue = DispatchQueue(label: "com.amplitude.analytics", target: .global(qos: .utility))

public init(
configuration: Configuration
) {
Expand Down Expand Up @@ -84,9 +86,7 @@ public class Amplitude {
}

@discardableResult
public func track(eventType: String, eventProperties: [String: Any]? = nil, options: EventOptions? = nil)
-> Amplitude
{
public func track(eventType: String, eventProperties: [String: Any]? = nil, options: EventOptions? = nil) -> Amplitude {
let event = BaseEvent(eventType: eventType)
event.eventProperties = eventProperties
if let eventOptions = options {
Expand Down Expand Up @@ -114,10 +114,10 @@ public class Amplitude {
if let eventOptions = options {
event.mergeEventOptions(eventOptions: eventOptions)
if eventOptions.userId != nil {
_ = setUserId(userId: eventOptions.userId)
setUserId(userId: eventOptions.userId)
}
if eventOptions.deviceId != nil {
_ = setDeviceId(deviceId: eventOptions.deviceId)
setDeviceId(deviceId: eventOptions.deviceId)
}
}
process(event: event)
Expand Down Expand Up @@ -247,9 +247,11 @@ public class Amplitude {

@discardableResult
public func flush() -> Amplitude {
timeline.apply { plugin in
if let _plugin = plugin as? EventPlugin {
_plugin.flush()
trackingQueue.async {
self.timeline.apply { plugin in
if let _plugin = plugin as? EventPlugin {
_plugin.flush()
}
}
}
return self
Expand Down Expand Up @@ -283,10 +285,17 @@ public class Amplitude {

@discardableResult
public func setSessionId(timestamp: Int64) -> Amplitude {
let sessionEvents = sessions.assignEventId(
events: timestamp >= 0 ? sessions.startNewSession(timestamp: timestamp) : sessions.endCurrentSession()
)
sessionEvents.forEach { e in timeline.processEvent(event: e) }
trackingQueue.async { [self] in
let sessionEvents: [BaseEvent]
if timestamp >= 0 {
sessionEvents = self.sessions.startNewSession(timestamp: timestamp)
} else {
sessionEvents = self.sessions.endCurrentSession()
}
self.sessions.assignEventId(events: sessionEvents).forEach { e in
self.timeline.processEvent(event: e)
}
}
return self
}

Expand All @@ -299,8 +308,8 @@ public class Amplitude {

@discardableResult
public func reset() -> Amplitude {
_ = setUserId(userId: nil)
_ = setDeviceId(deviceId: nil)
setUserId(userId: nil)
setDeviceId(deviceId: nil)
contextPlugin.initializeDeviceId()
return self
}
Expand All @@ -314,26 +323,33 @@ public class Amplitude {
logger?.log(message: "Skip event based on opt out configuration")
return
}
let events = sessions.processEvent(event: event, inForeground: inForeground)
events.forEach { e in timeline.processEvent(event: e) }
let inForeground = inForeground
trackingQueue.async { [self] in
let events = self.sessions.processEvent(event: event, inForeground: inForeground)
events.forEach { e in self.timeline.processEvent(event: e) }
}
}

func onEnterForeground(timestamp: Int64) {
inForeground = true
let dummySessionStartEvent = BaseEvent(
timestamp: timestamp,
eventType: Constants.AMP_SESSION_START_EVENT
)
let events = sessions.processEvent(event: dummySessionStartEvent, inForeground: false)
// Set inForeground to true only after we have successfully started a new session if needed.
inForeground = true
events.forEach { e in timeline.processEvent(event: e) }
trackingQueue.async { [self] in
// set inForeground to false to represent state before event was fired
let events = self.sessions.processEvent(event: dummySessionStartEvent, inForeground: false)
events.forEach { e in self.timeline.processEvent(event: e) }
}
}

func onExitForeground(timestamp: Int64) {
inForeground = false
sessions.lastEventTime = timestamp
trackingQueue.async { [self] in
self.sessions.lastEventTime = timestamp
}
if configuration.flushEventsOnClose == true {
_ = self.flush()
flush()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public struct NetworkPath {
// Protocol for creating network paths
protocol PathCreationProtocol {
var networkPathPublisher: AnyPublisher<NetworkPath, Never>? { get }
func start()
func start(queue: DispatchQueue)
}

// Implementation of PathCreationProtocol using NWPathMonitor
Expand All @@ -30,12 +30,12 @@ final class PathCreation: PathCreationProtocol {
private let subject = PassthroughSubject<NWPath, Never>()
private let monitor = NWPathMonitor()

func start() {
func start(queue: DispatchQueue) {
monitor.pathUpdateHandler = subject.send
networkPathPublisher = subject
.map { NetworkPath(status: $0.status) }
.eraseToAnyPublisher()
monitor.start(queue: .main)
monitor.start(queue: queue)
}
}

Expand All @@ -53,7 +53,7 @@ open class NetworkConnectivityCheckerPlugin: BeforePlugin {
super.setup(amplitude: amplitude)
amplitude.logger?.debug(message: "Installing NetworkConnectivityCheckerPlugin, offline feature should be supported.")

pathCreation.start()
pathCreation.start(queue: amplitude.trackingQueue)
pathUpdateCancellable = pathCreation.networkPathPublisher?
.sink(receiveValue: { [weak self] networkPath in
let isOffline = !(networkPath.status == .satisfied)
Expand Down
4 changes: 2 additions & 2 deletions Sources/Amplitude/Sessions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class Sessions {
private let amplitude: Amplitude

private var _sessionId: Int64 = -1
var sessionId: Int64 {
private(set) var sessionId: Int64 {
get { _sessionId }
set {
_sessionId = newValue
Expand All @@ -17,7 +17,7 @@ public class Sessions {
}

private var _lastEventId: Int64 = 0
var lastEventId: Int64 {
private(set) var lastEventId: Int64 {
get { _lastEventId }
set {
_lastEventId = newValue
Expand Down
4 changes: 2 additions & 2 deletions Sources/Amplitude/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import Foundation

class State {
var userId: String? {
@Atomic var userId: String? {
didSet {
for plugin in plugins {
plugin.onUserIdChanged(userId)
}
}
}

var deviceId: String? {
@Atomic var deviceId: String? {
didSet {
for plugin in plugins {
plugin.onDeviceIdChanged(deviceId)
Expand Down
6 changes: 4 additions & 2 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public class EventPipeline {

init(amplitude: Amplitude) {
self.amplitude = amplitude
self.httpClient = HttpClient(configuration: amplitude.configuration, diagnostics: amplitude.configuration.diagonostics)
self.flushTimer = QueueTimer(interval: getFlushInterval()) { [weak self] in
self.httpClient = HttpClient(configuration: amplitude.configuration,
diagnostics: amplitude.configuration.diagonostics,
callbackQueue: amplitude.trackingQueue)
self.flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in
self?.flush()
}
}
Expand Down
28 changes: 16 additions & 12 deletions Sources/Amplitude/Utilities/HttpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import Foundation

class HttpClient {
let configuration: Configuration
internal let session: URLSession
let session: URLSession
let diagnostics: Diagnostics
let callbackQueue: DispatchQueue

private lazy var dateFormatter: ISO8601DateFormatter = {
let formatter = ISO8601DateFormatter()
formatter.formatOptions.insert(.withFractionalSeconds)
return formatter
}()

init(configuration: Configuration, diagnostics: Diagnostics) {
init(configuration: Configuration, diagnostics: Diagnostics, callbackQueue: DispatchQueue? = nil) {
self.configuration = configuration
self.diagnostics = diagnostics
self.callbackQueue = callbackQueue ?? .global()

let sessionConfiguration = URLSessionConfiguration.default
sessionConfiguration.httpMaximumConnectionsPerHost = 2
Expand All @@ -35,18 +37,20 @@ class HttpClient {
let request = try getRequest()
let requestData = getRequestData(events: events)

sessionTask = session.uploadTask(with: request, from: requestData) { data, response, error in
if error != nil {
completion(.failure(error!))
} else if let httpResponse = response as? HTTPURLResponse {
switch httpResponse.statusCode {
case 1..<300:
completion(.success(httpResponse.statusCode))
default:
completion(.failure(Exception.httpError(code: httpResponse.statusCode, data: data)))
sessionTask = session.uploadTask(with: request, from: requestData) { [callbackQueue] data, response, error in
callbackQueue.async {
if error != nil {
completion(.failure(error!))
} else if let httpResponse = response as? HTTPURLResponse {
switch httpResponse.statusCode {
case 1..<300:
completion(.success(httpResponse.statusCode))
default:
completion(.failure(Exception.httpError(code: httpResponse.statusCode, data: data)))
}
}
backgroundTaskCompletion?()
}
backgroundTaskCompletion?()
}
sessionTask!.resume()
} catch {
Expand Down
23 changes: 23 additions & 0 deletions Tests/AmplitudeTests/Amplitude+Extensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Amplitude+Extensions.swift
// Amplitude-SwiftTests
//
// Created by Chris Leonavicius on 5/2/24.
//

@testable import AmplitudeSwift
import XCTest

extension Amplitude {

func waitForTrackingQueue() {
let waitForQueueExpectation = XCTestExpectation(description: "Wait for trackingQueue")
// Because trackingQueue is serial, this acts as a barrier in which any previous operations will
// have guaranteed to complete after this has run.
trackingQueue.async {
waitForQueueExpectation.fulfill()
}

XCTWaiter().wait(for: [waitForQueueExpectation])
}
}
16 changes: 11 additions & 5 deletions Tests/AmplitudeTests/AmplitudeIOSTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ final class AmplitudeIOSTests: XCTestCase {
identifyStorageProvider: interceptStorageMem,
defaultTracking: DefaultTrackingOptions(sessions: false, appLifecycles: true)
)
_ = Amplitude(configuration: configuration)
let amplitude = Amplitude(configuration: configuration)
NotificationCenter.default.post(name: UIApplication.didFinishLaunchingNotification, object: nil)

amplitude.waitForTrackingQueue()

let info = Bundle.main.infoDictionary
let currentBuild = info?["CFBundleVersion"] ?? ""
let currentVersion = info?["CFBundleShortVersionString"] ?? ""
Expand Down Expand Up @@ -58,8 +60,9 @@ final class AmplitudeIOSTests: XCTestCase {
try storageMem.write(key: StorageKey.LAST_EVENT_TIME, value: 123 as Int64)
try storageMem.write(key: StorageKey.APP_BUILD, value: "abc")
try storageMem.write(key: StorageKey.APP_VERSION, value: "xyz")
_ = Amplitude(configuration: configuration)
let amplitude = Amplitude(configuration: configuration)
NotificationCenter.default.post(name: UIApplication.didFinishLaunchingNotification, object: nil)
amplitude.waitForTrackingQueue()

let info = Bundle.main.infoDictionary
let currentBuild = info?["CFBundleVersion"] ?? ""
Expand Down Expand Up @@ -97,8 +100,9 @@ final class AmplitudeIOSTests: XCTestCase {
try storageMem.write(key: StorageKey.LAST_EVENT_TIME, value: 123 as Int64)
try storageMem.write(key: StorageKey.APP_BUILD, value: currentBuild)
try storageMem.write(key: StorageKey.APP_VERSION, value: currentVersion)
_ = Amplitude(configuration: configuration)
let amplitude = Amplitude(configuration: configuration)
NotificationCenter.default.post(name: UIApplication.didFinishLaunchingNotification, object: nil)
amplitude.waitForTrackingQueue()

let events = storageMem.events()
XCTAssertEqual(events.count, 1)
Expand All @@ -122,8 +126,9 @@ final class AmplitudeIOSTests: XCTestCase {
let currentBuild = info?["CFBundleVersion"] ?? ""
let currentVersion = info?["CFBundleShortVersionString"] ?? ""

_ = Amplitude(configuration: configuration)
let amplitude = Amplitude(configuration: configuration)
NotificationCenter.default.post(name: UIApplication.willEnterForegroundNotification, object: nil)
amplitude.waitForTrackingQueue()

let events = storageMem.events()
XCTAssertEqual(events.count, 1)
Expand All @@ -143,8 +148,9 @@ final class AmplitudeIOSTests: XCTestCase {
defaultTracking: DefaultTrackingOptions(sessions: false, appLifecycles: true)
)

_ = Amplitude(configuration: configuration)
let amplitude = Amplitude(configuration: configuration)
NotificationCenter.default.post(name: UIApplication.didEnterBackgroundNotification, object: nil)
amplitude.waitForTrackingQueue()

let events = storageMem.events()
XCTAssertEqual(events.count, 1)
Expand Down
Loading

0 comments on commit 9fdf2b3

Please sign in to comment.