From 0cae27fb324f194cca7f8d25ba85dc0791e12893 Mon Sep 17 00:00:00 2001 From: Dario Date: Mon, 6 Mar 2023 15:04:05 -0300 Subject: [PATCH] feat: added txPending and txpool converters, insert, update and delete --- src/converters/txPending.converters.js | 34 ++++++++++++ src/repositories/txPending.repository.js | 50 ++++++++--------- src/repositories/txPool.repository.js | 69 ++++++++++++++++-------- src/repositories/utils.js | 19 +++++++ src/services/classes/TxPool.js | 10 ++-- 5 files changed, 127 insertions(+), 55 deletions(-) create mode 100644 src/converters/txPending.converters.js diff --git a/src/converters/txPending.converters.js b/src/converters/txPending.converters.js new file mode 100644 index 00000000..b5580c1b --- /dev/null +++ b/src/converters/txPending.converters.js @@ -0,0 +1,34 @@ +function rawTxPendingToEntity (data) { + return { + hash: data.hash, + blockHash: data.blockHash, + from: data.from, + to: data.to, + blockNumber: data.blockNumber, + transactionIndex: data.transactionIndex, + nonce: data.nonce, + gas: data.gas, + gasPrice: data.gasPrice, + value: data.value, + input: data.input, + status: data.status + } +} + +function rawTxInPoolToEntity (data) { + return { + ...rawTxPendingToEntity(data), + poolId: data.poolId + } +} + +function rawTxPoolToEntity (data) { + return { + blockNumber: data.blockNumber, + pending: data.pending, + queued: data.queued, + timestamp: String(data.timestamp) + } +} + +export {rawTxPendingToEntity, rawTxPoolToEntity, rawTxInPoolToEntity} diff --git a/src/repositories/txPending.repository.js b/src/repositories/txPending.repository.js index f22ce1f6..872c2cc2 100644 --- a/src/repositories/txPending.repository.js +++ b/src/repositories/txPending.repository.js @@ -1,39 +1,35 @@ import { prismaClient } from '../lib/Setup' +import {rawTxPendingToEntity, rawTxInPoolToEntity} from '../converters/txPending.converters' +import { mongoQueryToPrisma } from './utils' export const txPendingRepository = { - findOne (query = {}, project = {}, collection) { - return collection.findOne(query, project) - }, - find (query = {}, project = {}, collection, sort = {}, limit = 0, isArray = true) { - if (isArray) { - return collection - .find(query, project) - .sort(sort) - .limit(limit) - .toArray() - } else { - return collection - .find(query, project) - .sort(sort) - .limit(limit) - } - }, - countDocuments (query = {}, collection) { - return collection.countDocuments(query) + async findOne (query = {}, project = {}, collection) { + const txPending = await prismaClient.transaction_pending.findFirst({where: mongoQueryToPrisma(query)}) + + return txPending }, - aggregate (aggregate, collection) { - return collection.aggregate(aggregate).toArray() + async find (query = {}, project = {}, collection, sort = {}, limit = 0, isArray = true) { + const txsPending = await prismaClient.transaction_pending.findMany({where: mongoQueryToPrisma(query)}) + + return txsPending }, async deleteOne (query, collection) { await prismaClient.transaction_pending.deleteMany({where: query}) - const mongoRes = collection.deleteOne(query) + const mongoRes = await collection.deleteOne(query) return mongoRes }, - updateOne (filter, update, options = {}, collection) { - return collection.updateOne(filter, update, options) - }, - insertOne (data, collection) { - return collection.insertOne(data) + async updateOne (filter, update, options = {}, collection) { + const {$set: data, poolId} = update + const newTxPending = rawTxPendingToEntity(data) + + await prismaClient.$transaction([ + prismaClient.transaction_pending.upsert({where: filter, update: newTxPending, create: newTxPending}), + prismaClient.transaction_in_pool.create({data: rawTxInPoolToEntity({...data, poolId})}) + ]) + + delete update.poolId + const mongoRes = await collection.updateOne(filter, update, options) + return mongoRes } } diff --git a/src/repositories/txPool.repository.js b/src/repositories/txPool.repository.js index 736dea75..a621b669 100644 --- a/src/repositories/txPool.repository.js +++ b/src/repositories/txPool.repository.js @@ -1,28 +1,51 @@ +import { prismaClient } from '../lib/Setup' +import {rawTxPoolToEntity} from '../converters/txPending.converters' +import {mongoQueryToPrisma} from './utils' + export const txPoolRepository = { - findOne (query = {}, project = {}, collection) { - return collection.findOne(query, project) + async findOne (query = {}, project = {}, collection) { + const txPool = await prismaClient.txpool.findFirst({ + where: mongoQueryToPrisma(query), + orderBy: {id: 'desc'}, + include: {transaction_in_pool: true} + }) + + txPool.txs = [...txPool.transaction_in_pool] + delete txPool.transaction_in_pool + + txPool._id = txPool.id + delete txPool.id + + txPool.timestamp = Number(txPool.timestamp) + + return txPool }, - find (query = {}, project = {}, collection, sort = {}, limit = 0, isArray = true) { - if (isArray) { - return collection - .find(query, project) - .sort(sort) - .limit(limit) - .toArray() - } else { - return collection - .find(query, project) - .sort(sort) - .limit(limit) - } + async find (query = {}, project = {}, collection, sort = {}, limit = 0, isArray = true) { + let txPools = await prismaClient.txpool.findMany({ + where: query, + orderBy: {timestamp: 'desc'}, + include: {transaction_in_pool: true}, + take: limit + }) + + txPools = txPools.map(txPool => { + txPool.txs = [...txPool.transaction_in_pool] + delete txPool.transaction_in_pool + + txPool._id = txPool.id + delete txPool.id + + txPool.timestamp = Number(txPool.timestamp) + + return txPool + }) + + return txPools }, - countDocuments (query = {}, collection) { - return collection.countDocuments(query) - }, - aggregate (aggregate, collection) { - return collection.aggregate(aggregate).toArray() - }, - insertOne (data, collection) { - return collection.insertOne(data) + async insertOne (data, collection) { + const txpool = await prismaClient.txpool.create({data: rawTxPoolToEntity(data)}) + + const mongoRes = await collection.insertOne(data) + return {...mongoRes, id: txpool.id} } } diff --git a/src/repositories/utils.js b/src/repositories/utils.js index 44d629a8..dd6adc50 100644 --- a/src/repositories/utils.js +++ b/src/repositories/utils.js @@ -5,3 +5,22 @@ export function mongoSortToPrisma (num) { return 'desc' } } +// TODO: finish the mapping of the remaining mongo operators +export function mongoQueryToPrisma (query) { + const mongoOperatorToPrisma = { + $or: 'OR', + $and: 'AND' + } + const newQuery = {} + + for (const key in query) { + const value = query[key] + if ((value && Object.keys(value).length > 0 && !Array.isArray(value))) { + newQuery[mongoOperatorToPrisma[key] || key] = mongoQueryToPrisma(value) + } else { + newQuery[mongoOperatorToPrisma[key] || key] = value + } + } + + return newQuery +} diff --git a/src/services/classes/TxPool.js b/src/services/classes/TxPool.js index 0b4b980f..71bf4501 100644 --- a/src/services/classes/TxPool.js +++ b/src/services/classes/TxPool.js @@ -2,7 +2,6 @@ import { BlocksBase } from '../../lib/BlocksBase' import { isHexString, base64toHex } from '../../lib/utils' import { txPoolRepository } from '../../repositories/txPool.repository' import { txPendingRepository } from '../../repositories/txPending.repository' - export class TxPool extends BlocksBase { constructor (db, options) { super(db, options) @@ -117,18 +116,19 @@ export class TxPool extends BlocksBase { async savePoolToDb (pool) { try { this.log.debug(`Saving txPool to db`) - await txPoolRepository.insertOne(pool, this.TxPool) - await this.savePendingTxs(pool.txs) + const txpool = await txPoolRepository.insertOne(pool, this.TxPool) + await this.savePendingTxs(pool.txs, txpool.id) } catch (err) { this.log.error(`Error saving txPool: ${err}`) return Promise.reject(err) } } - async savePendingTxs (txs) { + async savePendingTxs (txs, poolId) { try { txs = txs || [] - await Promise.all(txs.map(tx => txPendingRepository.updateOne({ hash: tx.hash }, { $set: tx }, { upsert: true }, this.PendingTxs))) + const savedTxs = await Promise.all(txs.map(tx => txPendingRepository.updateOne({ hash: tx.hash }, { $set: tx, poolId }, { upsert: true }, this.PendingTxs))) + return savedTxs } catch (err) { this.log.error(`Error saving pending transactions: ${err}`) return Promise.reject(err)