Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket: fix another sync loop #206

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ export class NodeWSServerAdapter extends NetworkAdapter {
)
switch (type) {
case "join":
const existingSocket = this.sockets[senderId]
if (existingSocket) {
if (existingSocket.readyState === WebSocket.OPEN) {
existingSocket.close()
}
this.emit("peer-disconnected", {peerId: senderId})
}

// Let the rest of the system know that we have a new connection.
this.emit("peer-candidate", { peerId: senderId })
this.sockets[senderId] = socket
Expand Down
196 changes: 195 additions & 1 deletion packages/automerge-repo-network-websocket/test/Websocket.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { PeerId, Repo } from "@automerge/automerge-repo"
import { next as A } from "@automerge/automerge"
import { AutomergeUrl, DocumentId, PeerId, Repo, SyncMessage, parseAutomergeUrl } from "@automerge/automerge-repo"
import assert from "assert"
import * as CBOR from "cbor-x"
import { once } from "events"
import http from "http"
import { describe, it } from "vitest"
import WebSocket, { AddressInfo } from "ws"
import { runAdapterTests } from "../../automerge-repo/src/helpers/tests/network-adapter-tests.js"
import { DummyStorageAdapter } from "../../automerge-repo/test/helpers/DummyStorageAdapter.js"
import { BrowserWebSocketClientAdapter } from "../src/BrowserWebSocketClientAdapter.js"
import { NodeWSServerAdapter } from "../src/NodeWSServerAdapter.js"
import {headsAreSame} from "@automerge/automerge-repo/src/helpers/headsAreSame.js"

describe("Websocket adapters", () => {
const setup = async (clientCount = 1) => {
Expand Down Expand Up @@ -152,6 +155,18 @@ describe("Websocket adapters", () => {
return message
}

async function recvOrTimeout(socket: WebSocket): Promise<Buffer | null> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
resolve(null)
}, 1000)
socket.once("message", (msg) => {
clearTimeout(timer)
resolve(msg as Buffer)
})
})
}

