Skip to content

Commit

Permalink
feat: create recon feed and consume events (#3161)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 authored Feb 15, 2024
1 parent 242c833 commit 0317921
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 18 deletions.
12 changes: 11 additions & 1 deletion packages/cli/src/ipfs-connection-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@ export class IpfsConnectionFactory {
ipfsEndpoint?: string
): Promise<IpfsApi> {
if (mode == IpfsMode.REMOTE) {
return ipfsClient.create({
const ipfsApi = ipfsClient.create({
url: ipfsEndpoint,
timeout: IPFS_GET_TIMEOUT,
agent: this.ipfsHttpAgent(ipfsEndpoint),
})

// TODO: WS1-1483 We utilize the `ipfs.config.get` method to retrieve the api address to use for recon calls. This prevents us from making unneccessary config changes until we are able to retrieve config data from the recon/rust-ceramic node.
ipfsApi.config.get = async (key: string): Promise<string | object> => {
if (key === 'Addresses.API') {
return ipfsEndpoint
}
return ''
}

return ipfsApi
} else {
return this.createGoIPFS()
}
Expand Down
9 changes: 7 additions & 2 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import type { AnchorService } from './anchor/anchor-service.js'
import { AnchorRequestCarBuilder } from './anchor/anchor-request-car-builder.js'
import { makeStreamLoaderAndUpdater } from './initialization/stream-loading.js'
import { Feed, type PublicFeed } from './feed.js'
import { ReconApi } from './recon.js'
import { IReconApi, ReconApi } from './recon.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -142,6 +142,7 @@ export interface CeramicModules {
anchorRequestCarBuilder: AnchorRequestCarBuilder
feed: Feed
signer: CeramicSigner
reconApi: IReconApi
}

/**
Expand Down Expand Up @@ -292,7 +293,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
this.nodeStatus.bind(this),
pinApi,
this.providersCache,
this.loadStream.bind(this)
this.loadStream.bind(this),
modules.reconApi
)
}

Expand Down Expand Up @@ -384,6 +386,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
{
enabled: Boolean(process.env.CERAMIC_RECON_MODE),
url: ipfs.config.get('Addresses.API').then((url) => url.toString()),
// TODO: WS1-1487 not an official ceramic config option
feedEnabled: config.reconFeedEnabled ?? true,
},
logger
)
Expand Down Expand Up @@ -441,6 +445,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
anchorRequestCarBuilder,
feed,
signer,
reconApi,
}

return [modules, params]
Expand Down
14 changes: 9 additions & 5 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ export class Dispatcher {

async init() {
if (process.env.CERAMIC_RECON_MODE) {
await this.recon.init()
return
}
this.messageBus.subscribe(this.handleMessage.bind(this))
Expand Down Expand Up @@ -424,13 +423,18 @@ export class Dispatcher {
* @param cid - Commit CID
* @param streamId - StreamID of the stream the commit belongs to, used for logging.
*/
async retrieveCommit(cid: CID | string, streamId: StreamID): Promise<any> {
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
} catch (e) {
this._logger.err(
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${streamId.toString()}: ${e}`
)
if (streamId) {
this._logger.err(
`Error while loading commit CID ${cid.toString()} from IPFS for stream ${streamId.toString()}: ${e}`
)
} else {
this._logger.err(`Error while loading commit CID ${cid.toString()} from IPFS: ${e}`)
}

throw e
}
}
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/local-admin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { convertCidToEthHash } from '@ceramicnetwork/anchor-utils'
import type { LocalIndexApi, ISyncApi } from '@ceramicnetwork/indexing'
import { ProvidersCache } from './providers-cache.js'
import { Provider } from '@ethersproject/providers'
import { IReconApi } from './recon.js'

type NodeStatusFn = () => Promise<NodeStatusResponse>
type LoadStreamFn<T> = (streamId: StreamID | CommitID | string, opts?: LoadOpts) => Promise<T>
Expand All @@ -31,7 +32,8 @@ export class LocalAdminApi implements AdminApi {
private readonly nodeStatusFn: NodeStatusFn, // TODO(CDB-2293): circular dependency back into Ceramic
private readonly pinApi: PinApi,
private readonly providersCache: ProvidersCache,
private readonly loadStream: LoadStreamFn<Model> // TODO(CDB-2293): circular dependency back into Ceramic
private readonly loadStream: LoadStreamFn<Model>, // TODO(CDB-2293): circular dependency back into Ceramic
private readonly recon: IReconApi
) {}

async nodeStatus(): Promise<NodeStatusResponse> {
Expand Down Expand Up @@ -92,6 +94,10 @@ export class LocalAdminApi implements AdminApi {
async startIndexingModelData(modelData: Array<ModelData>): Promise<void> {
await this.indexApi.indexModels(modelData)

if (this.recon.enabled) {
await Promise.all(modelData.map(({ streamID }) => this.recon.registerInterest(streamID)))
}

if (!this.syncApi.enabled) {
return
}
Expand Down
146 changes: 141 additions & 5 deletions packages/core/src/recon.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { type CAR } from 'cartonne'
import {
Subject,
defer,
concatMap,
expand,
of,
retry,
timer,
Observable,
Subscription,
filter,
tap,
takeUntil,
Subscriber,
TeardownLogic,
} from 'rxjs'

import { DiagnosticsLogger, FetchRequest, fetchJson, AbortOptions } from '@ceramicnetwork/common'
import { EventID, StreamID } from '@ceramicnetwork/streamid'
import { Model } from '@ceramicnetwork/stream-model'

const DEFAULT_POLL_INTERVAL = 1_000 // 1 seconds

/**
* Configuration for the Recon API
*/
Expand All @@ -12,6 +30,8 @@ export type ReconApiConfig = {
enabled: boolean
// URL of the Recon API or a promise that resolves to the URL
url: string | Promise<string>
// Whether the event feed is enabled
feedEnabled: boolean
}

/**
Expand All @@ -22,41 +42,81 @@ export interface ReconEvent {
data: CAR
}

/**
* Recon Event Feed Response
*/
export interface ReconEventFeedResponse {
events: Array<ReconEvent>
cursor: number
}

/**
* Recon API Interface
*/
export interface IReconApi {
init(): Promise<void>
export interface IReconApi extends Observable<ReconEventFeedResponse> {
init(initialCursor?: number): Promise<void>
registerInterest(model: StreamID): Promise<void>
put(event: ReconEvent, opts?: AbortOptions): Promise<void>
enabled: boolean
stop(): void
}

export class ReconApi implements IReconApi {
export class ReconApi extends Observable<ReconEventFeedResponse> implements IReconApi {
readonly #config: ReconApiConfig
readonly #logger: DiagnosticsLogger
readonly #sendRequest: FetchRequest
#url: string
#initialized = false

readonly #pollInterval: number
#eventsSubscription: Subscription
private readonly feed$: Subject<ReconEventFeedResponse> = new Subject<ReconEventFeedResponse>()
readonly #stopSignal: Subject<void> = new Subject<void>()

constructor(
config: ReconApiConfig,
logger: DiagnosticsLogger,
sendRequest: FetchRequest = fetchJson
sendRequest: FetchRequest = fetchJson,
pollInterval: number = DEFAULT_POLL_INTERVAL
) {
super((subscriber: Subscriber<ReconEventFeedResponse>): TeardownLogic => {
return this.feed$.subscribe(subscriber)
})

this.#config = config
this.#logger = logger
this.#sendRequest = sendRequest
this.#pollInterval = pollInterval
}

async init(): Promise<void> {
/**
* Initialization tasks. Registers interest in the model and starts polling the feed for new events to emit to subscribers.
* @param initialCursor
* @returns
*/
async init(initialCursor = 0): Promise<void> {
if (this.#initialized) {
return
}

this.#initialized = true

if (!this.enabled) {
return
}

this.#url = await this.#config.url
await this.registerInterest(Model.MODEL)

if (this.#config.feedEnabled) {
this.#eventsSubscription = this.createSubscription(initialCursor).subscribe(this.feed$)
}
}

/**
* Registers interest in a model
* @param model stream id of the model to register interest in
*/
async registerInterest(model: StreamID): Promise<void> {
if (!this.enabled) {
throw new Error(`Recon: disabled, not registering interest in model ${model.toString()}`)
Expand All @@ -75,6 +135,12 @@ export class ReconApi implements IReconApi {
}
}

/**
* Put an event to the Recon API
* @param event The event to put
* @param opts Abort options
* @returns
*/
async put(event: ReconEvent, opts: AbortOptions): Promise<void> {
if (!this.enabled) {
this.#logger.imp(`Recon: disabled, not putting event ${event.id}`)
Expand All @@ -97,7 +163,77 @@ export class ReconApi implements IReconApi {
}
}

/**
* Whether the Recon API is enabled
*/
get enabled(): boolean {
return this.#config.enabled
}

/**
* Stops the Recon API. This stops the polling and sends a complete signal to subscribers.
*/
stop(): void {
this.#stopSignal.next()
this.#stopSignal.complete()
if (this.#eventsSubscription) {
this.#eventsSubscription.unsubscribe()
}
this.feed$.complete()
}

/**
* Polls the Recon API for new events using the feed endpoint. This is a turned into an observable that emits the events.
* @param initialCursor The cursor to start polling from
* @returns An observable that emits the events and cursor so it can be stored and used to resume polling during restart
*/
private createSubscription(initialCursor: number): Observable<ReconEventFeedResponse> {
// start event
return of({ events: [], cursor: initialCursor, first: true }).pipe(
// projects the starting event to an Observable that emits the next events. Then it recursively projects each event to an Observable that emits the next event
expand((prev) => {
// creates an observable that emits the next event after a ceratin delay (pollInterval) unless this is the first event
return timer(prev.first ? 0 : this.#pollInterval).pipe(
// concat map is used to ensure that the next event is only emitted after the previous event has been processed
concatMap(() =>
// defer allows lazy creation of the observable
defer(async () => {
const response = await this.#sendRequest(
this.#url + `/ceramic/feed/events?resumeAt=${prev.cursor}`,
{
method: 'GET',
}
)
return {
events: response.events.map(({ id, data }) => {
return {
id: EventID.fromString(id),
data: undefined,
}
}),
cursor: Math.max(parseInt(response.resumeToken, 10), prev.cursor),
first: false,
}
}).pipe(
// if the request fails retry after a certain delay (pollInterval)
retry({
delay: (err) => {
this.#logger.warn(
`Recon: event feed failed, due to connection error ${err}; attempting to retry in ${
this.#pollInterval
}ms`
)
return timer(this.#pollInterval)
},
})
)
)
)
}),
// filter out events with no data
filter(({ events }) => events.length > 0),
// stop the polling when the stop signal is emitted
takeUntil(this.#stopSignal)
)
}
}
Loading

0 comments on commit 0317921

Please sign in to comment.