-
Notifications
You must be signed in to change notification settings - Fork 129
/
Copy pathchainset.go
238 lines (202 loc) · 6.56 KB
/
chainset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package interchaintest
import (
"context"
"database/sql"
"fmt"
"os"
"sync"
"time"
"github.com/docker/docker/client"
"github.com/strangelove-ventures/interchaintest/v8/blockdb"
"github.com/strangelove-ventures/interchaintest/v8/chain/cosmos"
"github.com/strangelove-ventures/interchaintest/v8/ibc"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// chainSet is an unordered collection of ibc.Chain,
// to group methods that apply actions against all chains in the set.
//
// The main purpose of the chainSet is to unify test setup when working with any number of chains.
type chainSet struct {
log *zap.Logger
chains map[ibc.Chain]struct{}
// The following fields are set during TrackBlocks, and used in Close.
trackerEg *errgroup.Group
db *sql.DB
collectors []*blockdb.Collector
}
func newChainSet(log *zap.Logger, chains []ibc.Chain) *chainSet {
cs := &chainSet{
log: log,
chains: make(map[ibc.Chain]struct{}, len(chains)),
}
for _, chain := range chains {
cs.chains[chain] = struct{}{}
}
return cs
}
// Initialize concurrently calls Initialize against each chain in the set.
// Each chain may run a docker pull command,
// so with a cold image cache, running concurrently may save some time.
func (cs *chainSet) Initialize(ctx context.Context, testName string, cli *client.Client, networkID string) error {
var eg errgroup.Group
for c := range cs.chains {
c := c
eg.Go(func() error {
if err := c.Initialize(ctx, testName, cli, networkID); err != nil {
return fmt.Errorf("failed to initialize chain %s: %w", c.Config().Name, err)
}
return nil
})
}
return eg.Wait()
}
// CreateCommonAccount creates a key with the given name on each chain in the set,
// and returns the bech32 representation of each account created.
// The typical use of CreateCommonAccount is to create a faucet account on each chain.
//
// The keys are created concurrently because creating keys on one chain
// should have no effect on any other chain.
func (cs *chainSet) CreateCommonAccount(ctx context.Context, keyName string) (faucetAddresses map[ibc.Chain]string, err error) {
var mu sync.Mutex
faucetAddresses = make(map[ibc.Chain]string, len(cs.chains))
eg, egCtx := errgroup.WithContext(ctx)
for c := range cs.chains {
c := c
eg.Go(func() error {
wallet, err := c.BuildWallet(egCtx, keyName, "")
if err != nil {
return err
}
mu.Lock()
faucetAddresses[c] = wallet.FormattedAddress()
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("failed to create common account with name %s: %w", keyName, err)
}
return faucetAddresses, nil
}
// Start concurrently calls Start against each chain in the set.
func (cs *chainSet) Start(ctx context.Context, testName string, additionalGenesisWallets map[ibc.Chain][]ibc.WalletAmount) error {
eg, egCtx := errgroup.WithContext(ctx)
for c := range cs.chains {
c := c
if cosmosChain, ok := c.(*cosmos.CosmosChain); ok && cosmosChain.Provider != nil {
// wait for provider chains to be started up first
continue
}
eg.Go(func() error {
chainCfg := c.Config()
if cosmosChain, ok := c.(*cosmos.CosmosChain); ok {
if len(cosmosChain.Consumers) > 0 {
// this is a provider chain
if err := cosmosChain.StartProvider(testName, egCtx, additionalGenesisWallets[c]...); err != nil {
return fmt.Errorf("failed to start provider chain %s: %w", chainCfg.Name, err)
}
return nil
}
}
// standard chain startup
if err := c.Start(testName, egCtx, additionalGenesisWallets[c]...); err != nil {
return fmt.Errorf("failed to start chain %s: %w", chainCfg.Name, err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
eg, egCtx = errgroup.WithContext(ctx)
// Now startup any consumer chains
for c := range cs.chains {
c := c
if cosmosChain, ok := c.(*cosmos.CosmosChain); ok && cosmosChain.Provider != nil {
eg.Go(func() error {
// this is a consumer chain
if err := cosmosChain.StartConsumer(testName, egCtx, additionalGenesisWallets[c]...); err != nil {
return fmt.Errorf("failed to start consumer chain %s: %w", c.Config().Name, err)
}
return nil
})
}
}
return eg.Wait()
}
// TrackBlocks initializes database tables and polls for transactions to be saved in the database.
// This method is a nop if dbPath is blank.
// The gitSha is used to pin a git commit to a test invocation. Thus, when a user is looking at historical
// data they are able to determine which version of the code produced the results.
// Expected to be called after Start.
func (cs chainSet) TrackBlocks(ctx context.Context, testName, dbPath, gitSha string) error {
if len(dbPath) == 0 {
// nop
return nil
}
db, err := blockdb.ConnectDB(ctx, dbPath)
if err != nil {
return fmt.Errorf("connect to sqlite database %s: %w", dbPath, err)
}
cs.db = db
if len(gitSha) == 0 {
gitSha = "unknown"
}
if err := blockdb.Migrate(db, gitSha); err != nil {
return fmt.Errorf("migrate sqlite database %s; deleting file recommended: %w", dbPath, err)
}
testCase, err := blockdb.CreateTestCase(ctx, db, testName, gitSha)
if err != nil {
_ = db.Close()
return fmt.Errorf("create test case in sqlite database: %w", err)
}
// TODO (nix - 6/1/22) Need logger instead of fmt.Fprint
cs.trackerEg = new(errgroup.Group)
cs.collectors = make([]*blockdb.Collector, len(cs.chains))
i := 0
for c := range cs.chains {
c := c
id := c.Config().ChainID
finder, ok := c.(blockdb.TxFinder)
if !ok {
fmt.Fprintf(os.Stderr, `Chain %s is not configured to save blocks; must implement "FindTxs(ctx context.Context, height int64) ([][]byte, error)"`+"\n", id)
return nil
}
j := i // Avoid closure on loop variable.
cs.trackerEg.Go(func() error {
chaindb, err := testCase.AddChain(ctx, id, c.Config().Type)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to add chain %s to database: %v", id, err)
return nil
}
log := cs.log.With(zap.String("chain_id", id))
collector := blockdb.NewCollector(log, finder, chaindb, 100*time.Millisecond)
cs.collectors[j] = collector
collector.Collect(ctx)
return nil
})
i++
}
return nil
}
// Close frees any resources associated with the chainSet.
//
// Currently, it only frees resources from TrackBlocks.
// Close is safe to call even if TrackBlocks was not called.
func (cs *chainSet) Close() error {
for _, c := range cs.collectors {
if c != nil {
c.Stop()
}
}
var err error
if cs.trackerEg != nil {
multierr.AppendInto(&err, cs.trackerEg.Wait())
}
if cs.db != nil {
multierr.AppendInto(&err, cs.db.Close())
}
return err
}