Skip to content

Commit

Permalink
fix bad access on batches
Browse files Browse the repository at this point in the history
  • Loading branch information
oa-s committed Apr 15, 2022
1 parent 45ae1f3 commit 5e2e47a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 77 deletions.
150 changes: 74 additions & 76 deletions web3swift/Promises/Classes/Promise+Batching.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ public class JSONRPCrequestDispatcher {

private var provider: Web3Provider
private var lockQueue: DispatchQueue
private lazy var batches: Store<Batch> = .init(queue: lockQueue)
private var batches: Store<Batch>

public init(provider: Web3Provider, queue: DispatchQueue, policy: DispatchPolicy) {
self.provider = provider
self.queue = queue
self.policy = policy
self.lockQueue = DispatchQueue(label: "batchingQueue", qos: .userInitiated)
self.batches = .init(queue: lockQueue)

createBatch()
}
Expand Down Expand Up @@ -56,69 +57,57 @@ public class JSONRPCrequestDispatcher {
case .NoBatching:
return provider.sendAsync(request, queue: queue)
case .Batch:
let (promise, seal) = Promise<JSONRPCresponse>.pending()

do {
let batch = try getBatch()
let internalPromise = try batch.add(request, maxWaitTime: MAX_WAIT_TIME)
internalPromise.done(on: queue) { resp in
seal.fulfill(resp)
}.catch(on: queue) { err in
seal.reject(err)
}
return try batch.add(request, maxWaitTime: MAX_WAIT_TIME)
} catch {
seal.reject(error)
let returnPromise = Promise<JSONRPCresponse>.pending()
queue.async {
returnPromise.resolver.reject(error)
}
return returnPromise.promise
}

return promise
}
}

