Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/dataset-pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"braintrust": patch
---

Add dataset pipeline helpers for deriving datasets from traced spans.
101 changes: 101 additions & 0 deletions js/src/dataset-pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import type { ObjectReferenceType as ObjectReference } from "./generated_types";
import type {
BaseMetadata,
Dataset,
DefaultMetadataType,
FullInitDatasetOptions,
} from "./logger";
import type { Trace } from "./trace";

export type DatasetPipelineScope = "span" | "trace";

export type DatasetPipelineSource = {
projectId?: string;
projectName?: string;
orgName?: string;
filter?: string;
scope?: DatasetPipelineScope;
};

type DatasetPipelineInitDatasetOptions = FullInitDatasetOptions<false>;

export type DatasetPipelineOrigin = ObjectReference;

export type DatasetPipelineTarget = {
projectId?: DatasetPipelineInitDatasetOptions["projectId"];
projectName?: DatasetPipelineInitDatasetOptions["project"];
orgName?: DatasetPipelineInitDatasetOptions["orgName"];
datasetName: NonNullable<DatasetPipelineInitDatasetOptions["dataset"]>;
description?: DatasetPipelineInitDatasetOptions["description"];
metadata?: DatasetPipelineInitDatasetOptions["metadata"];
};

export type DatasetPipelineRow = Parameters<Dataset["insert"]>[0];

export type DatasetPipelineTransformArgs<
Input = unknown,
Output = unknown,
Expected = unknown,
Metadata extends BaseMetadata = DefaultMetadataType,
> = {
input?: Input;
output?: Output;
expected?: Expected;
metadata?: Metadata extends void ? Record<string, unknown> : Metadata;
trace?: Trace;
};

export type DatasetPipelineTransformResult =
| DatasetPipelineRow
| DatasetPipelineRow[]
| null
| undefined;

export type DatasetPipelineTransform = (
args: DatasetPipelineTransformArgs,
) => DatasetPipelineTransformResult | Promise<DatasetPipelineTransformResult>;

export type DatasetPipelineDefinition = {
name?: string;
source: DatasetPipelineSource;
transform: DatasetPipelineTransform;
target: DatasetPipelineTarget;
};

const DATASET_PIPELINE_MARKER = "__braintrustDatasetPipeline";

declare global {
// eslint-disable-next-line no-var
var __braintrust_dataset_pipelines: DatasetPipelineDefinition[] | undefined;
}

function registry(): DatasetPipelineDefinition[] {
if (!globalThis.__braintrust_dataset_pipelines) {
globalThis.__braintrust_dataset_pipelines = [];
}
return globalThis.__braintrust_dataset_pipelines;
}

export function getRegisteredDatasetPipelines(): DatasetPipelineDefinition[] {
return [...registry()];
}

export function isDatasetPipelineDefinition(
value: unknown,
): value is DatasetPipelineDefinition {
return (
typeof value === "object" &&
value !== null &&
DATASET_PIPELINE_MARKER in value
);
}

export function DatasetPipeline(
definition: DatasetPipelineDefinition,
): DatasetPipelineDefinition {
const registered = Object.assign(definition, {
[DATASET_PIPELINE_MARKER]: true,
});
registry().push(registered);
return registered;
}
20 changes: 19 additions & 1 deletion js/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ export {
defaultErrorScoreHandler,
} from "./framework";

export type {
DatasetPipelineDefinition,
DatasetPipelineOrigin,
DatasetPipelineRow,
DatasetPipelineScope,
DatasetPipelineSource,
DatasetPipelineTarget,
DatasetPipelineTransform,
DatasetPipelineTransformArgs,
DatasetPipelineTransformResult,
} from "./dataset-pipeline";

export {
DatasetPipeline,
getRegisteredDatasetPipelines,
isDatasetPipelineDefinition,
} from "./dataset-pipeline";

