Skip to content

Commit

Permalink
nats: starting to add db service
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Jan 26, 2025
1 parent 419067f commit aa22718
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 52 deletions.
24 changes: 23 additions & 1 deletion docs/nats/devlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,29 @@ Plan.
- [ ] Stream: Records everything with this subject `project.${project_id}.patches`
- [ ] It would be very nice if we can use the server assigned timestamps.... but probably not
- [ ] For transitioning and de\-archiving, there must be a way to do this, since they have a backup/restore process
-
## [ ] Goal: PostgreSQL Changefeed Synctable
This is critical to solve. This sucks now. This is key to eliminating "hub\-websocket". This might be very easy. Here's the plan:
- make a request/response listener that listens on hub.account.{account\_id} and hub.db.project.{project\_id} for a db query.
- if changes is false, just responds with the result of the query.
- if changes is true, get kv store k named `account-{account_id}` or `project-{project_id}` \(which might be used by project or compute server\).
- let id be the sha1 hash of the query \(and options\)
- k.id.update is less than X seconds ago, do nothing... it's already being updated by another server.
- do the query to the database \(with changes true\)
- write the results into k under k.id.data.key = value.
- keep watching for changes so long as k.id.interest is at most n\*X seconds ago.
- Also set k.id.update to now.
- return id
- another message to `hub.db.{account_id}` which contains a list of id's.
- When get this one, update k.id.interest to now for each of the id's.
With the above algorithm, it should be very easy to reimplement the client side of SyncTable. Moreover, there are many advantages:
- For a fixed account\_id or project\-id, there's no extra work at all for 1 versus 100 of them. I.e., this is great for opening a bunch of distinct browser windows.
- If you refresh your browser, everything stays stable \-\- nothing changes at all and you instantly have your data. Same if the network drops and resumes.
- When implementing our new synctable, we can immediately start with the possibly stale data from the last time it was active, then update it to the correct data. Thus even if everything but NATS is done/unavailable, the experience would be much better. It's like "local first", but somehow "network mesh first". With a leaf node it would literally be local first.
## [ ] Goal: Terminal and **compute server**
Expand Down
33 changes: 33 additions & 0 deletions src/packages/backend/nats/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { join } from "path";
import { secrets } from "@cocalc/backend/data";
import { readFile } from "node:fs/promises";
import getLogger from "@cocalc/backend/logger";
import { connect, credsAuthenticator } from "nats";

const logger = getLogger("backend:nats");

export async function getCreds(): Promise<string | undefined> {
const filename = join(secrets, "nats.creds");
try {
return (await readFile(filename)).toString().trim();
} catch {
logger.debug(
`getCreds -- please create ${filename}, which is missing. Nothing will work.`,
);
return undefined;
}
}

