Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 116 additions & 7 deletions sdks/typescript/src/apache_beam/worker/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,102 @@ export interface StateProvider {
}

// TODO: (Advanced) Cross-bundle caching.
/**
* Wrapper for cached values that tracks their weight (memory size).
*/
interface WeightedCacheEntry<T> {
entry: MaybePromise<T>;
weight: number;
}

/**
* Estimates the memory size of a value in bytes.
* This is a simplified estimation - actual memory usage may vary.
*/
function estimateSize(value: any): number {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (value === null || value === undefined) {
return 8;
}

const type = typeof value;

if (type === "boolean") {
return 4;
}
if (type === "number") {
return 8;
}
if (type === "string") {
// Each character is 2 bytes in JavaScript (UTF-16) + overhead
return 40 + value.length * 2;
}
if (value instanceof Uint8Array || value instanceof Buffer) {
return 40 + value.length;
}
if (Array.isArray(value)) {
let size = 40; // Array overhead
for (const item of value) {
size += estimateSize(item);
}
return size;
}
if (type === "object") {
let size = 40; // Object overhead
for (const key of Object.keys(value)) {
size += estimateSize(key) + estimateSize(value[key]);
}
return size;
}

// Default for unknown types
return 64;
}

// Default cache size: 100MB
const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;

export class CachingStateProvider implements StateProvider {
underlying: StateProvider;
cache: Map<string, MaybePromise<any>> = new Map();
cache: Map<string, WeightedCacheEntry<any>> = new Map();
maxCacheWeight: number;
currentWeight: number = 0;

constructor(underlying: StateProvider) {
constructor(
underlying: StateProvider,
maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
) {
this.underlying = underlying;
this.maxCacheWeight = maxCacheWeight;
}

/**
* Evicts least recently used entries until the cache is under the weight limit.
* JavaScript Maps preserve insertion order, so the first entry is the oldest.
*/
private evictIfNeeded() {
while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
// Remove the first (oldest) entry
const firstKey = this.cache.keys().next().value;
if (firstKey !== undefined) {
const evicted = this.cache.get(firstKey);
if (evicted !== undefined) {
this.currentWeight -= evicted.weight;
}
this.cache.delete(firstKey);
}
}
}

/**
* Moves a cache entry to the end (most recently used) by deleting and re-adding it.
* This maintains LRU order: most recently accessed items are at the end.
*/
private touchCacheEntry(cacheKey: string) {
const value = this.cache.get(cacheKey);
if (value !== undefined) {
this.cache.delete(cacheKey);
this.cache.set(cacheKey, value);
}
}

getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
Expand All @@ -62,21 +152,40 @@ export class CachingStateProvider implements StateProvider {
"base64",
);
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey)!;
// Cache hit: move to end (most recently used)
this.touchCacheEntry(cacheKey);
return this.cache.get(cacheKey)!.entry;
}
// Cache miss: fetch from underlying provider
let result = this.underlying.getState(stateKey, decode);
const this_ = this;
if (result.type === "promise") {
result = {
type: "promise",
promise: result.promise.then((value) => {
this_.cache.set(cacheKey, { type: "value", value });
// When promise resolves, update cache with resolved value
// Get the current entry to update its weight
const currentEntry = this.cache.get(cacheKey);
if (currentEntry !== undefined) {
// Remove old weight from total
this.currentWeight -= currentEntry.weight;
}
const resolvedWeight = estimateSize(value);
this.cache.set(cacheKey, {
entry: { type: "value", value },
weight: resolvedWeight,
});
this.currentWeight += resolvedWeight;
this.evictIfNeeded();
return value;
}),
};
}
// TODO: (Perf) Cache eviction.
this.cache.set(cacheKey, result);
// Estimate weight for the new entry
const weight = result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight
// Evict if needed before adding new entry
this.currentWeight += weight;
this.evictIfNeeded();
this.cache.set(cacheKey, { entry: result, weight });
return result;
}
}
Expand Down
Loading