diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index 3d3835a799..69f2e5869e 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. This projec ## Pending +### Bug Fixes +* Update the boundary check in `BufferedStorageBackend` to queue ledgers up to the end boundary, resolving skipped final batch when the `from` ledger doesn't align with file boundary [5563](https://github.com/stellar/go/pull/5563). + ### New Features * Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform). * Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index fe3ca0266c..2ddb0efb8d 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -65,6 +65,14 @@ func createBufferedStorageBackendForTesting() BufferedStorageBackend { func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) *datastore.MockDataStore { mockDataStore := new(datastore.MockDataStore) partition := count*partitionSize - 1 + + schema := datastore.DataStoreSchema{ + LedgersPerFile: count, + FilesPerPartition: partitionSize, + } + + start = schema.GetSequenceNumberStartBoundary(start) + end = schema.GetSequenceNumberEndBoundary(end) for i := start; i <= end; i = i + count { var objectName string var readCloser io.ReadCloser @@ -78,10 +86,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1) } - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: count, - FilesPerPartition: partitionSize, - }) + mockDataStore.On("GetSchema").Return(schema) t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -248,31 +253,24 @@ func TestBSBGetLedger_SingleLedgerPerFile(t *testing.T) { } func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) { - startLedger := uint32(2) - endLedger := uint32(5) + startLedger := uint32(6) + endLedger := uint32(17) lcmArray := createLCMForTesting(startLedger, endLedger) bsb := createBufferedStorageBackendForTesting() ctx := context.Background() ledgerRange := BoundedRange(startLedger, endLedger) - mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 4) bsb.dataStore = mockDataStore - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: uint32(2), - FilesPerPartition: partitionSize, - }) + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) - assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 4 }, time.Second*5, time.Millisecond*50) - lcm, err := bsb.GetLedger(ctx, uint32(2)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[0], lcm) - lcm, err = bsb.GetLedger(ctx, uint32(3)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[1], lcm) - lcm, err = bsb.GetLedger(ctx, uint32(4)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[2], lcm) + for i := 0; i <= int(endLedger-startLedger); i++ { + lcm, err := bsb.GetLedger(ctx, startLedger+uint32(i)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[i], lcm) + } } func TestBSBGetLedger_ErrorPreceedingLedger(t *testing.T) { diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index 7ee9dda083..0b2742e94d 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -96,8 +96,8 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu } func (lb *ledgerBuffer) pushTaskQueue() { - // In bounded mode, don't queue past the end ledger - if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded { + // In bounded mode, don't queue past the end boundary ledger for the specified range. + if lb.ledgerRange.bounded && lb.nextTaskLedger > lb.dataStore.GetSchema().GetSequenceNumberEndBoundary(lb.ledgerRange.to) { return } lb.taskQueue <- lb.nextTaskLedger