Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 48 additions & 74 deletions packages/d2mini/src/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,32 @@ import { DefaultMap, hash } from './utils.js'
* exploit the key-value structure of the data to run efficiently.
*/
export class Index<K, V> {
#inner: DefaultMap<K, [V, number][]>
#changedKeys: Set<K>
#inner: DefaultMap<K, DefaultMap<string, [V, number]>>

constructor() {
this.#inner = new DefaultMap<K, [V, number][]>(() => [])
this.#changedKeys = new Set<K>()
this.#inner = new DefaultMap<K, DefaultMap<string, [V, number]>>(
() =>
new DefaultMap<string, [V, number]>(() => [undefined as any as V, 0]),
)
// #inner is as map of:
// {
// [key]: {
// [hash(value)]: [value, multiplicity]
// }
// }
}

toString(indent = false): string {
return `Index(${JSON.stringify(
[...this.#inner],
[...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]),
undefined,
indent ? ' ' : undefined,
)})`
}

get(key: K): [V, number][] {
return this.#inner.get(key)
const valueMap = this.#inner.get(key)
return [...valueMap.values()]
}

entries() {
Expand All @@ -36,78 +44,44 @@ export class Index<K, V> {
}

has(key: K): boolean {
return this.#inner.has(key) && this.#inner.get(key).length > 0
return this.#inner.has(key)
}

get size(): number {
let count = 0
for (const [, values] of this.#inner.entries()) {
if (values.length > 0) {
count++
}
}
return count
return this.#inner.size
}

addValue(key: K, value: [V, number]): void {
const values = this.#inner.get(key)
values.push(value)
this.#changedKeys.add(key)
}

append(other: Index<K, V>): void {
for (const [key, otherValues] of other.entries()) {
const thisValues = this.#inner.get(key)
for (const value of otherValues) {
thisValues.push(value)
}
this.#changedKeys.add(key)
}
}

compact(keys: K[] = []): void {
// If no keys specified, use the changed keys
const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys

for (const key of keysToProcess) {
if (!this.#inner.has(key)) continue

const values = this.#inner.get(key)
const consolidated = this.consolidateValues(values)

// Remove the key entirely and re-add only if there are non-zero values
this.#inner.delete(key)
if (consolidated.length > 0) {
this.#inner.get(key).push(...consolidated)
}
}

// Clear the changed keys after compaction
if (keys.length === 0) {
this.#changedKeys.clear()
} else {
// Only remove the keys that were explicitly compacted
for (const key of keys) {
this.#changedKeys.delete(key)
const [val, multiplicity] = value
const valueMap = this.#inner.get(key)
const valueHash = hash(val)
const [, existingMultiplicity] = valueMap.get(valueHash)
const newMultiplicity = existingMultiplicity + multiplicity
if (multiplicity !== 0) {
if (newMultiplicity === 0) {
valueMap.delete(valueHash)
} else {
valueMap.set(valueHash, [val, newMultiplicity])
}
}
}

private consolidateValues(values: [V, number][]): [V, number][] {
const consolidated = new Map<string, { value: V; multiplicity: number }>()

for (const [value, multiplicity] of values) {
const valueHash = hash(value)
if (consolidated.has(valueHash)) {
consolidated.get(valueHash)!.multiplicity += multiplicity
} else {
consolidated.set(valueHash, { value, multiplicity })
append(other: Index<K, V>): void {
for (const [key, otherValueMap] of other.entries()) {
const thisValueMap = this.#inner.get(key)
for (const [
valueHash,
[value, multiplicity],
] of otherValueMap.entries()) {
const [, existingMultiplicity] = thisValueMap.get(valueHash)
const newMultiplicity = existingMultiplicity + multiplicity
if (newMultiplicity === 0) {
thisValueMap.delete(valueHash)
} else {
thisValueMap.set(valueHash, [value, newMultiplicity])
}
}
}

return [...consolidated.values()]
.filter(({ multiplicity }) => multiplicity !== 0)
.map(({ value, multiplicity }) => [value, multiplicity])
}

join<V2>(other: Index<K, V2>): MultiSet<[K, [V, V2]]> {
Expand All @@ -116,23 +90,23 @@ export class Index<K, V> {
// We want to iterate over the smaller of the two indexes to reduce the
// number of operations we need to do.
if (this.size <= other.size) {
for (const [key, values1] of this.entries()) {
for (const [key, valueMap] of this.entries()) {
if (!other.has(key)) continue
const values2 = other.get(key)
for (const [val1, mul1] of values1) {
for (const [val2, mul2] of values2) {
const otherValues = other.get(key)
for (const [val1, mul1] of valueMap.values()) {
for (const [val2, mul2] of otherValues) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
}
}
}
}
} else {
for (const [key, values2] of other.entries()) {
for (const [key, otherValueMap] of other.entries()) {
if (!this.has(key)) continue
const values1 = this.get(key)
for (const [val2, mul2] of values2) {
for (const [val1, mul1] of values1) {
const values = this.get(key)
for (const [val2, mul2] of otherValueMap.values()) {
for (const [val1, mul1] of values) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
}
Expand Down
7 changes: 0 additions & 7 deletions packages/d2mini/src/operators/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<

// Append deltaB to indexB
this.#indexB.append(deltaB)

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
deltaA.compact()
deltaB.compact()
this.#indexA.compact()
this.#indexB.compact()
}
}

Expand Down
103 changes: 103 additions & 0 deletions packages/d2mini/src/operators/reduce copy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { IStreamBuilder, KeyValue } from '../types.js'
import {
DifferenceStreamReader,
DifferenceStreamWriter,
UnaryOperator,
} from '../graph.js'
import { StreamBuilder } from '../d2.js'
import { MultiSet } from '../multiset.js'
import { Index } from '../indexes.js'
import { hash } from '../utils.js'

/**
* Base operator for reduction operations (version-free)
*/
export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
#index = new Index<K, V1>()
#indexOut = new Index<K, V2>()
#f: (values: [V1, number][]) => [V2, number][]

constructor(
id: number,
inputA: DifferenceStreamReader<[K, V1]>,
output: DifferenceStreamWriter<[K, V2]>,
f: (values: [V1, number][]) => [V2, number][],
) {
super(id, inputA, output)
this.#f = f
}

run(): void {
const keysTodo = new Set<K>()

// Collect all input messages and update the index
for (const message of this.inputMessages()) {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
this.#index.addValue(key, [value, multiplicity])
keysTodo.add(key)
}
}

// For each key, compute the reduction and delta
const result: [[K, V2], number][] = []
for (const key of keysTodo) {
const curr = this.#index.get(key)
const currOut = this.#indexOut.get(key)
const out = this.#f(curr)

// Calculate delta between current and previous output
const delta = new Map<string, number>()
const values = new Map<string, V2>()
for (const [value, multiplicity] of out) {
const valueKey = hash(value)
values.set(valueKey, value)
delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity)
}
for (const [value, multiplicity] of currOut) {
const valueKey = hash(value)
values.set(valueKey, value)
delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity)
}

// Add non-zero deltas to result
for (const [valueKey, multiplicity] of delta) {
const value = values.get(valueKey)!
if (multiplicity !== 0) {
result.push([[key, value], multiplicity])
this.#indexOut.addValue(key, [value, multiplicity])
}
}
}

if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}
}
}

/**
* Reduces the elements in the stream by key (version-free)
*/
export function reduce<
K extends T extends KeyValue<infer K, infer _V> ? K : never,
V1 extends T extends KeyValue<K, infer V> ? V : never,
R,
T,
>(f: (values: [V1, number][]) => [R, number][]) {
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, R>> => {
const output = new StreamBuilder<KeyValue<K, R>>(
stream.graph,
new DifferenceStreamWriter<KeyValue<K, R>>(),
)
const operator = new ReduceOperator<K, V1, R>(
stream.graph.getNextOperatorId(),
stream.connectReader() as DifferenceStreamReader<KeyValue<K, V1>>,
output.writer,
f,
)
stream.graph.addOperator(operator)
stream.graph.addStream(output.connectReader())
return output
}
}
5 changes: 0 additions & 5 deletions packages/d2mini/src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
this.#index.compact()
this.#indexOut.compact()
}
}

Expand Down
5 changes: 0 additions & 5 deletions packages/d2mini/src/operators/topKWithFractionalIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,6 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
this.#index.compact()
this.#indexOut.compact()
}
}

Expand Down
50 changes: 4 additions & 46 deletions packages/d2mini/tests/indexes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,9 @@ describe('Index', () => {
index.addValue('key1', [10, 1])
index.addValue('key1', [10, -1])

// Before compaction, values are stored as-is
// Value is compacted on the fly
const result = index.get('key1')
expect(result).toEqual([
[10, 1],
[10, -1],
])

// After compaction, zero-multiplicity values are removed
index.compact(['key1'])
const compactedResult = index.get('key1')
expect(compactedResult).toEqual([])
expect(result).toEqual([])
})
})

Expand Down Expand Up @@ -70,14 +62,6 @@ describe('Index', () => {

index.append(other)

// Before compaction, values are stored separately
expect(index.get('key1')).toEqual([
[10, 2],
[10, 3],
])

// After compaction, multiplicities are combined
index.compact(['key1'])
expect(index.get('key1')).toEqual([[10, 5]])
})
})
Expand Down Expand Up @@ -164,43 +148,17 @@ describe('Index', () => {
// Append should also track changed keys
index.append(other)

// Before compaction, all values should be stored as-is
expect(index.get('key1')).toEqual([
[10, 1],
[10, -1],
])
expect(index.get('key2')).toEqual([[20, 1]])
expect(index.get('key3')).toEqual([[30, 1]])
expect(index.get('key4')).toEqual([
[40, 1],
[40, -1],
])
expect(index.get('key5')).toEqual([[50, 1]])

// Compact without arguments should only compact changed keys
index.compact()

// After compaction, values should be consolidated and zero-multiplicity entries removed
// Values should be consolidated and zero-multiplicity entries removed
expect(index.get('key1')).toEqual([]) // Cancelled out
expect(index.get('key2')).toEqual([[20, 1]])
expect(index.get('key3')).toEqual([[30, 1]])
expect(index.get('key4')).toEqual([]) // Cancelled out
expect(index.get('key5')).toEqual([[50, 1]])

// Add more values after compaction
// Add more values
index.addValue('key2', [25, 1])
index.addValue('key6', [60, 1])

// Only key2 and key6 should have new uncompacted values
expect(index.get('key2')).toEqual([
[20, 1],
[25, 1],
])
expect(index.get('key6')).toEqual([[60, 1]])

// Compact again - should only affect key2 and key6
index.compact()

expect(index.get('key2')).toEqual([
[20, 1],
[25, 1],
Expand Down