Skip to content

Commit

Permalink
Network disconnection support & repo shutdown (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh authored Jul 26, 2024
1 parent f96b835 commit 2c450d6
Show file tree
Hide file tree
Showing 25 changed files with 1,367 additions and 1,511 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"eslint-plugin-react": "^7.34.1",
"eslint-plugin-react-hooks": "^4.6.0",
"globals": "^15.6.0",
"lerna": "^6.6.2",
"lerna": "^8.1.6",
"npm-run-all": "^4.1.5",
"npm-watch": "^0.11.0",
"portfinder": "^1.0.32",
Expand Down
51 changes: 47 additions & 4 deletions packages/automerge-repo-network-broadcastchannel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,32 @@ export type BroadcastChannelNetworkAdapterOptions = {

export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
#broadcastChannel: BroadcastChannel
#disconnected = false

#options: BroadcastChannelNetworkAdapterOptions

#ready = false
#readyResolver?: () => void
#readyPromise: Promise<void> = new Promise<void>(resolve => {
this.#readyResolver = resolve
})
#connectedPeers: PeerId[] = []

isReady() {
return this.#ready
}

whenReady() {
return this.#readyPromise
}

#forceReady() {
if (!this.#ready) {
this.#ready = true
this.#readyResolver?.()
}
}

constructor(options?: BroadcastChannelNetworkAdapterOptions) {
super()
this.#options = { channelName: "broadcast", ...(options ?? {}) }
Expand All @@ -39,6 +62,7 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
connect(peerId: PeerId, peerMetadata?: PeerMetadata) {
this.peerId = peerId
this.peerMetadata = peerMetadata
this.#disconnected = false

this.#broadcastChannel.addEventListener(
"message",
Expand All @@ -48,6 +72,10 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
return
}

if (this.#disconnected) {
return
}

const { senderId, type } = message

switch (type) {
Expand All @@ -69,6 +97,12 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
this.#announceConnection(senderId, peerMetadata)
}
break
case "leave":
this.#connectedPeers = this.#connectedPeers.filter(
p => p !== senderId
)
this.emit("peer-disconnected", { peerId: senderId })
break
default:
if (!("data" in message)) {
this.emit("message", message)
Expand All @@ -89,15 +123,18 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
type: "arrive",
peerMetadata,
})

this.emit("ready", { network: this })
}

#announceConnection(peerId: PeerId, peerMetadata: PeerMetadata) {
this.#forceReady()
this.#connectedPeers.push(peerId)
this.emit("peer-candidate", { peerId, peerMetadata })
}

send(message: Message) {
if (this.#disconnected) {
return false
}
if ("data" in message) {
this.#broadcastChannel.postMessage({
...message,
Expand All @@ -114,8 +151,14 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
}

disconnect() {
// TODO:
throw new Error("Unimplemented: leave on BroadcastChannelNetworkAdapter")
this.#broadcastChannel.postMessage({
senderId: this.peerId,
type: "leave",
})
for (const peerId of this.#connectedPeers) {
this.emit("peer-disconnected", { peerId })
}
this.#disconnected = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface PortRefEvents {

export interface MessagePortRef extends EventEmitter<PortRefEvents> {
start(): void
stop(): void
postMessage(message: any, transferable?: Transferable[]): void
isAlive(): boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,33 @@ export class StrongMessagePortRef
extends EventEmitter<PortRefEvents>
implements MessagePortRef
{
isDisconnected: boolean = false

constructor(private port: MessagePort) {
port.addEventListener("message", event => {
this.emit("message", event)
if (!this.isDisconnected) {
this.emit("message", event)
}
})

super()
}

postMessage(message: any, transfer: Transferable[]): void {
this.port.postMessage(message, transfer)
if (!this.isDisconnected) {
this.port.postMessage(message, transfer)
}
}

start(): void {
this.isDisconnected = false
this.port.start()
}

stop() {
this.isDisconnected = true
}

isAlive(): boolean {
/* c8 ignore next */
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ export class WeakMessagePortRef
this.weakRef = new WeakRef<MessagePort>(port)

port.addEventListener("message", event => {
this.emit("message", event)
if (!this.isDisconnected) {
this.emit("message", event)
}
})
}

Expand All @@ -27,6 +29,10 @@ export class WeakMessagePortRef
return
}

if (this.isDisconnected) {
return
}

try {
port.postMessage(message, transfer)
} catch (err) {
Expand All @@ -42,13 +48,19 @@ export class WeakMessagePortRef
return
}

this.isDisconnected = false

try {
port.start()
} catch (err) {
this.disconnnect()
}
}

stop() {
this.isDisconnected = true
}

