Skip to content

Commit

Permalink
Serialize big array blocks (elastic#106373)
Browse files Browse the repository at this point in the history
Similar to ArrayBlocks, this change serializes the underlying
structure of BigArrayBlocks.
  • Loading branch information
dnhatn authored Mar 14, 2024
1 parent 925a9a3 commit b6f876f
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106373.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106373
summary: Serialize big array blocks
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_EXTENDED_ENRICH_INPUT_TYPE = def(8_607_00_0);
public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0);
public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -54,6 +56,29 @@ private BooleanBigArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static BooleanBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
BooleanBigArrayVector vector = null;
boolean success = false;
try {
vector = BooleanBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new BooleanBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public BooleanVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static BooleanBlock readFrom(BlockStreamInput in) throws IOException {
case SERIALIZE_BLOCK_VALUES -> BooleanBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> BooleanVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> BooleanArrayBlock.readArrayBlock(in.blockFactory(), in);
case SERIALIZE_BLOCK_BIG_ARRAY -> BooleanBigArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
Expand Down Expand Up @@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BooleanArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof BooleanBigArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
b.writeArrayBlock(out);
} else {
out.writeByte(SERIALIZE_BLOCK_VALUES);
BooleanBlock.writeValues(this, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -54,6 +56,29 @@ private DoubleBigArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static DoubleBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
DoubleBigArrayVector vector = null;
boolean success = false;
try {
vector = DoubleBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new DoubleBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public DoubleVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static DoubleBlock readFrom(BlockStreamInput in) throws IOException {
case SERIALIZE_BLOCK_VALUES -> DoubleBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> DoubleVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> DoubleArrayBlock.readArrayBlock(in.blockFactory(), in);
case SERIALIZE_BLOCK_BIG_ARRAY -> DoubleBigArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
Expand Down Expand Up @@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof DoubleArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof DoubleBigArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
b.writeArrayBlock(out);
} else {
out.writeByte(SERIALIZE_BLOCK_VALUES);
DoubleBlock.writeValues(this, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -54,6 +56,29 @@ private IntBigArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static IntBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
IntBigArrayVector vector = null;
boolean success = false;
try {
vector = IntBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new IntBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public IntVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static IntBlock readFrom(BlockStreamInput in) throws IOException {
case SERIALIZE_BLOCK_VALUES -> IntBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> IntVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> IntArrayBlock.readArrayBlock(in.blockFactory(), in);
case SERIALIZE_BLOCK_BIG_ARRAY -> IntBigArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
Expand Down Expand Up @@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof IntArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof IntBigArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
b.writeArrayBlock(out);
} else {
out.writeByte(SERIALIZE_BLOCK_VALUES);
IntBlock.writeValues(this, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -54,6 +56,29 @@ private LongBigArrayBlock(
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static LongBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
LongBigArrayVector vector = null;
boolean success = false;
try {
vector = LongBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new LongBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public LongVector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private static LongBlock readFrom(BlockStreamInput in) throws IOException {
case SERIALIZE_BLOCK_VALUES -> LongBlock.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> LongVector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> LongArrayBlock.readArrayBlock(in.blockFactory(), in);
case SERIALIZE_BLOCK_BIG_ARRAY -> LongBigArrayBlock.readArrayBlock(in.blockFactory(), in);
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
Expand Down Expand Up @@ -91,6 +92,9 @@ default void writeTo(StreamOutput out) throws IOException {
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof LongArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof LongBigArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
b.writeArrayBlock(out);
} else {
out.writeByte(SERIALIZE_BLOCK_VALUES);
LongBlock.writeValues(this, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,5 @@ static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
byte SERIALIZE_BLOCK_VALUES = 0;
byte SERIALIZE_BLOCK_VECTOR = 1;
byte SERIALIZE_BLOCK_ARRAY = 2;
byte SERIALIZE_BLOCK_BIG_ARRAY = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.$Array$;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.BitSet;

/**
Expand Down Expand Up @@ -54,6 +56,29 @@ public final class $Type$BigArrayBlock extends AbstractArrayBlock implements $Ty
: firstValueIndexes[getPositionCount()] == vector.getPositionCount();
}

static $Type$BigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
final SubFields sub = new SubFields(blockFactory, in);
$Type$BigArrayVector vector = null;
boolean success = false;
try {
vector = $Type$BigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
var block = new $Type$BigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
success = true;
return block;
} finally {
if (success == false) {
Releasables.close(vector);
blockFactory.adjustBreaker(-sub.bytesReserved);
}
}
}

void writeArrayBlock(StreamOutput out) throws IOException {
writeSubFields(out);
vector.writeArrayVector(vector.getPositionCount(), out);
}

@Override
public $Type$Vector asVector() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ $endif$
case SERIALIZE_BLOCK_VALUES -> $Type$Block.readValues(in);
case SERIALIZE_BLOCK_VECTOR -> $Type$Vector.readFrom(in.blockFactory(), in).asBlock();
case SERIALIZE_BLOCK_ARRAY -> $Type$ArrayBlock.readArrayBlock(in.blockFactory(), in);
$if(BytesRef)$$else$
case SERIALIZE_BLOCK_BIG_ARRAY -> $Type$BigArrayBlock.readArrayBlock(in.blockFactory(), in);
$endif$
default -> {
assert false : "invalid block serialization type " + serializationType;
throw new IllegalStateException("invalid serialization type " + serializationType);
Expand Down Expand Up @@ -106,6 +109,11 @@ $endif$
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof $Type$ArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_ARRAY);
b.writeArrayBlock(out);
$if(BytesRef)$$else$
} else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof $Type$BigArrayBlock b) {
out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
b.writeArrayBlock(out);
$endif$
} else {
out.writeByte(SERIALIZE_BLOCK_VALUES);
$Type$Block.writeValues(this, out);
Expand Down

0 comments on commit b6f876f

Please sign in to comment.