Skip to content

Commit

Permalink
feat: handle message add / edit / delete longpoll events
Browse files Browse the repository at this point in the history
  • Loading branch information
danyadev committed Sep 12, 2024
1 parent 2972822 commit d50f9aa
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 31 deletions.
146 changes: 146 additions & 0 deletions src/actions/handleEngineUpdates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import * as IEngine from 'env/IEngine'
import * as Convo from 'model/Convo'
import * as History from 'model/History'
import * as Message from 'model/Message'
import * as Peer from 'model/Peer'
import { insertConvos, insertPeers } from 'actions'
import { fromApiMessage } from 'converters/MessageConverter'
import { useEnv } from 'hooks'
import { PEER_FIELDS } from 'misc/constants'

export async function handleEngineUpdates(fromPts: number, updates: IEngine.Update[]) {
const { api } = useEnv()

const hasUsefulUpdates = updates.some(([eventId]) => (
(eventId >= 10002 && eventId <= 10007) || eventId === 10018
))
if (!hasUsefulUpdates) {
return
}

const glphResponse = await api.fetch('messages.getLongPollHistory', {
pts: fromPts,
lp_version: IEngine.VERSION,
msgs_limit: 1100,
events_limit: 1100,
last_n: 1000,
extended: 1,
fields: PEER_FIELDS
})

if (glphResponse.from_pts > fromPts) {
throw new Error('[handleEngineUpdates] glph история обрезалась')
}

for (const update of updates) {
switch (update[0]) {
case 10002: {
const [, rawCmid, flags, rawPeerId] = update
const isDeleteMessageUpdate = flags & 128
const peerId = Peer.resolveId(rawPeerId)
const cmid = Message.resolveCmid(rawCmid)

const apiConvo = glphResponse.conversations.find((apiConvo) => (
apiConvo.peer.id === peerId
))
if (!apiConvo) {
console.warn('[handleEngineUpdates] no convo', glphResponse, update)
break
}

const apiMessage = glphResponse.messages.items.find((message) => (
message.peer_id === peerId && message.conversation_message_id === cmid
))
const message = apiMessage && fromApiMessage(apiMessage)
if (!message && !isDeleteMessageUpdate) {
console.warn('[handleEngineUpdates] no message', glphResponse, update)
break
}

const convo = Convo.get(peerId)
if (!convo) {
break
}

if (isDeleteMessageUpdate) {
History.removeNode(convo.history, cmid)
} else if (message) {
insertConvos([{ conversation: apiConvo }], {
merge: true,
addToList: false
})

Convo.insertMessages(convo, [message], {
up: true,
down: true,
aroundId: cmid
})
}
break
}

case 10003:
case 10004:
case 10005:
case 10018: {
const rawCmid = update[1]
const rawPeerId = update[0] === 10004 ? update[4] : update[3]
const peerId = Peer.resolveId(rawPeerId)
const cmid = Message.resolveCmid(rawCmid)
const isRestoreMessageUpdate =
update[0] === 10003 && ((update[2] & 64) > 0 || (update[2] & 128) > 0)

const apiConvo = glphResponse.conversations.find((apiConvo) => (
apiConvo.peer.id === peerId
))
if (!apiConvo) {
console.warn('[handleEngineUpdates] no convo', glphResponse, update)
break
}

const apiMessage = glphResponse.messages.items.find((message) => (
message.peer_id === peerId && message.conversation_message_id === cmid
))
const message = apiMessage && fromApiMessage(apiMessage)
if (!message) {
console.warn('[handleEngineUpdates] no message', glphResponse, update)
break
}

insertConvos([{ conversation: apiConvo }], {
merge: true,
addToList: update[0] === 10004 || isRestoreMessageUpdate
})
insertPeers({
profiles: glphResponse.profiles,
groups: glphResponse.groups
})

const convo = Convo.safeGet(peerId)

Convo.insertMessages(convo, [message], {
up: true,
down: true,
aroundId: cmid
})
break
}

case 10006:
case 10007: {
const [, rawPeerId] = update
const peerId = Peer.resolveId(rawPeerId)
const apiConvo = glphResponse.conversations.find((apiConvo) => (
apiConvo.peer.id === peerId
))
if (apiConvo) {
insertConvos([{ conversation: apiConvo }], {
merge: true,
addToList: false
})
}
break
}
}
}
}
1 change: 1 addition & 0 deletions src/actions/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { handleEngineUpdates } from './handleEngineUpdates'
export { insertConvos } from './insertConvos'
export { insertPeers } from './insertPeers'
export { loadConvoHistory } from './loadConvoHistory'
Expand Down
20 changes: 15 additions & 5 deletions src/actions/insertConvos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@ import { useConvosStore } from 'store/convos'
import { usePeersStore } from 'store/peers'
import { fromApiConvo } from 'converters/ConvoConverter'

