Skip to content

Commit

Permalink
Restore sync-api to match main
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Hinek <[email protected]>
  • Loading branch information
frankhinek committed May 20, 2023
1 parent 170bf8a commit 55743fd
Showing 1 changed file with 39 additions and 68 deletions.
107 changes: 39 additions & 68 deletions packages/web5-user-agent/src/sync-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ import { utils as didUtils } from '@tbd54566975/dids';
import { DataStream, EventsGet, MessagesGet, Encoder, RecordsRead } from '@tbd54566975/dwn-sdk-js';


import type { DwnRpc } from '@tbd54566975/web5-agent';
import type { BatchOperation } from 'level';
import type { DwnServiceEndpoint, DidResolver } from '@tbd54566975/dids';
import type {
Dwn,
EventsGetReply,
MessagesGetReply,
SignatureInput,
RecordsWriteMessage,
RecordsReadReply,
PrivateJwk as DwnPrivateKeyJwk,
Event,
} from '@tbd54566975/dwn-sdk-js';

import type { ProfileManager } from './profile-manager.js';

import { Level } from 'level';
import { utils as didUtils } from '@tbd54566975/dids';
import { DataStream, EventsGet, MessagesGet, Encoder, RecordsRead } from '@tbd54566975/dwn-sdk-js';


import { SyncManager } from './sync-manager.js';
import { DwnRpcClient } from './dwn-rpc-client.js';
import { webReadableToIsomorphicNodeReadable } from './utils.js';
Expand All @@ -64,30 +43,6 @@ type DwnMessage = {
data?: Blob;
}

type DbBatchOperation = BatchOperation<Level, string, string>;
import { DwnRpcClient } from './dwn-rpc-client.js';
import { webReadableToIsomorphicNodeReadable } from './utils.js';

export type SyncApiOptions = {
dwn: Dwn;
didResolver: DidResolver;
profileManager: ProfileManager;
storeLocation?: string;
};

type Direction = 'push' | 'pull';

type SyncState = {
did: string;
dwnUrl: string;
watermark: string | undefined;
}

type DwnMessage = {
message: any;
data?: Blob;
}

type DbBatchOperation = BatchOperation<Level, string, string>;

export class SyncApi implements SyncManager {
Expand Down Expand Up @@ -194,25 +149,35 @@ export class SyncApi implements SyncManager {

const pushQueue = this.#getPushQueue();
const pushJobs = await pushQueue.iterator().all();
const errored: Set<string> = new Set();

const delOps: DbBatchOperation[] = [];

for (let job of pushJobs) {
const [key, watermark] = job;
const [did, dwnUrl, messageCid] = key.split('~');

if (errored.has(dwnUrl)) {
continue;
}

const dwnMessage = await this.#getDwnMessage(did, messageCid);
const reply = await this.#dwnRpcClient.sendDwnRequest({
dwnUrl,
targetDid : did,
data : dwnMessage.data,
message : dwnMessage.message
});

if (reply.status.code === 202) {
delOps.push({ type: 'del', key: key });
await this.setWatermark(did, dwnUrl, 'push', watermark);
await this.#addMessage(did, messageCid);
try {
const reply = await this.#dwnRpcClient.sendDwnRequest({
dwnUrl,
targetDid : did,
data : dwnMessage.data,
message : dwnMessage.message
});

if (reply.status.code === 202) {
delOps.push({ type: 'del', key: key });
await this.setWatermark(did, dwnUrl, 'push', watermark);
await this.#addMessage(did, messageCid);
}
} catch(e) {
errored.add(dwnUrl);
}
}

Expand Down Expand Up @@ -281,11 +246,16 @@ export class SyncApi implements SyncManager {
const pullQueue = this.#getPullQueue();
const pullJobs = await pullQueue.iterator().all();
const delOps: DbBatchOperation[] = [];
const errored: Set<string> = new Set();

for (let job of pullJobs) {
const [key, watermark] = job;
const [did, dwnUrl, messageCid] = key.split('~');

if (errored.has(dwnUrl)) {
continue;
}

const messageExists = await this.#messageExists(did, messageCid);
if (messageExists) {
await this.setWatermark(did, dwnUrl, 'pull', watermark);
Expand All @@ -300,11 +270,18 @@ export class SyncApi implements SyncManager {
authorizationSignatureInput : signatureInput
});

const reply = await this.#dwnRpcClient.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesGet
}) as MessagesGetReply;
let reply: MessagesGetReply;

try {
reply = await this.#dwnRpcClient.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesGet
}) as MessagesGetReply;
} catch(e) {
errored.add(dwnUrl);
continue;
}

for (let entry of reply.messages) {
// TODO: check entry.error
Expand All @@ -331,7 +308,8 @@ export class SyncApi implements SyncManager {
}) as RecordsReadReply;

if (reply.status.code >= 400) {
// TODO: tombstone BS
// TODO: handle reply
const pruneReply = await this.#dwn.synchronizePrunedInitialRecordsWrite(did, message);
} else {
dataStream = webReadableToIsomorphicNodeReadable(recordsReadReply.record.data as any);
}
Expand Down Expand Up @@ -478,21 +456,14 @@ export class SyncApi implements SyncManager {
return this.#db.sublevel('watermarks');
}

#getPushQueue() {
return this.#db.sublevel('pushQueue');
#getPushQueue() {
return this.#db.sublevel('pushQueue');
}

#getPullQueue() {
return this.#db.sublevel('pullQueue');
#getPullQueue() {
return this.#db.sublevel('pullQueue');
}

// TODO: export BaseMessage from dwn-sdk.
#getDwnMessageType(message: any) {
return `${message.descriptor.interface}${message.descriptor.method}`;
// TODO: export BaseMessage from dwn-sdk.
#getDwnMessageType(message: any) {
return `${message.descriptor.interface}${message.descriptor.method}`;
Expand Down

0 comments on commit 55743fd

Please sign in to comment.