Skip to content

Commit d9cee20

Browse files
authored
Merge pull request #284 from ProverCoderAI/issue-282
Add federation exchange status observability
2 parents ca1b4c4 + 59f9cbb commit d9cee20

5 files changed

Lines changed: 259 additions & 1 deletion

File tree

packages/api/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ Optional env:
9696
- `GET /federation/followers`
9797
- `GET /federation/following`
9898
- `GET /federation/liked`
99+
- `GET /federation/exchange/status` (connection summary and recent exchange events)
99100
- `POST /federation/exchange/subscriptions` (discover remote actor, persist metadata, send signed `Follow`)
100101
- `GET /federation/exchange/subscriptions`
101102
- `POST /federation/exchange/poll` (manual remote outbox poll)
@@ -131,10 +132,13 @@ Exchange targets must be explicit. Use `https://exchange.lefine.pro`, an actor U
131132
}'
132133

133134
./ctl request POST /federation/exchange/poll '{}'
135+
./ctl request GET /federation/exchange/status
134136
./ctl request GET /federation/exchange/subscriptions
135137
./ctl request GET /federation/issues
136138
```
137139

140+
`GET /federation/exchange/status` is the live observability endpoint for a Lefine connection. It reports subscription counts, accepted/pending/rejected state, `lastInboxAt`, `lastPollAt`, persisted issue count, processed outbox items, and recent events such as `follow.sent`, `inbox.follow.accept`, `inbox.issue.received`, and `poll.completed`.
141+
138142
When a polled `Create(Ticket)` has no GitHub URL in the Ticket payload, `projectRepoUrl` or `DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL` is required for the automatic docker-git project/agent run.
139143

140144
1. Read actor profile (contains `inbox/outbox/followers/following/liked`):

packages/api/src/api/contracts.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,57 @@ export type ExchangePollResult = {
662662
readonly failedItems: number
663663
}
664664

665+
export type FederationExchangeEventKind =
666+
| "follow.sent"
667+
| "inbox.follow.accept"
668+
| "inbox.follow.reject"
669+
| "inbox.issue.received"
670+
| "poll.completed"
671+
672+
export type FederationExchangeEvent = {
673+
readonly id: string
674+
readonly kind: FederationExchangeEventKind
675+
readonly occurredAt: string
676+
readonly subscriptionId?: string | undefined
677+
readonly target?: string | undefined
678+
readonly queue?: string | undefined
679+
readonly status?: FollowStatus | undefined
680+
readonly issueId?: string | undefined
681+
readonly remoteActor?: string | undefined
682+
readonly totalItems?: number | undefined
683+
readonly newItems?: number | undefined
684+
readonly processedItems?: number | undefined
685+
readonly failedItems?: number | undefined
686+
}
687+
688+
export type FederationExchangeStatusSubscription = {
689+
readonly id: string
690+
readonly target: string
691+
readonly queue?: string | undefined
692+
readonly status: FollowStatus
693+
readonly remoteActor?: string | undefined
694+
readonly remoteInbox?: string | undefined
695+
readonly remoteOutbox?: string | undefined
696+
readonly createdAt: string
697+
readonly updatedAt: string
698+
}
699+
700+
export type FederationExchangeStatus = {
701+
readonly publicActor: string
702+
readonly summary: {
703+
readonly subscriptions: number
704+
readonly accepted: number
705+
readonly pending: number
706+
readonly rejected: number
707+
readonly issues: number
708+
readonly processedOutboxItems: number
709+
readonly lastInboxAt?: string | undefined
710+
readonly lastPollAt?: string | undefined
711+
}
712+
readonly subscriptions: ReadonlyArray<FederationExchangeStatusSubscription>
713+
readonly recentEvents: ReadonlyArray<FederationExchangeEvent>
714+
}
715+
665716
export type ApiEventType =
666717
| "snapshot"
667718
| "project.created"

packages/api/src/http.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ import {
7070
listFollowSubscriptions,
7171
makeFederationActorDocument,
7272
makeFederationContext,
73+
makeFederationExchangeStatus,
7374
makeFederationFollowersCollection,
7475
makeFederationFollowingCollection,
7576
makeFederationLikedCollection,
@@ -824,6 +825,14 @@ export const makeRouter = () => {
824825
return yield* _(activityJsonResponse(makeFederationLikedCollection(context), 200))
825826
}).pipe(Effect.catchAll(errorResponse))
826827
),
828+
HttpRouter.get(
829+
"/federation/exchange/status",
830+
Effect.gen(function*(_) {
831+
const request = yield* _(HttpServerRequest.HttpServerRequest)
832+
const context = yield* _(resolveFederationContext(request))
833+
return yield* _(jsonResponse(makeFederationExchangeStatus(context), 200))
834+
}).pipe(Effect.catchAll(errorResponse))
835+
),
827836
HttpRouter.post(
828837
"/federation/exchange/subscriptions",
829838
Effect.gen(function*(_) {

packages/api/src/services/federation.ts

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import type {
2222
CreateProjectRequest,
2323
ExchangePollRequest,
2424
ExchangeSubscribeRequest,
25+
FederationExchangeEvent,
26+
FederationExchangeStatus,
2527
FederationInboxResult,
2628
FederationIssueRecord,
2729
FollowStatus,
@@ -47,6 +49,7 @@ type StoredFederationState = {
4749
readonly issues: ReadonlyArray<FederationIssueRecord>
4850
readonly follows: ReadonlyArray<FollowSubscription>
4951
readonly processedOutboxItems: ReadonlyArray<string>
52+
readonly exchangeEvents?: ReadonlyArray<FederationExchangeEvent> | undefined
5053
readonly localActorKeys?: LocalActorKeys | undefined
5154
}
5255

@@ -97,12 +100,14 @@ const jsonLdContentType = "application/ld+json; profile=\"https://www.w3.org/ns/
97100
const activityAcceptHeader = `${jsonLdContentType}, ${activityJsonContentType}, application/json`
98101
const defaultExchangeQueue = "code"
99102
const stateVersion = 1 as const
103+
const exchangeEventLimit = 100
100104

101105
const issueStore: Map<string, FederationIssueRecord> = new Map()
102106
const followStore: Map<string, FollowSubscription> = new Map()
103107
const followByActivityId: Map<string, string> = new Map()
104108
const followByActorObject: Map<string, string> = new Map()
105109
const processedOutboxItems: Set<string> = new Set()
110+
let exchangeEvents: ReadonlyArray<FederationExchangeEvent> = []
106111
let localActorKeys: LocalActorKeys | null = null
107112
let stateLoaded = false
108113

@@ -120,6 +125,33 @@ const asNonEmptyString = (value: unknown): string | null =>
120125
const readOptionalString = (record: JsonRecord, key: string): string | undefined =>
121126
asNonEmptyString(record[key]) ?? undefined
122127

128+
type FederationExchangeEventDraft = Omit<FederationExchangeEvent, "id" | "occurredAt"> & {
129+
readonly occurredAt?: string | undefined
130+
}
131+
132+
const exchangeSubscriptionTarget = (subscription: FollowSubscription): string =>
133+
subscription.subscriptionName ?? subscription.remoteActor ?? subscription.object
134+
135+
const findExchangeSubscriptionByActor = (actor: string | undefined): FollowSubscription | undefined =>
136+
actor === undefined
137+
? undefined
138+
: [...followStore.values()].find((subscription) =>
139+
subscription.remoteOutbox !== undefined &&
140+
(subscription.remoteActor === actor || subscription.object === actor)
141+
)
142+
143+
const recordExchangeEvent = (event: FederationExchangeEventDraft): FederationExchangeEvent => {
144+
const { occurredAt, ...details } = event
145+
const stored: FederationExchangeEvent = {
146+
id: randomUUID(),
147+
occurredAt: occurredAt ?? nowIso(),
148+
...details
149+
}
150+
exchangeEvents = [...exchangeEvents, stored].slice(-exchangeEventLimit)
151+
persistFederationStateBestEffort()
152+
return stored
153+
}
154+
123155
const readRequiredString = (
124156
record: JsonRecord,
125157
key: string,
@@ -302,6 +334,7 @@ const serializeState = (): StoredFederationState => ({
302334
issues: [...issueStore.values()],
303335
follows: [...followStore.values()],
304336
processedOutboxItems: [...processedOutboxItems],
337+
exchangeEvents,
305338
...(localActorKeys === null ? {} : { localActorKeys })
306339
})
307340

@@ -377,6 +410,7 @@ const hydrateState = (state: StoredFederationState): void => {
377410
followByActivityId.clear()
378411
followByActorObject.clear()
379412
processedOutboxItems.clear()
413+
exchangeEvents = []
380414

381415
for (const issue of state.issues ?? []) {
382416
issueStore.set(issue.issueId, issue)
@@ -387,6 +421,7 @@ const hydrateState = (state: StoredFederationState): void => {
387421
for (const item of state.processedOutboxItems ?? []) {
388422
processedOutboxItems.add(item)
389423
}
424+
exchangeEvents = [...(state.exchangeEvents ?? [])].slice(-exchangeEventLimit)
390425
localActorKeys = state.localActorKeys ?? null
391426
}
392427

@@ -740,6 +775,22 @@ const ingestCreateTicket = (
740775
return issue
741776
})
742777

778+
const recordIssueReceivedEvent = (
779+
issue: FederationIssueRecord,
780+
options: IngestOptions
781+
): void => {
782+
const remoteActor = issue.actor ?? issue.ticket.attributedTo
783+
const subscription = options.subscription ?? findExchangeSubscriptionByActor(remoteActor)
784+
recordExchangeEvent({
785+
kind: "inbox.issue.received",
786+
subscriptionId: subscription?.id,
787+
target: subscription === undefined ? undefined : exchangeSubscriptionTarget(subscription),
788+
queue: subscription?.queue,
789+
issueId: issue.issueId,
790+
remoteActor: subscription?.remoteActor ?? remoteActor
791+
})
792+
}
793+
743794
// CHANGE: support ForgeFed issue inputs and ActivityPub inbox transitions in API mode.
744795
// WHY: issue #233 requires ForgeFed/ActivityPub subscription and task intake.
745796
// QUOTE(ТЗ): "Осталось forgefed допподержать" + "Законнектишь к exchange"
@@ -768,23 +819,34 @@ export const ingestFederationInbox = (
768819

769820
if (hasType(record, "Offer")) {
770821
const issue = yield* _(ingestOfferTicket(record))
822+
recordIssueReceivedEvent(issue, options)
771823
return { kind: "issue.offer", issue }
772824
}
773825

774826
if (hasType(record, "Create")) {
775827
const issue = yield* _(ingestCreateTicket(record, options))
828+
recordIssueReceivedEvent(issue, options)
776829
return { kind: "issue.create", issue }
777830
}
778831

779832
if (hasType(record, "Ticket")) {
780833
const issue = yield* _(ingestDirectTicket(record))
834+
recordIssueReceivedEvent(issue, options)
781835
return { kind: "issue.ticket", issue }
782836
}
783837

784838
if (hasType(record, "Accept") || hasType(record, "Reject")) {
785839
const subscription = yield* _(resolveFollowFromInbox(record))
786840
const status: FollowStatus = hasType(record, "Accept") ? "accepted" : "rejected"
787841
const updated = updateFollowStatus(subscription, status)
842+
recordExchangeEvent({
843+
kind: status === "accepted" ? "inbox.follow.accept" : "inbox.follow.reject",
844+
subscriptionId: updated.id,
845+
target: exchangeSubscriptionTarget(updated),
846+
queue: updated.queue,
847+
status: updated.status,
848+
remoteActor: updated.remoteActor
849+
})
788850
return status === "accepted"
789851
? { kind: "follow.accept", subscription: updated }
790852
: { kind: "follow.reject", subscription: updated }
@@ -1242,6 +1304,14 @@ export const ensureExchangeSubscription = (
12421304
if (inbox !== undefined) {
12431305
yield* _(sendJsonLd(context, inbox, activity).pipe(Effect.ignore))
12441306
}
1307+
recordExchangeEvent({
1308+
kind: "follow.sent",
1309+
subscriptionId: subscription.id,
1310+
target: exchangeSubscriptionTarget(subscription),
1311+
queue: subscription.queue,
1312+
status: subscription.status,
1313+
remoteActor: subscription.remoteActor
1314+
})
12451315

12461316
return { subscription, activity }
12471317
})
@@ -1255,12 +1325,65 @@ export const listFollowSubscriptions = (): ReadonlyArray<FollowSubscription> =>
12551325
export const listExchangeSubscriptions = (): ReadonlyArray<FollowSubscription> =>
12561326
listFollowSubscriptions().filter((subscription) => subscription.remoteOutbox !== undefined)
12571327

1328+
const latestIso = (values: ReadonlyArray<string | undefined>): string | undefined =>
1329+
values
1330+
.filter((value): value is string => value !== undefined && value.length > 0)
1331+
.sort()
1332+
.at(-1)
1333+
1334+
export const makeFederationExchangeStatus = (
1335+
context: FederationContext
1336+
): FederationExchangeStatus => {
1337+
const subscriptions = listExchangeSubscriptions()
1338+
const recentEvents = [...exchangeEvents].sort((left, right) => right.occurredAt.localeCompare(left.occurredAt))
1339+
const accepted = subscriptions.filter((subscription) => subscription.status === "accepted").length
1340+
const pending = subscriptions.filter((subscription) => subscription.status === "pending").length
1341+
const rejected = subscriptions.filter((subscription) => subscription.status === "rejected").length
1342+
const inboxEventTimes = exchangeEvents
1343+
.filter((event) => event.kind === "inbox.follow.accept" || event.kind === "inbox.follow.reject" || event.kind === "inbox.issue.received")
1344+
.map((event) => event.occurredAt)
1345+
const acceptedTransitionTimes = subscriptions
1346+
.filter((subscription) => subscription.status === "accepted" || subscription.status === "rejected")
1347+
.map((subscription) => subscription.updatedAt)
1348+
1349+
return {
1350+
publicActor: context.actorId,
1351+
summary: {
1352+
subscriptions: subscriptions.length,
1353+
accepted,
1354+
pending,
1355+
rejected,
1356+
issues: issueStore.size,
1357+
processedOutboxItems: processedOutboxItems.size,
1358+
lastInboxAt: latestIso([...inboxEventTimes, ...acceptedTransitionTimes]),
1359+
lastPollAt: latestIso(
1360+
exchangeEvents
1361+
.filter((event) => event.kind === "poll.completed")
1362+
.map((event) => event.occurredAt)
1363+
)
1364+
},
1365+
subscriptions: subscriptions.map((subscription) => ({
1366+
id: subscription.id,
1367+
target: exchangeSubscriptionTarget(subscription),
1368+
queue: subscription.queue,
1369+
status: subscription.status,
1370+
remoteActor: subscription.remoteActor,
1371+
remoteInbox: subscription.remoteInbox,
1372+
remoteOutbox: subscription.remoteOutbox,
1373+
createdAt: subscription.createdAt,
1374+
updatedAt: subscription.updatedAt
1375+
})),
1376+
recentEvents
1377+
}
1378+
}
1379+
12581380
export const clearFederationState = (): void => {
12591381
issueStore.clear()
12601382
followStore.clear()
12611383
followByActivityId.clear()
12621384
followByActorObject.clear()
12631385
processedOutboxItems.clear()
1386+
exchangeEvents = []
12641387
localActorKeys = null
12651388
stateLoaded = true
12661389
}
@@ -1377,8 +1500,19 @@ export const pollExchangeOutboxes = (
13771500
}
13781501
}
13791502

1503+
const polledAt = nowIso()
1504+
recordExchangeEvent({
1505+
kind: "poll.completed",
1506+
occurredAt: polledAt,
1507+
target: request.target,
1508+
totalItems,
1509+
newItems,
1510+
processedItems,
1511+
failedItems
1512+
})
1513+
13801514
return {
1381-
polledAt: nowIso(),
1515+
polledAt,
13821516
subscriptions: subscriptions.length,
13831517
totalItems,
13841518
newItems,

0 commit comments

Comments
 (0)