let nc: Awaited<ReturnType<typeof connect>> | null = null;
export async function getConnection() {
logger.debug("connecting to nats");

if (nc == null) {
const creds = await getCreds();
nc = await connect({
authenticator: credsAuthenticator(new TextEncoder().encode(creds)),
});
logger.debug(`connected to ${nc.getServer()}`);
}
return nc;
}
14 changes: 4 additions & 10 deletions src/packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
"exports": {
"./*": "./dist/*.js",
"./database": "./dist/database/index.js",
"./nats": "./dist/nats/index.js",
"./server-settings": "./dist/server-settings/index.js",
"./auth/*": "./dist/auth/*.js",
"./auth/tokens/*": "./dist/auth/tokens/*.js"
},
"keywords": [
"utilities",
"cocalc"
],
"keywords": ["utilities", "cocalc"],
"scripts": {
"preinstall": "npx only-allow pnpm",
"clean": "rm -rf dist node_modules",
Expand All @@ -21,12 +19,7 @@
"test": "pnpm exec jest --detectOpenHandles",
"prepublishOnly": "pnpm test"
},
"files": [
"dist/**",
"bin/**",
"README.md",
"package.json"
],
"files": ["dist/**", "bin/**", "README.md", "package.json"],
"author": "SageMath, Inc.",
"license": "SEE LICENSE.md",
"dependencies": {
Expand All @@ -40,6 +33,7 @@
"fs-extra": "^11.2.0",
"lodash": "^4.17.21",
"lru-cache": "^7.18.3",
"nats": "^2.29.1",
"password-hash": "^1.2.2",
"prom-client": "^13.0.0",
"rimraf": "^5.0.5",
Expand Down
73 changes: 73 additions & 0 deletions src/packages/database/nats/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
echo "require('@cocalc/database/nats').init()" | COCALC_MODE='single-user' DEBUG_CONSOLE=yes DEBUG=cocalc:* node
*/

import getLogger from "@cocalc/backend/logger";
import { JSONCodec } from "nats";
import { isValidUUID } from "@cocalc/util/misc";
import userQuery from "@cocalc/database/user-query";
import { getConnection } from "@cocalc/backend/nats";

const logger = getLogger("database:nats");

const jc = JSONCodec();

export async function init() {
const subject = "hub.*.*.db";
logger.debug(`init -- subject='${subject}', options=`, {
queue: "0",
});
const nc = await getConnection();
const sub = nc.subscribe(subject, { queue: "0" });
for await (const mesg of sub) {
handleRequest(mesg);
}
}

async function handleRequest(mesg) {
console.log({ subject: mesg.subject });
let resp;
try {
const segments = mesg.subject.split(".");
const uuid = segments[2];
if (!isValidUUID(uuid)) {
throw Error(`invalid uuid '${uuid}'`);
}
const type = segments[1]; // 'project' or 'account'
let account_id, project_id;
if (type == "project") {
project_id = uuid;
account_id = undefined;
} else if (type == "account") {
project_id = undefined;
account_id = uuid;
} else {
throw Error("must be project or account");
}
const { name, args } = jc.decode(mesg.data) ?? ({} as any);
if (!name) {
throw Error("api endpoint name must be given in message");
}
logger.debug("handling hub db request:", {
account_id,
project_id,
name,
args,
});
resp = await getResponse({ name, args, account_id, project_id });
} catch (err) {
resp = { error: `${err}` };
}
mesg.respond(jc.encode(resp));
}

async function getResponse({ name, args, account_id, project_id }) {
if (name == "userQuery") {
return await userQuery({ ...args[0], account_id, project_id });
} else {
throw Error(`name='${name}' not implemented`);
}
}
2 changes: 2 additions & 0 deletions src/packages/database/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"exports": {
".": "./dist/index.js",
"./accounts/*": "./dist/accounts/*.js",
"./nats": "./dist/nats/index.js",
"./pool": "./dist/pool/index.js",
"./pool/*": "./dist/pool/*.js",
"./postgres/*": "./dist/postgres/*.js",
Expand All @@ -31,6 +32,7 @@
"json-stable-stringify": "^1.0.1",
"lodash": "^4.17.21",
"lru-cache": "^7.18.3",
"nats": "^2.29.1",
"node-fetch": "2.6.7",
"pg": "^8.7.1",
"random-key": "^0.3.2",
Expand Down
13 changes: 7 additions & 6 deletions src/packages/frontend/client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ export class NatsClient {
private sc = nats.StringCodec();
private jc = nats.JSONCodec();
private nc?: Awaited<ReturnType<typeof nats.connect>>;
// obviously just for learning:
public nats = nats;
public jetstream = jetstream;
public hub : HubApi;
public hub: HubApi;

constructor(client: WebappClient) {
this.client = client;
this.hub = initHubApi(this.callHubApi);
this.hub = initHubApi(this.callHub);
}

getConnection = reuseInFlight(async () => {
Expand All @@ -47,15 +46,17 @@ export class NatsClient {
return this.nc;
});

private callHubApi = async ({
private callHub = async ({
service = "api",
name,
args,
args = [],
}: {
service?: string;
name: string;
args: any[];
}) => {
const c = await this.getConnection();
const subject = `hub.account.api.${this.client.account_id}`;
const subject = `hub.account.${this.client.account_id}.${service}`;
const resp = await c.request(
subject,
this.jc.encode({
Expand Down
7 changes: 7 additions & 0 deletions src/packages/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 13 additions & 15 deletions src/packages/server/nats/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ const logger = getLogger("server:nats:api");
const jc = JSONCodec();

export async function initAPI(nc) {
logger.debug("initAPI -- subject='hub.account.api', options=", {
const subject = "hub.*.*.api";
logger.debug(`initAPI -- subject='${subject}', options=`, {
queue: "0",
});
const sub = nc.subscribe("hub.account.api.>", { queue: "0" });
const sub = nc.subscribe(subject, { queue: "0" });
for await (const mesg of sub) {
handleApiRequest(mesg);
}
Expand All @@ -48,12 +49,15 @@ async function handleApiRequest(mesg) {
let resp;
try {
const segments = mesg.subject.split(".");
const account_id = segments[3];
const type = segments[1];
if (type != "account") {
throw Error("only type='account' is supported now");
}
const account_id = segments[2];
if (!isValidUUID(account_id)) {
throw Error(`invalid account_id '${account_id}'`);
}
const request = jc.decode(mesg.data) ?? {};
// TODO: obviously user-provided account_id is no good! This is a POC.
const { name, args } = request as any;
logger.debug("handling hub.api request:", { account_id, name, args });
resp = await getResponse({ name, args, account_id });
Expand All @@ -66,24 +70,18 @@ async function handleApiRequest(mesg) {
import userQuery from "@cocalc/database/user-query";
import getCustomize from "@cocalc/database/settings/customize";

function getAccountId(args) {
return (args as any).account_id;
}

const hubApi: HubApi = {
getCustomize,
userQuery: async (...args) =>
await userQuery({
...args[0],
account_id: getAccountId(args),
}),
userQuery,
};

async function getResponse({ name, args, account_id }) {
const f = hubApi[name];
if (f == null) {
throw Error(`unknown function '${name}'`);
}
args.account_id = account_id;
return await f(args);
if (name == "userQuery" && args[0] != null) {
args[0].account_id = account_id;
}
return await f(...args);
}
8 changes: 7 additions & 1 deletion src/packages/server/nats/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Points that took me a while to figure out:
DOCS:
- https://nats-io.github.io/nsc/
USAGE:
a = require('@cocalc/server/nats/auth')
await a.configureNatsUser({account_id:'275f1db7-bf37-4b44-b9aa-d64694269c9f'})
await a.configureNatsUser({project_id:'81e0c408-ac65-4114-bad5-5f4b6539bd0e'})
*/

import { executeCode } from "@cocalc/backend/execute-code";
Expand Down Expand Up @@ -110,7 +116,7 @@ export async function configureNatsUser(cocalcUser: CoCalcUser) {
throw Error("must be a valid uuid");
}
const userType = getCoCalcUserType(cocalcUser);
const goalPub = new Set(["_INBOX.>", `hub.${userType}.api.${userId}`]);
const goalPub = new Set(["_INBOX.>", `hub.${userType}.${userId}.>`]);
const goalSub = new Set(["_INBOX.>"]);

if (userType == "account") {
Expand Down
21 changes: 2 additions & 19 deletions src/packages/server/nats/index.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,12 @@
import { connect, credsAuthenticator } from "nats";
import getLogger from "@cocalc/backend/logger";
import { initAPI } from "./api";
import { getConnection } from "@cocalc/backend/nats";

const logger = getLogger("server:nats");

const creds = `-----BEGIN NATS USER JWT-----
eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJVTDJaWEdFWFFKTzVRNjdLU1hKNDdERFpKSFE3QUFWMjdHWUtBN1ZJVjVaT01DQU1SN1hBIiwiaWF0IjoxNzM3NTY3OTQwLCJpc3MiOiJBRDRHNlI2MkJERFFVU0NKVkxaTkE3RVM3UjNBNkRXWExZVVdHWlY3NEVKMlM2VkJDN0RRVk0zSSIsIm5hbWUiOiJhZG1pbiIsInN1YiI6IlVBV1hZVUpYSEFXQzNPSFFURE1SQVBSWVpNNFQ0RkZDRk1TTVFLNDVCWU1SS0ZSRE5RTjQ0Vk1SIiwibmF0cyI6eyJwdWIiOnsiYWxsb3ciOlsiXHUwMDNlIl19LCJzdWIiOnsiYWxsb3ciOlsiXHUwMDNlIl19LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ.Pv9-T3P7cO1VSFiNocGA0vCGvwQ-UaX3b7OzwMIHdn5hGs4kUv4eLE-Er_6dxrZiPu6PJjBYB7eD2hyb-gxSCQ
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used to sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
SUAMW6S2OXSKL2ETX5GJE3NDLWGXZFZ4JAP5WHBCK43RMFDPJCCJLPWC5Y
------END USER NKEY SEED------
*************************************************************
`;

export default async function initNatsServer() {
logger.debug("initializing nats cocalc hub server");
const nc = await connect({
authenticator: credsAuthenticator(new TextEncoder().encode(creds)),
});
const nc = await getConnection();
logger.debug(`connected to ${nc.getServer()}`);
initAPI(nc);
}

0 comments on commit aa22718

Please sign in to comment.