Skip to content

Commit

Permalink
add queuing system for bitcoin broadcasts (#227)
Browse files Browse the repository at this point in the history
* added indexes and removed aggregate functions

added indexes to btc_blocks_can materialized view, no longer using aggregate functions

* added sql

* add queuing system for bitcoin broadcasts

add a db table btc_transaction_broadcast_request that will store broadcast requests.  when someone wants to broadcast a tx, insert into this table (ignoring duplicates).  have a few workers that read from this table and attempt to broadcast transactions to electrumx.

when a worker "checks out" a broadcast request, it updates the last_broadcast_attempt_at column

when a worker confirms a broadcast, it updates the broadcast_at column

wait 10 minutes before processing same tx again if not confirmed

* fix comment

* add index

* ignoring requests older than 3 hours, added test

* using created_at to determine age, 2 hours

* re-added test

* squashme

* squashme

* fix pool pass

* constant notify

* more optimization:

* ordering

* delete on certain error

* set last error

* all errors

* stagger

* cache l2keystones

* cache on new

* no stagger

* more connections

* bss bfg timeout

* ref, limit

* async refresh and insert

* bugfix

* fix query

* larger backoff

* fix tests

* don't use input ctx

* longer timeouts in ci

* pr feedback

* smaller backoff

* added test wait
  • Loading branch information
ClaytonNorthey92 authored Aug 27, 2024
1 parent 7cd520d commit 92648fc
Show file tree
Hide file tree
Showing 7 changed files with 722 additions and 86 deletions.
5 changes: 5 additions & 0 deletions database/bfgd/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type Database interface {
AccessPublicKeyInsert(ctx context.Context, publicKey *AccessPublicKey) error
AccessPublicKeyExists(ctx context.Context, publicKey *AccessPublicKey) (bool, error)
AccessPublicKeyDelete(ctx context.Context, publicKey *AccessPublicKey) error

BtcTransactionBroadcastRequestInsert(ctx context.Context, serializedTx []byte, txId string) error
BtcTransactionBroadcastRequestGetNext(ctx context.Context, onlyNew bool) ([]byte, error)
BtcTransactionBroadcastRequestConfirmBroadcast(ctx context.Context, txId string) error
BtcTransactionBroadcastRequestSetLastError(ctx context.Context, txId string, lastErr string) error
}

// NotificationName identifies a database notification type.
Expand Down
323 changes: 323 additions & 0 deletions database/bfgd/database_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,329 @@ func TestBtcHeightsNoChildren(t *testing.T) {
}
}

type BtcTransactionBroadcastRequest struct {
TxId string
SerializedTx []byte
}

func TestBtcTransactionBroadcastRequestInsert(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

rows, err := sdb.QueryContext(ctx, "SELECT tx_id, serialized_tx FROM btc_transaction_broadcast_request")
if err != nil {
t.Fatal(err)
}

result := BtcTransactionBroadcastRequest{}
count := 0

for rows.Next() {
err = rows.Scan(&result.TxId, &result.SerializedTx)
if err != nil {
t.Fatal(err)
}
count++
}

if count != 1 {
t.Fatalf("unexpected number of rows %d", count)
}

diff := deep.Equal(result, BtcTransactionBroadcastRequest{
TxId: txId,
SerializedTx: serializedTx,
})

if len(diff) > 0 {
t.Fatalf("unexpected diff %s", diff)
}
}

func TestBtcTransactionBroadcastRequestGetNext(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx, savedSerializedTx) {
t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx)
}
}

func TestBtcTransactionBroadcastRequestGetNextMultiple(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

serializedTx2 := []byte("blahblahblah2")
txId2 := "myid2"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

err = db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx2, txId2)
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx, savedSerializedTx) {
t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx)
}

savedSerializedTx2, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx2, savedSerializedTx2) {
t.Fatalf("slices to do match: %v != %v", serializedTx2, savedSerializedTx2)
}

savedSerializedTx3, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if savedSerializedTx3 != nil {
t.Fatal("expected nil value")
}
}

func TestBtcTransactionBroadcastRequestGetNextBefore10Minutes(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx, savedSerializedTx) {
t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx)
}

// we should have set the fields on the last get, should not be able to
// get and process twice
savedSerializedTx, err = db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if savedSerializedTx != nil {
t.Fatal("expected a nil response")
}
}

func TestBtcTransactionBroadcastRequestGetNextRetry(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx, savedSerializedTx) {
t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx)
}

_, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET next_broadcast_attempt_at = NOW()")
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err = db.BtcTransactionBroadcastRequestGetNext(ctx, false)
if err != nil {
t.Fatal(err)
}

if !slices.Equal(serializedTx, savedSerializedTx) {
t.Fatalf("slices to do match: %v != %v", serializedTx, savedSerializedTx)
}
}

func TestBtcTransactionBroadcastRequestGetNextAfter2Hours(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

_, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET created_at = NOW() - INTERVAL '31 minutes'")
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if savedSerializedTx != nil {
t.Fatal("expected nil value")
}
}

func TestBtcTransactionBroadcastRequestGetNextAlreadyBroadcast(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

_, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET broadcast_at = NOW()")
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if savedSerializedTx != nil {
t.Fatal("expected nil response")
}
}

func TestBtcTransactionBroadcastRequestConfirmBroadcast(t *testing.T) {
ctx, cancel := defaultTestContext()
defer cancel()

db, sdb, cleanup := createTestDB(ctx, t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

serializedTx := []byte("blahblahblah")
txId := "myid"

err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId)
if err != nil {
t.Fatal(err)
}

err = db.BtcTransactionBroadcastRequestConfirmBroadcast(ctx, txId)
if err != nil {
t.Fatal(err)
}

savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true)
if err != nil {
t.Fatal(err)
}

if savedSerializedTx != nil {
t.Fatal("expected nil response")
}
}

func createBtcBlock(ctx context.Context, t *testing.T, db bfgd.Database, count int, chain bool, height int, lastHash []byte, l2BlockNumber uint32) bfgd.BtcBlock {
header := make([]byte, 80)
hash := make([]byte, 32)
Expand Down
Loading

0 comments on commit 92648fc

Please sign in to comment.