From e23b49c42c6c1e0a7a0271244757f8a3cb592ceb Mon Sep 17 00:00:00 2001 From: Cole Kurkowski Date: Thu, 16 Jan 2025 12:32:44 -0700 Subject: [PATCH] Work around open-telemetry/opentelemetry-swift#615, and add support for building with the Static Linux SDK --- Package@swift-5.9.swift | 21 +----- .../OpenTelemetryProtocolHttp/Lock.swift | 6 +- Sources/Importers/OpenTracingShim/Locks.swift | 6 +- .../Importers/SwiftMetricsShim/Locks.swift | 6 +- Sources/OpenTelemetrySdk/Internal/Locks.swift | 6 +- .../Processors/BatchLogRecordProcessor.swift | 63 ++++++++++------- .../SpanProcessors/BatchSpanProcessor.swift | 69 +++++++++++-------- 7 files changed, 101 insertions(+), 76 deletions(-) diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 0080aadd..a45c30dc 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -33,12 +33,14 @@ let package = Package( .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.2"), .package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"), .package(url: "https://github.com/apple/swift-metrics.git", from: "2.1.1"), + .package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0") ], targets: [ .target(name: "OpenTelemetryApi", dependencies: []), .target(name: "OpenTelemetrySdk", - dependencies: ["OpenTelemetryApi"].withAtomicsIfNeeded()), + dependencies: ["OpenTelemetryApi", + .product(name: "Atomics", package: "swift-atomics", condition: .when(platforms: [.linux]))]), .target(name: "OpenTelemetryConcurrency", dependencies: ["OpenTelemetryApi"]), .target(name: "OpenTelemetryTestUtils", @@ -133,25 +135,8 @@ let package = Package( ] ).addPlatformSpecific() -extension [Target.Dependency] { - func withAtomicsIfNeeded() -> [Target.Dependency] { - #if canImport(Darwin) - return self - #else - var dependencies = self - dependencies.append(.product(name: "Atomics", package: "swift-atomics")) - return dependencies - #endif - } -} - extension Package { func addPlatformSpecific() -> Self { - #if !canImport(Darwin) - dependencies.append( - .package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")) - ) - #endif #if canImport(ObjectiveC) dependencies.append( .package(url: "https://github.com/undefinedlabs/opentracing-objc", from: "0.5.2") diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift index 5a51c654..dc230462 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift @@ -33,8 +33,12 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) import Darwin -#else +#elseif canImport(Glibc) import Glibc +#elseif canImport(Musl) +import Musl +#else +#error("Unsupported platform") #endif /// A threading lock based on `libpthread` instead of `libdispatch`. diff --git a/Sources/Importers/OpenTracingShim/Locks.swift b/Sources/Importers/OpenTracingShim/Locks.swift index 092c8548..3120e3df 100644 --- a/Sources/Importers/OpenTracingShim/Locks.swift +++ b/Sources/Importers/OpenTracingShim/Locks.swift @@ -33,8 +33,12 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) import Darwin +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Musl) +import Musl #else - import Glibc +#error("Unsupported platform") #endif /// A threading lock based on `libpthread` instead of `libdispatch`. diff --git a/Sources/Importers/SwiftMetricsShim/Locks.swift b/Sources/Importers/SwiftMetricsShim/Locks.swift index 5a51c654..dc230462 100644 --- a/Sources/Importers/SwiftMetricsShim/Locks.swift +++ b/Sources/Importers/SwiftMetricsShim/Locks.swift @@ -33,8 +33,12 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) import Darwin -#else +#elseif canImport(Glibc) import Glibc +#elseif canImport(Musl) +import Musl +#else +#error("Unsupported platform") #endif /// A threading lock based on `libpthread` instead of `libdispatch`. diff --git a/Sources/OpenTelemetrySdk/Internal/Locks.swift b/Sources/OpenTelemetrySdk/Internal/Locks.swift index 5a51c654..dc230462 100644 --- a/Sources/OpenTelemetrySdk/Internal/Locks.swift +++ b/Sources/OpenTelemetrySdk/Internal/Locks.swift @@ -33,8 +33,12 @@ #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) import Darwin -#else +#elseif canImport(Glibc) import Glibc +#elseif canImport(Musl) +import Musl +#else +#error("Unsupported platform") #endif /// A threading lock based on `libpthread` instead of `libdispatch`. diff --git a/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift b/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift index 05cc718d..19ee57d4 100644 --- a/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift +++ b/Sources/OpenTelemetrySdk/Logs/Processors/BatchLogRecordProcessor.swift @@ -1,36 +1,35 @@ // // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// +// import Foundation import OpenTelemetryApi public class BatchLogRecordProcessor : LogRecordProcessor { - - + fileprivate var worker : BatchWorker - + public init(logRecordExporter: LogRecordExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [ReadableLogRecord])->Void)? = nil) { worker = BatchWorker(logRecordExporter: logRecordExporter, scheduleDelay: scheduleDelay, exportTimeout: exportTimeout, maxQueueSize: maxQueueSize, maxExportBatchSize: maxExportBatchSize, willExportCallback: willExportCallback) - + worker.start() } - + public func onEmit(logRecord: ReadableLogRecord) { worker.emit(logRecord: logRecord) } - + public func forceFlush(explicitTimeout: TimeInterval?) -> ExportResult { forceFlush(timeout: explicitTimeout) return .success } - + public func forceFlush(timeout: TimeInterval? = nil) { worker.forceFlush(explicitTimeout: timeout) } - - + + public func shutdown(explicitTimeout: TimeInterval? = nil) -> ExportResult { worker.cancel() worker.shutdown(explicitTimeout: explicitTimeout) @@ -38,7 +37,8 @@ public class BatchLogRecordProcessor : LogRecordProcessor { } } -private class BatchWorker : Thread { +private class BatchWorker { + var thread: Thread! let logRecordExporter : LogRecordExporter let scheduleDelay : TimeInterval let maxQueueSize : Int @@ -49,14 +49,14 @@ private class BatchWorker : Thread { private let cond = NSCondition() var logRecordList = [ReadableLogRecord]() var queue : OperationQueue - + init(logRecordExporter: LogRecordExporter, scheduleDelay: TimeInterval, exportTimeout: TimeInterval, maxQueueSize: Int, maxExportBatchSize: Int, willExportCallback: ((inout [ReadableLogRecord])->Void)?) { - + self.logRecordExporter = logRecordExporter self.scheduleDelay = scheduleDelay self.exportTimeout = exportTimeout @@ -67,24 +67,35 @@ private class BatchWorker : Thread { queue = OperationQueue() queue.name = "BatchWorker Queue" queue.maxConcurrentOperationCount = 1 + self.thread = Thread(block: { [weak self] in + self?.main() + }) + } + + func start() { + self.thread.start() + } + + func cancel() { + self.thread.cancel() } - + func emit(logRecord: ReadableLogRecord) { cond.lock() - defer { cond.unlock()} + defer { cond.unlock() } if logRecordList.count == maxQueueSize { // TODO: record a counter for dropped logs return } - + // TODO: record a gauge for referenced logs logRecordList.append(logRecord) if logRecordList.count >= halfMaxQueueSize { cond.broadcast() } } - - override func main() { + + func main() { repeat { autoreleasepool { var logRecordsCopy : [ReadableLogRecord] @@ -92,33 +103,33 @@ private class BatchWorker : Thread { if logRecordList.count < maxExportBatchSize { repeat { cond.wait(until: Date().addingTimeInterval(scheduleDelay)) - } while logRecordList.isEmpty && !self.isCancelled + } while logRecordList.isEmpty && !thread.isCancelled } logRecordsCopy = logRecordList logRecordList.removeAll() cond.unlock() self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout) } - } while !self.isCancelled + } while !thread.isCancelled } - + public func forceFlush(explicitTimeout: TimeInterval? = nil) { var logRecordsCopy: [ReadableLogRecord] cond.lock() logRecordsCopy = logRecordList logRecordList.removeAll() cond.unlock() - + exportBatch(logRecordList: logRecordsCopy, explicitTimeout: explicitTimeout) } - - + + public func shutdown(explicitTimeout: TimeInterval?) { let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout) forceFlush(explicitTimeout: timeout) _ = logRecordExporter.shutdown(explicitTimeout: timeout) } - + private func exportBatch(logRecordList: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) { let exportOperation = BlockOperation { [weak self] in self?.exportAction(logRecordList : logRecordList, explicitTimeout: explicitTimeout) @@ -132,7 +143,7 @@ private class BatchWorker : Thread { queue.waitUntilAllOperationsAreFinished() timeoutTimer.cancel() } - + private func exportAction(logRecordList: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) { stride(from: 0, to: logRecordList.endIndex, by: maxExportBatchSize).forEach { var logRecordToExport = logRecordList[$0 ..< min($0 + maxExportBatchSize, logRecordList.count)].map {$0} diff --git a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift index 7c79d7b3..49a8fde5 100644 --- a/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift +++ b/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift @@ -18,13 +18,13 @@ public struct BatchSpanProcessor: SpanProcessor { fileprivate static let SPAN_PROCESSOR_TYPE_LABEL: String = "processorType" fileprivate static let SPAN_PROCESSOR_DROPPED_LABEL: String = "dropped" fileprivate static let SPAN_PROCESSOR_TYPE_VALUE: String = BatchSpanProcessor.name - + fileprivate var worker: BatchWorker - + public static var name: String { String(describing: Self.self) } - + public init( spanExporter: SpanExporter, meterProvider: StableMeterProvider? = nil, @@ -45,24 +45,24 @@ public struct BatchSpanProcessor: SpanProcessor { ) worker.start() } - + public let isStartRequired = false public let isEndRequired = true - + public func onStart(parentContext: SpanContext?, span: ReadableSpan) {} - + public func onEnd(span: ReadableSpan) { if !span.context.traceFlags.sampled { return } worker.addSpan(span: span) } - + public func shutdown(explicitTimeout: TimeInterval? = nil) { worker.cancel() worker.shutdown() } - + public func forceFlush(timeout: TimeInterval? = nil) { worker.forceFlush(explicitTimeout: timeout) } @@ -71,7 +71,8 @@ public struct BatchSpanProcessor: SpanProcessor { /// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export /// the data. /// The list of batched data is protected by a NSCondition which ensures full concurrency. -private class BatchWorker: Thread { +private class BatchWorker { + var thread: Thread! let spanExporter: SpanExporter let meterProvider: StableMeterProvider? let scheduleDelay: TimeInterval @@ -83,11 +84,11 @@ private class BatchWorker: Thread { private let cond = NSCondition() var spanList = [ReadableSpan]() var queue: OperationQueue - + private var queueSizeGauge: ObservableLongGauge? private var spanGaugeObserver: ObservableLongGauge? private var processedSpansCounter: LongCounter? - + init( spanExporter: SpanExporter, meterProvider: StableMeterProvider? = nil, @@ -108,9 +109,9 @@ private class BatchWorker: Thread { queue = OperationQueue() queue.name = "BatchWorker Queue" queue.maxConcurrentOperationCount = 1 - + if let meter = meterProvider?.meterBuilder(name: "io.opentelemetry.sdk.trace").build() { - + var longGaugeSdk = meter.gaugeBuilder(name: "queueSize").ofLongs() as? LongGaugeBuilderSdk longGaugeSdk = longGaugeSdk?.setDescription("The number of items queued") longGaugeSdk = longGaugeSdk?.setUnit("1") @@ -122,12 +123,12 @@ private class BatchWorker: Thread { ] ) } - + var longCounterSdk = meter.counterBuilder(name: "processedSpans") as? LongCounterMeterBuilderSdk longCounterSdk = longCounterSdk?.setUnit("1") longCounterSdk = longCounterSdk?.setDescription("The number of spans processed by the BatchSpanProcessor. [dropped=true if they were dropped due to high throughput]") processedSpansCounter = longCounterSdk?.build() - + // Subscribe to new gauge observer self.spanGaugeObserver = meter.gaugeBuilder(name: "spanSize") .ofLongs() @@ -140,18 +141,30 @@ private class BatchWorker: Thread { ) } } + + self.thread = Thread(block: { [weak self] in + self?.main() + }) } - + deinit { // Cleanup all gauge observer self.queueSizeGauge?.close() self.spanGaugeObserver?.close() } - + + func start() { + self.thread.start() + } + + func cancel() { + self.thread.cancel() + } + func addSpan(span: ReadableSpan) { cond.lock() defer { cond.unlock() } - + if spanList.count == maxQueueSize { processedSpansCounter?.add(value: 1, attribute: [ BatchSpanProcessor.SPAN_PROCESSOR_TYPE_LABEL: .string(BatchSpanProcessor.SPAN_PROCESSOR_TYPE_VALUE), @@ -160,15 +173,15 @@ private class BatchWorker: Thread { return } spanList.append(span) - + // Notify the worker thread that at half of the queue is available. It will take // time anyway for the thread to wake up. if spanList.count >= halfMaxQueueSize { cond.broadcast() } } - - override func main() { + + func main() { repeat { autoreleasepool { var spansCopy: [ReadableSpan] @@ -176,21 +189,21 @@ private class BatchWorker: Thread { if spanList.count < maxExportBatchSize { repeat { cond.wait(until: Date().addingTimeInterval(scheduleDelay)) - } while spanList.isEmpty && !self.isCancelled + } while spanList.isEmpty && !thread.isCancelled } spansCopy = spanList spanList.removeAll() cond.unlock() self.exportBatch(spanList: spansCopy, explicitTimeout: self.exportTimeout) } - } while !self.isCancelled + } while !thread.isCancelled } - + func shutdown() { forceFlush(explicitTimeout: self.exportTimeout) spanExporter.shutdown() } - + public func forceFlush(explicitTimeout: TimeInterval? = nil) { var spansCopy: [ReadableSpan] cond.lock() @@ -200,7 +213,7 @@ private class BatchWorker: Thread { // Execute the batch export outside the synchronized to not block all producers. exportBatch(spanList: spansCopy, explicitTimeout: explicitTimeout) } - + private func exportBatch(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { let maxTimeOut = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout) let exportOperation = BlockOperation { [weak self] in @@ -210,14 +223,14 @@ private class BatchWorker: Thread { timeoutTimer.setEventHandler { exportOperation.cancel() } - + timeoutTimer.schedule(deadline: .now() + .milliseconds(Int(maxTimeOut.toMilliseconds)), leeway: .milliseconds(1)) timeoutTimer.activate() queue.addOperation(exportOperation) queue.waitUntilAllOperationsAreFinished() timeoutTimer.cancel() } - + private func exportAction(spanList: [ReadableSpan], explicitTimeout: TimeInterval? = nil) { stride(from: 0, to: spanList.endIndex, by: maxExportBatchSize).forEach { var spansToExport = spanList[$0 ..< min($0 + maxExportBatchSize, spanList.count)].map { $0.toSpanData() }