diff --git a/pom.xml b/pom.xml index 4eda159..92d88ee 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,7 @@ 21 2.0.3 + 18.1.0 antaressimulatorteam AntaresSimulatorTeam_antares-datamanager-back https://sonarcloud.io @@ -54,10 +55,6 @@ org.springframework.boot spring-boot-starter-validation - - org.springframework.boot - spring-boot-starter-integration - org.springframework.boot spring-boot-starter-web @@ -94,6 +91,12 @@ org.apache.poi poi-ooxml 5.2.3 + + + org.apache.commons + commons-compress + + @@ -162,6 +165,23 @@ junit-jupiter test + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + org.apache.arrow + arrow-compression + ${arrow.version} + + @@ -176,6 +196,7 @@ lombok + --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED @@ -219,6 +240,9 @@ org.apache.maven.plugins maven-surefire-plugin 3.0.0-M9 + + --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED + diff --git a/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowReader.java b/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowReader.java new file mode 100644 index 0000000..03b3418 --- /dev/null +++ b/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowReader.java @@ -0,0 +1,91 @@ +package com.rte_france.antares.datamanager_back.util; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.types.pojo.Field; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +public class ArrowReader { + private static final int ROW_COUNT = 8760; + + public static Matrix readMatrixFromArrow(Path filePath) throws IOException { + Objects.requireNonNull(filePath); + + try (var channel = Files.newByteChannel(filePath); + var allocator = new RootAllocator(); + var reader = new ArrowFileReader(channel, allocator)) { + + reader.loadNextBatch(); + var root = reader.getVectorSchemaRoot(); + var fields = root.getSchema().getFields(); + + var columns = new ArrayList(); + fillMatrixColumns(fields, root, columns); + + return new Matrix(columns); + } + } + + private static void fillMatrixColumns(List fields, VectorSchemaRoot root, ArrayList columns) { + for (var field : fields) { + var vector = root.getVector(field.getName()); + var values = new double[vector.getValueCount()]; + for (var i = 0; i < vector.getValueCount(); i++) { + switch (vector) { + case Float8Vector f -> values[i] = f.get(i); + default -> throw new IllegalStateException(); + } + } + columns.add(new MatrixColumn(field.getName(), values)); + } + } + + public static Matrix readMatrixFromTxt(Path filePath) throws IOException { + Objects.requireNonNull(filePath); + + try (var lines = Files.lines(filePath)) { + var iterator = lines.iterator(); + if (!iterator.hasNext()) { + throw new IllegalArgumentException("File is empty"); + } + + var firstLine = iterator.next(); + var columnCount = firstLine.split("\\s+").length; + var data = new double[columnCount][ROW_COUNT]; + + fillDataList(firstLine, iterator, data); + + var columns = new ArrayList(data.length); + for (int j = 0; j < data.length; j++) { + columns.add(new MatrixColumn("Column" + j, data[j])); + } + + return new Matrix(columns); + } + } + + private static void fillDataList(String firstLine, Iterator iterator, double[][] data) { + var rowIndex = 0; + while (iterator.hasNext()) { + String[] values; + if (rowIndex == 0) { + values = firstLine.split("\\s+"); + } else { + values = iterator.next().split("\\s+"); + } + for (var j = 0; j < values.length; j++) { + data[j][rowIndex] = Double.parseDouble(values[j]); + } + rowIndex++; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowWriter.java b/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowWriter.java new file mode 100644 index 0000000..c5d414b --- /dev/null +++ b/src/main/java/com/rte_france/antares/datamanager_back/util/ArrowWriter.java @@ -0,0 +1,104 @@ +package com.rte_france.antares.datamanager_back.util; + +import org.apache.arrow.compression.CommonsCompressionFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ArrowWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class); + + private static Field doubleField(String name) { + return new Field(name, FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null); + } + + private static Schema createSchema(Matrix matrix) { + var fields = matrix.getColumns().stream() + .map(column -> doubleField(column.name())) + .collect(Collectors.toList()); + return new Schema(fields); + } + + private static void populateDoubleVector(VectorSchemaRoot table, MatrixColumn column) { + var vector = table.getVector(column.name()); + var values = column.values(); + var size = values.length; + switch (vector) { + case Float8Vector f8Vector -> { + f8Vector.allocateNew(size); + table.setRowCount(size); + IntStream.range(0, size).forEach(i -> f8Vector.set(i, values[i])); + } + default -> throw new IllegalStateException(); + } + } + + public void write(Matrix matrix, OutputStream out) throws IOException { + Objects.requireNonNull(matrix); + Objects.requireNonNull(out); + + var schema = createSchema(matrix); + try (var allocator = new RootAllocator(); + var table = VectorSchemaRoot.create(schema, allocator)) { + matrix.getColumns().forEach(c -> populateDoubleVector(table, c)); + + var compressionFactory = new CommonsCompressionFactory(); + try (var ch = Channels.newChannel(out); + var writer = new ArrowFileWriter(table, null, ch, null, IpcOption.DEFAULT, compressionFactory, CompressionUtil.CodecType.ZSTD)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + } + } + + public String getDefaultFileExtension() { + return "arrow"; + } + + public static void main(String[] args) { + var writer = new ArrowWriter(); + try { + var matrix = ArrowReader.readMatrixFromTxt(Path.of("src/main/resources/INPUT/load/load_fr_2030-2031.txt")); + + var startSerialization = System.nanoTime(); + var arrowFilePath = Path.of("src/main/resources/test-matrix.arrow"); + try (var out = Files.newOutputStream(arrowFilePath)) { + writer.write(matrix, out); + } + var endSerialization = System.nanoTime(); + var serializationTime = (endSerialization - startSerialization) / 1_000_000_000.0; + var fileSize = Files.size(arrowFilePath); + + var startDeserialization = System.nanoTime(); + var deserializedMatrix = ArrowReader.readMatrixFromArrow(arrowFilePath); + var endDeserialization = System.nanoTime(); + var deserializationTime = (endDeserialization - startDeserialization) / 1_000_000_000.0; + + LOGGER.info("Serialization time (s): {}", serializationTime); + LOGGER.info("Deserialization time (s): {}", deserializationTime); + LOGGER.info(".arrow file size (bytes): {}", fileSize); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/rte_france/antares/datamanager_back/util/ColumnType.java b/src/main/java/com/rte_france/antares/datamanager_back/util/ColumnType.java new file mode 100644 index 0000000..980a35e --- /dev/null +++ b/src/main/java/com/rte_france/antares/datamanager_back/util/ColumnType.java @@ -0,0 +1,12 @@ +/** + * Copyright (c) 2023, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.rte_france.antares.datamanager_back.util; + +public enum ColumnType { + INT, + FLOAT +} diff --git a/src/main/java/com/rte_france/antares/datamanager_back/util/Matrix.java b/src/main/java/com/rte_france/antares/datamanager_back/util/Matrix.java new file mode 100644 index 0000000..5f29dd2 --- /dev/null +++ b/src/main/java/com/rte_france/antares/datamanager_back/util/Matrix.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2023, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.rte_france.antares.datamanager_back.util; + +import lombok.Value; + +import java.util.List; + +@Value +public class Matrix { + + List columns; + + public int getRowCount() { + if (columns.isEmpty()) { + return 0; + } + return columns.getFirst().getSize(); + } + +} diff --git a/src/main/java/com/rte_france/antares/datamanager_back/util/MatrixColumn.java b/src/main/java/com/rte_france/antares/datamanager_back/util/MatrixColumn.java new file mode 100644 index 0000000..469afa6 --- /dev/null +++ b/src/main/java/com/rte_france/antares/datamanager_back/util/MatrixColumn.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2025, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.rte_france.antares.datamanager_back.util; + +import java.util.Arrays; +import java.util.Objects; + +public record MatrixColumn(String name, double[] values) { + public int getSize() { + return values.length; + } + + public MatrixColumn { + Objects.requireNonNull(name); + Objects.requireNonNull(values); + } + + public MatrixColumn renamed(String newName) { + return new MatrixColumn(newName, values); + } + + @Override + public String toString() { + return "MatrixColumn{" + + "name='" + name + '\'' + + ", values=" + Arrays.toString(values) + + '}'; + } +}