Skip to content

Commit

Permalink
UBERF-9093: Fix connection establish (#7623)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiodo authored Jan 9, 2025
1 parent 65e037c commit be10f68
Showing 1 changed file with 53 additions and 62 deletions.
115 changes: 53 additions & 62 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,20 @@ export async function createClient (
}
}
}
let initialized = false
const conn = await ctx.with('connect', {}, () => connect(txHandler))

const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
switch (mode) {
case 'same':
case 'upgrade':
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
}

txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)

client = new ClientImpl(conn)
Expand All @@ -268,70 +279,50 @@ export async function createClient (
const oldOnConnect:
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
| undefined = conn.onConnect
conn.onConnect = async (event, _lastTx, data) => {
console.log('Client: onConnect', event)
if (event === ClientConnectEvent.Maintenance) {
lastTx = _lastTx
await oldOnConnect?.(ClientConnectEvent.Maintenance, _lastTx, data)
return
}
// Find all new transactions and apply
const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))

switch (mode) {
case 'upgrade':
// We have upgrade procedure and need rebuild all stuff.
hierarchy = new Hierarchy()
model = new ModelDb(hierarchy)
;(client as ClientImpl).setModel(hierarchy, model)

await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
await oldOnConnect?.(ClientConnectEvent.Upgraded, _lastTx, data)
// No need to fetch more stuff since upgrade was happened.
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
break
}

await new Promise<void>((resolve) => {
conn.onConnect = async (event, _lastTx, data) => {
console.log('Client: onConnect', event)
if (event === ClientConnectEvent.Maintenance) {
lastTx = _lastTx
await oldOnConnect?.(ClientConnectEvent.Maintenance, _lastTx, data)
return
}
// Find all new transactions and apply
const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
if (!initialized) {
switch (mode) {
case 'same':
case 'upgrade':
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
}
initialized = true
} else {
switch (mode) {
case 'upgrade':
// We have upgrade procedure and need rebuild all stuff.
hierarchy = new Hierarchy()
model = new ModelDb(hierarchy)
;(client as ClientImpl).setModel(hierarchy, model)

await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
await oldOnConnect?.(ClientConnectEvent.Upgraded, _lastTx, data)
// No need to fetch more stuff since upgrade was happened.
break
case 'addition':
await ctx.with('build-model', {}, (ctx) =>
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
)
break
}
}
resolve()

if (lastTx === undefined) {
// No need to do anything here since we connected.
await oldOnConnect?.(event, _lastTx, data)
lastTx = _lastTx
resolve()
return
}

if (lastTx === _lastTx) {
// Same lastTx, no need to refresh
await oldOnConnect?.(ClientConnectEvent.Reconnected, _lastTx, data)
resolve()
return
}
if (lastTx === undefined) {
// No need to do anything here since we connected.
await oldOnConnect?.(event, _lastTx, data)
lastTx = _lastTx
// We need to trigger full refresh on queries, etc.
await oldOnConnect?.(ClientConnectEvent.Refresh, lastTx, data)
resolve()
return
}
})

if (lastTx === _lastTx) {
// Same lastTx, no need to refresh
await oldOnConnect?.(ClientConnectEvent.Reconnected, _lastTx, data)
return
}
lastTx = _lastTx
// We need to trigger full refresh on queries, etc.
await oldOnConnect?.(ClientConnectEvent.Refresh, lastTx, data)
}

return client
}
Expand Down

0 comments on commit be10f68

Please sign in to comment.