private disconnnect() {
if (!this.isDisconnected) {
this.emit("close")
Expand Down
50 changes: 39 additions & 11 deletions packages/automerge-repo-network-messagechannel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,28 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
channels = {}
/** @hidden */
messagePortRef: MessagePortRef
#startupComplete = false

#ready = false
#readyResolver?: () => void
#readyPromise: Promise<void> = new Promise<void>(resolve => {
this.#readyResolver = resolve
})
#remotePeerId?: PeerId

isReady() {
return this.#ready
}

whenReady() {
return this.#readyPromise
}

#forceReady() {
if (!this.#ready) {
this.#ready = true
this.#readyResolver?.()
}
}

constructor(
messagePort: MessagePort,
Expand All @@ -42,6 +63,7 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
log("messageport connecting")
this.peerId = peerId
this.peerMetadata = peerMetadata

this.messagePortRef.start()
this.messagePortRef.addListener(
"message",
Expand Down Expand Up @@ -76,6 +98,11 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
this.announceConnection(senderId, peerMetadata)
}
break
case "leave":
if (this.#remotePeerId === senderId) {
this.emit("peer-disconnected", { peerId: senderId })
}
break
default:
if (!("data" in message)) {
this.emit("message", message)
Expand Down Expand Up @@ -104,10 +131,7 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
// must be something weird going on on the other end to cause us to receive
// no response
setTimeout(() => {
if (!this.#startupComplete) {
this.#startupComplete = true
this.emit("ready", { network: this })
}
this.#forceReady()
}, 100)
}

Expand All @@ -131,16 +155,20 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
}

announceConnection(peerId: PeerId, peerMetadata: PeerMetadata) {
if (!this.#startupComplete) {
this.#startupComplete = true
this.emit("ready", { network: this })
}
this.#remotePeerId = peerId
this.#forceReady()
this.emit("peer-candidate", { peerId, peerMetadata })
}

disconnect() {
// TODO
throw new Error("Unimplemented: leave on MessagePortNetworkAdapter")
if (this.#remotePeerId && this.peerId) {
this.messagePortRef.postMessage({
type: "leave",
senderId: this.peerId,
})
this.emit("peer-disconnected", { peerId: this.#remotePeerId })
}
this.messagePortRef.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,27 @@ abstract class WebSocketNetworkAdapter extends NetworkAdapter {
}

export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
#isReady = false
#ready = false
#readyResolver?: () => void
#readyPromise: Promise<void> = new Promise<void>(resolve => {
this.#readyResolver = resolve
})

isReady() {
return this.#ready
}

whenReady() {
return this.#readyPromise
}

#forceReady() {
if (!this.#ready) {
this.#ready = true
this.#readyResolver?.()
}
}

#retryIntervalId?: TimeoutId
#log = debug("automerge-repo:websocket:browser")

Expand Down Expand Up @@ -71,7 +91,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
// Mark this adapter as ready if we haven't received an ack in 1 second.
// We might hear back from the other end at some point but we shouldn't
// hold up marking things as unavailable for any longer
setTimeout(() => this.#ready(), 1000)
setTimeout(() => this.#forceReady(), 1000)
this.join()
}

Expand Down Expand Up @@ -121,12 +141,6 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
this.#log("Connection failed, retrying...")
}

#ready() {
if (this.#isReady) return
this.#isReady = true
this.emit("ready", { network: this })
}

join() {
assert(this.peerId)
assert(this.socket)
Expand All @@ -140,14 +154,28 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
disconnect() {
assert(this.peerId)
assert(this.socket)
this.send({ type: "leave", senderId: this.peerId })
const socket = this.socket
if (socket) {
socket.removeEventListener("open", this.onOpen)
socket.removeEventListener("close", this.onClose)
socket.removeEventListener("message", this.onMessage)
socket.removeEventListener("error", this.onError)
socket.close()
}
clearInterval(this.#retryIntervalId)
if (this.remotePeerId)
this.emit("peer-disconnected", { peerId: this.remotePeerId })
this.socket = undefined
}

send(message: FromClientMessage) {
if ("data" in message && message.data?.byteLength === 0)
throw new Error("Tried to send a zero-length message")
assert(this.peerId)
assert(this.socket)
if (!this.socket) {
this.#log("Tried to send on a disconnected socket.")
return
}
if (this.socket.readyState !== WebSocket.OPEN)
throw new Error(`Websocket not ready (${this.socket.readyState})`)

Expand All @@ -157,7 +185,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {

peerCandidate(remotePeerId: PeerId, peerMetadata: PeerMetadata) {
assert(this.socket)
this.#ready()
this.#forceReady()
this.remotePeerId = remotePeerId
this.emit("peer-candidate", {
peerId: remotePeerId,
Expand Down
Loading

0 comments on commit 2c450d6

Please sign in to comment.