export type {
CodeOpts,
CreateProjectOpts,
Expand Down Expand Up @@ -250,7 +268,7 @@ export {
} from "./prompt-schemas";

export type { Trace, SpanData, GetThreadOptions } from "./trace";
export { SpanFetcher, CachedSpanFetcher } from "./trace";
export { LocalTrace, SpanFetcher, CachedSpanFetcher } from "./trace";

export type {
ParentExperimentIds,
Expand Down
43 changes: 43 additions & 0 deletions js/src/logger-json-attachment.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,49 @@ describe("JSONAttachment", () => {
expect(attachment.reference.filename).toBe("custom.json");
});

it("should defer to the dataset pipeline hook when installed", () => {
const globalWithHook = globalThis as typeof globalThis & {
__BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__?: (
data: unknown,
options?: { filename?: string; pretty?: boolean },
) => object;
};
const previous =
globalWithHook.__BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__;
try {
globalWithHook.__BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__ = (
data,
options,
) => {
const reference = {
type: "braintrust_deferred_attachment",
kind: "json",
filename: options?.filename,
content_type: "application/json",
pretty: options?.pretty,
data,
};
return { reference };
};

const attachment = new JSONAttachment(
{ test: "data" },
{ filename: "trace.json", pretty: true },
);

expect(attachment.reference).toEqual({
type: "braintrust_deferred_attachment",
kind: "json",
filename: "trace.json",
content_type: "application/json",
pretty: true,
data: { test: "data" },
});
} finally {
globalWithHook.__BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__ = previous;
}
});

it("should pretty print when requested", async () => {
const testData = { a: 1, b: 2 };
const attachment = new JSONAttachment(testData, { pretty: true });
Expand Down
42 changes: 42 additions & 0 deletions js/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import {
type RepoInfoType as RepoInfo,
type PromptBlockDataType as PromptBlockData,
type ResponseFormatJsonSchemaType as ResponseFormatJsonSchema,
type ObjectReferenceType,
} from "./generated_types";

const BRAINTRUST_ATTACHMENT =
Expand Down Expand Up @@ -1392,6 +1393,20 @@ export abstract class BaseAttachment {
abstract debugInfo(): Record<string, unknown>;
}

type DatasetPipelineDeferredJSONAttachmentHook = (
data: unknown,
options?: { filename?: string; pretty?: boolean },
) => object;

declare global {
// Set by the bt dataset pipeline runner so JSONAttachment can be represented
// as a destination-uploaded marker during transform.
// eslint-disable-next-line no-var
var __BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__:
| DatasetPipelineDeferredJSONAttachmentHook
| undefined;
}

/**
* Represents an attachment to be uploaded and the associated metadata.
* `Attachment` objects can be inserted anywhere in an event, allowing you to
Expand Down Expand Up @@ -1857,6 +1872,20 @@ export class JSONAttachment extends Attachment {
},
) {
const { filename = "data.json", pretty = false, state } = options ?? {};
const deferredJsonAttachment =
globalThis.__BT_DATASET_PIPELINE_DEFER_JSON_ATTACHMENT__;
if (deferredJsonAttachment) {
super({
data: new Blob([]),
filename,
contentType: "application/json",
state,
});
return deferredJsonAttachment(data, {
filename,
pretty,
}) as unknown as JSONAttachment;
}

// Serialize the JSON data
const jsonString = pretty
Expand Down Expand Up @@ -6910,9 +6939,15 @@ export class SpanImpl implements Span {
const cachedSpan: CachedSpan = {
input: partialRecord.input,
output: partialRecord.output,
expected: partialRecord.expected,
error: partialRecord.error,
scores: partialRecord.scores,
metrics: partialRecord.metrics,
metadata: partialRecord.metadata,
tags: partialRecord.tags,
span_id: this._spanId,
span_parents: this._spanParents,
is_root: this._spanId === this._rootSpanId,
span_attributes: partialRecord.span_attributes,
};
this._state.spanCache.queueWrite(
Expand Down Expand Up @@ -7377,6 +7412,7 @@ export class Dataset<
metadata,
tags,
output,
origin,
isMerge,
}: {
id: string;
Expand All @@ -7385,6 +7421,7 @@ export class Dataset<
metadata?: Record<string, unknown>;
tags?: string[];
output?: unknown;
origin?: ObjectReferenceType;
isMerge?: boolean;
}): LazyValue<BackgroundLogEvent> {
return new LazyValue(async () => {
Expand All @@ -7399,6 +7436,7 @@ export class Dataset<
dataset_id,
created: !isMerge ? new Date().toISOString() : undefined, //if we're merging/updating an event we will not add this ts
metadata,
origin,
...(!!isMerge
? {
[IS_MERGE_FIELD]: true,
Expand All @@ -7422,6 +7460,7 @@ export class Dataset<
* about anything else that's relevant, that you can use to help find and analyze examples later. For example, you could log the
* `prompt`, example's `id`, or anything else that would be useful to slice/dice later. The values in `metadata` can be any
* JSON-serializable type, but its keys must be strings.
* @param event.origin (Optional) a reference to the source object this dataset record was derived from.
* @param event.id (Optional) a unique identifier for the event. If you don't provide one, Braintrust will generate one for you.
* @param event.output: (Deprecated) The output of your application. Use `expected` instead.
* @returns The `id` of the logged record.
Expand All @@ -7433,13 +7472,15 @@ export class Dataset<
tags,
id,
output,
origin,
}: {
readonly input?: unknown;
readonly expected?: unknown;
readonly tags?: string[];
readonly metadata?: Record<string, unknown>;
readonly id?: string;
readonly output?: unknown;
readonly origin?: ObjectReferenceType;
}): string {
this.validateEvent({ metadata, expected, output, tags });

Expand All @@ -7452,6 +7493,7 @@ export class Dataset<
metadata,
tags,
output,
origin,
isMerge: false,
}),
);
Expand Down
6 changes: 6 additions & 0 deletions js/src/span-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ function canUseSpanCache(): boolean {
export interface CachedSpan {
input?: unknown;
output?: unknown;
expected?: unknown;
error?: unknown;
scores?: Record<string, unknown>;
metrics?: Record<string, unknown>;
metadata?: Record<string, unknown>;
tags?: string[];
span_id: string;
span_parents?: string[];
is_root?: boolean | null;
span_attributes?: {
name?: string;
type?: string;
Expand Down
Loading
Loading