Skip to content

Commit

Permalink
ingest/ledgerbackend: fix to ensure that the ledger buffer properly q…
Browse files Browse the repository at this point in the history
…ueues the last batch of ledgers (LCM) within the specified range (#5563)

* ingest/ledgerbackend: fix to ensure that the ledger buffer properly queues the last batch of ledgers (LCM) within the specified range, preventing get_ledger from blocking indefinitely.

* Update Changelog
  • Loading branch information
urvisavla authored Jan 6, 2025
1 parent 5468720 commit 2999e29
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
3 changes: 3 additions & 0 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. This projec

## Pending

### Bug Fixes
* Update the boundary check in `BufferedStorageBackend` to queue ledgers up to the end boundary, resolving skipped final batch when the `from` ledger doesn't align with file boundary [5563](https://github.com/stellar/go/pull/5563).

### New Features
* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform).
* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).
Expand Down
40 changes: 19 additions & 21 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func createBufferedStorageBackendForTesting() BufferedStorageBackend {
func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) *datastore.MockDataStore {
mockDataStore := new(datastore.MockDataStore)
partition := count*partitionSize - 1

schema := datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
}

start = schema.GetSequenceNumberStartBoundary(start)
end = schema.GetSequenceNumberEndBoundary(end)
for i := start; i <= end; i = i + count {
var objectName string
var readCloser io.ReadCloser
Expand All @@ -78,10 +86,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1)
}
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})
mockDataStore.On("GetSchema").Return(schema)

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -248,31 +253,24 @@ func TestBSBGetLedger_SingleLedgerPerFile(t *testing.T) {
}

func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
startLedger := uint32(2)
endLedger := uint32(5)
startLedger := uint32(6)
endLedger := uint32(17)
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 4)
bsb.dataStore = mockDataStore
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})

assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 4 }, time.Second*5, time.Millisecond*50)

lcm, err := bsb.GetLedger(ctx, uint32(2))
assert.NoError(t, err)
assert.Equal(t, lcmArray[0], lcm)
lcm, err = bsb.GetLedger(ctx, uint32(3))
assert.NoError(t, err)
assert.Equal(t, lcmArray[1], lcm)
lcm, err = bsb.GetLedger(ctx, uint32(4))
assert.NoError(t, err)
assert.Equal(t, lcmArray[2], lcm)
for i := 0; i <= int(endLedger-startLedger); i++ {
lcm, err := bsb.GetLedger(ctx, startLedger+uint32(i))
assert.NoError(t, err)
assert.Equal(t, lcmArray[i], lcm)
}
}

func TestBSBGetLedger_ErrorPreceedingLedger(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
}

func (lb *ledgerBuffer) pushTaskQueue() {
// In bounded mode, don't queue past the end ledger
if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded {
// In bounded mode, don't queue past the end boundary ledger for the specified range.
if lb.ledgerRange.bounded && lb.nextTaskLedger > lb.dataStore.GetSchema().GetSequenceNumberEndBoundary(lb.ledgerRange.to) {
return
}
lb.taskQueue <- lb.nextTaskLedger
Expand Down

0 comments on commit 2999e29

Please sign in to comment.