Skip to content

Commit

Permalink
Work around open-telemetry#615, and add support for building with the…
Browse files Browse the repository at this point in the history
… Static Linux SDK
  • Loading branch information
semicoleon committed Jan 16, 2025
1 parent 0db61a3 commit e23b49c
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 76 deletions.
21 changes: 3 additions & 18 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ let package = Package(
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.2"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.1.1"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0")
],
targets: [
.target(name: "OpenTelemetryApi",
dependencies: []),
.target(name: "OpenTelemetrySdk",
dependencies: ["OpenTelemetryApi"].withAtomicsIfNeeded()),
dependencies: ["OpenTelemetryApi",
.product(name: "Atomics", package: "swift-atomics", condition: .when(platforms: [.linux]))]),
.target(name: "OpenTelemetryConcurrency",
dependencies: ["OpenTelemetryApi"]),
.target(name: "OpenTelemetryTestUtils",
Expand Down Expand Up @@ -133,25 +135,8 @@ let package = Package(
]
).addPlatformSpecific()

extension [Target.Dependency] {
func withAtomicsIfNeeded() -> [Target.Dependency] {
#if canImport(Darwin)
return self
#else
var dependencies = self
dependencies.append(.product(name: "Atomics", package: "swift-atomics"))
return dependencies
#endif
}
}

extension Package {
func addPlatformSpecific() -> Self {
#if !canImport(Darwin)
dependencies.append(
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0"))
)
#endif
#if canImport(ObjectiveC)
dependencies.append(
.package(url: "https://github.com/undefinedlabs/opentracing-objc", from: "0.5.2")
Expand Down
6 changes: 5 additions & 1 deletion Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/OpenTracingShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
import Glibc
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/SwiftMetricsShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/OpenTelemetrySdk/Internal/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//
//

import Foundation
import OpenTelemetryApi

public class BatchLogRecordProcessor : LogRecordProcessor {



fileprivate var worker : BatchWorker

public init(logRecordExporter: LogRecordExporter, scheduleDelay: TimeInterval = 5, exportTimeout: TimeInterval = 30, maxQueueSize: Int = 2048, maxExportBatchSize: Int = 512, willExportCallback: ((inout [ReadableLogRecord])->Void)? = nil) {
worker = BatchWorker(logRecordExporter: logRecordExporter, scheduleDelay: scheduleDelay, exportTimeout: exportTimeout, maxQueueSize: maxQueueSize, maxExportBatchSize: maxExportBatchSize, willExportCallback: willExportCallback)

worker.start()
}

public func onEmit(logRecord: ReadableLogRecord) {
worker.emit(logRecord: logRecord)
}

public func forceFlush(explicitTimeout: TimeInterval?) -> ExportResult {
forceFlush(timeout: explicitTimeout)
return .success
}

public func forceFlush(timeout: TimeInterval? = nil) {
worker.forceFlush(explicitTimeout: timeout)
}


public func shutdown(explicitTimeout: TimeInterval? = nil) -> ExportResult {
worker.cancel()
worker.shutdown(explicitTimeout: explicitTimeout)
return .success
}
}

private class BatchWorker : Thread {
private class BatchWorker {
var thread: Thread!
let logRecordExporter : LogRecordExporter
let scheduleDelay : TimeInterval
let maxQueueSize : Int
Expand All @@ -49,14 +49,14 @@ private class BatchWorker : Thread {
private let cond = NSCondition()
var logRecordList = [ReadableLogRecord]()
var queue : OperationQueue

init(logRecordExporter: LogRecordExporter,
scheduleDelay: TimeInterval,
exportTimeout: TimeInterval,
maxQueueSize: Int,
maxExportBatchSize: Int,
willExportCallback: ((inout [ReadableLogRecord])->Void)?) {

self.logRecordExporter = logRecordExporter
self.scheduleDelay = scheduleDelay
self.exportTimeout = exportTimeout
Expand All @@ -67,58 +67,69 @@ private class BatchWorker : Thread {
queue = OperationQueue()
queue.name = "BatchWorker Queue"
queue.maxConcurrentOperationCount = 1
self.thread = Thread(block: { [weak self] in
self?.main()
})
}

func start() {
self.thread.start()
}

func cancel() {
self.thread.cancel()
}

func emit(logRecord: ReadableLogRecord) {
cond.lock()
defer { cond.unlock()}
defer { cond.unlock() }
if logRecordList.count == maxQueueSize {
// TODO: record a counter for dropped logs
return
}

// TODO: record a gauge for referenced logs
logRecordList.append(logRecord)
if logRecordList.count >= halfMaxQueueSize {
cond.broadcast()
}
}
override func main() {

func main() {
repeat {
autoreleasepool {
var logRecordsCopy : [ReadableLogRecord]
cond.lock()
if logRecordList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while logRecordList.isEmpty && !self.isCancelled
} while logRecordList.isEmpty && !thread.isCancelled
}
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
}
} while !self.isCancelled
} while !thread.isCancelled
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) {
var logRecordsCopy: [ReadableLogRecord]
cond.lock()
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()

exportBatch(logRecordList: logRecordsCopy, explicitTimeout: explicitTimeout)
}


public func shutdown(explicitTimeout: TimeInterval?) {
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, exportTimeout)
forceFlush(explicitTimeout: timeout)
_ = logRecordExporter.shutdown(explicitTimeout: timeout)
}

private func exportBatch(logRecordList: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) {
let exportOperation = BlockOperation { [weak self] in
self?.exportAction(logRecordList : logRecordList, explicitTimeout: explicitTimeout)
Expand All @@ -132,7 +143,7 @@ private class BatchWorker : Thread {
queue.waitUntilAllOperationsAreFinished()
timeoutTimer.cancel()
}

private func exportAction(logRecordList: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) {
stride(from: 0, to: logRecordList.endIndex, by: maxExportBatchSize).forEach {
var logRecordToExport = logRecordList[$0 ..< min($0 + maxExportBatchSize, logRecordList.count)].map {$0}
Expand Down
Loading

0 comments on commit e23b49c

Please sign in to comment.