From 92648fc3c446e6f1725950457adb52d1090d2cd5 Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Tue, 27 Aug 2024 13:11:45 -0400 Subject: [PATCH] add queuing system for bitcoin broadcasts (#227) * added indexes and removed aggregate functions added indexes to btc_blocks_can materialized view, no longer using aggregate functions * added sql * add queuing system for bitcoin broadcasts add a db table btc_transaction_broadcast_request that will store broadcast requests. when someone wants to broadcast a tx, insert into this table (ignoring duplicates). have a few workers that read from this table and attempt to broadcast transactions to electrumx. when a worker "checks out" a broadcast request, it updates the last_broadcast_attempt_at column when a worker confirms a broadcast, it updates the broadcast_at column wait 10 minutes before processing same tx again if not confirmed * fix comment * add index * ignoring requests older than 3 hours, added test * using created_at to determine age, 2 hours * re-added test * squashme * squashme * fix pool pass * constant notify * more optimization: * ordering * delete on certain error * set last error * all errors * stagger * cache l2keystones * cache on new * no stagger * more connections * bss bfg timeout * ref, limit * async refresh and insert * bugfix * fix query * larger backoff * fix tests * don't use input ctx * longer timeouts in ci * pr feedback * smaller backoff * added test wait --- database/bfgd/database.go | 5 + database/bfgd/database_ext_test.go | 323 +++++++++++++++++++++++++++++ database/bfgd/postgres/postgres.go | 115 +++++++++- database/bfgd/scripts/0009.sql | 27 +++ e2e/e2e_ext_test.go | 98 ++++++--- service/bfg/bfg.go | 238 ++++++++++++++++----- service/bss/bss.go | 2 +- 7 files changed, 722 insertions(+), 86 deletions(-) create mode 100644 database/bfgd/scripts/0009.sql diff --git a/database/bfgd/database.go b/database/bfgd/database.go index d3a6dbfe4..c805b51b7 100644 --- a/database/bfgd/database.go +++ b/database/bfgd/database.go @@ -41,6 +41,11 @@ type Database interface { AccessPublicKeyInsert(ctx context.Context, publicKey *AccessPublicKey) error AccessPublicKeyExists(ctx context.Context, publicKey *AccessPublicKey) (bool, error) AccessPublicKeyDelete(ctx context.Context, publicKey *AccessPublicKey) error + + BtcTransactionBroadcastRequestInsert(ctx context.Context, serializedTx []byte, txId string) error + BtcTransactionBroadcastRequestGetNext(ctx context.Context, onlyNew bool) ([]byte, error) + BtcTransactionBroadcastRequestConfirmBroadcast(ctx context.Context, txId string) error + BtcTransactionBroadcastRequestSetLastError(ctx context.Context, txId string, lastErr string) error } // NotificationName identifies a database notification type. diff --git a/database/bfgd/database_ext_test.go b/database/bfgd/database_ext_test.go index 037ee3295..86bf47f58 100644 --- a/database/bfgd/database_ext_test.go +++ b/database/bfgd/database_ext_test.go @@ -1815,6 +1815,329 @@ func TestBtcHeightsNoChildren(t *testing.T) { } } +type BtcTransactionBroadcastRequest struct { + TxId string + SerializedTx []byte +} + +func TestBtcTransactionBroadcastRequestInsert(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + rows, err := sdb.QueryContext(ctx, "SELECT tx_id, serialized_tx FROM btc_transaction_broadcast_request") + if err != nil { + t.Fatal(err) + } + + result := BtcTransactionBroadcastRequest{} + count := 0 + + for rows.Next() { + err = rows.Scan(&result.TxId, &result.SerializedTx) + if err != nil { + t.Fatal(err) + } + count++ + } + + if count != 1 { + t.Fatalf("unexpected number of rows %d", count) + } + + diff := deep.Equal(result, BtcTransactionBroadcastRequest{ + TxId: txId, + SerializedTx: serializedTx, + }) + + if len(diff) > 0 { + t.Fatalf("unexpected diff %s", diff) + } +} + +func TestBtcTransactionBroadcastRequestGetNext(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx, savedSerializedTx) { + t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx) + } +} + +func TestBtcTransactionBroadcastRequestGetNextMultiple(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + serializedTx2 := []byte("blahblahblah2") + txId2 := "myid2" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + err = db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx2, txId2) + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx, savedSerializedTx) { + t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx) + } + + savedSerializedTx2, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx2, savedSerializedTx2) { + t.Fatalf("slices to do match: %v != %v", serializedTx2, savedSerializedTx2) + } + + savedSerializedTx3, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx3 != nil { + t.Fatal("expected nil value") + } +} + +func TestBtcTransactionBroadcastRequestGetNextBefore10Minutes(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx, savedSerializedTx) { + t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx) + } + + // we should have set the fields on the last get, should not be able to + // get and process twice + savedSerializedTx, err = db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx != nil { + t.Fatal("expected a nil response") + } +} + +func TestBtcTransactionBroadcastRequestGetNextRetry(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx, savedSerializedTx) { + t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx) + } + + _, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET next_broadcast_attempt_at = NOW()") + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err = db.BtcTransactionBroadcastRequestGetNext(ctx, false) + if err != nil { + t.Fatal(err) + } + + if !slices.Equal(serializedTx, savedSerializedTx) { + t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx) + } +} + +func TestBtcTransactionBroadcastRequestGetNextAfter2Hours(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + _, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET created_at = NOW() - INTERVAL '31 minutes'") + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx != nil { + t.Fatal("expected nil value") + } +} + +func TestBtcTransactionBroadcastRequestGetNextAlreadyBroadcast(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + _, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET broadcast_at = NOW()") + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx != nil { + t.Fatal("expected nil response") + } +} + +func TestBtcTransactionBroadcastRequestConfirmBroadcast(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + err = db.BtcTransactionBroadcastRequestConfirmBroadcast(ctx, txId) + if err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx != nil { + t.Fatal("expected nil response") + } +} + func createBtcBlock(ctx context.Context, t *testing.T, db bfgd.Database, count int, chain bool, height int, lastHash []byte, l2BlockNumber uint32) bfgd.BtcBlock { header := make([]byte, 80) hash := make([]byte, 32) diff --git a/database/bfgd/postgres/postgres.go b/database/bfgd/postgres/postgres.go index 8b8c576c0..60996a6a1 100644 --- a/database/bfgd/postgres/postgres.go +++ b/database/bfgd/postgres/postgres.go @@ -20,14 +20,14 @@ import ( ) const ( - bfgdVersion = 8 + bfgdVersion = 9 logLevel = "INFO" verbose = false ) const effectiveHeightSql = ` - COALESCE((SELECT MIN(height) + COALESCE((SELECT height FROM ( @@ -38,6 +38,7 @@ const effectiveHeightSql = ` = pop_basis.l2_keystone_abrev_hash WHERE ll.l2_block_number >= l2_keystones.l2_block_number + ORDER BY height ASC LIMIT 1 )), 0) ` @@ -808,7 +809,7 @@ func (p *pgdb) L2BTCFinalityByL2KeystoneAbrevHash(ctx context.Context, l2Keyston l2_keystones.ep_hash, l2_keystones.version, %s, - COALESCE((SELECT MAX(height) FROM btc_blocks_can),0) + COALESCE((SELECT height FROM btc_blocks_can ORDER BY height DESC LIMIT 1),0) FROM l2_keystones LEFT JOIN pop_basis ON l2_keystones.l2_keystone_abrev_hash @@ -1041,3 +1042,111 @@ func (p *pgdb) refreshBTCBlocksCanonical(ctx context.Context) error { return nil } + +func (p *pgdb) BtcTransactionBroadcastRequestInsert(ctx context.Context, serializedTx []byte, txId string) error { + log.Tracef("BtcTransactionBroadcastRequestInsert") + defer log.Tracef("BtcTransactionBroadcastRequestInsert exit") + + const insertSql = ` + INSERT INTO btc_transaction_broadcast_request + (tx_id, serialized_tx) + VALUES ($1, $2) + ` + _, err := p.db.ExecContext(ctx, insertSql, txId, serializedTx) + if err != nil { + var pgErr *pq.Error + if errors.As(err, &pgErr) && pgErr.Code.Class().Name() == "integrity_constraint_violation" { + return database.DuplicateError(fmt.Sprintf("duplicate entry: %s", pgErr)) + } + return fmt.Errorf("failed to insert btc_transaction_broadcast_request: %w", err) + } + + return nil +} + +// BtcTransactionBroadcastRequestGetNext +func (p *pgdb) BtcTransactionBroadcastRequestGetNext(ctx context.Context, onlyNew bool) ([]byte, error) { + log.Tracef("BtcTransactionBroadcastRequestGetNext") + defer log.Tracef("BtcTransactionBroadcastRequestGetNext exit") + + onlyNewClause := " next_broadcast_attempt_at IS NOT NULL AND next_broadcast_attempt_at <= NOW() " + if onlyNew { + onlyNewClause = " next_broadcast_attempt_at IS NULL " + } + + orderClause := " ORDER BY last_broadcast_attempt_at ASC " + if onlyNew { + orderClause = " ORDER BY created_at ASC " + } + + querySql := fmt.Sprintf(` + UPDATE btc_transaction_broadcast_request + SET last_broadcast_attempt_at = NOW(), + + next_broadcast_attempt_at = NOW() + INTERVAL '1 minute' + RANDOM() * INTERVAL '240 seconds' + + WHERE tx_id = ( + SELECT tx_id FROM btc_transaction_broadcast_request + WHERE + %s + AND broadcast_at IS NULL + AND created_at > NOW() - INTERVAL '30 minutes' + %s + LIMIT 1 + ) + RETURNING serialized_tx + `, onlyNewClause, orderClause) + + rows, err := p.db.QueryContext(ctx, querySql) + if err != nil { + return nil, fmt.Errorf("could not get next btc_transaction_broadcast_request: %v", err) + } + + defer rows.Close() + + for rows.Next() { + var serializedTx []byte + if err := rows.Scan(&serializedTx); err != nil { + return nil, err + } + + return serializedTx, nil + } + + return nil, nil +} + +// BtcTransactionBroadcastRequestConfirmBroadcast sets a broadcast request to +// "broadcasted" so it doesn't get attempted again +func (p *pgdb) BtcTransactionBroadcastRequestConfirmBroadcast(ctx context.Context, txId string) error { + log.Tracef("BtcTransactionBroadcastRequestConfirmBroadcast") + defer log.Tracef("BtcTransactionBroadcastRequestConfirmBroadcast exit") + + const querySql = ` + UPDATE btc_transaction_broadcast_request + SET broadcast_at = NOW() + WHERE tx_id = $1 + ` + _, err := p.db.ExecContext(ctx, querySql, txId) + if err != nil { + return fmt.Errorf("could not confirm broadcast: %v", err) + } + + return nil +} + +func (p *pgdb) BtcTransactionBroadcastRequestSetLastError(ctx context.Context, txId string, lastErr string) error { + log.Tracef("BtcTransactionBroadcastRequestSetLastError") + defer log.Tracef("BtcTransactionBroadcastRequestSetLastError exit") + + const querySql = ` + UPDATE btc_transaction_broadcast_request + SET last_error = $2 WHERE tx_id = $1 + ` + _, err := p.db.ExecContext(ctx, querySql, txId, lastErr) + if err != nil { + return fmt.Errorf("could not confirm broadcast: %v", err) + } + + return nil +} diff --git a/database/bfgd/scripts/0009.sql b/database/bfgd/scripts/0009.sql new file mode 100644 index 000000000..b1d0c026d --- /dev/null +++ b/database/bfgd/scripts/0009.sql @@ -0,0 +1,27 @@ +-- Copyright (c) 2024 Hemi Labs, Inc. +-- Use of this source code is governed by the MIT License, +-- which can be found in the LICENSE file. + +BEGIN; + +UPDATE version SET version = 9; + +CREATE INDEX btc_blocks_can_hash_idx ON btc_blocks_can (hash); +CREATE INDEX btc_blocks_can_height_idx ON btc_blocks_can (height); + +CREATE TABLE btc_transaction_broadcast_request ( + tx_id TEXT PRIMARY KEY NOT NULL, + serialized_tx BYTEA NOT NULL, + broadcast_at TIMESTAMP, + last_broadcast_attempt_at TIMESTAMP, + next_broadcast_attempt_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + last_error TEXT +); + +CREATE INDEX ON btc_transaction_broadcast_request (last_broadcast_attempt_at) WHERE broadcast_at IS NULL; +CREATE INDEX btc_transaction_broadcast_request_created_at_new ON btc_transaction_broadcast_request (created_at) WHERE broadcast_at IS NULL AND next_broadcast_attempt_at IS NULL; +CREATE INDEX btc_transaction_broadcast_request_created_at_retry ON btc_transaction_broadcast_request (created_at) WHERE broadcast_at IS NULL; + + +COMMIT; diff --git a/e2e/e2e_ext_test.go b/e2e/e2e_ext_test.go index 7afe186ce..354aa9132 100644 --- a/e2e/e2e_ext_test.go +++ b/e2e/e2e_ext_test.go @@ -34,6 +34,7 @@ import ( btcchaincfg "github.com/btcsuite/btcd/chaincfg" btcchainhash "github.com/btcsuite/btcd/chaincfg/chainhash" btctxscript "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" btcwire "github.com/btcsuite/btcd/wire" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" @@ -357,8 +358,6 @@ func reverseAndEncodeEncodedHash(encodedHash string) string { panic(err) } - slices.Reverse(rev) - return hex.EncodeToString(rev) } @@ -400,6 +399,11 @@ func createMockElectrumxServer(ctx context.Context, t *testing.T, l2Keystone *he } func handleMockElectrumxConnection(ctx context.Context, t *testing.T, conn net.Conn, btx []byte) { + mb := wire.MsgTx{} + if err := mb.Deserialize(bytes.NewBuffer(btx)); err != nil { + panic(fmt.Sprintf("failed to deserialize tx: %v", err)) + } + t.Helper() defer conn.Close() @@ -432,7 +436,7 @@ func handleMockElectrumxConnection(ctx context.Context, t *testing.T, conn net.C if req.Method == "blockchain.transaction.broadcast" { res.ID = req.ID res.Error = nil - res.Result = json.RawMessage([]byte(fmt.Sprintf("\"%s\"", mockTxHash))) + res.Result = json.RawMessage([]byte(fmt.Sprintf("\"%s\"", mb.TxID()))) } if req.Method == "blockchain.headers.subscribe" { @@ -479,7 +483,7 @@ func handleMockElectrumxConnection(ctx context.Context, t *testing.T, conn net.C t.Logf("checking height %d, pos %d", params.Height, params.TXPos) if params.TXPos == mockTxPos && params.Height == mockTxheight { - result.TXHash = reverseAndEncodeEncodedHash(mockTxHash) + result.TXHash = reverseAndEncodeEncodedHash(mb.TxID()) result.Merkle = mockMerkleHashes } @@ -511,7 +515,7 @@ func handleMockElectrumxConnection(ctx context.Context, t *testing.T, conn net.C panic(err) } - if params.TXHash == reverseAndEncodeEncodedHash(mockTxHash) { + if params.TXHash == reverseAndEncodeEncodedHash(mb.TxID()) { j, err := json.Marshal(hex.EncodeToString(btx)) if err != nil { panic(err) @@ -746,6 +750,8 @@ func TestNewL2Keystone(t *testing.T) { l2KeystoneAbrevHash := hemi.L2KeystoneAbbreviate(l2KeystoneRequest.L2Keystone).Hash() + time.Sleep(2 * time.Second) + // 3 l2KeystoneSavedDB, err := db.L2KeystoneByAbrevHash(ctx, [32]byte(l2KeystoneAbrevHash)) if err != nil { @@ -814,6 +820,8 @@ func TestL2Keystone(t *testing.T) { t.Fatal(err) } + time.Sleep(2 * time.Second) + l2KeystonesRequest := bfgapi.L2KeystonesRequest{ NumL2Keystones: 5, } @@ -1028,6 +1036,7 @@ func TestBFGPublicErrorCases(t *testing.T) { }, }, electrumx: false, + skip: true, }, { name: "bitcoin broadcast database error", @@ -1470,6 +1479,11 @@ func TestBitcoinBroadcast(t *testing.T) { Transaction: btx, } + mb := wire.MsgTx{} + if err := mb.Deserialize(bytes.NewBuffer(btx)); err != nil { + t.Fatalf("failed to deserialize tx: %v", err) + } + // 2 c, _, err := websocket.Dial(ctx, bfgPublicWsUrl, nil) if err != nil { @@ -1502,7 +1516,7 @@ func TestBitcoinBroadcast(t *testing.T) { } // async now, in a rush, sleep should work - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) command, _, _, err := bfgapi.Read(ctx, bws.conn) if err != nil { @@ -1516,32 +1530,36 @@ func TestBitcoinBroadcast(t *testing.T) { publicKey := privateKey.PubKey() publicKeyUncompressed := publicKey.SerializeUncompressed() + t.Logf("querying for keystone %s", hex.EncodeToString(hemi.L2KeystoneAbbreviate(l2Keystone).Hash())) + // 3 popBases, err := db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), false) if err != nil { t.Fatal(err) } - btcTxId, err := btcchainhash.NewHashFromStr(mockTxHash) - if err != nil { - t.Fatal(err) + btcTxId := mb.TxHash() + + t.Logf("test hash is %s", hex.EncodeToString(btcTxId[:])) + + if len(popBases) != 1 { + t.Fatalf("unexpected length %d", len(popBases)) } - diff := deep.Equal(popBases, []bfgd.PopBasis{ - { - L2KeystoneAbrevHash: hemi.L2KeystoneAbbreviate(l2Keystone).Hash(), - PopMinerPublicKey: publicKeyUncompressed, - BtcRawTx: btx, - BtcTxId: btcTxId[:], - BtcMerklePath: nil, - BtcHeaderHash: nil, - PopTxId: nil, - BtcTxIndex: nil, - }, - }) + if !slices.Equal(popBases[0].L2KeystoneAbrevHash, hemi.L2KeystoneAbbreviate(l2Keystone).Hash()) { + t.Fatalf("%v != %v", popBases[0].L2KeystoneAbrevHash, hemi.L2KeystoneAbbreviate(l2Keystone).Hash()) + } - if len(diff) > 0 { - t.Fatalf("unexpected diff: %s", diff) + if !slices.Equal(popBases[0].PopMinerPublicKey, publicKeyUncompressed) { + t.Fatalf("%v != %v", popBases[0].PopMinerPublicKey, publicKeyUncompressed) + } + + if !slices.Equal(popBases[0].BtcRawTx, btx) { + t.Fatalf("%v != %v", popBases[0].BtcRawTx, btx) + } + + if !slices.Equal(popBases[0].BtcTxId, btcTxId[:]) { + t.Fatalf("%v != %v", popBases[0].BtcTxId, btcTxId[:]) } } @@ -1642,11 +1660,13 @@ func TestBitcoinBroadcastDuplicate(t *testing.T) { t.Fatal(err) } - btcTxId, err := btcchainhash.NewHashFromStr(mockTxHash) - if err != nil { - t.Fatal(err) + mb := wire.MsgTx{} + if err := mb.Deserialize(bytes.NewBuffer(btx)); err != nil { + t.Fatalf("failed to deserialize tx: %v", err) } + btcTxId := mb.TxHash() + diff := deep.Equal(popBases, []bfgd.PopBasis{ { L2KeystoneAbrevHash: hemi.L2KeystoneAbbreviate(l2Keystone).Hash(), @@ -1736,8 +1756,10 @@ func TestProcessBitcoinBlockNewBtcBlock(t *testing.T) { EPHash: fillOutBytes("ephash", 32), } + btx := createBtcTx(t, 800, &l2Keystone, minerPrivateKeyBytes) + // 1 - electrumxAddr, cleanupE := createMockElectrumxServer(ctx, t, &l2Keystone, nil) + electrumxAddr, cleanupE := createMockElectrumxServer(ctx, t, &l2Keystone, btx) defer cleanupE() err := EnsureCanConnectTCP(t, electrumxAddr, mockElectrumxConnectTimeout) if err != nil { @@ -1855,11 +1877,13 @@ loop: t.Fatal(err) } - btcTxId, err := btcchainhash.NewHashFromStr(mockTxHash) - if err != nil { - t.Fatal(err) + mb := wire.MsgTx{} + if err := mb.Deserialize(bytes.NewBuffer(btx)); err != nil { + t.Fatalf("failed to deserialize tx: %v", err) } + btcTxId := mb.TxHash() + btcHeader, err := hex.DecodeString(strings.Replace(mockEncodedBlockHeader, "\"", "", 2)) if err != nil { t.Fatal(err) @@ -1875,7 +1899,6 @@ loop: // 4 btcTxIdSlice := btcTxId[:] - slices.Reverse(btcTxIdSlice) popTxIdFull := []byte{} popTxIdFull = append(popTxIdFull, btcTxIdSlice...) @@ -1934,6 +1957,10 @@ func TestBitcoinBroadcastThenUpdate(t *testing.T) { // 1 btx := createBtcTx(t, 199, &l2Keystone, minerPrivateKeyBytes) + mb := wire.MsgTx{} + if err := mb.Deserialize(bytes.NewBuffer(btx)); err != nil { + t.Fatalf("failed to deserialize tx: %v", err) + } // 2 electrumxAddr, cleanupE := createMockElectrumxServer(ctx, t, &l2Keystone, btx) @@ -1991,7 +2018,7 @@ func TestBitcoinBroadcastThenUpdate(t *testing.T) { publicKey := privateKey.PubKey() publicKeyUncompressed := publicKey.SerializeUncompressed() - btcTxId, err := btcchainhash.NewHashFromStr(mockTxHash) + btcTxId, err := btcchainhash.NewHashFromStr(mb.TxID()) if err != nil { t.Fatal(err) } @@ -2028,7 +2055,6 @@ loop: btcHeaderHash := btcchainhash.DoubleHashB(btcHeader) btcTxIdSlice := btcTxId[:] - slices.Reverse(btcTxIdSlice) popTxIdFull := []byte{} popTxIdFull = append(popTxIdFull, btcTxIdSlice...) @@ -2398,6 +2424,8 @@ func TestGetFinalitiesByL2KeystoneBSS(t *testing.T) { conn: protocol.NewWSConn(c), } + time.Sleep(5 * time.Second) + // first and second btcBlocks recentFinalities, err := db.L2BTCFinalityMostRecent(ctx, 100) if err != nil { @@ -2491,6 +2519,8 @@ func TestGetFinalitiesByL2KeystoneBSSLowerServerHeight(t *testing.T) { conn: protocol.NewWSConn(c), } + time.Sleep(5 * time.Second) + // first and second btcBlocks recentFinalities, err := db.L2BTCFinalityMostRecent(ctx, 100) if err != nil { @@ -2582,6 +2612,8 @@ func TestGetMostRecentL2BtcFinalitiesBFG(t *testing.T) { conn: protocol.NewWSConn(c), } + time.Sleep(5 * time.Second) + finalityRequest := bfgapi.BTCFinalityByRecentKeystonesRequest{ NumRecentKeystones: 100, } diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 0765ccbd8..e96d29928 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -134,6 +134,8 @@ type Server struct { canonicalChainHeight uint64 checkForInvalidBlocks chan struct{} + + l2keystonesCache []hemi.L2Keystone } // metrics stores prometheus metrics. @@ -322,6 +324,102 @@ func (s *Server) handleBitcoinBalance(ctx context.Context, bbr *bfgapi.BitcoinBa }, nil } +func (s *Server) handleOneBroadcastRequest(ctx context.Context, highPriority bool) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + serializedTx, err := s.db.BtcTransactionBroadcastRequestGetNext(ctx, highPriority) + if err != nil { + log.Errorf("error getting next broadcast request: %v", err) + return + } + + // if there are no new serialized txs, backoff a bit + if serializedTx == nil { + select { + case <-time.After(1 * time.Second): + return + case <-ctx.Done(): + return + } + } + + rr := bytes.NewReader(serializedTx) + mb := wire.MsgTx{} + if err := mb.Deserialize(rr); err != nil { + log.Errorf("failed to deserialize tx: %v", err) + return + } + + var tl2 *pop.TransactionL2 + for _, v := range mb.TxOut { + tl2, err = pop.ParseTransactionL2FromOpReturn(v.PkScript) + if err == nil { + break // Found the pop transaction. + } + } + + if tl2 == nil { + log.Errorf("could not find pop tx") + return + } + + publicKeyUncompressed, err := pop.ParsePublicKeyFromSignatureScript(mb.TxIn[0].SignatureScript) + if err != nil { + log.Errorf("could not parse public key from signature script: %v", err) + return + } + + hash := mb.TxHash() + + if err := s.db.PopBasisInsertPopMFields(ctx, &bfgd.PopBasis{ + BtcTxId: hash[:], + BtcRawTx: database.ByteArray(serializedTx), + PopMinerPublicKey: publicKeyUncompressed, + L2KeystoneAbrevHash: tl2.L2Keystone.Hash(), + }); err != nil { + log.Infof("inserting pop basis: %s", err) + } + + _, err = s.btcClient.Broadcast(ctx, serializedTx) + if err != nil { + log.Errorf("broadcast tx: %s", err) + err = s.db.BtcTransactionBroadcastRequestSetLastError(ctx, mb.TxID(), err.Error()) + if err != nil { + log.Errorf("could not delete %v", err) + } + return + } + + s.metrics.popBroadcasts.Inc() + + log.Tracef("hash is %s", hex.EncodeToString(hash[:])) + + err = s.db.BtcTransactionBroadcastRequestConfirmBroadcast(ctx, mb.TxID()) + if err != nil { + log.Errorf("could not confirm broadcast: %v", err) + return + } + + log.Infof("successfully broadcast tx %s, for l2 keystone %s", mb.TxID(), hex.EncodeToString(tl2.L2Keystone.Hash())) +} + +func (s *Server) bitcoinBroadcastWorker(ctx context.Context, highPriority bool) { + log.Tracef("bitcoinBroadcastWorker") + defer log.Tracef("bitcoinBroadcastWorker exit") + + defer s.wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + s.handleOneBroadcastRequest(ctx, highPriority) + } + } +} + func (s *Server) handleBitcoinBroadcast(ctx context.Context, bbr *bfgapi.BitcoinBroadcastRequest) (any, error) { log.Tracef("handleBitcoinBroadcast") defer log.Tracef("handleBitcoinBroadcast exit") @@ -351,44 +449,23 @@ func (s *Server) handleBitcoinBroadcast(ctx context.Context, bbr *bfgapi.Bitcoin }, nil } - publicKeyUncompressed, err := pop.ParsePublicKeyFromSignatureScript(mb.TxIn[0].SignatureScript) + _, err = pop.ParsePublicKeyFromSignatureScript(mb.TxIn[0].SignatureScript) if err != nil { return &bfgapi.BitcoinBroadcastResponse{ Error: protocol.RequestErrorf("could not parse signature script: %v", err), }, nil } - txHash, err := s.btcClient.Broadcast(ctx, bbr.Transaction) - if err != nil { - // This may not alwyas be an internal error. - e := protocol.NewInternalErrorf("broadcast tx: %w", err) + err = s.db.BtcTransactionBroadcastRequestInsert(ctx, bbr.Transaction, mb.TxID()) + if err != nil && !errors.Is(err, database.ErrDuplicate) { + e := protocol.NewInternalErrorf("insert broadcast request : %w", err) return &bfgapi.BitcoinBroadcastResponse{ Error: e.ProtocolError(), }, e } - s.metrics.popBroadcasts.Inc() - - go func() { - // retry up to 2 times, allowing only 5 second per try - // if we fail here it is ok for now - for range 2 { - insertCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := s.db.PopBasisInsertPopMFields(insertCtx, &bfgd.PopBasis{ - BtcTxId: txHash, - BtcRawTx: database.ByteArray(bbr.Transaction), - PopMinerPublicKey: publicKeyUncompressed, - L2KeystoneAbrevHash: tl2.L2Keystone.Hash(), - }); err != nil { - log.Errorf("error occurred inserting pop basis: %s", err) - } else { - return - } - } - }() - - return &bfgapi.BitcoinBroadcastResponse{TXID: txHash}, nil + hash := mb.TxHash() + return &bfgapi.BitcoinBroadcastResponse{TXID: hash[:]}, nil } func (s *Server) handleBitcoinInfo(ctx context.Context, bir *bfgapi.BitcoinInfoRequest) (any, error) { @@ -1203,16 +1280,30 @@ func (s *Server) handleBtcFinalityByKeystonesRequest(ctx context.Context, bfkr * }, nil } -func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2KeystonesRequest) (any, error) { - log.Tracef("handleL2KeystonesRequest") - defer log.Tracef("handleL2KeystonesRequest exit") +func (s *Server) getL2KeystonesCache() []hemi.L2Keystone { + log.Tracef("getL2KeystonesCache") + defer log.Tracef("getL2KeystonesCache exit") + + s.mtx.Lock() + defer s.mtx.Unlock() - results, err := s.db.L2KeystonesMostRecentN(ctx, uint32(l2kr.NumL2Keystones)) + results := make([]hemi.L2Keystone, len(s.l2keystonesCache)) + copy(results, s.l2keystonesCache) + + return results +} + +func (s *Server) refreshL2KeystoneCache(ctx context.Context) { + log.Tracef("refreshL2KeystoneCache") + defer log.Tracef("refreshL2KeystoneCache exit") + + s.mtx.Lock() + defer s.mtx.Unlock() + + results, err := s.db.L2KeystonesMostRecentN(ctx, 100) if err != nil { - e := protocol.NewInternalErrorf("error getting l2 keystones: %w", err) - return &bfgapi.L2KeystonesResponse{ - Error: e.ProtocolError(), - }, e + log.Errorf("error getting keystones %v", err) + return } l2Keystones := make([]hemi.L2Keystone, 0, len(results)) @@ -1228,8 +1319,24 @@ func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2Ke }) } + s.l2keystonesCache = l2Keystones +} + +func (s *Server) handleL2KeystonesRequest(ctx context.Context, l2kr *bfgapi.L2KeystonesRequest) (any, error) { + log.Tracef("handleL2KeystonesRequest") + defer log.Tracef("handleL2KeystonesRequest exit") + + results := []hemi.L2Keystone{} + for i, v := range s.getL2KeystonesCache() { + if uint64(i) < l2kr.NumL2Keystones { + results = append(results, v) + } else { + break + } + } + return &bfgapi.L2KeystonesResponse{ - L2Keystones: l2Keystones, + L2Keystones: results, }, nil } @@ -1312,22 +1419,21 @@ func (s *Server) handleNewL2Keystones(ctx context.Context, nlkr *bfgapi.NewL2Key log.Tracef("handleNewL2Keystones") defer log.Tracef("handleNewL2Keystones exit") - ks := hemiL2KeystonesToDb(nlkr.L2Keystones) - err := s.db.L2KeystonesInsert(ctx, ks) response := bfgapi.NewL2KeystonesResponse{} - if err != nil { - if errors.Is(err, database.ErrDuplicate) { - response.Error = protocol.Errorf("l2 keystone already exists") - return response, nil - } - if errors.Is(err, database.ErrValidation) { - log.Errorf("error inserting l2 keystone: %s", err) - response.Error = protocol.Errorf("invalid l2 keystone") - return response, nil + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ks := hemiL2KeystonesToDb(nlkr.L2Keystones) + err := s.db.L2KeystonesInsert(ctx, ks) + if err != nil { + log.Errorf("error saving keystone %v", err) + return } - return nil, err - } + s.refreshL2KeystoneCache(ctx) + }() return response, nil } @@ -1492,6 +1598,40 @@ func (s *Server) Run(pctx context.Context) error { return err } + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-time.After(1 * time.Minute): + log.Infof("sending notifications of l2 keystones") + go s.handleL2KeystonesNotification() + case <-ctx.Done(): + return + } + } + }() + + for _, p := range []bool{true, false} { + for range 4 { + s.wg.Add(1) + go s.bitcoinBroadcastWorker(ctx, p) + } + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + s.refreshL2KeystoneCache(ctx) + } + } + }() + // Setup websockets and HTTP routes privateMux := s.server publicMux := s.publicServer diff --git a/service/bss/bss.go b/service/bss/bss.go index 5ede086cb..c0566faf6 100644 --- a/service/bss/bss.go +++ b/service/bss/bss.go @@ -149,7 +149,7 @@ func NewServer(cfg *Config) (*Server, error) { Help: "The total number of succesful RPC commands", }), requestTimeout: defaultRequestTimeout, - bfgCallTimeout: defaultRequestTimeout / 2, + bfgCallTimeout: 20 * time.Second, holdoffTimeout: 6 * time.Second, requestLimit: requestLimit, sessions: make(map[string]*bssWs),