From 45d1d4550306af96f84561ba36310f18e25431fe Mon Sep 17 00:00:00 2001 From: Zicklag Date: Mon, 30 Dec 2024 17:28:49 -0600 Subject: [PATCH] feat: first pass horribly broken 2-way p2p chat. --- src/client/initMatrix.ts | 6 +- src/matrix-shim/data.ts | 1 - src/matrix-shim/index.ts | 104 ++++++++++++++++++++++++++++- src/matrix-shim/peerjsFrontend.ts | 83 ++++++++++++++++------- src/matrix-shim/peerjsTransport.ts | 67 ++++++++++++++----- src/sw.ts | 6 +- 6 files changed, 216 insertions(+), 51 deletions(-) diff --git a/src/client/initMatrix.ts b/src/client/initMatrix.ts index 52e8317e4..5fe2c94cc 100644 --- a/src/client/initMatrix.ts +++ b/src/client/initMatrix.ts @@ -6,9 +6,9 @@ import { cryptoCallbacks } from './state/secretStorageKeys'; global.Olm = Olm; -if (import.meta.env.PROD) { - logger.disableAll(); -} +// if (import.meta.env.PROD) { + logger.disableAll() +// } type Session = { baseUrl: string; diff --git a/src/matrix-shim/data.ts b/src/matrix-shim/data.ts index c5009deaa..ae7157869 100644 --- a/src/matrix-shim/data.ts +++ b/src/matrix-shim/data.ts @@ -1,7 +1,6 @@ /* eslint-disable no-restricted-syntax */ /* eslint-disable max-classes-per-file */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import _ from 'lodash'; import { IRoomEvent, IStateEvent } from 'matrix-js-sdk'; type Message = { diff --git a/src/matrix-shim/index.ts b/src/matrix-shim/index.ts index b1f1c66e6..35b6910c1 100644 --- a/src/matrix-shim/index.ts +++ b/src/matrix-shim/index.ts @@ -108,7 +108,7 @@ export class MatrixShim { agent?: Agent; - connectionManager: PeerjsConnectionManager = new PeerjsConnectionManager(); + connectionManager: PeerjsConnectionManager; webrtcPeerConns: { [did: string]: DataConnection } = {}; @@ -241,10 +241,40 @@ export class MatrixShim { this.kvdb = kvdb; this.keypair = keypair; - this.connectionManager.openHandlers.push(() => { - this.setPeerIdRecordInPds(); + // Whenever a peer is opened, set that PeerId as our current peer ID in our AtProto PDS. + this.connectionManager = new PeerjsConnectionManager({ + peerOpenHandlers: [ + () => { + this.setPeerIdRecordInPds(); + this.connectToPeers(); + }, + ], + peerConnectHandlers: [ + (transport) => { + (async () => { + for await (const message of transport) { + for (const roomId of this.data.roomIds()) { + const room = this.data.rooms[roomId]; + if (room.direct) { + if (this.oauthSession) { + this.data.roomSendMessage( + roomId, + room.members[0].id, + crypto.randomUUID(), + new TextDecoder().decode(message) + ); + this.changes.notify(); + } + } + } + } + })(); + }, + ], }); + this.connectToPeers(); + const router = AutoRouter(); router.get('/_matrix/custom/authchecktest', () => { @@ -478,6 +508,7 @@ export class MatrixShim { data.is_direct ); this.changes.notify(); + this.connectToPeers(); return { room_id: roomId }; }); @@ -500,6 +531,11 @@ export class MatrixShim { params.txid, content.body || '[unknown body]' ); + + for (const transport of this.connectionManager.transports) { + transport.send(new TextEncoder().encode(content.body)); + } + this.changes.notify(); return { @@ -583,6 +619,68 @@ export class MatrixShim { this.agent = new Agent(this.oauthSession); this.userHandle = (await resolveDid(this.oauthSession.did)) || this.oauthSession.did; this.kvdb.set('did', this.oauthSession.did); + this.connectToPeers(); + } + + async getPeerIdForDid(did: string): Promise { + if (this.agent && this.oauthSession) { + const record = await this.agent.com.atproto.repo.getRecord( + { + collection: 'peer.pigeon.muni.town', + repo: did, + rkey: 'self', + }, + { headers: { 'atproto-proxy': `${did}#atproto_pds` } } + ); + + return (record.data.value as any)?.id; + } + + return undefined; + } + + /** Connect to all peers */ + async connectToPeers() { + console.info('Connecting to peers'); + const membersList: Set = new Set(); + for (const roomId of this.data.roomIds()) { + const room = this.data.rooms[roomId]; + membersList.add(room.owner.id); + room.members.forEach((x) => membersList.add(x.id)); + } + if (this.oauthSession) membersList.delete(this.oauthSession.did); + + console.log('Peer dids:', membersList); + + const ids = await Promise.all( + [...membersList.values()].map(async (x) => [x, await this.getPeerIdForDid(x)]) + ); + console.log('Peer ids:', ids); + + for (const [did, peerId] of ids) { + if (peerId) { + console.info('Connecting to peer', peerId); + const transport = await this.connectionManager.connect(peerId); + (async () => { + for await (const message of transport) { + for (const roomId of this.data.roomIds()) { + const room = this.data.rooms[roomId]; + if (room.direct && room.members.some((x) => x.id === did)) { + if (this.oauthSession) { + this.data.roomSendMessage( + roomId, + did!, + crypto.randomUUID(), + new TextDecoder().decode(message) + ); + this.changes.notify(); + } + } + } + } + })(); + } + } } /** diff --git a/src/matrix-shim/peerjsFrontend.ts b/src/matrix-shim/peerjsFrontend.ts index 2edb8563f..d9e4809e1 100644 --- a/src/matrix-shim/peerjsFrontend.ts +++ b/src/matrix-shim/peerjsFrontend.ts @@ -30,15 +30,27 @@ export class PeerjsFrontendManager { constructor() { this.peer = new Peer(); this.peer.on('open', (id) => { - console.trace('Peer opened:', id); + console.info('Peer opened:', id); const message: PeerjsFrontendMessage = { type: 'peerOpened', peerId: id }; this.sender.postMessage(message); }); + this.peer.on('disconnected', () => { + console.error('Peer disconnected.'); + }); + this.peer.on('error', (error) => { + console.error('Peer error', error); + }); + // When we receive a connection from outside this.peer.on('connection', (conn) => { + this.addConnectionDataCloseHandlers(conn); + + console.info('Incomming peer connection', conn); + // Add the connection to the list this.connections[conn.connectionId] = conn; + // And send a connected event to the service worker const m: PeerjsFrontendMessage = { type: 'incomingConnected', @@ -51,6 +63,7 @@ export class PeerjsFrontendManager { // When the peer closes this.peer.on('close', () => { + console.info('Peer closed'); // Tell the service worker const m: PeerjsFrontendMessage = { type: 'peerClosed', @@ -60,7 +73,7 @@ export class PeerjsFrontendManager { }); // When we receive a message from our service worker - this.sender.addEventListener('message', (event) => { + this.receiver.addEventListener('message', (event) => { const message: PeerjsBackendMessage = event.data; // If we should connect to another peer @@ -71,10 +84,15 @@ export class PeerjsFrontendManager { // good. // Create the connection + console.info('Connecting to peer:', message.remotePeerId); const conn = this.peer.connect(message.remotePeerId, { reliable: true }); + console.info('Conencted to peer:', conn.peer, conn); + + this.connections[conn.connectionId] = conn; // When the connection opens conn.on('open', () => { + console.info('Connection opened', conn.connectionId); // Tell the service worker the connection has opened. const m: PeerjsFrontendMessage = { type: 'connOpened', @@ -85,34 +103,15 @@ export class PeerjsFrontendManager { this.sender.postMessage(m); }); - // When the connection has data - conn.on('data', (data) => { - // Tell the service worker the connection has opened. - const m: PeerjsFrontendMessage = { - type: 'connData', - peerId: this.peer.id, - connectionId: conn.connectionId, - data, - }; - this.sender.postMessage(m); - }); - - // When the connection closees - conn.on('close', () => { - // Tell the service worker the connection has opened. - const m: PeerjsFrontendMessage = { - type: 'connClosed', - peerId: this.peer.id, - connectionId: conn.connectionId, - }; - this.sender.postMessage(m); - }); + this.addConnectionDataCloseHandlers(conn); // If the service worker wants us to send data - } else if (message.type === 'sendData' && message.peerId === this.peer.id) { + } else if (message.type === 'sendData') { + console.log('wants to send', message); // Get the connection and send it const conn = this.connections[message.connectionId]; if (conn) { + console.info('Sending data to ', conn.peer, message.data); conn.send(message.data); } } else if (message.type === 'getPeerId') { @@ -125,4 +124,38 @@ export class PeerjsFrontendManager { } }); } + + addConnectionDataCloseHandlers(conn: DataConnection) { + conn.on('error', (error) => { + console.error('Peer connection error', error); + }); + conn.on('iceStateChanged', (state) => { + console.log('Connection state change', state); + }); + + // When the connection has data + conn.on('data', (data) => { + console.info('Connection data', conn.connectionId, data); + // Tell the service worker the connection has opened. + const m: PeerjsFrontendMessage = { + type: 'connData', + peerId: this.peer.id, + connectionId: conn.connectionId, + data, + }; + this.sender.postMessage(m); + }); + + // When the connection closees + conn.on('close', () => { + console.info('Connection closed', conn.connectionId); + // Tell the service worker the connection has opened. + const m: PeerjsFrontendMessage = { + type: 'connClosed', + peerId: this.peer.id, + connectionId: conn.connectionId, + }; + this.sender.postMessage(m); + }); + } } diff --git a/src/matrix-shim/peerjsTransport.ts b/src/matrix-shim/peerjsTransport.ts index 4e2b0dd72..0fd3ceccb 100644 --- a/src/matrix-shim/peerjsTransport.ts +++ b/src/matrix-shim/peerjsTransport.ts @@ -95,6 +95,7 @@ export class PeerjsTransport implements Transport { if (data.type === 'connClosed' && data.connectionId === t.connId) { t.isClosed = true; } else if (data.type === 'connData' && data.connectionId === t.connId) { + console.info('Transport Recv', data.data); t.received.push(data.data as Uint8Array); } }); @@ -119,6 +120,7 @@ export class PeerjsTransport implements Transport { } async send(bytes: Uint8Array): Promise { + console.info('Transport Send', bytes); const m: PeerjsBackendMessage = { type: 'sendData', connectionId: this.connId, @@ -151,6 +153,8 @@ export class PeerjsTransport implements Transport { type ConnectHandler = (ev: PeerjsTransport) => unknown; +type OpenCloseHandler = (peerId: string) => unknown; + export class PeerjsConnectionManager { sender = new BroadcastChannel('matrix-shim-peerjs-backend'); @@ -160,46 +164,77 @@ export class PeerjsConnectionManager { transports: PeerjsTransport[] = []; - openHandlers: ((peerId: string) => unknown)[] = []; + peerOpenHandlers: OpenCloseHandler[] = []; + + peerCloseHandlers: OpenCloseHandler[] = []; - closeHandlers: ((peerId: string) => unknown)[] = []; + peerConnectHandlers: ConnectHandler[] = []; - connectHandlers: ConnectHandler[] = []; + constructor(initialHandlers: { + peerOpenHandlers?: OpenCloseHandler[]; + peerCloseHandlers?: OpenCloseHandler[]; + peerConnectHandlers?: ConnectHandler[]; + }) { + if (initialHandlers.peerCloseHandlers) + this.peerCloseHandlers = initialHandlers.peerCloseHandlers; + if (initialHandlers.peerOpenHandlers) this.peerOpenHandlers = initialHandlers.peerOpenHandlers; + if (initialHandlers.peerConnectHandlers) + this.peerConnectHandlers = initialHandlers.peerConnectHandlers; - constructor() { // Make sure the client sends us the peerOpened event. this.sender.postMessage({ type: 'getPeerId' } as PeerjsBackendMessage); this.receiver.addEventListener('message', (event) => { // eslint-disable-next-line prefer-destructuring - const data: PeerjsFrontendMessage = event.data; + const message: PeerjsFrontendMessage = event.data; - if (data.type === 'incomingConnected') { - PeerjsTransport.accept(data).then((transport) => { + // Add incoming connections to transport list + if (message.type === 'incomingConnected') { + PeerjsTransport.accept(message).then((transport) => { + console.info('Accepted peer connection from:', transport.remotePeerId); this.transports.push(transport); - for (const handler of this.connectHandlers) { + for (const handler of this.peerConnectHandlers) { handler(transport); } }); - } else if (data.type === 'peerOpened') { - if (data.peerId !== this.peerId) { - this.peerId = data.peerId; - for (const handler of this.openHandlers) { - handler(data.peerId); + + // Set current peer ID + } else if (message.type === 'peerOpened') { + console.info('Peer opened:', message.peerId); + if (message.peerId !== this.peerId) { + this.peerId = message.peerId; + for (const handler of this.peerOpenHandlers) { + handler(message.peerId); } } - } else if (data.type === 'peerClosed') { - for (const handler of this.closeHandlers) { - handler(data.peerId); + + // Prune transports from closed peer + } else if (message.type === 'peerClosed') { + for (const handler of this.peerCloseHandlers) { + handler(message.peerId); } this.peerId = undefined; + this.transports = this.transports.filter( + (transport) => transport.peerId === message.peerId + ); + + // Prune closed connections + } else if (message.type === 'connClosed') { + this.transports = this.transports.filter( + (transport) => transport.connId !== message.connectionId + ); } }); } async connect(remotePeerId: string): Promise { + const existingTransport = this.transports.find((x) => x.remotePeerId === remotePeerId); + if (existingTransport) return existingTransport; + + console.info('Trying to connect to remote peer:', remotePeerId); const transport = await PeerjsTransport.connect(remotePeerId); + console.info('Connection to peer opened:', remotePeerId); this.transports.push(transport); return transport; } diff --git a/src/sw.ts b/src/sw.ts index 7c1d4b5d7..05b9561e9 100644 --- a/src/sw.ts +++ b/src/sw.ts @@ -49,9 +49,9 @@ globalThis.localStorage = { // Immediately activate new service workers. self.addEventListener('install', async () => { - console.trace('Service worker installed, trying to skip waiting...'); + console.info('Service worker installed, trying to skip waiting...'); await self.skipWaiting(); - console.trace('Service worker done waiting'); + console.info('Service worker done waiting'); // TODO: we may still end up waiting to update if we are currently in the middle of // responding to a request in the old service worker. We need to add an abort controller @@ -65,7 +65,7 @@ self.addEventListener('activate', async () => { // zicklag: I'm not sure what this `waitUntil` was for, but I'm removing it for now. // event.waitUntil(self.clients.claim()); - console.trace('Service worker activated'); + console.info('Service worker activated'); matrixShim = await MatrixShim.init();