export function insertConvos(conversations: MessagesConversationWithMessage[]) {
export function insertConvos(
conversations: MessagesConversationWithMessage[],
{ merge = false, addToList = true } = {}
) {
const { convoList, convos } = useConvosStore()
const { peers } = usePeersStore()

for (const apiConvo of conversations) {
const { convo, peer } = fromApiConvo(apiConvo.conversation, apiConvo.last_message)

if (convo) {
convoList.peerIds.push(convo.id)
// Мы держим конву в актуальном состоянии через лонгполл.
// Перезапись конвы в кеше сбросила бы список сообщений
if (!convos.get(convo.id)) {
const localConvo = convos.get(convo.id)

if (localConvo && merge) {
convo.history = localConvo.history
}

if (!localConvo || merge) {
convos.set(convo.id, convo)
}

if (addToList) {
convoList.peerIds.push(convo.id)
}
}

if (peer) {
Expand Down
5 changes: 2 additions & 3 deletions src/actions/loadInitialData.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ENGINE_VERSION } from 'env/Engine'
import * as IEngine from 'env/IEngine'
import { useConvosStore } from 'store/convos'
import { insertConvos, insertPeers } from 'actions'
import { useEnv } from 'hooks'
Expand All @@ -16,7 +16,7 @@ export async function loadInitialData(onError: () => void) {
try {
const [longpollParams, conversations] = await api.fetchParallel([
api.buildMethod('messages.getLongPollServer', {
lp_version: ENGINE_VERSION,
lp_version: IEngine.VERSION,
need_pts: 1
}),
api.buildMethod('messages.getConversations', {
Expand All @@ -34,7 +34,6 @@ export async function loadInitialData(onError: () => void) {

connection.status = 'connected'
engine.start(longpollParams)
engine.stop()

convoList.loading = false
convoList.hasMore = conversations.items.length === CONVOS_PER_PAGE
Expand Down
15 changes: 8 additions & 7 deletions src/env/Engine.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import * as IEngine from 'env/IEngine'
import { MessagesLongpollCredentials } from 'model/api-types/objects/MessagesLongpollCredentials'
import { handleEngineUpdates } from 'actions'
import { toUrlParams } from 'misc/utils'

export const ENGINE_VERSION = 21

const ENGINE_MAX_CONNECTION_DURATION_SEC = 20
const ENGINE_FETCH_TIMEOUT = (ENGINE_MAX_CONNECTION_DURATION_SEC + 5) * 1000
const ENGINE_MODE =
Expand All @@ -16,15 +15,19 @@ const ENGINE_MODE =

export class Engine {
active = false
version = ENGINE_VERSION
version = IEngine.VERSION
credentials: MessagesLongpollCredentials | null = null

async start(credentials: MessagesLongpollCredentials) {
if (this.active) {
throw new Error('[engine] Движок уже запущен')
}

this.active = true
this.credentials = credentials

while (this.active) {
const { server, key, ts } = this.credentials
const { server, key, ts, pts } = this.credentials
const timeoutSignal = AbortSignal.timeout(ENGINE_FETCH_TIMEOUT)

const result = await fetch(`https://${server}?act=a_check`, {
Expand Down Expand Up @@ -52,9 +55,7 @@ export class Engine {
this.credentials.ts = result.ts
this.credentials.pts = result.pts

// for (const update of result.updates) {
// console.log(update)
// }
await handleEngineUpdates(pts, result.updates)
}
}

Expand Down
126 changes: 125 additions & 1 deletion src/env/IEngine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import { MessagesKeyboard } from 'model/api-types/objects/MessagesKeyboard'
import { MessagesMessageAction } from 'model/api-types/objects/MessagesMessage'

export const VERSION = 21

export type Response =
| { ts: number, pts: number, updates: Update[] }
// ts слишком маленький (отстал на 256+ событий) или слишком большой
Expand All @@ -7,4 +12,123 @@ export type Response =
// Указана неверная версия движка
| { failed: 4, min_version: number, max_version: number }

type Update = unknown
export type Update =
| Update10002
| Update10003
| Update10004
| Update10005
| Update10006
| Update10007
| Update10018

// Изменение флагов сообщения
type Update10002 = [
type: 10002,
cmid: number,
flags: number,
peerId: number
]

// Сброс флагов сообщения
type Update10003 = [
type: 10003,
cmid: number,
flags: number,
peerId: number
// В случае восстановления сообщения есть дополнительные поля
]

// Новое сообщение
type Update10004 = [
type: 10004,
cmid: number,
flags: number,
minorId: number,
peerId: number,
timestamp: number,
text: string,
additional: {
from?: number
marked_users?: Array<
| [1, number[]]
| [1, 'online', number[]]
| [1, 'all']
| [2, 'all']
>
source_act?: MessagesMessageAction['type']
source_mid?: string
source_text?: string
source_old_text?: string
source_style?: string
source_chat_local_id?: string
source_message?: string
pinned_at?: string
expire_ttl?: string
ttl?: string
is_expired?: string
is_silent?: string
keyboard?: MessagesKeyboard
has_template?: string
payload?: string
},
attachments: {
attachments?: string
attach1_kind?: string
attach1_type?: string
attach1?: string
attach2_type?: string
attach2?: string
reply?: string
fwd?: string
geo?: string
},
randomId: number,
messageId: number,
updateTimestamp: 0
]

// Редактирование сообщения
type Update10005 = [
type: 10005,
cmid: Update10004[1],
flags: Update10004[2],
peerId: Update10004[4],
timestamp: Update10004[5],
text: Update10004[6],
additional: Update10004[7],
attachments: Update10004[8],
randomId: Update10004[9],
messageId: Update10004[10],
updateTimestampInSeconds: number
]

// Прочтение входящих сообщений
type Update10006 = [
type: 10006,
peerId: number,
lastCmid: number,
inUnreadCount: number
]

// Прочтение исходящих сообщений
type Update10007 = [
type: 10007,
peerId: number,
lastCmid: number,
outUnreadCount: number
]

// Обновление сообщения
type Update10018 = [
type: 10018,
cmid: Update10004[1],
flags: Update10004[2],
peerId: Update10004[4],
timestamp: Update10004[5],
text: Update10004[6],
additional: Update10004[7],
attachments: Update10004[8],
randomId: Update10004[9],
messageId: Update10004[10],
updateTimestampInSeconds: number
]
Loading

0 comments on commit d50f9aa

Please sign in to comment.