Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix influx schema #90

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion scripts/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,25 @@ export LOW_BALANCE_THRESHOLD=10000
export INFLUX_URL=""
export INFLUX_TOKEN=""
export INFLUX_ORG=""
export INFLU_BUCKET=""
export INFLUX_BUCKET=""

# Datadog Event Subscriber Configs
export TIME_WINDOW_SECS=600
export CHUNK_SIZE=50

# Tag converter
export MEASUREMENT="txn_event"
export NEW_MEASUREMENT_VERSION="v1"
export START_HOURS_AGO="72"
export END_HOURS_AGO="0"
export HOUR_WINDOW="1"
export TAGS_TO_CONVERT="address"

export NEW_MEASUREMENT_VERSION=v1

#export MEASUREMENT="failed_txn_event"
#export TAGS_TO_KEEP="network"
#export FIELDS="signature,errorDetails"
export MEASUREMENT="txn_event"
export TAGS_TO_KEEP="ecosystem,network,eventCategory"
export FIELDS="signature,amount,eventDetails,address,claimant"
2 changes: 1 addition & 1 deletion scripts/claim_sdk/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ function formatClaimInfo(
address: claimInfo.identity.injective.address,
amount: claimInfo.amount.toString(),
}
} else if (claimInfo.identity.algorand?.pubkey) {
} else if (claimInfo.identity.algorand) {
return {
ecosystem: 'algorand',
address: base32encode(claimInfo.identity.algorand.pubkey),
Expand Down
110 changes: 110 additions & 0 deletions scripts/influxdb.fixer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
*
* To Run:
* copy .env.sample .env
* Then update the parameters in .env accordingly
* source .env
* ts-node ./scripts/influxdb.fixer.ts
*/
import { envOrErr } from './claim_sdk'
import { InfluxDB, QueryApi } from '@influxdata/influxdb-client'

const INFLUX_URL = envOrErr('INFLUX_URL')
const INFLUX_TOKEN = envOrErr('INFLUX_TOKEN')
const INFLUX_ORG = envOrErr('INFLUX_ORG')
const INFLUX_BUCKET = envOrErr('INFLUX_BUCKET')

const MEASUREMENT = process.env.MEASUREMENT ?? 'txn_event'
const NEW_MEASUREMENT_VERSION = process.env.NEW_MEASUREMENT_VERSION ?? 'v1'
const TAGS_TO_KEEP =
process.env.TAGS_TO_KEEP ?? 'ecosystem,network,eventCategory'
const FIELDS =
process.env.FIELDS ?? 'signature,amount,eventDetails,address,claimant'

const START = process.env.START ?? '2024-04-09T00:00:00.000Z'
const END = process.env.END ?? '2024-04-09T23:59:59.000Z'
const HOUR_WINDOW = parseInt(process.env.HOUR_WINDOW ?? '1')

async function main() {
console.log(
`Connecting to influx <${INFLUX_URL}/${INFLUX_ORG}/${INFLUX_BUCKET}>`
)

console.log(
`Migrating ${MEASUREMENT} to ${MEASUREMENT}_${NEW_MEASUREMENT_VERSION}`
)

const influxDB = new InfluxDB({ url: INFLUX_URL, token: INFLUX_TOKEN })
const readApi = influxDB.getQueryApi(INFLUX_ORG)
const tagsToKeep = TAGS_TO_KEEP.split(',')
const fields = FIELDS.split(',')

let startTime = new Date(START)
const endTime = new Date(END)
let currentEndDateTime = new Date(startTime)

try {
while (startTime < endTime) {
currentEndDateTime.setHours(startTime.getHours() + HOUR_WINDOW)

await mapAndWrite(
INFLUX_BUCKET,
MEASUREMENT,
NEW_MEASUREMENT_VERSION,
tagsToKeep,
fields,
startTime,
currentEndDateTime,
readApi
)
startTime.setHours(startTime.getHours() + HOUR_WINDOW)
await new Promise((resolve) => setTimeout(resolve, 1500))
}
} catch (error) {
console.error(`Error: ${error}`)
}

console.log(
`Done -> StartHoursAgo: -${startTime} - EndHoursAgo: ${currentEndDateTime}`
)
}

async function mapAndWrite(
bucket: string,
measurement: string,
newMeasurementVersion: string,
tagsToKeep: string[],
fields: string[],
start: Date,
end: Date,
readApi: QueryApi
): Promise<void> {
const stream = `from(bucket: "${bucket}")
|> range(start: ${start.toISOString()}, stop: ${end.toISOString()})
|> filter(fn: (r) => r._measurement == "${measurement}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")`

const mapper = `${stream}
|> set(key: "_measurement", value: "${measurement}_${newMeasurementVersion}")
|> to(
bucket: "${bucket}",
timeColumn: "_time",
tagColumns: [${tagsToKeep.map((tag) => `"${tag}"`).join(',')}],
fieldFn: (r) => ({ ${fields
.map((field) => `${field}: r.${field}`)
.join(',')} })
)`

console.log(` Executing the following query: ${mapper}`)
await readApi.queryRaw(mapper)
console.log(`Done ${start} - ${end}`)
}

;(async () => {
try {
await main()
} catch (e) {
console.error(`error from influxdb.ts: ${e}`)
process.exit(1)
}
})()
94 changes: 39 additions & 55 deletions scripts/influxdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
import * as anchor from '@coral-xyz/anchor'
import { envOrErr } from './claim_sdk'
import { BN } from '@coral-xyz/anchor'
import { inspect } from 'util'
import { InfluxDB, Point, QueryApi } from '@influxdata/influxdb-client'

const ENDPOINT = envOrErr('ENDPOINT')
Expand All @@ -27,7 +26,11 @@ const INFLUX_ORG = envOrErr('INFLUX_ORG')
const INFLUX_BUCKET = envOrErr('INFLUX_BUCKET')
const TIME_WINDOW_SECS = Number.parseInt(envOrErr('TIME_WINDOW_SECS'), 10)
const CHUNK_SIZE = Number.parseInt(envOrErr('CHUNK_SIZE'), 10)
const LOW_BALANCE_THRESHOLD = envOrErr('LOW_BALANCE_THRESHOLD')
const LAST_TXN_MEASUREMENT =
process.env.LAST_TXN_MEASUREMENT ?? 'latest_txn_seen'
const TXN_MEASUREMENT = process.env.TXN_MEASUREMENT ?? 'txn_event'
const FAILED_TXN_MEASUREMENT =
process.env.FAILED_TXN_MEASUREMENT ?? 'failed_txn_event'
// based off airdrop allocation commit 16d0c19f3951427f04cc015d38805f356fcb88b1
const MAX_AMOUNT_PER_ECOSYSTEM = new Map<string, BN>([
['discord', new BN('87000000000')],
Expand Down Expand Up @@ -118,15 +121,6 @@ async function main() {
console.log('No double claims detected')
}

const lowBalanceEventPoint = createLowBalanceEventPoint(formattedTxnEvents)

if (lowBalanceEventPoint) {
console.log('Detected low balance event')
writeApi.writePoint(lowBalanceEventPoint)
} else {
console.log('No low balance detected')
}

const txnEventPoints = createTxnEventPoints(formattedTxnEvents)
txnEventPoints.forEach((txnEventPoint) => {
writeApi.writePoint(txnEventPoint)
Expand All @@ -148,7 +142,7 @@ async function main() {
)

console.log('Last signature processed:', latestSignature)
const latestTxPoint = new Point('latest_txn_seen')
const latestTxPoint = new Point(LAST_TXN_MEASUREMENT)
.tag('network', CLUSTER)
.stringField('signature', latestSignature)

Expand All @@ -172,8 +166,8 @@ async function getLatestTxSignature(
): Promise<string | undefined> {
const query = `from(bucket: "${bucket}")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "latest_txn_seen")
|> filter(fn: (r) => r.network == "${network}")
|> filter(fn: (r) => r._measurement == "${LAST_TXN_MEASUREMENT}")
|> filter(fn: (r) => r.network == "${network}" and r._field == "signature")
|> sort(columns: ["_time"], desc: true)
|> first()
|> limit(n:1)`
Expand All @@ -188,26 +182,37 @@ async function getLatestTxSignature(
}

function createTxnEventPoints(formattedTxnEvents: FormattedTxnEventInfo[]) {
const timestamps: Record<string, boolean> = {}

return formattedTxnEvents.map((formattedEvent) => {
let timestamp = formattedEvent.blockTime * 1000
if (timestamps[timestamp]) {
// see https://docs.influxdata.com/influxdb/v2/write-data/best-practices/duplicate-points/
while (timestamps[timestamp]) {
timestamp = timestamp + 1
}
}
timestamps[timestamp] = true

const { signature, claimant } = formattedEvent
const { ecosystem, address, amount } = formattedEvent.claimInfo!
let eventCategory = 'normal'
let amountValue = parseInt(amount, 10)

if (MAX_AMOUNT_PER_ECOSYSTEM.get(ecosystem)!.lt(new BN(amount))) {
const maxAmount = MAX_AMOUNT_PER_ECOSYSTEM.get(ecosystem)
if (amount && maxAmount && maxAmount.lt(new BN(amount))) {
eventCategory = 'max_transfer_exceeded'
}

const point = new Point('txn_event')
.tag('claimant', claimant!)
const point = new Point(TXN_MEASUREMENT)
.tag('ecosystem', ecosystem)
.tag('address', address)
.tag('network', CLUSTER)
.tag('eventCategory', eventCategory)
.stringField('claimant', claimant)
.stringField('address', address)
.stringField('signature', signature)
.intField('amount', amountValue)
.stringField('eventDetails', JSON.stringify(formattedEvent))
.timestamp(new Date(formattedEvent.blockTime * 1000))
.timestamp(new Date(timestamp))

return point
})
Expand Down Expand Up @@ -246,9 +251,8 @@ function createDoubleClaimPoint(formattedTxnEvents: FormattedTxnEventInfo[]) {

const point = new Point('double_claim_event')
.tag('ecosystem', ecosystem)
.tag('address', address)
.tag('network', CLUSTER)
.tag('service', 'token-dispenser-event-subscriber')
.stringField('address', address)
.stringField('details', JSON.stringify(txnEventInfos))
.timestamp(new Date(blockTime * 1000))
doubleClaimPoints.push(point)
Expand All @@ -260,46 +264,26 @@ function createDoubleClaimPoint(formattedTxnEvents: FormattedTxnEventInfo[]) {
}

function createFailedTxnEventPoints(failedTxns: TxnInfo[]) {
const timestamps: Record<string, boolean> = {}

return failedTxns.map((errorLog) => {
const point = new Point('failed_txn_event')
.tag('signature', errorLog.signature)
let timestamp = errorLog.blockTime * 1000
if (timestamps[timestamp]) {
// see https://docs.influxdata.com/influxdb/v2/write-data/best-practices/duplicate-points/
while (timestamps[timestamp]) {
timestamp = timestamp + 1
}
}
timestamps[timestamp] = true
const point = new Point(FAILED_TXN_MEASUREMENT)
.tag('network', CLUSTER)
.tag('service', 'token-dispenser-event-subscriber')
.stringField('signature', errorLog.signature)
.stringField('errorDetails', JSON.stringify(errorLog))
.timestamp(new Date(errorLog.blockTime * 1000))
.timestamp(new Date(timestamp))
return point
})
}

function createLowBalanceEventPoint(
formattedTxnEvents: FormattedTxnEventInfo[]
) {
if (formattedTxnEvents.length === 0) {
return undefined
}

const mostRecentEvent = formattedTxnEvents.sort((a, b) => b.slot - a.slot)[0]

if (
mostRecentEvent.remainingBalance &&
new BN(mostRecentEvent.remainingBalance).lt(new BN(LOW_BALANCE_THRESHOLD))
) {
const point = new Point('low_balance_event')
.tag('signature', mostRecentEvent.signature)
.tag('network', CLUSTER)
.tag('service', 'token-dispenser-event-subscriber')
.intField(
'remainingBalance',
parseInt(mostRecentEvent.remainingBalance, 10)
)
.stringField('eventDetails', JSON.stringify(mostRecentEvent))
.timestamp(new Date(mostRecentEvent.blockTime * 1000).toISOString())
return point
}

return undefined
}

;(async () => {
try {
await main()
Expand Down
Loading
Loading