internal final class Batch: NSObject {
var capacity: Int
lazy var promises: DictionaryStore<UInt64, (promise: Promise<JSONRPCresponse>, resolver: Resolver<JSONRPCresponse>)> = .init(queue: lockQueue)
lazy var requests: Store<JSONRPCrequest> = .init(queue: lockQueue)

private var pendingTrigger: Guarantee<Void>?
private let queue: DispatchQueue
private let lockQueue: DispatchQueue
private (set) var triggered: Bool = false

var capacity: Int
var promises: DictionaryStore<UInt64, (promise: Promise<JSONRPCresponse>, seal: Resolver<JSONRPCresponse>)>
var requests: Store<JSONRPCrequest>

weak var delegate: BatchDelegate?

fileprivate func add(_ request: JSONRPCrequest, maxWaitTime: TimeInterval) throws -> Promise<JSONRPCresponse> {
guard !triggered else {
throw Web3Error.nodeError("Batch is already in flight")
}

let requestID = request.id
let promiseToReturn = Promise<JSONRPCresponse>.pending()
var shouldAddPromise = true

if promises[requestID] != nil {
shouldAddPromise = false
promiseToReturn.resolver.reject(Web3Error.processingError("Request ID collision"))
}
let (promise, seal) = Promise<JSONRPCresponse>.pending()

if shouldAddPromise {
promises[requestID] = promiseToReturn
if let value = promises[request.id] {
value.seal.reject(Web3Error.processingError("Request ID collision"))
} else {
promises[request.id] = (promise, seal)
}

requests.append(request)

if pendingTrigger == nil {
pendingTrigger = after(seconds: maxWaitTime).done(on: queue) { [weak self] in
guard let strongSelf = self else {
return
pendingTrigger = after(seconds: maxWaitTime)
.done(on: queue) { [weak self] in
self?.trigger()
}
strongSelf.trigger()
}
}

if requests.count() == capacity {
trigger()
}

return promiseToReturn.promise
return promise
}

func trigger() {
Expand All @@ -131,7 +120,8 @@ public class JSONRPCrequestDispatcher {
init(capacity: Int, queue: DispatchQueue, lockQueue: DispatchQueue) {
self.capacity = capacity
self.queue = queue
self.lockQueue = lockQueue
self.promises = .init(queue: lockQueue)
self.requests = .init(queue: lockQueue)
}
}

Expand All @@ -142,26 +132,33 @@ extension JSONRPCrequestDispatcher: BatchDelegate {
func didTrigger(id batch: Batch) {
let requestsBatch = JSONRPCrequestBatch(requests: batch.requests.allValues())

provider.sendAsync(requestsBatch, queue: queue).done(on: queue, { batchResponse in
for response in batchResponse.responses {
if batch.promises[UInt64(response.id)] == nil {
for k in batch.promises.keys() {
batch.promises[k]?.resolver.reject(Web3Error.nodeError("Unknown request id"))
provider
.sendAsync(requestsBatch, queue: queue)
.done(on: queue, { [weak batches] batchResponse in
for response in batchResponse.responses {
let id = UInt64(response.id)
guard let value = batch.promises[id] else {
guard let keys = batch.promises.keys() else { return }
for key in keys {
guard let value = batch.promises[key] else { continue }
value.seal.reject(Web3Error.nodeError("Unknown request id"))
}
return
}
return
value.seal.fulfill(response)
}
}

for response in batchResponse.responses {
batch.promises[UInt64(response.id)]?.resolver.fulfill(response)
}
}).catch(on: queue, { err in
for k in batch.promises.keys() {
batch.promises[k]?.resolver.reject(err)
}
}).finally { [weak self] in
self?.batches.removeAll(where: { $0 == batch })
}
batches?.removeAll(batch)
}).catch(on: queue, { [weak batches] err in
guard let keys = batch.promises.keys() else { return }

for key in keys {
guard let value = batch.promises[key] else { continue }
value.seal.reject(err)
}

batches?.removeAll(batch)
})
}

@discardableResult func createBatch() -> Batch {
Expand All @@ -184,7 +181,7 @@ extension JSONRPCrequestDispatcher: BatchDelegate {
}
}

protocol BatchDelegate: class {
protocol BatchDelegate: AnyObject {
func didTrigger(id batch: JSONRPCrequestDispatcher.Batch)
}

Expand All @@ -200,28 +197,27 @@ class DictionaryStore<K: Hashable, V> {
get {
var element: V?
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
element = self.values[key]
queue.sync { [weak self] in
element = self?.values[key]
}
return element
}
set {
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
self.values[key] = newValue
queue.sync { [weak self] in
self?.values[key] = newValue
}
}
}

func keys() -> Dictionary<K, V>.Keys {
func keys() -> Dictionary<K, V>.Keys? {
var keys: Dictionary<K, V>.Keys?
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
keys = self.values.keys
queue.sync { [weak self] in
keys = self?.values.keys
}
return keys!
return keys
}

}

class Store<T> {
Expand All @@ -234,43 +230,45 @@ class Store<T> {

func append(_ element: T) {
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
self.values.append(element)
queue.sync { [weak self] in
self?.values.append(element)
}
}

func last() -> T? {
var element: T?
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
element = self.values.last
queue.sync { [weak self] in
element = self?.values.last
}

return element
}

func removeAll(where closure: (T) -> Bool) {
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
self.values.removeAll(where: closure)
}
}

func count() -> Int {
var count: Int = 0
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
count = self.values.count
queue.sync { [weak self] in
count = self?.values.count ?? 0
}
return count
}

func allValues() -> [T] {
var values: [T] = []
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [unowned self] in
values = self.values
queue.sync { [weak self] in
values = self?.values ?? []
}
return values
}
}

extension Store where T: Equatable & AnyObject {
func removeAll(_ elem: T) {
dispatchPrecondition(condition: .notOnQueue(queue))
queue.sync { [weak self] in
self?.values.removeAll(where: { $0 === elem })
}
}
}
2 changes: 1 addition & 1 deletion web3swift/Web3/Classes/Web3+Instance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class web3: Web3OptionsInheritable {
return self.provider.send(request: request)
}
public func dispatch(_ request: JSONRPCrequest) -> Promise<JSONRPCresponse> {
return self.requestDispatcher.addToQueue(request: request)
return requestDispatcher.addToQueue(request: request)
}

public init(provider prov: Web3Provider, queue: OperationQueue? = nil, dispatcher: OperationDispatcher? = nil, requestDispatcher: JSONRPCrequestDispatcher? = nil) {
Expand Down

0 comments on commit 5e2e47a

Please sign in to comment.