Skip to content

feat: workflow-based llama-reader #1824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/readers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"start:assemblyai": "node --import tsx ./src/assemblyai.ts",
"start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts",
"start:llamaparse-json": "node --import tsx ./src/llamaparse-json.ts",
"start:llamaparse-workflow": "node --import tsx ./src/llamaparse-workflow.ts",
"start:discord": "node --import tsx ./src/discord.ts",
"start:json": "node --import tsx ./src/json.ts",
"start:obsidian": "node --import tsx ./src/obsidian.ts"
Expand Down
26 changes: 26 additions & 0 deletions examples/readers/src/llamaparse-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { uploadFile } from "@llamaindex/cloud/reader-workflow";

Check failure on line 1 in examples/readers/src/llamaparse-workflow.ts

View workflow job for this annotation

GitHub Actions / typecheck

Module '"@llamaindex/cloud/reader-workflow"' has no exported member 'uploadFile'.

Check failure on line 1 in examples/readers/src/llamaparse-workflow.ts

View workflow job for this annotation

GitHub Actions / typecheck

Module '"@llamaindex/cloud/reader-workflow"' has no exported member 'uploadFile'.

async function main() {
const job = await uploadFile({
file: "../data/basic.pdf",
});
console.log("job id", job.jobId);

job.signal.addEventListener("abort", () => {
console.error("ERROR:", job.signal.reason);
});

const markdown = await job.markdown();
console.log("--markdown--");
console.log(markdown);

const text = await job.text();
console.log("--text--");
console.log(text);

const json = await job.json();
console.log("--json--");
console.log(json);
}

main();
17 changes: 17 additions & 0 deletions packages/cloud/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
"default": "./reader/dist/index.js"
},
"default": "./reader/dist/index.js"
},
"./reader-workflow": {
"require": {
"types": "./reader/dist/workflow.d.cts",
"default": "./reader/dist/workflow.cjs"
},
"import": {
"types": "./reader/dist/workflow.d.ts",
"default": "./reader/dist/workflow.js"
},
"default": "./reader/dist/workflow.js"
}
},
"repository": {
Expand All @@ -59,10 +70,16 @@
"@hey-api/openapi-ts": "^0.61.0",
"@llamaindex/core": "workspace:*",
"@llamaindex/env": "workspace:*",
"@types/node": "^22.14.0",
"bunchee": "6.4.0"
},
"peerDependencies": {
"@llamaindex/core": "workspace:*",
"@llamaindex/env": "workspace:*"
},
"dependencies": {
"fluere": "^0.3.3",
"stable-hash": "^0.0.5",
"zod": "^3.24.2"
}
}
300 changes: 300 additions & 0 deletions packages/cloud/src/reader-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
import { createClient, createConfig } from "@hey-api/client-fetch";
import { fs, getEnv, path } from "@llamaindex/env";
import {
createWorkflow,
type WorkflowEvent,
workflowEvent,
type WorkflowEventData,
} from "fluere";
import { withStore } from "fluere/middleware/store";
import { withTraceEvents } from "fluere/middleware/trace-events";
import { collect } from "fluere/stream/consumer";
import { until } from "fluere/stream/until";
import { pRetryHandler } from "fluere/util/p-retry";
import { zodEvent } from "fluere/util/zod";
import hash from "stable-hash";
import { z } from "zod";
import {
type Body_upload_file_api_v1_parsing_upload_post,
getJobApiV1ParsingJobJobIdGet,
getJobJsonResultApiV1ParsingJobJobIdResultJsonGet,
getJobResultApiV1ParsingJobJobIdResultMarkdownGet,
getJobTextResultApiV1ParsingJobJobIdResultTextGet,
type StatusEnum,
uploadFileApiV1ParsingUploadPost,
} from "./api";
import { parseFormSchema } from "./schema";

type InferWorkflowEventData<T> =
T extends WorkflowEventData<infer U>
? U
: T extends WorkflowEvent<infer U>
? U
: never;

const startEvent = zodEvent(
z.union([
parseFormSchema.merge(
z.object({
file: z
.string()
.or(z.instanceof(File))
.or(z.instanceof(Blob))
.or(z.instanceof(Uint8Array))
.optional()
.describe("input"),
}),
),
parseFormSchema.merge(
z.object({
input_s3_path: z.string().optional(),
}),
),
parseFormSchema.merge(
z.object({
input_url: z.string().optional(),
}),
),
]),
);

const checkStatusEvent = workflowEvent<string>();
const checkStatusSuccessEvent = workflowEvent<string>();
const requestMarkdownEvent = workflowEvent<string>();
const requestTextEvent = workflowEvent<string>();
const requestJsonEvent = workflowEvent<string>();

const markdownResultEvent = workflowEvent<string>();
const textResultEvent = workflowEvent<string>();
const jsonResultEvent = workflowEvent<unknown>();

export type LlamaParseWorkflowParams = {
region?: "us" | "eu" | "us-staging";
apiKey?: string;
};

const URLS = {
us: "https://api.cloud.llamaindex.ai",
eu: "https://api.cloud.eu.llamaindex.ai",
"us-staging": "https://api.staging.llamaindex.ai",
} as const;

const llamaParseWorkflow = withStore((params: LlamaParseWorkflowParams) => {
const apiKey = params.apiKey ?? getEnv("LLAMA_CLOUD_API_KEY");
const region = params.region ?? "us";
if (!apiKey) {
throw new Error("LLAMA_CLOUD_API_KEY is not set");
}
return {
cache: {} as Record<string, StatusEnum>,
client: createClient(
createConfig({
baseUrl: URLS[region],
headers: {
Authorization: `Bearer ${apiKey}`,
},
}),
),
};
}, withTraceEvents(createWorkflow()));

