-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobrc.mjs
124 lines (107 loc) · 3.72 KB
/
obrc.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { open } from "node:fs/promises"
import { createInterface } from "node:readline"
import { isMainThread, Worker, workerData, parentPort, threadId } from "node:worker_threads"
const NUM_THREADS = 31
const FILE_NAME = "./measurements-100000.txt"
if (isMainThread) {
// MAIN
function runWorker({ globalMeasurements, start, end }) {
return new Promise((resolve, reject) => {
const worker = new Worker(import.meta.filename, { workerData: { start, end } })
function onMessage(localMeasurements) {
Object.entries(localMeasurements).forEach(([station, { Min, Max, Sum, Count }]) => {
const globalMeasurement = globalMeasurements[station]
if (!globalMeasurement) {
globalMeasurements[station] = {
Min,
Max,
Sum,
Count,
}
} else {
globalMeasurement.Count += Count
globalMeasurement.Sum += Sum
globalMeasurement.Min = Math.min(Min, globalMeasurement.Min)
globalMeasurement.Max = Math.max(Max, globalMeasurement.Max)
}
})
}
function onExit(code) {
if (code !== 0) {
reject(false)
} else {
resolve(true)
}
}
worker.on("message", onMessage)
worker.on("exit", onExit)
})
}
const globalMeasurements = {}
const fileHandle = await open(FILE_NAME, "r")
const fileStat = await fileHandle.stat()
const fileSizeInBytes = fileStat.size
const chunkSize = Math.round(fileSizeInBytes / NUM_THREADS)
const workers = []
let lastPos = 0
for (let i = 0; i < NUM_THREADS; i++) {
const position = chunkSize * (i + 1)
const buffer = new Uint8Array(4 * 1024)
const { bytesRead } = await fileHandle.read(buffer, 0, buffer.byteLength, position)
if (bytesRead === 0) {
continue
}
const lastNewLineIndex = buffer.lastIndexOf(10)
if (lastNewLineIndex === -1) {
throw new Error("There is no newline in the buffer")
}
const absoluteLastNewLineIndex = position + lastNewLineIndex
workers.push(runWorker({ globalMeasurements, start: lastPos, end: absoluteLastNewLineIndex - 1 }))
lastPos = absoluteLastNewLineIndex + 1
}
// process remaining
if (lastPos < fileSizeInBytes) {
workers.push(runWorker({ globalMeasurements, start: lastPos, end: fileSizeInBytes }))
}
await fileHandle.close()
await Promise.all(workers)
const sortedStations = Object.keys(globalMeasurements).sort()
process.stdout.write("{")
sortedStations.forEach((station, stationIdx) => {
const { Min, Max, Sum, Count } = globalMeasurements[station]
const Mean = (Sum / Count).toFixed(1)
process.stdout.write(`${station}=${Min.toFixed(1)}/${Mean}/${Max.toFixed(1)}`)
if (stationIdx !== sortedStations.length - 1) {
process.stdout.write(", ")
}
})
process.stdout.write("}")
} else {
// WORKER
const localMeasurements = {}
const { fileName, start, end } = workerData
const fileHandle = await open(FILE_NAME, "r")
const readLineInterface = createInterface({
input: fileHandle.createReadStream({ start, end }),
crlfDelay: Infinity
})
for await (const line of readLineInterface) {
const [stationStr, tempStr] = line.split(";")
const tempFloat = parseFloat(tempStr);
const measurement = localMeasurements[stationStr]
if (!measurement) {
localMeasurements[stationStr] = {
Min: tempFloat,
Max: tempFloat,
Sum: tempFloat,
Count: 1,
}
} else {
measurement.Count++
measurement.Sum += tempFloat
measurement.Min = Math.min(tempFloat, measurement.Min)
measurement.Max = Math.max(tempFloat, measurement.Max)
}
}
parentPort.postMessage(localMeasurements)
}