it("should send the negotiated protocol version in its hello message", async () => {
const response = await serverResponse({
type: "join",
Expand Down Expand Up @@ -192,6 +207,185 @@ describe("Websocket adapters", () => {
selectedProtocolVersion: "1",
})
})

/**
* Create a new document, initialized with the given contents and return a
* storage containign that document as well as the URL and a fork of the
* document
*
* @param contents - The contents to initialize the document with
*/
async function initDocAndStorage<T extends Record<string, unknown>>(contents: T): Promise<{
storage: DummyStorageAdapter,
url: AutomergeUrl,
doc: A.Doc<T>,
documentId: DocumentId
}> {
const storage = new DummyStorageAdapter()
const silentRepo = new Repo({storage, network: []})
const doc = A.from<T>(contents)
const handle = silentRepo.create()
handle.update(() => A.clone(doc))
const { documentId } = parseAutomergeUrl(handle.url)
await pause(150)
return {
url: handle.url,
doc,
documentId,
storage,
}
}

function assertIsPeerMessage(msg: Buffer | null) {
if (msg == null) {
throw new Error("expected a peer message, got null")
}
let decoded = CBOR.decode(msg)
if (decoded.type !== "peer") {
throw new Error(`expected a peer message, got type: ${decoded.type}`)
}
}

function assertIsSyncMessage(forDocument: DocumentId, msg: Buffer | null): SyncMessage {
if (msg == null) {
throw new Error("expected a peer message, got null")
}
let decoded = CBOR.decode(msg)
if (decoded.type !== "sync") {
throw new Error(`expected a peer message, got type: ${decoded.type}`)
}
if (decoded.documentId !== forDocument) {
throw new Error(`expected a sync message for ${forDocument}, not for ${decoded.documentId}`)
}
return decoded
}

it("should disconnect existing peers on reconnect before announcing them", async () => {
// This test exercises a sync loop which is exposed in the following
// sequence of events:
//
// 1. A document exists on both the server and the client with divergent
// heads (both sides have changes the other does not have)
// 2. The client sends a sync message to the server
// 3. The server responds, but for some reason the server response is
// dropped
// 4. The client reconnects due to not receiving a response or a ping
// 5. The peers exchange sync messages, but the server thinks it has
// already sent its changes to the client, so it doesn't sent them.
// 6. The client notices that it doesn't have the servers changes so it
// asks for them
// 7. The server responds with an empty sync message because it thinks it
// has already sent the changes
//
// 6 and 7 continue in an infinite loop. The root cause is the servers
// failure to clear the sync state associated with the given peer when
// it receives a new connection from the same peer ID.
const { socket, serverUrl } = await setup(0)

// Create a doc, populate a DummyStorageAdapter with that doc
const {storage, url, doc, documentId} = await initDocAndStorage({foo: "bar"})

// Create a copy of the document to represent the client state
let clientDoc = A.clone<{foo: string}>(doc)
clientDoc = A.change(clientDoc, d => d.foo = "qux")

// Now create a websocket sync server with the original document in it's storage
const adapter = new NodeWSServerAdapter(socket)
const repo = new Repo({ network: [adapter], storage, peerId: "server" as PeerId })

// make a change to the handle on the sync server
const handle = repo.find<{foo: string}>(url)
await handle.whenReady()
handle.change(d => d.foo = "baz")

// Okay, so now there is a document on both the client and the server
// which has concurrent changes on each peer.

// Simulate the initial websocket connection
let clientSocket = new WebSocket(serverUrl)
await once(clientSocket, "open")

// Run through the client/server hello
clientSocket.send(CBOR.encode({
type: "join",
senderId: "client",
supportedProtocolVersions: ["1"],
}))

let response = await recvOrTimeout(clientSocket)
assertIsPeerMessage(response)

// Okay now we start syncing

let clientState = A.initSyncState()
let [newSyncState, message] = A.generateSyncMessage(clientDoc, clientState)
clientState = newSyncState

// Send the initial sync state
clientSocket.send(CBOR.encode({
type: "request",
documentId,
targetId: "server",
senderId: "client",
data: message
}))

response = await recvOrTimeout(clientSocket)
assertIsSyncMessage(documentId, response)

// Now, assume either the network or the server is going slow, so the
// server thinks it has sent the response above, but for whatever reason
// it never gets to the client. In that case the reconnect timer in the
// BrowserWebSocketClientAdapter will fire and we'll create a new
// websocket and connect it. To simulate this we drop the above response
// on the floor and start connecting again.

clientSocket = new WebSocket(serverUrl)
await once(clientSocket, "open")

// and we also make a change to the client doc
clientDoc = A.change(clientDoc, d => d.foo = "quoxen")

// Run through the whole client/server hello dance again
clientSocket.send(CBOR.encode({
type: "join",
senderId: "client",
supportedProtocolVersions: ["1"],
}))

response = await recvOrTimeout(clientSocket)
assertIsPeerMessage(response)

// Now, we start syncing. If we're not buggy, this loop should terminate.
while(true) {
;[clientState, message] = A.generateSyncMessage(clientDoc, clientState)
if (message) {
clientSocket.send(CBOR.encode({
type: "sync",
documentId,
targetId: "server",
senderId: "client",
data: message
}))
}
const response = await recvOrTimeout(clientSocket)
if (response) {
const decoded = assertIsSyncMessage(documentId, response)
;[clientDoc, clientState] = A.receiveSyncMessage(clientDoc, clientState, decoded.data)
}
if (response == null && message == null) {
break
}
// Make sure shit has time to happen
await pause(50)
}

let localHeads = A.getHeads(clientDoc)
let remoteHeads = A.getHeads(handle.docSync())
if (!headsAreSame(localHeads, remoteHeads)) {
throw new Error("heads not equal")
}
})
})
})

Expand Down