Skip to content

Commit

Permalink
Fix memory leak with batch
Browse files Browse the repository at this point in the history
  • Loading branch information
oa-s committed Jan 27, 2022
1 parent cce12b1 commit f22b7cc
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 162 deletions.
242 changes: 145 additions & 97 deletions web3swift/Promises/Classes/Promise+Batching.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,133 +13,181 @@ public class JSONRPCrequestDispatcher {
public var MAX_WAIT_TIME: TimeInterval = 0.1
public var policy: DispatchPolicy
public var queue: DispatchQueue

private var provider: Web3Provider
private var lockQueue: DispatchQueue
private var batches: [Batch] = [Batch]()
init(provider: Web3Provider, queue: DispatchQueue, policy: DispatchPolicy) {
private lazy var batches: [Batch] = []

public init(provider: Web3Provider, queue: DispatchQueue, policy: DispatchPolicy) {
self.provider = provider
self.queue = queue
self.policy = policy
self.lockQueue = DispatchQueue(label: "batchingQueue", qos: .userInitiated)

createBatch()
}

private func getBatch() throws -> Batch {
guard case .Batch(let batchLength) = policy else {
throw Web3Error.inputError("Trying to batch a request when policy is not to batch")
}

let currentBatch = batches.last ?? createBatch()

if currentBatch.requests.count % batchLength == 0 || currentBatch.triggered {
let newBatch = Batch(capacity: Int(batchLength), queue: queue)
newBatch.delegate = self

batches.append(newBatch)

return newBatch
}

return currentBatch
}

public enum DispatchPolicy {
case Batch(Int)
case NoBatching
}

func addToQueue(request: JSONRPCrequest) -> Promise<JSONRPCresponse> {
switch policy {
case .NoBatching:
self.batches.append(Batch(provider: self.provider, capacity: 32, queue: self.queue, lockQueue: self.lockQueue))
case .Batch(let count):
self.batches.append(Batch(provider: self.provider, capacity: count, queue: self.queue, lockQueue: self.lockQueue))
return provider.sendAsync(request, queue: queue)
case .Batch:
let (promise, seal) = Promise<JSONRPCresponse>.pending()
lockQueue.async { [weak self] in
guard let strongSelf = self else { return }

do {
let batch = try strongSelf.getBatch()
let internalPromise = try batch.add(request, maxWaitTime: strongSelf.MAX_WAIT_TIME)
internalPromise.done(on: strongSelf.queue) { resp in
seal.fulfill(resp)
}.catch(on: strongSelf.queue) { err in
seal.reject(err)
}
} catch {
seal.reject(error)
}
}

return promise
}
}
internal final class Batch {

internal final class Batch: NSObject {
var capacity: Int
var promisesDict: [UInt64: (promise: Promise<JSONRPCresponse>, resolver: Resolver<JSONRPCresponse>)] = [UInt64: (promise: Promise<JSONRPCresponse>, resolver: Resolver<JSONRPCresponse>)]()
var requests: [JSONRPCrequest] = [JSONRPCrequest]()
var pendingTrigger: Guarantee<Void>?
var provider: Web3Provider
var queue: DispatchQueue
var lockQueue : DispatchQueue
var triggered : Bool = false
func add(_ request: JSONRPCrequest, maxWaitTime: TimeInterval) throws -> Promise<JSONRPCresponse> {
if self.triggered {
var promises: [UInt64: (promise: Promise<JSONRPCresponse>, resolver: Resolver<JSONRPCresponse>)] = [:]
var requests: [JSONRPCrequest] = []

private var pendingTrigger: Guarantee<Void>?
private let queue: DispatchQueue
private (set) var triggered: Bool = false
weak var delegate: BatchDelegate?

fileprivate func add(_ request: JSONRPCrequest, maxWaitTime: TimeInterval) throws -> Promise<JSONRPCresponse> {
guard !triggered else {
throw Web3Error.nodeError("Batch is already in flight")
}

let requestID = request.id
let promiseToReturn = Promise<JSONRPCresponse>.pending()
self.lockQueue.async {
if self.promisesDict[requestID] != nil {
promiseToReturn.resolver.reject(Web3Error.processingError("Request ID collision"))
}
self.promisesDict[requestID] = promiseToReturn
self.requests.append(request)
if self.pendingTrigger == nil {
self.pendingTrigger = after(seconds: maxWaitTime).done(on: self.queue) {
self.trigger()
var shouldAddPromise = true

if promises[requestID] != nil {
shouldAddPromise = false
promiseToReturn.resolver.reject(Web3Error.processingError("Request ID collision"))
}

if shouldAddPromise {
promises[requestID] = promiseToReturn
}

requests.append(request)

if pendingTrigger == nil {
pendingTrigger = after(seconds: maxWaitTime).done(on: queue) { [weak self] in
guard let strongSelf = self else {
return
}
strongSelf.trigger()
}
if self.requests.count == self.capacity {
self.trigger()
}
}

if requests.count == capacity {
trigger()
}

return promiseToReturn.promise
}

func trigger() {
self.lockQueue.async {
if self.triggered {
return
}
self.triggered = true
let requestsBatch = JSONRPCrequestBatch(requests: self.requests)
_ = self.provider.sendAsync(requestsBatch, queue: self.queue).done(on: self.queue){batch in
for response in batch.responses {
if self.promisesDict[UInt64(response.id)] == nil {
for k in self.promisesDict.keys {
self.promisesDict[k]?.resolver.reject(Web3Error.nodeError("Unknown request id"))
}
return
}
}
for response in batch.responses {
let promise = self.promisesDict[UInt64(response.id)]!
promise.resolver.fulfill(response)
}
}.catch(on:self.queue) {err in
for k in self.promisesDict.keys {
self.promisesDict[k]?.resolver.reject(err)
}
}
guard !triggered else {
return
}
triggered = true

delegate?.didTrigger(id: self)
}

init (provider: Web3Provider, capacity: Int, queue: DispatchQueue, lockQueue: DispatchQueue) {
self.provider = provider

init(capacity: Int, queue: DispatchQueue) {
self.capacity = capacity
self.queue = queue
self.lockQueue = lockQueue
}
}

func getBatch() throws -> Batch {
guard case .Batch(let batchLength) = self.policy else {
throw Web3Error.inputError("Trying to batch a request when policy is not to batch")
}
let currentBatch = self.batches.last!
if currentBatch.requests.count % batchLength == 0 || currentBatch.triggered {
let newBatch = Batch(provider: self.provider, capacity: Int(batchLength), queue: self.queue, lockQueue: self.lockQueue)
self.batches.append(newBatch)
return newBatch
}
return currentBatch
}

public enum DispatchPolicy {
case Batch(Int)
case NoBatching
}

func addToQueue(request: JSONRPCrequest) -> Promise<JSONRPCresponse> {
switch self.policy {
case .NoBatching:
return self.provider.sendAsync(request, queue: self.queue)
case .Batch(_):
let promise = Promise<JSONRPCresponse> {
seal in
self.lockQueue.async {
do {
let batch = try self.getBatch()
let internalPromise = try batch.add(request, maxWaitTime: self.MAX_WAIT_TIME)
internalPromise.done(on: self.queue) {resp in
seal.fulfill(resp)
}.catch(on: self.queue){err in
seal.reject(err)
}
} catch {
seal.reject(error)

}

extension JSONRPCrequestDispatcher: BatchDelegate {

func didTrigger(id batch: Batch) {
let requestsBatch = JSONRPCrequestBatch(requests: batch.requests)

provider.sendAsync(requestsBatch, queue: queue).done(on: queue, { [weak self] batchResponse in
for response in batchResponse.responses {
if batch.promises[UInt64(response.id)] == nil {
for k in batch.promises.keys {
batch.promises[k]?.resolver.reject(Web3Error.nodeError("Unknown request id"))
}
return
}
}
return promise

for response in batchResponse.responses {
let promise = batch.promises[UInt64(response.id)]!
promise.resolver.fulfill(response)
}
self?.batches.removeAll(where: { $0 == batch })
}).catch(on: queue, { [weak self] err in
for k in batch.promises.keys {
batch.promises[k]?.resolver.reject(err)
}
self?.batches.removeAll(where: { $0 == batch })
})
}

@discardableResult func createBatch() -> Batch {
switch policy {
case .NoBatching:
let batch = Batch(capacity: 32, queue: queue)
batch.delegate = self

batches.append(batch)

return batch
case .Batch(let count):
let batch = Batch(capacity: count, queue: queue)
batch.delegate = self

batches.append(batch)

return batch
}
}
}

protocol BatchDelegate: class {
func didTrigger(id batch: JSONRPCrequestDispatcher.Batch)
}
22 changes: 7 additions & 15 deletions web3swift/Web3/Classes/Web3+Instance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class web3: Web3OptionsInheritable {
self.queue = OperationQueue.init()
self.queue.maxConcurrentOperationCount = 32
self.queue.underlyingQueue = DispatchQueue.global(qos: .userInteractive)

} else {
self.queue = queue!
}
Expand All @@ -50,59 +50,51 @@ public class web3: Web3OptionsInheritable {
self.requestDispatcher = requestDispatcher!
}
}


public func addKeystoreManager(_ manager: KeystoreManager?) {
self.provider.attachedKeystoreManager = manager
}

public class Eth: Web3OptionsInheritable {
var provider: Web3Provider
var web3: web3
public var options: Web3Options {
return self.web3.options
}

public init(provider prov: Web3Provider, web3 web3instance: web3) {
provider = prov
web3 = web3instance
}
}

public class Personal:Web3OptionsInheritable {
var provider: Web3Provider
var web3: web3
public var options: Web3Options {
return self.web3.options
}

public init(provider prov: Web3Provider, web3 web3instance: web3) {
provider = prov
web3 = web3instance
}
}

public class Web3Wallet {
var provider: Web3Provider
var web3: web3

public init(provider prov: Web3Provider, web3 web3instance: web3) {
provider = prov
web3 = web3instance
}
}

public class BrowserFunctions: Web3OptionsInheritable {
var provider: Web3Provider
var web3: web3
public var options: Web3Options {
return self.web3.options
}

public init(provider prov: Web3Provider, web3 web3instance: web3) {
provider = prov
web3 = web3instance
}
}

}
Loading

0 comments on commit f22b7cc

Please sign in to comment.