-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathmain.go
161 lines (141 loc) · 4.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main
import (
"fmt"
"os"
"time"
"github.com/flashbots/mempool-dumpster/common"
"github.com/urfave/cli/v2"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
"go.uber.org/zap"
)
var (
version = "dev" // is set during build process
debug = os.Getenv("DEBUG") == "1"
max = common.GetEnvInt("MAX", 0)
// Helpers
log *zap.SugaredLogger
// CLI flags
cliFlags = []cli.Flag{
&cli.StringSliceFlag{
Name: "input-parquet",
Usage: "input parquet files",
},
&cli.StringSliceFlag{
Name: "input-sourcelog",
Usage: "input sourcelog files",
},
&cli.StringFlag{
Name: "out",
Usage: "output filename",
},
// &cli.StringSliceFlag{
// Name: "tx-blacklist",
// Usage: "metadata CSV/ZIP input files with transactions to ignore in analysis",
// },
// &cli.StringSliceFlag{
// Name: "tx-whitelist",
// Usage: "metadata CSV/ZIP input files to only use transactions in there for analysis",
// },
&cli.StringSliceFlag{
Name: "cmp",
Usage: "compare these sources",
},
}
)
func main() {
log = common.GetLogger(debug, false)
defer func() { _ = log.Sync() }()
app := &cli.App{
Name: "analyze",
Usage: "Analyze transaction and sourcelog files",
Flags: cliFlags,
Action: analyzeV2,
}
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
func analyzeV2(cCtx *cli.Context) error {
outFile := cCtx.String("out")
// ignoreTxsFiles := cCtx.StringSlice("tx-blacklist")
// whitelistTxsFiles := cCtx.StringSlice("tx-whitelist")
parquetInputFiles := cCtx.StringSlice("input-parquet")
inputSourceLogFiles := cCtx.StringSlice("input-sourcelog")
cmpSources := cCtx.StringSlice("cmp")
sourceComps := common.DefaultSourceComparisons
if len(cmpSources) > 0 {
sourceComps = common.NewSourceComps(cmpSources)
}
if len(parquetInputFiles) == 0 {
log.Fatal("no input-parquet files specified")
}
log.Infow("Analyzer V2", "version", version)
// log.Infow("Comparing:", "sources", sourceComps)
// Ensure output files are don't yet exist
common.MustNotExist(log, outFile)
log.Infof("Output file: %s", outFile)
// Check input files
for _, fn := range parquetInputFiles {
common.MustBeParquetFile(log, fn)
}
// for _, fn := range append(ignoreTxsFiles, whitelistTxsFiles...) {
// common.MustBeCSVFile(log, fn)
// }
// Load parquet input files
timeStart := time.Now()
log.Infow("Loading parquet input files...", "memUsed", common.GetMemUsageHuman())
fr, err := local.NewLocalFileReader(parquetInputFiles[0])
if err != nil {
log.Fatalw("Can't open file", "error", err)
}
pr, err := reader.NewParquetReader(fr, new(common.TxSummaryEntry), 4)
if err != nil {
log.Fatalw("Can't create parquet reader", "error", err)
}
num := int(pr.GetNumRows())
entries := make(map[string]*common.TxSummaryEntry)
var i int
for i = 0; i < num; i++ {
stus := make([]common.TxSummaryEntry, 1)
if err = pr.Read(&stus); err != nil {
log.Errorw("Read error", "error", err)
}
if i%20_000 == 0 {
log.Infow(common.Printer.Sprintf("- Loaded %10d / %d rows", i, num), "memUsed", common.GetMemUsageHuman())
}
entries[stus[0].Hash] = &stus[0]
if i+1 == max {
break
}
}
pr.ReadStop()
fr.Close()
log.Infow(common.Printer.Sprintf("- Loaded %10d / %d rows", i+1, num), "memUsed", common.GetMemUsageHuman(), "timeTaken", time.Since(timeStart).String())
// Load input files
var sourcelog map[string]map[string]int64 // [hash][source] = timestampMs
if len(inputSourceLogFiles) > 0 {
log.Info("Loading sourcelog files...")
sourcelog, _ = common.LoadSourcelogFiles(log, inputSourceLogFiles)
log.Infow("Processed input sourcelog files",
"txTotal", common.Printer.Sprintf("%d", len(sourcelog)),
"memUsed", common.GetMemUsageHuman(),
)
}
log.Info("Analyzing...")
analyzer := common.NewAnalyzer2(common.Analyzer2Opts{ //nolint:exhaustruct
Transactions: entries,
Sourelog: sourcelog,
SourceComps: sourceComps,
})
s := analyzer.Sprint()
fmt.Println("")
fmt.Println(s)
if outFile != "" {
err = analyzer.WriteToFile(outFile)
if err != nil {
log.Errorw("Can't write to file", "error", err)
}
}
return nil
}