Skip to content

Commit

Permalink
use to
Browse files Browse the repository at this point in the history
  • Loading branch information
mat1asm committed Apr 5, 2024
1 parent 4bc7c27 commit 6987d01
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
10 changes: 10 additions & 0 deletions scripts/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ export START_HOURS_AGO="72"
export END_HOURS_AGO="0"
export HOUR_WINDOW="1"
export TAGS_TO_CONVERT="address"

export NEW_MEASUREMENT_VERSION=v1

#export MEASUREMENT="failed_txn_event"
#export TAGS_TO_KEEP="network"
#export FIELDS="signature,errorDetails"
export MEASUREMENT="txn_event"
export TAGS_TO_KEEP="ecosystem,network,eventCategory"
export FIELDS="signature,amount,eventDetails,address,claimant"

34 changes: 22 additions & 12 deletions scripts/influxdb.fixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ const INFLUX_BUCKET = envOrErr('INFLUX_BUCKET')

const MEASUREMENT = process.env.MEASUREMENT ?? 'txn_event'
const NEW_MEASUREMENT_VERSION = process.env.NEW_MEASUREMENT_VERSION ?? 'v1'
const TAGS_TO_KEEP =
process.env.TAGS_TO_KEEP ?? 'ecosystem,network,eventCategory'
const FIELDS =
process.env.FIELDS ?? 'signature,amount,eventDetails,address,claimant'

const START_HOURS_AGO = process.env.START_HOURS_AGO ?? '72' // 3 days
const END_HOURS_AGO = process.env.END_HOURS_AGO ?? '0' // now
const HOUR_WINDOW = process.env.HOUR_WINDOW ?? '1'

const TAGS_TO_CONVERT = process.env.TAGS_TO_CONVERT ?? 'address'

async function main() {
console.log(
`Connecting to influx <${INFLUX_URL}/${INFLUX_ORG}/${INFLUX_BUCKET}>`
Expand All @@ -34,7 +36,8 @@ async function main() {

const influxDB = new InfluxDB({ url: INFLUX_URL, token: INFLUX_TOKEN })
const readApi = influxDB.getQueryApi(INFLUX_ORG)
const tagsToConvert = TAGS_TO_CONVERT.split(',')
const tagsToKeep = TAGS_TO_KEEP.split(',')
const fields = FIELDS.split(',')

let startHoursAgo = Number.parseInt(START_HOURS_AGO)
const endHoursAgo = Number.parseInt(END_HOURS_AGO)
Expand All @@ -49,7 +52,8 @@ async function main() {
INFLUX_BUCKET,
MEASUREMENT,
NEW_MEASUREMENT_VERSION,
tagsToConvert,
tagsToKeep,
fields,
currentStartHoursAgo,
currentEndHoursago,
readApi
Expand All @@ -69,22 +73,28 @@ async function mapAndWrite(
bucket: string,
measurement: string,
newMeasurementVersion: string,
tagsToConvert: string[],
tagsToKeep: string[],
fields: string[],
start: string,
end: string,
readApi: QueryApi
): Promise<void> {
const stream = `from(bucket: "${bucket}")
|> range(start: ${start}, stop: ${end})
|> filter(fn: (r) => r._measurement == "${measurement}")`
|> filter(fn: (r) => r._measurement == "${measurement}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")`

const mapper = `${stream}
|> map(fn: (r) => ({ r with _measurement: "${measurement}_${newMeasurementVersion}", ${tagsToConvert
.map((tag) => `${tag}: r.${tag}`)
.join(', ')} }))
|> drop(columns: [${tagsToConvert.map((tag) => `"${tag}"`).join(',')}])
|> to(bucket: "${bucket}")
`
|> set(key: "_measurement", value: "${measurement}_${newMeasurementVersion}")
|> to(
bucket: "${bucket}",
timeColumn: "_time",
tagColumns: [${tagsToKeep.map((tag) => `"${tag}"`).join(',')}],
fieldFn: (r) => ({ ${fields
.map((field) => `${field}: r.${field}`)
.join(',')} })
)`

console.log(` Executing the following query: ${mapper}`)
const response = await readApi.queryRaw(mapper)
console.log(response)
Expand Down

0 comments on commit 6987d01

Please sign in to comment.