Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"dependencies": {
"@electric-sql/pglite": "^0.3.15",
"@guiiai/logg": "catalog:",
"@moeru/eventa": "catalog:",
"@node-rs/jieba": "^2.0.1",
"@tg-search/bot": "workspace:*",
"@tg-search/common": "workspace:*",
Expand Down
104 changes: 75 additions & 29 deletions apps/server/src/account.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
import type { Eventa } from '@moeru/eventa'
import type { Config } from '@tg-search/common'
import type { CoreContext, CoreEmitter, FromCoreEvent } from '@tg-search/core'
import type { CoreContext } from '@tg-search/core'
import type { Peer } from 'crossws'

import { useLogger } from '@guiiai/logg'
import { attachBotToContext, getBotRegistry } from '@tg-search/bot'
import { CoreEventType, createCoreInstance } from '@tg-search/core'
import { coreMessageBatchesProcessedTotal, coreMessagesProcessedTotal, coreMetrics, withSpan } from '@tg-search/observability'

import {
// Notification events from core (broadcast to all peers)
accountReadyEvent,
authCodeNeededEvent,
authConnectedEvent,
authDisconnectedEvent,
authErrorEvent,
authPasswordNeededEvent,
botStatusEvent,
coreErrorEvent,
createCoreInstance,
dialogAvatarDataEvent,
entityAvatarDataEvent,
entityMeDataEvent,
gramMessageReceivedEvent,
messageDataEvent,
messageFetchProgressEvent,
messageProcessEvent,
onEvent,
sessionUpdateEvent,
syncStatusEvent,
takeoutMetricsEvent,
takeoutTaskProgressEvent,
} from '@tg-search/core'
import { coreMessageBatchesProcessedTotal, coreMessagesProcessedTotal, coreMetrics } from '@tg-search/observability'

import { sendWsEvent } from './events'
import { getDB } from './storage/drizzle'
import { getMediaStorage } from './storage/media'

Expand All @@ -20,8 +45,6 @@ import { getMediaStorage } from './storage/media'
* Risks:
* - Long-lived accounts increase memory usage; monitor active account count.
*/
export type CoreEventListener = (...args: unknown[]) => void

export interface AccountState {
ctx: CoreContext

Expand All @@ -30,11 +53,6 @@ export interface AccountState {
*/
accountReady: boolean

/**
* Core event listeners (registered once, shared by all WebSocket connections)
*/
coreEventListeners: Map<keyof FromCoreEvent, CoreEventListener>

/**
* Active WebSocket peers for this account
*/
Expand All @@ -51,24 +69,52 @@ export const accountStates = new Map<string, AccountState>()
// Ephemeral per-peer bookkeeping.
export const peerToAccountId = new Map<string, string>()

// We need to track peer objects for broadcasting
// NOTICE: peerObjects is populated in app.ts (WebSocket open/close handlers).
// The sonarjs/no-empty-collection rule cannot track cross-file mutations.

export const peerObjects = new Map<string, Peer>()

function bindTracingMetaToSpan(emitter: CoreEmitter) {
// Ensure tracingId from incoming meta is bound into active span for all core handlers
const originalOn = emitter.on.bind(emitter)
emitter.on = ((event, listener) => {
return originalOn(event, (...args: Parameters<typeof listener>) => {
return withSpan(String(event), () => listener(...args))
})
}) as CoreEmitter['on']
/**
* All notification events from core that should be broadcast to WebSocket peers.
* Each entry maps a typed Eventa event to its wire protocol event name.
*/
const notificationEvents: Array<{ event: Eventa<any>, wireName: string }> = [
{ event: coreErrorEvent, wireName: 'core:error' },
{ event: authCodeNeededEvent, wireName: 'auth:code:needed' },
{ event: authPasswordNeededEvent, wireName: 'auth:password:needed' },
{ event: authConnectedEvent, wireName: 'auth:connected' },
{ event: authDisconnectedEvent, wireName: 'auth:disconnected' },
{ event: authErrorEvent, wireName: 'auth:error' },
{ event: sessionUpdateEvent, wireName: 'session:update' },
{ event: accountReadyEvent, wireName: 'account:ready' },
{ event: messageDataEvent, wireName: 'message:data' },
{ event: messageFetchProgressEvent, wireName: 'message:fetch:progress' },
{ event: dialogAvatarDataEvent, wireName: 'dialog:avatar:data' },
Comment on lines +90 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Forward message:processed in server notification bridge

The new server-side static notificationEvents list omits message:processed, even though this commit still defines it as a core notification in the shared dispatch map. In WebSocket/server mode, clients that register for message:processed will no longer receive batch completion events, creating a behavior regression and inconsistency versus core-bridge mode where this notification is still wired.

Useful? React with 👍 / 👎.

{ event: entityMeDataEvent, wireName: 'entity:me:data' },
{ event: entityAvatarDataEvent, wireName: 'entity:avatar:data' },
{ event: takeoutTaskProgressEvent, wireName: 'takeout:task:progress' },
{ event: takeoutMetricsEvent, wireName: 'takeout:metrics' },
{ event: gramMessageReceivedEvent, wireName: 'gram:message:received' },
{ event: botStatusEvent, wireName: 'bot:status' },
{ event: syncStatusEvent, wireName: 'sync:status' },
]

const originalOnce = emitter.once.bind(emitter)
emitter.once = ((event, listener) => {
return originalOnce(event, (...args: Parameters<typeof listener>) => {
return withSpan(String(event), () => listener(...args))
/**
* Register all notification event listeners on the core context.
* Each notification is broadcast to all active WebSocket peers for this account.
*/
function registerNotificationListeners(account: AccountState) {
for (const { event, wireName } of notificationEvents) {
onEvent(account.ctx.ctx, event, (body) => {
account.activePeers.forEach((peerId) => {
// eslint-disable-next-line sonarjs/no-empty-collection -- peerObjects is populated in app.ts
const targetPeer = peerObjects.get(peerId)
if (targetPeer) {
sendWsEvent(targetPeer, wireName, body)
}
})
})
}) as CoreEmitter['once']
}
}

export function getOrCreateAccount(accountId: string, config: Config): AccountState {
Expand All @@ -79,19 +125,19 @@ export function getOrCreateAccount(accountId: string, config: Config): AccountSt

const ctx = createCoreInstance(getDB, config, getMediaStorage(), logger, coreMetrics)

bindTracingMetaToSpan(ctx.emitter)

const account: AccountState = {
ctx,
accountReady: false,
coreEventListeners: new Map(),
activePeers: new Set(),
createdAt: Date.now(),
lastActive: Date.now(),
}

// Register all notification listeners upfront (replaces server:event:register protocol)
registerNotificationListeners(account)

// Instrument core message processing for this account
ctx.emitter.on(CoreEventType.MessageProcess, ({ messages, isTakeout }) => {
onEvent(ctx.ctx, messageProcessEvent, ({ messages, isTakeout }) => {
const source = isTakeout ? 'takeout' : 'realtime'
coreMessageBatchesProcessedTotal.add(1, { source })
coreMessagesProcessedTotal.add(messages.length, { source })
Expand Down
49 changes: 13 additions & 36 deletions apps/server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,32 @@

import type { Logger } from '@guiiai/logg'
import type { Config } from '@tg-search/common'
import type { ExtractData, FromCoreEvent, ToCoreEvent } from '@tg-search/core'
import type { H3 } from 'h3'

import type { AccountState, CoreEventListener } from './account'
import type { WsEventToClientData, WsMessageToServer } from './events'
import type { AccountState } from './account'
import type { WsMessageToServer } from './events'

import { useLogger } from '@guiiai/logg'
import { CoreEventType, destroyCoreInstance } from '@tg-search/core'
import { accountReadyEvent, authLoginEvent, authLogoutEvent, destroyCoreInstance } from '@tg-search/core'
import { coreEventsInTotal, wsConnectionsActive } from '@tg-search/observability'
import { defineWebSocketHandler, HTTPError } from 'h3'
import { v4 as uuidv4 } from 'uuid'

import { accountStates, getOrCreateAccount, peerObjects, peerToAccountId } from './account'
import { dispatchClientEvent, isCoreEvent } from './event-dispatch'
import { sendWsEvent } from './events'

const WS_MODE_LABEL = 'server' as const

export function registerCoreEventListeners(logger: Logger, account: AccountState, accountId: string, eventName: keyof FromCoreEvent) {
if (eventName.startsWith('server:')) {
return
}

if (!account.coreEventListeners.has(eventName)) {
const listener: CoreEventListener = (...args: unknown[]) => {
const data = args[0] as WsEventToClientData<typeof eventName>
account.activePeers.forEach((peerId) => {
const targetPeer = peerObjects.get(peerId)
if (targetPeer) {
sendWsEvent(targetPeer, eventName, data)
}
})
}

account.ctx.emitter.on(eventName, listener)
account.coreEventListeners.set(eventName, listener)

logger.withFields({ eventName, accountId }).debug('Registered shared core event listener')
}
}

export async function updateAccountState(logger: Logger, account: AccountState, accountId: string, eventName: keyof ToCoreEvent) {
export async function updateAccountState(logger: Logger, account: AccountState, accountId: string, eventName: string) {
// Update account state based on events
switch (eventName) {
case CoreEventType.AuthLogin:
account.ctx.emitter.once(CoreEventType.AccountReady, () => {
case authLoginEvent.id:
account.ctx.ctx.once(accountReadyEvent, () => {
account.accountReady = true
})
break
case CoreEventType.AuthLogout:
case authLogoutEvent.id:
account.accountReady = false
logger.withFields({ accountId }).log('User logged out, destroying account')
await destroyCoreInstance(account.ctx)
Expand Down Expand Up @@ -122,23 +99,23 @@ export function setupWsRoutes(app: H3, config: Config) {
const event = message.json<WsMessageToServer>()

try {
// Skip the old server:event:register protocol — notifications are registered upfront
if (event.type === 'server:event:register') {
registerCoreEventListeners(logger, account, accountId, event.data.event as keyof FromCoreEvent)
return
}

const tracingId = event.meta?.tracingId || uuidv4()

logger.withFields({ type: event.type, accountId, tracingId }).verbose('Message received')

if (!event.type.startsWith('server:')) {
if (isCoreEvent(event.type)) {
coreEventsInTotal.add(1, { event_name: event.type })
}

// Emit to core context (meta.tracingId is re-bound via emitter on/once wrappers)
account.ctx.emitter.emit(event.type, { ...event.data, meta: { tracingId } } as ExtractData<keyof ToCoreEvent>)
// Dispatch to core via Eventa events
await dispatchClientEvent(account.ctx, event.type, event.data, peer)

updateAccountState(logger, account, accountId, event.type as keyof ToCoreEvent)
updateAccountState(logger, account, accountId, event.type)
}
catch (error) {
logger.withError(error).error('Handle websocket message failed')
Expand Down
45 changes: 45 additions & 0 deletions apps/server/src/event-dispatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Server-side event dispatch for bridging JSON-over-WebSocket messages to Eventa events.
*
* Uses shared event maps from @tg-search/core and adds server-specific
* dispatch logic (invoking RPCs and sending responses back to the calling peer).
*/
import type { CoreContext } from '@tg-search/core'
import type { Peer } from 'crossws'

import { defineInvoke } from '@moeru/eventa'
import { fireAndForgetEvents, isCoreEvent, rpcEvents } from '@tg-search/core'

import { sendWsEvent } from './events'

export { isCoreEvent }

/**
* Dispatch an incoming client event to the core Eventa context.
*
* - Fire-and-forget events are emitted directly.
* - RPC invokes are awaited and their response is sent back to the calling peer.
*/
export async function dispatchClientEvent(
ctx: CoreContext,
eventName: string,
data: any,
peer: Peer,
): Promise<void> {
// Try RPC invoke first
const rpc = rpcEvents.get(eventName)
if (rpc) {
const invoke = defineInvoke(ctx.ctx, rpc.invoke)
const result = await invoke(data)
sendWsEvent(peer, rpc.responseEvent, result)
return
}

// Try fire-and-forget
const event = fireAndForgetEvents.get(eventName)
if (event) {
ctx.ctx.emit(event, data)
}

// Unknown event — skip silently (may be server-only)
}
Loading