Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some constructor for chunk capacity #180

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
18 changes: 18 additions & 0 deletions java/tsfile/src/main/java/org/apache/tsfile/utils/PublicBAOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ private static int hugeCapacity(int minCapacity) {
return (minCapacity > MAX_ARRAY_SIZE) ? Integer.MAX_VALUE : MAX_ARRAY_SIZE;
}

/**
* Reserves the specified capacity for the byte array. If the current capacity is less than the
* argument, a new byte array is allocated. The method does nothing if the specified capacity is
* less than the current capacity.
*
* @param capacity
*/
public void reserve(int capacity) {
if (capacity > buf.length) {
if (capacity > MAX_ARRAY_SIZE) {
throw new OutOfMemoryError();
}
byte[] buf = new byte[capacity];
System.arraycopy(this.buf, 0, buf, 0, count);
this.buf = buf;
}
}

@Override
public void write(int b) {
ensureCapacity(count + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
IChunkGroupWriter groupWriter =
tryToInitialGroupWriter(new PlainDeviceID(tablet.deviceId), isAligned);

Path devicePath = new Path(tablet.deviceId);
List<MeasurementSchema> schemas = tablet.getSchemas();
if (schema.containsDevice(devicePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,40 @@ public void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas) thr
}
}

public void tryToAddSeriesWriter(MeasurementSchema measurementSchema, int rowCount)
throws IOException {
if (!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
ValueChunkWriter valueChunkWriter =
new ValueChunkWriter(
measurementSchema.getMeasurementId(),
measurementSchema.getCompressor(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getValueEncoder(),
rowCount);
valueChunkWriterMap.put(measurementSchema.getMeasurementId(), valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
}

public void tryToAddSeriesWriter(List<MeasurementSchema> measurementSchemas, int rowCount)
throws IOException {
for (MeasurementSchema schema : measurementSchemas) {
if (!valueChunkWriterMap.containsKey(schema.getMeasurementId())) {
ValueChunkWriter valueChunkWriter =
new ValueChunkWriter(
schema.getMeasurementId(),
schema.getCompressor(),
schema.getType(),
schema.getEncodingType(),
schema.getValueEncoder(),
rowCount);
valueChunkWriterMap.put(schema.getMeasurementId(), valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
}
}

@Override
public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
checkIsHistoryData(time);
Expand Down Expand Up @@ -170,6 +204,11 @@ public int write(Tablet tablet) throws WriteProcessException, IOException {
emptyValueChunkWriters.add(entry.getValue());
}
}

for (int columnIndex = 0; columnIndex < measurementSchemas.size(); ++columnIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
}
for (int row = 0; row < tablet.rowSize; row++) {
long time = tablet.timestamps[row];
checkIsHistoryData(time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,36 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

public AlignedChunkWriterImpl(VectorMeasurementSchema schema, int rowCount) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empyt line

List<String> valueMeasurementIdList = schema.getSubMeasurementsList();
List<TSDataType> valueTSDataTypeList = schema.getSubMeasurementsTSDataTypeList();
List<TSEncoding> valueTSEncodingList = schema.getSubMeasurementsTSEncodingList();
List<Encoder> valueEncoderList = schema.getSubMeasurementsEncoderList();

valueChunkWriterList = new ArrayList<>(valueMeasurementIdList.size());
for (int i = 0; i < valueMeasurementIdList.size(); i++) {
valueChunkWriterList.add(
new ValueChunkWriter(
valueMeasurementIdList.get(i),
schema.getCompressor(),
valueTSDataTypeList.get(i),
valueTSEncodingList.get(i),
valueEncoderList.get(i),
rowCount));
}
timeChunkWriter =
new TimeChunkWriter(
schema.getMeasurementId(),
schema.getCompressor(),
schema.getTimeTSEncoding(),
schema.getTimeEncoder(),
rowCount);

this.valueIndex = 0;
this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

/**
* This is used to rewrite file. The encoding and compression of the time column should be the
* same as the source file.
Expand Down Expand Up @@ -111,6 +141,32 @@ public AlignedChunkWriterImpl(
this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

public AlignedChunkWriterImpl(
IMeasurementSchema timeSchema, List<IMeasurementSchema> valueSchemaList, int rowCount) {

valueChunkWriterList = new ArrayList<>(valueSchemaList.size());
for (int i = 0; i < valueSchemaList.size(); i++) {
valueChunkWriterList.add(
new ValueChunkWriter(
valueSchemaList.get(i).getMeasurementId(),
valueSchemaList.get(i).getCompressor(),
valueSchemaList.get(i).getType(),
valueSchemaList.get(i).getEncodingType(),
valueSchemaList.get(i).getValueEncoder(),
rowCount));
}
timeChunkWriter =
new TimeChunkWriter(
timeSchema.getMeasurementId(),
timeSchema.getCompressor(),
timeSchema.getEncodingType(),
timeSchema.getTimeEncoder(),
rowCount);

this.valueIndex = 0;
this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

/**
* This is used to write 0-level file. The compression of the time column is 'LZ4' in the
* configuration by default. The encoding of the time column is 'TS_2DIFF' in the configuration by
Expand Down Expand Up @@ -146,6 +202,37 @@ public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList, int rowCount) {
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor();

valueChunkWriterList = new ArrayList<>(schemaList.size());
for (int i = 0; i < schemaList.size(); i++) {
valueChunkWriterList.add(
new ValueChunkWriter(
schemaList.get(i).getMeasurementId(),
schemaList.get(i).getCompressor(),
schemaList.get(i).getType(),
schemaList.get(i).getEncodingType(),
schemaList.get(i).getValueEncoder(),
rowCount));
}

timeChunkWriter =
new TimeChunkWriter(
"",
timeCompression,
timeEncoding,
TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType),
rowCount);

this.valueIndex = 0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

this.remainingPointsNumber = timeChunkWriter.getRemainingPointNumberForCurrentPage();
}

public void write(long time, int value, boolean isNull) {
valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,42 @@ public ChunkWriterImpl(IMeasurementSchema schema, boolean isMerging) {
this.isMerging = isMerging;
}

public ChunkWriterImpl(IMeasurementSchema schema, int rowCount) {
this.measurementSchema = schema;
this.compressor = ICompressor.getCompressor(schema.getCompressor());

this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;

// init statistics for this chunk and page
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
int bufferSize = rowCount * schema.getType().getDataTypeSize();
bufferSize = (bufferSize + 31) >> 5;
int pageSize =
Math.min(
bufferSize,
Math.min(
(int) pageSizeThreshold,
MINIMUM_RECORD_COUNT_FOR_CHECK * schema.getType().getDataTypeSize()));
// let the page size be multiple of 32
pageSize = (pageSize + 31) >> 5;
this.pageWriter = new PageWriter(measurementSchema, pageSize);

this.pageWriter.setTimeEncoder(measurementSchema.getTimeEncoder());
this.pageWriter.setValueEncoder(measurementSchema.getValueEncoder());
this.pageBuffer = new PublicBAOS(bufferSize);
// check if the measurement schema uses SDT
checkSdtEncoding();
}

public ChunkWriterImpl(IMeasurementSchema schema, boolean isMerging, int rowCount) {
this(schema, rowCount);
this.isMerging = isMerging;
}

private void checkSdtEncoding() {
if (measurementSchema.getProps() != null && !isMerging) {
if (measurementSchema.getProps().getOrDefault(LOSS, "").equals(SDT)) {
Expand Down Expand Up @@ -265,6 +301,7 @@ private void checkPageSizeAndMayOpenANewPage() {
long currentPageSize = pageWriter.estimateMaxMemSize();
if (currentPageSize > pageSizeThreshold) { // memory size exceeds threshold
// we will write the current page

logger.debug(
"enough size, write page {}, pageSizeThreshold:{}, currentPateSize:{}, valueCountInOnePage:{}",
measurementSchema.getMeasurementId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public void tryToAddSeriesWriter(MeasurementSchema schema) {
}
}

public void tryToAddSeriesWriter(MeasurementSchema schema, int rowCount) {
if (!chunkWriters.containsKey(schema.getMeasurementId())) {
this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema, rowCount));
}
}

@Override
public void tryToAddSeriesWriter(List<MeasurementSchema> schemas) {
for (IMeasurementSchema schema : schemas) {
Expand All @@ -75,6 +81,14 @@ public void tryToAddSeriesWriter(List<MeasurementSchema> schemas) {
}
}

public void tryToAddSeriesWriter(List<MeasurementSchema> schemas, int rowCount) {
for (IMeasurementSchema schema : schemas) {
if (!chunkWriters.containsKey(schema.getMeasurementId())) {
this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema, rowCount));
}
}
}

@Override
public int write(long time, List<DataPoint> data) throws IOException, WriteProcessException {
int pointCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,39 @@ public TimeChunkWriter(
this.pageWriter = new TimePageWriter(timeEncoder, ICompressor.getCompressor(compressionType));
}

public TimeChunkWriter(
String measurementId,
CompressionType compressionType,
TSEncoding encodingType,
Encoder timeEncoder,
int rowCount) {
this.measurementId = measurementId;
this.encodingType = encodingType;
this.compressionType = compressionType;

this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;

// init statistics for this chunk and page
this.statistics = new TimeStatistics();

int bufferCount =
rowCount * TSDataType.TIMESTAMP.getDataTypeSize()
+ PageHeader.estimateMaxPageHeaderSizeWithoutStatistics();
bufferCount = (bufferCount + 31) >> 5;
this.pageBuffer = new PublicBAOS(bufferCount);
int pageSize =
Math.min(
MINIMUM_RECORD_COUNT_FOR_CHECK * TSDataType.TIMESTAMP.getDataTypeSize(),
Math.min(bufferCount, (int) pageSizeThreshold));
this.pageWriter =
new TimePageWriter(
timeEncoder, ICompressor.getCompressor(compressionType), (pageSize + 31) >> 5);
}

public void write(long time) {
pageWriter.write(time);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,40 @@ public ValueChunkWriter(
new ValuePageWriter(valueEncoder, ICompressor.getCompressor(compressionType), dataType);
}

public ValueChunkWriter(
String measurementId,
CompressionType compressionType,
TSDataType dataType,
TSEncoding encodingType,
Encoder valueEncoder,
int rowCount) {
this.measurementId = measurementId;
this.encodingType = encodingType;
this.dataType = dataType;
this.compressionType = compressionType;
this.pageSizeThreshold = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
this.valueCountInOnePageForNextCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;

// init statistics for this chunk and page
this.statistics = Statistics.getStatsByType(dataType);

int bufferCount =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstract function and add comments

rowCount * dataType.getDataTypeSize()
+ PageHeader.estimateMaxPageHeaderSizeWithoutStatistics();
this.pageBuffer = new PublicBAOS((bufferCount + 31) >> 5);
int pageCapacity =
Math.min(
Math.min((int) pageSizeThreshold, bufferCount),
MINIMUM_RECORD_COUNT_FOR_CHECK * rowCount
+ PageHeader.estimateMaxPageHeaderSizeWithoutStatistics());
pageCapacity = (pageCapacity + 31) >> 5;
this.pageWriter =
new ValuePageWriter(
valueEncoder, ICompressor.getCompressor(compressionType), dataType, pageCapacity);
}

public void write(long time, long value, boolean isNull) {
pageWriter.write(time, value, isNull);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,31 @@ public PageWriter() {
this(null, null);
}

public PageWriter(int capacity) {
this(null, null);
this.timeOut = new PublicBAOS(capacity);
this.valueOut = new PublicBAOS(capacity);
}

public PageWriter(IMeasurementSchema measurementSchema) {
this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder());
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
}

public PageWriter(IMeasurementSchema measurementSchema, int pageSize) {
this(measurementSchema.getTimeEncoder(), measurementSchema.getValueEncoder(), pageSize);
this.statistics = Statistics.getStatsByType(measurementSchema.getType());
this.compressor = ICompressor.getCompressor(measurementSchema.getCompressor());
}

private PageWriter(Encoder timeEncoder, Encoder valueEncoder, int pageSize) {
this.timeOut = new PublicBAOS(pageSize);
this.valueOut = new PublicBAOS(pageSize);
this.timeEncoder = timeEncoder;
this.valueEncoder = valueEncoder;
}

private PageWriter(Encoder timeEncoder, Encoder valueEncoder) {
this.timeOut = new PublicBAOS();
this.valueOut = new PublicBAOS();
Expand Down Expand Up @@ -293,4 +312,12 @@ public long getPointNumber() {
public Statistics<? extends Serializable> getStatistics() {
return statistics;
}

public PublicBAOS getPageBuffer() {
return valueOut;
}

public PublicBAOS getTimeOut() {
return timeOut;
}
}
Loading