Skip to content

Commit

Permalink
savepoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
igalklebanov committed Mar 25, 2024
1 parent 56d5c9a commit b348674
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 116 deletions.
6 changes: 2 additions & 4 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type {Knex} from 'knex'
import type {Dialect} from 'kysely'
import type {ColdDialect} from './cold-dialect/cold-dialect.js'

export interface KyselyKnexDialectConfig {
knex: Knex
kyselySubDialect: KyselySubDialect
kyselySubDialect: ColdDialect
}

export type KyselySubDialect = Omit<Dialect, 'createDriver'>
49 changes: 21 additions & 28 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import type {Knex} from 'knex'
import type {CompiledQuery, DatabaseConnection, QueryResult, TransactionSettings} from 'kysely'
import {KnexWriteResult} from './types'
import {ResultsParser} from './cold-dialect/results-parser.js'

export class KyselyKnexConnection implements DatabaseConnection {
#connection: unknown
readonly #knex: Knex
readonly #resultParser: ResultsParser
#transaction: Knex.Transaction | undefined

constructor(knex: Knex, connection: unknown) {
constructor(knex: Knex, connection: unknown, resultParser: ResultsParser) {
this.#connection = connection
this.#knex = knex
this.#resultParser = resultParser
}

async beginTransaction(settings: TransactionSettings): Promise<void> {
Expand All @@ -26,25 +28,13 @@ export class KyselyKnexConnection implements DatabaseConnection {
}

async executeQuery<R>(compiledQuery: CompiledQuery<unknown>): Promise<QueryResult<R>> {
const results = await this.#knex.raw(compiledQuery.sql, compiledQuery.parameters).connection(this.#connection)
const results = await this.#getRawQueryForConnection(compiledQuery)

console.log('results', results)

if (Array.isArray(results)) {
return {
rows: results,
}
}

return {
insertId: BigInt((results as KnexWriteResult).lastInsertRowid),
numAffectedRows: BigInt((results as KnexWriteResult).changes),
rows: [],
}
return this.#resultParser.parseResults(results)
}

release(): void {
;(this.#knex.client as Knex.Client).releaseConnection(this.#connection)
async release(): Promise<void> {
await (this.#knex.client as Knex.Client).releaseConnection(this.#connection)
this.#connection = undefined
}

Expand All @@ -57,16 +47,19 @@ export class KyselyKnexConnection implements DatabaseConnection {
compiledQuery: CompiledQuery<unknown>,
chunkSize?: number | undefined,
): AsyncIterableIterator<QueryResult<R>> {
return this.#knex
.raw(compiledQuery.sql, compiledQuery.parameters)
.stream({
// TODO: chunk size???
})
.map((data) => {
console.log('data', data)

throw new Error('Method not implemented.')
})
return this.#getRawQueryForConnection(compiledQuery)
.stream({highWaterMark: chunkSize})
.map((results) => this.#resultParser.parseResults([results]))
.iterator()
}

#getRawQueryForConnection(compiledQuery: CompiledQuery<unknown>): Knex.Raw {
return this.#knex.raw(this.#getNormalizedSQL(compiledQuery), compiledQuery.parameters).connection(this.#connection)
}

#getNormalizedSQL(compiledQuery: CompiledQuery): string {
const {sql} = compiledQuery

return compiledQuery.parameters.length ? sql.replace(/(\W)([\$@]\d+)(\W|$)/g, '$1?$3') : sql
}
}
2 changes: 0 additions & 2 deletions src/dialect.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type {DatabaseIntrospector, Dialect, DialectAdapter, Driver, Kysely, QueryCompiler} from 'kysely'
import type {KyselyKnexDialectConfig} from './config.js'
import {KyselyKnexDriver} from './driver.js'
import {assertSupportedDialect} from './supported-dialects.js'

export class KyselyKnexDialect implements Dialect {
readonly #config: KyselyKnexDialectConfig

constructor(config: KyselyKnexDialectConfig) {
assertSupportedDialect(config.knex.client.config.client)
this.#config = config
}

Expand Down
7 changes: 5 additions & 2 deletions src/driver.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import {Knex} from 'knex'
import type {Knex} from 'knex'
import type {DatabaseConnection, Driver, TransactionSettings} from 'kysely'
import type {ResultsParser} from './cold-dialect/results-parser.js'
import type {KyselyKnexDialectConfig} from './config.js'
import {KyselyKnexConnection} from './connection.js'

export class KyselyKnexDriver implements Driver {
readonly #config: KyselyKnexDialectConfig
readonly #resultsParser: ResultsParser

constructor(config: KyselyKnexDialectConfig) {
this.#config = config
this.#resultsParser = config.kyselySubDialect.createResultsParser!()
}

async acquireConnection(): Promise<DatabaseConnection> {
const connection = await (this.#config.knex.client as Knex.Client).acquireConnection()

return new KyselyKnexConnection(this.#config.knex, connection)
return new KyselyKnexConnection(this.#config.knex, connection, this.#resultsParser)
}

async beginTransaction(connection: KyselyKnexConnection, settings: TransactionSettings): Promise<void> {
Expand Down
13 changes: 0 additions & 13 deletions src/supported-dialects.ts

This file was deleted.

84 changes: 74 additions & 10 deletions tests/nodejs/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {InsertResult, UpdateResult} from 'kysely'
import {DeleteResult, InsertResult, UpdateResult} from 'kysely'
import {expectType} from 'tsd'
import {SUPPORTED_DIALECTS} from '../../src/supported-dialects'
import {
DEFAULT_DATA_SET,
PerDialect,
SUPPORTED_DIALECTS,
dropDatabase,
expect,
initTest,
seedDatabase,
type PerDialect,
type TestContext,
} from './test-setup'

for (const dialect of SUPPORTED_DIALECTS.slice(0, 1)) {
for (const dialect of SUPPORTED_DIALECTS) {
describe(`KyselyKnexDialect: ${dialect}`, () => {
let ctx: TestContext

Expand All @@ -28,7 +28,7 @@ for (const dialect of SUPPORTED_DIALECTS.slice(0, 1)) {
await ctx.kysely.destroy()
})

it.only('should be able to perform select queries', async () => {
it('should be able to perform select queries', async () => {
const knexPeople = await ctx.knex.from('person').select('*')

const kyselyPeople = await ctx.kysely.selectFrom('person').selectAll().execute()
Expand All @@ -37,15 +37,25 @@ for (const dialect of SUPPORTED_DIALECTS.slice(0, 1)) {
expectType<typeof knexPeople>(kyselyPeople)
})

it.only('should be able to perform insert queries', async () => {
it('should be able to perform insert queries', async () => {
const result = await ctx.kysely.insertInto('person').values({gender: 'female'}).executeTakeFirstOrThrow()

expect(result).to.deep.equal(
(
{
'better-sqlite3': {insertId: BigInt(DEFAULT_DATA_SET.length + 1), numInsertedOrUpdatedRows: BigInt(1)},
mssql: {insertId: undefined, numInsertedOrUpdatedRows: BigInt(1)},
mysql: {insertId: BigInt(DEFAULT_DATA_SET.length + 1), numInsertedOrUpdatedRows: BigInt(1)},
'better-sqlite3': {
insertId: BigInt(DEFAULT_DATA_SET.length + 1),
numInsertedOrUpdatedRows: BigInt(1),
},
mssql: {insertId: undefined, numInsertedOrUpdatedRows: undefined},
mysql: {
insertId: BigInt(DEFAULT_DATA_SET.length + 1),
numInsertedOrUpdatedRows: BigInt(1),
},
mysql2: {
insertId: BigInt(DEFAULT_DATA_SET.length + 1),
numInsertedOrUpdatedRows: BigInt(1),
},
pg: {insertId: undefined, numInsertedOrUpdatedRows: BigInt(1)},
sqlite3: {insertId: undefined, numInsertedOrUpdatedRows: undefined},
} satisfies PerDialect<{[K in keyof InsertResult]: InsertResult[K]}>
Expand Down Expand Up @@ -76,13 +86,67 @@ for (const dialect of SUPPORTED_DIALECTS.slice(0, 1)) {
(
{
'better-sqlite3': new UpdateResult(BigInt(1), undefined),
mssql: new UpdateResult(BigInt(1), undefined),
mssql: new UpdateResult(BigInt(0), undefined),
mysql: new UpdateResult(BigInt(1), BigInt(1)),
mysql2: new UpdateResult(BigInt(1), BigInt(1)),
pg: new UpdateResult(BigInt(1), undefined),
sqlite3: new UpdateResult(BigInt(0), undefined),
} satisfies PerDialect<{[K in keyof UpdateResult]: UpdateResult[K]}>
)[dialect],
)
})

if (dialect === 'better-sqlite3' || dialect === 'pg' || dialect === 'sqlite3') {
it('should be able to perform update queries with returning', async () => {
const result = await ctx.kysely
.updateTable('person')
.set({marital_status: 'widowed'})
.where('id', '=', 1)
.returning(['gender'])
.executeTakeFirstOrThrow()

expect(result).to.deep.equal({gender: DEFAULT_DATA_SET[0].gender})
})
}

it('should be able to perform delete queries', async () => {
const result = await ctx.kysely.deleteFrom('person').where('id', '=', 1).executeTakeFirstOrThrow()

expect(result).to.deep.equal(
(
{
'better-sqlite3': {numDeletedRows: BigInt(1)},
mssql: {numDeletedRows: BigInt(0)},
mysql: {numDeletedRows: BigInt(1)},
mysql2: {numDeletedRows: BigInt(1)},
pg: {numDeletedRows: BigInt(1)},
sqlite3: {numDeletedRows: BigInt(0)},
} satisfies PerDialect<{[K in keyof DeleteResult]: DeleteResult[K]}>
)[dialect],
)
})

if (dialect === 'better-sqlite3' || dialect === 'pg' || dialect === 'sqlite3') {
it('should be able to perform delete queries with returning', async () => {
const result = await ctx.kysely
.deleteFrom('person')
.where('id', '=', 1)
.returning('gender')
.executeTakeFirstOrThrow()

expect(result).to.deep.equal({gender: DEFAULT_DATA_SET[0].gender})
})
}

it.skip('should be able to stream results', async () => {
const stream = ctx.kysely.selectFrom('person').selectAll().stream(1)

// let called = 0
for await (const chunk of stream) {
// expect(chunk).to.be.an('array').with.lengthOf(1)
// called++
}
// expect(called).to.equal(DEFAULT_DATA_SET.length)
})
})
}
Loading

0 comments on commit b348674

Please sign in to comment.