This repository has been archived by the owner on Aug 23, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.js
84 lines (77 loc) · 2.18 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
const { spawn } = require('child_process')
const PQueue = require('p-queue')
const delay = require('delay')
const queue = new PQueue({ concurrency: 6 })
async function queueProposeDeal (jobBus, dealRequestId, dealRequest) {
queue.add(() => run())
async function run () {
await delay(0)
jobBus.emit('started')
await delay(0)
console.log('Job started', dealRequestId)
const before = Date.now()
const { cid, ask, duration } = dealRequest
const prefix = `${dealRequestId}:`
console.log(
prefix,
`go-filecoin client propose-storage-deal ` +
`--allow-duplicates ${ask.miner} ${cid} ${ask.id} ${duration}`
)
try {
const [output, code] = await spawnAndWait(
'go-filecoin',
[
'client',
'propose-storage-deal',
'--allow-duplicates',
ask.miner,
cid,
`${ask.id}`,
`${duration}`
],
{ prefix }
)
const elapsed = Math.floor((Date.now() - before) / 1000)
console.log(`${dealRequestId}: Done in ${elapsed}s. Exit code: ${code}`)
let errorMsg
let dealId
output.split('\n').map(line => {
const matchError = line.match(/^Error: (.*)/)
if (matchError) {
errorMsg = matchError[1]
}
const matchDealId = line.match(/^DealID: *(.*)/)
if (matchDealId) {
dealId = matchDealId[1]
}
})
if (code === 0) {
jobBus.emit('success', { dealId })
} else {
jobBus.emit('fail', { errorMsg })
}
console.log('Job finished', dealRequestId)
} catch (e) {
console.error('Propose deal error', e)
jobBus.emit('fail')
}
}
}
function spawnAndWait (cmd, args, options = {}) {
const promise = new Promise((resolve, reject) => {
let output = ''
const child = spawn(cmd, args)
child.stdout.on('data', appendOutput)
child.stderr.on('data', appendOutput)
child.on('close', code => resolve([output, code]))
child.on('error', e => reject(e))
function appendOutput (data) {
process.stdout.write(`${options.prefix} ${data.toString()}`)
output += data
}
})
return promise
}
module.exports = {
queueProposeDeal
}