|
| 1 | +import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' |
| 2 | +import { StreamBuilder } from '../d2.js' |
| 3 | +import { MultiSet } from '../multiset.js' |
| 4 | +import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js' |
| 5 | +import { TopKArray, createKeyedComparator } from './topKArray.js' |
| 6 | +import type { IndexedValue, TopK } from './topKArray.js' |
| 7 | +import type { DifferenceStreamReader } from '../graph.js' |
| 8 | +import type { IStreamBuilder, PipedOperator } from '../types.js' |
| 9 | + |
| 10 | +export interface GroupedTopKWithFractionalIndexOptions<K, T> { |
| 11 | + limit?: number |
| 12 | + offset?: number |
| 13 | + setSizeCallback?: (getSize: () => number) => void |
| 14 | + setWindowFn?: ( |
| 15 | + windowFn: (options: { offset?: number; limit?: number }) => void, |
| 16 | + ) => void |
| 17 | + /** |
| 18 | + * Function to extract a group key from the element's key and value. |
| 19 | + * Elements with the same group key will be sorted and limited together. |
| 20 | + */ |
| 21 | + groupKeyFn: (key: K, value: T) => unknown |
| 22 | +} |
| 23 | + |
| 24 | +/** |
| 25 | + * Operator for grouped fractional indexed topK operations. |
| 26 | + * This operator maintains separate topK windows for each group, |
| 27 | + * allowing per-group limits and ordering. |
| 28 | + * |
| 29 | + * The input is a keyed stream [K, T] and outputs [K, IndexedValue<T>]. |
| 30 | + * Elements are grouped by the groupKeyFn, and each group maintains |
| 31 | + * its own sorted collection with independent limit/offset. |
| 32 | + */ |
| 33 | +export class GroupedTopKWithFractionalIndexOperator< |
| 34 | + K extends string | number, |
| 35 | + T, |
| 36 | +> extends UnaryOperator<[K, T], [K, IndexedValue<T>]> { |
| 37 | + #groupStates: Map<unknown, TopKState<K, T>> = new Map() |
| 38 | + #groupKeyFn: (key: K, value: T) => unknown |
| 39 | + #comparator: (a: [K, T], b: [K, T]) => number |
| 40 | + #offset: number |
| 41 | + #limit: number |
| 42 | + |
| 43 | + constructor( |
| 44 | + id: number, |
| 45 | + inputA: DifferenceStreamReader<[K, T]>, |
| 46 | + output: DifferenceStreamWriter<[K, IndexedValue<T>]>, |
| 47 | + comparator: (a: T, b: T) => number, |
| 48 | + options: GroupedTopKWithFractionalIndexOptions<K, T>, |
| 49 | + ) { |
| 50 | + super(id, inputA, output) |
| 51 | + this.#groupKeyFn = options.groupKeyFn |
| 52 | + this.#limit = options.limit ?? Infinity |
| 53 | + this.#offset = options.offset ?? 0 |
| 54 | + this.#comparator = createKeyedComparator(comparator) |
| 55 | + options.setSizeCallback?.(() => this.#getTotalSize()) |
| 56 | + options.setWindowFn?.(this.#moveTopK.bind(this)) |
| 57 | + } |
| 58 | + |
| 59 | + /** |
| 60 | + * Creates a new TopK data structure for a group. |
| 61 | + * Can be overridden in subclasses to use different implementations (e.g., B+ tree). |
| 62 | + */ |
| 63 | + protected createTopK( |
| 64 | + offset: number, |
| 65 | + limit: number, |
| 66 | + comparator: (a: [K, T], b: [K, T]) => number, |
| 67 | + ): TopK<[K, T]> { |
| 68 | + return new TopKArray(offset, limit, comparator) |
| 69 | + } |
| 70 | + |
| 71 | + #getTotalSize(): number { |
| 72 | + let size = 0 |
| 73 | + for (const state of this.#groupStates.values()) { |
| 74 | + size += state.size |
| 75 | + } |
| 76 | + return size |
| 77 | + } |
| 78 | + |
| 79 | + #getOrCreateGroupState(groupKey: unknown): TopKState<K, T> { |
| 80 | + let state = this.#groupStates.get(groupKey) |
| 81 | + if (!state) { |
| 82 | + const topK = this.createTopK(this.#offset, this.#limit, this.#comparator) |
| 83 | + state = new TopKState(topK) |
| 84 | + this.#groupStates.set(groupKey, state) |
| 85 | + } |
| 86 | + return state |
| 87 | + } |
| 88 | + |
| 89 | + #cleanupGroupIfEmpty(groupKey: unknown, state: TopKState<K, T>): void { |
| 90 | + if (state.isEmpty) { |
| 91 | + this.#groupStates.delete(groupKey) |
| 92 | + } |
| 93 | + } |
| 94 | + |
| 95 | + /** |
| 96 | + * Moves the topK window for all groups based on the provided offset and limit. |
| 97 | + * Any changes to the topK are sent to the output. |
| 98 | + */ |
| 99 | + #moveTopK({ offset, limit }: { offset?: number; limit?: number }): void { |
| 100 | + if (offset !== undefined) { |
| 101 | + this.#offset = offset |
| 102 | + } |
| 103 | + if (limit !== undefined) { |
| 104 | + this.#limit = limit |
| 105 | + } |
| 106 | + |
| 107 | + const result: Array<[[K, IndexedValue<T>], number]> = [] |
| 108 | + let hasChanges = false |
| 109 | + |
| 110 | + for (const state of this.#groupStates.values()) { |
| 111 | + const diff = state.move({ offset: this.#offset, limit: this.#limit }) // TODO: think we should just pass offset and limit |
| 112 | + |
| 113 | + diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result)) |
| 114 | + diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result)) |
| 115 | + |
| 116 | + if (diff.changes) { |
| 117 | + hasChanges = true |
| 118 | + } |
| 119 | + } |
| 120 | + |
| 121 | + if (hasChanges) { |
| 122 | + this.output.sendData(new MultiSet(result)) |
| 123 | + } |
| 124 | + } |
| 125 | + |
| 126 | + run(): void { |
| 127 | + const result: Array<[[K, IndexedValue<T>], number]> = [] |
| 128 | + for (const message of this.inputMessages()) { |
| 129 | + for (const [item, multiplicity] of message.getInner()) { |
| 130 | + const [key, value] = item |
| 131 | + this.#processElement(key, value, multiplicity, result) |
| 132 | + } |
| 133 | + } |
| 134 | + |
| 135 | + if (result.length > 0) { |
| 136 | + this.output.sendData(new MultiSet(result)) |
| 137 | + } |
| 138 | + } |
| 139 | + |
| 140 | + #processElement( |
| 141 | + key: K, |
| 142 | + value: T, |
| 143 | + multiplicity: number, |
| 144 | + result: Array<[[K, IndexedValue<T>], number]>, |
| 145 | + ): void { |
| 146 | + const groupKey = this.#groupKeyFn(key, value) |
| 147 | + const state = this.#getOrCreateGroupState(groupKey) |
| 148 | + |
| 149 | + const changes = state.processElement(key, value, multiplicity) |
| 150 | + handleMoveIn(changes.moveIn, result) |
| 151 | + handleMoveOut(changes.moveOut, result) |
| 152 | + |
| 153 | + // Cleanup empty groups to prevent memory leaks |
| 154 | + this.#cleanupGroupIfEmpty(groupKey, state) |
| 155 | + } |
| 156 | +} |
| 157 | + |
| 158 | +/** |
| 159 | + * Limits the number of results per group based on a comparator, with optional offset. |
| 160 | + * Uses fractional indexing to minimize the number of changes when elements move positions. |
| 161 | + * Each element is assigned a fractional index that is lexicographically sortable. |
| 162 | + * When elements move, only the indices of the moved elements are updated, not all elements. |
| 163 | + * |
| 164 | + * This operator groups elements by the provided groupKeyFn and applies the limit/offset |
| 165 | + * independently to each group. |
| 166 | + * |
| 167 | + * @param comparator - A function that compares two elements for ordering |
| 168 | + * @param options - Configuration including groupKeyFn, limit, and offset |
| 169 | + * @returns A piped operator that orders elements per group and limits results per group |
| 170 | + */ |
| 171 | +export function groupedTopKWithFractionalIndex<K extends string | number, T>( |
| 172 | + comparator: (a: T, b: T) => number, |
| 173 | + options: GroupedTopKWithFractionalIndexOptions<K, T>, |
| 174 | +): PipedOperator<[K, T], [K, IndexedValue<T>]> { |
| 175 | + return ( |
| 176 | + stream: IStreamBuilder<[K, T]>, |
| 177 | + ): IStreamBuilder<[K, IndexedValue<T>]> => { |
| 178 | + const output = new StreamBuilder<[K, IndexedValue<T>]>( |
| 179 | + stream.graph, |
| 180 | + new DifferenceStreamWriter<[K, IndexedValue<T>]>(), |
| 181 | + ) |
| 182 | + const operator = new GroupedTopKWithFractionalIndexOperator<K, T>( |
| 183 | + stream.graph.getNextOperatorId(), |
| 184 | + stream.connectReader(), |
| 185 | + output.writer, |
| 186 | + comparator, |
| 187 | + options, |
| 188 | + ) |
| 189 | + stream.graph.addOperator(operator) |
| 190 | + return output |
| 191 | + } |
| 192 | +} |
0 commit comments