Skip to content

Commit

Permalink
save latest seen tx and continue from there the next run
Browse files Browse the repository at this point in the history
  • Loading branch information
iturricf committed Apr 1, 2024
1 parent 5f303e6 commit 72b9d71
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
53 changes: 31 additions & 22 deletions frontend/claim_sdk/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ export class TokenDispenserEventSubscriber {
connection: anchor.web3.Connection
programId: anchor.web3.PublicKey
timeWindowSecs: number
lastSignatureSeen: string | undefined
chunkSize: number

constructor(
endpoint: string,
programId: anchor.web3.PublicKey,
timeWindowSecs: number,
lastSignatureSeen: string | undefined,
chunkSize: number,
confirmOpts?: anchor.web3.ConfirmOptions
) {
const coder = new BorshCoder(tokenDispenser as Idl)
this.programId = programId
this.eventParser = new anchor.EventParser(this.programId, coder)
this.timeWindowSecs = timeWindowSecs
this.lastSignatureSeen = lastSignatureSeen
this.chunkSize = chunkSize
confirmOpts = confirmOpts ?? anchor.AnchorProvider.defaultOptions()
if (
Expand All @@ -54,31 +57,37 @@ export class TokenDispenserEventSubscriber {
let signatures: Array<ConfirmedSignatureInfo> = []
let currentBatch = await this.connection.getSignaturesForAddress(
this.programId,
{},
{
until: this.lastSignatureSeen,
},
this.connection.commitment as anchor.web3.Finality
)
let batchWithinWindow = true
while (currentBatch.length > 0 && batchWithinWindow) {
const currentBatchLastSig =
currentBatch[currentBatch.length - 1]?.signature
const currentBatchLastSigBlockTime = await this.getTransactionBlockTime(
currentBatchLastSig
)
if (
currentBatchLastSigBlockTime &&
currentBatchLastSigBlockTime < currentTimeSec - this.timeWindowSecs
) {
batchWithinWindow = false
}
if (this.lastSignatureSeen !== undefined) {
signatures = signatures.concat(currentBatch)
currentBatch = await this.connection.getSignaturesForAddress(
this.programId,
{
before: currentBatchLastSig,
// Note: ignoring lastSignature and will assume datadog can handle de-duplication
},
this.connection.commitment as anchor.web3.Finality
)
} else {
let batchWithinWindow = true
while (currentBatch.length > 0 && batchWithinWindow) {
const currentBatchLastSig =
currentBatch[currentBatch.length - 1]?.signature
const currentBatchLastSigBlockTime = await this.getTransactionBlockTime(
currentBatchLastSig
)
if (
currentBatchLastSigBlockTime &&
currentBatchLastSigBlockTime < currentTimeSec - this.timeWindowSecs
) {
batchWithinWindow = false
}
signatures = signatures.concat(currentBatch)
currentBatch = await this.connection.getSignaturesForAddress(
this.programId,
{
before: currentBatchLastSig,
// Note: ignoring lastSignature and will assume datadog can handle de-duplication
},
this.connection.commitment as anchor.web3.Finality
)
}
}

const validTxnSigs = []
Expand Down
51 changes: 47 additions & 4 deletions frontend/scripts/influxdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import * as anchor from '@coral-xyz/anchor'
import { envOrErr } from '../claim_sdk'
import { BN } from '@coral-xyz/anchor'
import { inspect } from 'util'
import { InfluxDB, Point } from '@influxdata/influxdb-client'
import { InfluxDB, Point, QueryApi } from '@influxdata/influxdb-client'

const ENDPOINT = envOrErr('ENDPOINT')
const PROGRAM_ID = envOrErr('PROGRAM_ID')
Expand Down Expand Up @@ -46,22 +46,41 @@ async function main() {
console.log('Time Window Secs:', TIME_WINDOW_SECS)
console.log('Chunk Size:', CHUNK_SIZE)

const influxDB = new InfluxDB({ url: INFLUX_URL, token: INFLUX_TOKEN })
const writeApi = influxDB.getWriteApi(INFLUX_ORG, INFLUX_BUCKET)
const readApi = influxDB.getQueryApi(INFLUX_ORG)

let latestTxBlockTime = 0
let latestSignature = await getLatestTxSignature(INFLUX_BUCKET, CLUSTER, readApi)

console.log("LATEST SIGNATURE AT START:", latestSignature);
const tokenDispenserEventSubscriber = new TokenDispenserEventSubscriber(
ENDPOINT,
new anchor.web3.PublicKey(PROGRAM_ID),
TIME_WINDOW_SECS,
latestSignature,
CHUNK_SIZE,
{
commitment: 'confirmed',
}
)

const influxDB = new InfluxDB({ url: INFLUX_URL, token: INFLUX_TOKEN })
const writeApi = influxDB.getWriteApi(INFLUX_ORG, INFLUX_BUCKET)

const { txnEvents, failedTxnInfos } =
await tokenDispenserEventSubscriber.parseTransactionLogs()

for (const txnEvent of txnEvents) {
if (txnEvent.blockTime > latestTxBlockTime) {
latestTxBlockTime = txnEvent.blockTime
latestSignature = txnEvent.signature
}
}
for (const failedTxnInfo of failedTxnInfos) {
if (failedTxnInfo.blockTime > latestTxBlockTime) {
latestTxBlockTime = failedTxnInfo.blockTime
latestSignature = failedTxnInfo.signature
}
}

console.log('Events', txnEvents)
console.log('Failed Txn Infos', failedTxnInfos)

Expand Down Expand Up @@ -124,6 +143,13 @@ async function main() {
writeApi.writePoint(failedTxnEventPoint)
})

console.log('Latest Signature at the end:', latestSignature);
const latestTxPoint = new Point('latest_txn_seen')
.tag('network', CLUSTER)
.stringField('signature', latestSignature)

writeApi.writePoint(latestTxPoint)

writeApi
.close()
.then(() => {
Expand All @@ -135,6 +161,23 @@ async function main() {
})
}

async function getLatestTxSignature(bucket: string, network: string, readApi: QueryApi): Promise<string | undefined> {
const query = `from(bucket: "${bucket}")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "latest_txn_seen")
|> filter(fn: (r) => r.network == "${network}")
|> last()
|> limit(n:1)`

let signature = undefined;
for await (const {values, tableMeta} of readApi.iterateRows(query)) {
const o = tableMeta.toObject(values)
signature = o._value.length > 0 ? o._value : undefined
}

return signature;
}

function createTxnEventPoints(formattedTxnEvents: FormattedTxnEventInfo[]) {
return formattedTxnEvents.map((formattedEvent) => {
const { signature, claimant } = formattedEvent
Expand Down

0 comments on commit 72b9d71

Please sign in to comment.