llamaParseWorkflow.handle([startEvent], async ({ data: form }) => {
const store = llamaParseWorkflow.getStore();
const finalForm = { ...form } as Body_upload_file_api_v1_parsing_upload_post;
if ("file" in form) {
// support loads from the file system
const file = form?.file;
const isFilePath = typeof file === "string";
const data = isFilePath ? await fs.readFile(file) : file;
const filename: string | undefined = isFilePath
? path.basename(file)
: undefined;
finalForm.file = data
? globalThis.File && filename
? new File([data], filename)
: new Blob([data])
: null;
}
const {
data: { id, status },
} = await uploadFileApiV1ParsingUploadPost({
throwOnError: true,
body: {
...finalForm,
} satisfies {
[Key in keyof Body_upload_file_api_v1_parsing_upload_post]:
| Body_upload_file_api_v1_parsing_upload_post[Key]
| undefined;
} as Body_upload_file_api_v1_parsing_upload_post,
client: store.client,
});
store.cache[id] = status;
return checkStatusEvent.with(id);
});

llamaParseWorkflow.handle(
[checkStatusEvent],
pRetryHandler(
async ({ data: uuid }) => {
const store = llamaParseWorkflow.getStore();
if (store.cache[uuid] === "SUCCESS") {
{
return checkStatusSuccessEvent.with(uuid);
}
}
const {
data: { status },
} = await getJobApiV1ParsingJobJobIdGet({
throwOnError: true,
path: {
job_id: uuid,
},
client: store.client,
});
store.cache[uuid] = status;
if (status === "SUCCESS") {
return checkStatusSuccessEvent.with(uuid);
}
throw new Error(`LLamaParse status: ${status}`);
},
{
retries: 100,
},
),
);

llamaParseWorkflow.handle([requestMarkdownEvent], async ({ data: job_id }) => {
const store = llamaParseWorkflow.getStore();
const { data } = await getJobResultApiV1ParsingJobJobIdResultMarkdownGet({
throwOnError: true,
path: {
job_id,
},
client: store.client,
});
return markdownResultEvent.with(data.markdown);
});

llamaParseWorkflow.handle([requestTextEvent], async ({ data: job_id }) => {
const store = llamaParseWorkflow.getStore();
const { data } = await getJobTextResultApiV1ParsingJobJobIdResultTextGet({
throwOnError: true,
path: {
job_id,
},
client: store.client,
});
return textResultEvent.with(data.text);
});

llamaParseWorkflow.handle([requestJsonEvent], async ({ data: job_id }) => {
const store = llamaParseWorkflow.getStore();
const { data } = await getJobJsonResultApiV1ParsingJobJobIdResultJsonGet({
throwOnError: true,
path: {
job_id,
},
client: store.client,
});
return jsonResultEvent.with(data.pages);
});

const cacheMap = new Map<
string,
ReturnType<typeof llamaParseWorkflow.createContext>
>();

export type ParseJob = {
get jobId(): string;
get signal(): AbortSignal;
get context(): ReturnType<typeof llamaParseWorkflow.createContext>;
get form(): InferWorkflowEventData<typeof startEvent>;

markdown(): Promise<string>;
text(): Promise<string>;
//eslint-disable-next-line @typescript-eslint/no-explicit-any
json(): Promise<any[]>;
};

export const upload = async (
params: InferWorkflowEventData<typeof startEvent> & LlamaParseWorkflowParams,
): Promise<ParseJob> => {
//#region cache
const key = hash({ apiKey: params.apiKey, region: params.region });
if (!cacheMap.has(key)) {
const context = llamaParseWorkflow.createContext(params);
cacheMap.set(key, context);
}
//#endregion

//#region upload event
const context = cacheMap.get(key)!;
const { stream, sendEvent, createFilter } = context;
const ev = startEvent.with(params);
sendEvent(ev);
const uploadThread = await collect(
until(
stream,
createFilter(ev, (ev) => checkStatusSuccessEvent.include(ev)),
),
);
//#region
const jobId: string = uploadThread.at(-1)!.data;
const job = {
get signal() {
// lazy load
return context.signal;
},
get jobId() {
return jobId;
},
get form() {
return ev.data;
},
get context() {
return context;
},
async markdown(): Promise<string> {
const requestEv = requestMarkdownEvent.with(jobId);
sendEvent(requestEv);
const markdownThread = await collect(
until(
stream,
createFilter(requestEv, (ev) => markdownResultEvent.include(ev)),
),
);
return markdownThread.at(-1)!.data;
},
async text(): Promise<string> {
const requestEv = requestTextEvent.with(jobId);
sendEvent(requestEv);
const textThread = await collect(
until(
stream,
createFilter(requestEv, (ev) => textResultEvent.include(ev)),
),
);
return textThread.at(-1)!.data;
},
//eslint-disable-next-line @typescript-eslint/no-explicit-any
async json(): Promise<any[]> {
const requestEv = requestJsonEvent.with(jobId);
sendEvent(requestEv);
const jsonThread = await collect(
until(
stream,
createFilter(requestEv, (ev) => jsonResultEvent.include(ev)),
),
);
return jsonThread.at(-1)!.data;
},
async images(): Promise<void> {
const json = await job.json();
const images = json.flatMap(({ images }) => images);
images.map((image) => {
// todo
});
},
};
return job;
};
Loading
Loading