diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index d62703a608c0..04bef0b577c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -151,7 +151,14 @@ public PlanNode clone() { @Override public String toString() { - return "InsertRowNode{" + "time=" + time + ", values=" + Arrays.toString(values) + '}'; + return "InsertRowNode{" + + "insertTarget=" + + targetPath + + ", time=" + + time + + ", values=" + + Arrays.toString(values) + + '}'; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index a9695475a184..c8b840cc6bba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -1300,4 +1300,21 @@ public void updateLastCache(final String databaseName) { isAligned, measurementSchemas); } + + @Override + public String toString() { + return "InsertTabletNode{" + + "targetPath=" + + targetPath + + ", measurements=" + + Arrays.toString(measurements) + + ", rowCount=" + + rowCount + + ", timeRange=[," + + times[0] + + ", " + + times[times.length - 1] + + "]" + + '}'; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index bb5969dde9d4..88cfb8b1564e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -198,4 +198,9 @@ public WALFlushListener getWalFlushListener() { public abstract boolean isSignal(); public abstract long getMemorySize(); + + @Override + public String toString() { + return "WALEntry{" + "type=" + type + ", memTableId=" + memTableId + ", value=" + value + '}'; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index e3f544a88944..aee0d9449733 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -216,47 +216,58 @@ private void loadNextSegmentV1() throws IOException { } private void loadNextSegmentV2() throws IOException { + long position = channel.position(); SegmentInfo segmentInfo = getNextSegmentInfo(); - if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { - // A compressed segment - if (Objects.isNull(dataBuffer) - || dataBuffer.capacity() < segmentInfo.uncompressedSize - || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) { - MmapUtil.clean(dataBuffer); - dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); - } - dataBuffer.clear(); + try { + if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { + // A compressed segment + if (Objects.isNull(dataBuffer) + || dataBuffer.capacity() < segmentInfo.uncompressedSize + || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) { + MmapUtil.clean(dataBuffer); + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); + } + dataBuffer.clear(); - if (Objects.isNull(compressedBuffer) - || compressedBuffer.capacity() < segmentInfo.dataInDiskSize - || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { - MmapUtil.clean(compressedBuffer); - compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - } - compressedBuffer.clear(); - // limit the buffer to prevent it from reading too much byte than expected - compressedBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException("Unexpected end of file"); - } - compressedBuffer.flip(); - IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); - uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); - } else { - // An uncompressed segment - if (Objects.isNull(dataBuffer) - || dataBuffer.capacity() < segmentInfo.dataInDiskSize - || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { - MmapUtil.clean(dataBuffer); - dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); - } - dataBuffer.clear(); - // limit the buffer to prevent it from reading too much byte than expected - dataBuffer.limit(segmentInfo.dataInDiskSize); + if (Objects.isNull(compressedBuffer) + || compressedBuffer.capacity() < segmentInfo.dataInDiskSize + || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { + MmapUtil.clean(compressedBuffer); + compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); + } + compressedBuffer.clear(); + // limit the buffer to prevent it from reading too much byte than expected + compressedBuffer.limit(segmentInfo.dataInDiskSize); + if (readWALBufferFromChannel(compressedBuffer) != segmentInfo.dataInDiskSize) { + throw new IOException("Unexpected end of file"); + } + compressedBuffer.flip(); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); + uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor); + } else { + // An uncompressed segment + if (Objects.isNull(dataBuffer) + || dataBuffer.capacity() < segmentInfo.dataInDiskSize + || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { + MmapUtil.clean(dataBuffer); + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); + } + dataBuffer.clear(); + // limit the buffer to prevent it from reading too much byte than expected + dataBuffer.limit(segmentInfo.dataInDiskSize); - if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) { - throw new IOException("Unexpected end of file"); + if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) { + throw new IOException("Unexpected end of file"); + } } + } catch (Exception e) { + logger.error( + "Unexpected error when loading a wal segment {} in {}@{}", + segmentInfo, + logFile, + position, + e); + throw new IOException(e); } dataBuffer.flip(); } @@ -389,5 +400,17 @@ int headerSize() { ? Byte.BYTES + Integer.BYTES : Byte.BYTES + Integer.BYTES * 2; } + + @Override + public String toString() { + return "SegmentInfo{" + + "compressionType=" + + compressionType + + ", dataInDiskSize=" + + dataInDiskSize + + ", uncompressedSize=" + + uncompressedSize + + '}'; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java index 1370745cea24..a5622d29c490 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode; import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -38,6 +40,9 @@ * and give some methods to read the content from the disk. */ public class WALEntryPosition { + + private static final Logger LOGGER = LoggerFactory.getLogger(WALEntryPosition.class); + private volatile String identifier = ""; private volatile long walFileVersionId = -1; private volatile long position; @@ -107,6 +112,14 @@ ByteBuffer read() throws IOException { ByteBuffer buffer = ByteBuffer.allocate(size); is.read(buffer); return buffer; + } catch (Exception e) { + LOGGER.error( + "Unexpected error when reading a wal entry from {}@{} with size {}", + walFile, + position, + size, + e); + throw new IOException(e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALPrintTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALPrintTool.java new file mode 100644 index 000000000000..99f27fbd6ac0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALPrintTool.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal.utils; + +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader; + +import java.io.File; +import java.io.IOException; +import java.util.Stack; + +public class WALPrintTool { + + public void print(File file) throws IOException { + Stack stack = new Stack<>(); + if (file.exists()) { + stack.push(file); + } else { + System.out.println("The initial file does not exist"); + } + + while (!stack.isEmpty()) { + File f = stack.pop(); + if (f.isDirectory()) { + File[] files = f.listFiles(); + if (files != null) { + for (File child : files) { + stack.push(child); + } + } + } else if (f.getName().endsWith(".wal")) { + doPrint(f); + } + } + } + + private void doPrint(File file) throws IOException { + System.out.printf("-----------------%s---------------%n", file.getAbsoluteFile()); + try (WALReader reader = new WALReader(file)) { + long walCurrentReadOffset = reader.getWALCurrentReadOffset(); + while (reader.hasNext()) { + WALEntry entry = reader.next(); + System.out.printf("%d\t%s%n", walCurrentReadOffset, entry.toString()); + walCurrentReadOffset = reader.getWALCurrentReadOffset(); + } + } + } + + public static void main(String[] args) throws IOException { + if (args.length == 0) { + System.out.println("Usage: WALPrintTool "); + return; + } + + WALPrintTool tool = new WALPrintTool(); + tool.print(new File(args[0])); + } +} diff --git a/scripts/tools/wal/print-wal.sh b/scripts/tools/wal/print-wal.sh new file mode 100644 index 000000000000..1aa396d4fd55 --- /dev/null +++ b/scripts/tools/wal/print-wal.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +echo --------------------- +echo Starting Printing the WAL +echo --------------------- + +source "$(dirname "$0")/../../conf/iotdb-common.sh" +#get_iotdb_include and checkAllVariables is in iotdb-common.sh +VARS=$(get_iotdb_include "$*") +checkAllVariables +export IOTDB_HOME="${IOTDB_HOME}/.." +eval set -- "$VARS" + + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done + +MAIN_CLASS=org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALPrintTool + +"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@" +exit $? \ No newline at end of file diff --git a/scripts/tools/windows/wal/print-wal.bat b/scripts/tools/windows/wal/print-wal.bat new file mode 100644 index 000000000000..a13c42827da1 --- /dev/null +++ b/scripts/tools/windows/wal/print-wal.bat @@ -0,0 +1,67 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM + + +@echo off +echo ```````````````````````` +echo Starting Printing the WAL +echo ```````````````````````` + +if "%OS%" == "Windows_NT" setlocal + +pushd %~dp0..\..\.. +if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% +popd + +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALPrintTool +if NOT DEFINED JAVA_HOME goto :err + +@REM ----------------------------------------------------------------------------- +@REM JVM Opts +set JAVA_OPTS=-ea^ + -Dfile.encoding=UTF-8 + +@REM ----------------------------------------------------------------------------- +@REM ***** CLASSPATH library setting ***** +@REM Ensure that any user defined CLASSPATH variables are not used on startup +set CLASSPATH="%IOTDB_HOME%\lib\*" + +goto okClasspath + +:append +set CLASSPATH=%CLASSPATH%;%1 +goto :eof + +@REM ----------------------------------------------------------------------------- +:okClasspath + +"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp "%CLASSPATH%" %MAIN_CLASS% %* + +goto finally + + +:err +echo JAVA_HOME environment variable must be set! +pause + + +@REM ----------------------------------------------------------------------------- +:finally + +ENDLOCAL \ No newline at end of file