Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Caused by: java.io.IOException: java.nio.channels.ClosedChannelException #4958

Open
1 of 2 tasks
skdfeitian opened this issue Jan 20, 2025 · 8 comments
Open
1 of 2 tasks
Labels
bug Something isn't working

Comments

@skdfeitian
Copy link

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

paimon version 0.8.2

Compute Engine

flink version 1.17.1

Minimal reproduce step

java.lang.Exception: Could not perform checkpoint 18073 for operator Source: bg_action_source[1] -> Calc[2] -> Map -> Writer : dwd_new_user_detail_realtime (1/20)#5.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1184)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$13(StreamTask.java:1131)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.nio.channels.ClosedChannelException
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220)
at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:121)
at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:189)
at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1172)
... 14 more
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:158)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.paimon.fs.hadoop.HadoopFileIO$HadoopPositionOutputStream.write(HadoopFileIO.java:300)
at org.apache.paimon.format.parquet.writer.PositionOutputStreamAdapter.write(PositionOutputStreamAdapter.java:54)
at java.io.OutputStream.write(OutputStream.java:75)
at org.apache.paimon.shade.org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:903)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.finish(ParquetBulkWriter.java:57)
at org.apache.paimon.io.SingleFileWriter.close(SingleFileWriter.java:144)
at org.apache.paimon.io.RowDataFileWriter.close(RowDataFileWriter.java:95)
at org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:107)
at org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:144)
at org.apache.paimon.append.AppendOnlyWriter$DirectSinkWriter.flush(AppendOnlyWriter.java:365)
at org.apache.paimon.append.AppendOnlyWriter.flush(AppendOnlyWriter.java:195)
at org.apache.paimon.append.AppendOnlyWriter.prepareCommit(AppendOnlyWriter.java:183)
at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:198)
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:207)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:215)
... 24 more

What doesn't meet your expectations?

#3678

I referred to this link and set fs.hdfs.impl.disable.cache=true, but now I occasionally encounter the error above. How should I resolve it?

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@skdfeitian skdfeitian added the bug Something isn't working label Jan 20, 2025
@yangjf2019
Copy link
Contributor

Hi @skdfeitian ,Thanks for your issue! Could you provide more information for this problem?Such as code, env and so on.

@skdfeitian
Copy link
Author

skdfeitian commented Jan 20, 2025

To add more information: After adding the parameter fs.hdfs.impl.disable.cache, the error Caused by: java.io.IOException: Filesystem closed seems to have disappeared. It appears that this parameter is very effective.

However, the occasional occurrence of the java.nio.channels.ClosedChannelException error has not been resolved. I have nearly 200 Flink-related Paimon tasks, and 4 different types of tasks report this error. Some of them occur after running for dozens of days, but it is not very frequent. It feels like there might be an issue with the HDFS configuration, but we also have many real-time Flink tasks writing to HDFS, and we haven't encountered similar issues.

For example:
(1)Data lake ingestion tasks from MySQL CDC to Paimon via KafkaSyncDatabaseAction.
(2)Flink SQL tasks calculating PV and UV metrics and writing them to Paimon tables.
(3)Tasks that use tableEnv.executeSql to execute insert into statements, collecting Kafka data into Paimon tables.
(4)Tasks executed through paimon-flink-action.jar to perform compaction.

such as:

java.io.IOException: Could not perform checkpoint 14784 for operator Writer : ods_analyser_realtime (1/20)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Encountered an error while do compaction
at org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:101)
at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
... 22 more
Caused by: java.util.concurrent.ExecutionException: java.nio.channels.ClosedChannelException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.prepareCommit(AppendOnlyTableCompactionWorkerOperator.java:93)
... 30 more
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:158)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.paimon.fs.hadoop.HadoopFileIO$HadoopPositionOutputStream.write(HadoopFileIO.java:300)
at org.apache.paimon.format.parquet.writer.PositionOutputStreamAdapter.write(PositionOutputStreamAdapter.java:54)
at java.io.OutputStream.write(OutputStream.java:75)
at org.apache.paimon.shade.org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:903)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.finish(ParquetBulkWriter.java:57)
at org.apache.paimon.io.SingleFileWriter.close(SingleFileWriter.java:144)
at org.apache.paimon.io.RowDataFileWriter.close(RowDataFileWriter.java:95)
at org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:107)
at org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:144)
at org.apache.paimon.operation.AppendOnlyFileStoreWrite.lambda$compactRewriter$0(AppendOnlyFileStoreWrite.java:184)
at org.apache.paimon.append.AppendOnlyCompactionTask.doCompact(AppendOnlyCompactionTask.java:64)
at org.apache.paimon.flink.sink.AppendOnlyTableCompactionWorkerOperator.lambda$processElement$1(AppendOnlyTableCompactionWorkerOperator.java:108)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

such as:

org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, backoffTimeMS=60000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
at sun.reflect.GeneratedMethodAccessor75.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.io.IOException: Could not perform checkpoint 8 for operator CDC MultiplexWriter (7/10)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:9104, compressed_page_size:5104, crc:495053246, dictionary_page_header:DictionaryPageHeader(num_values:1138, encoding:PLAIN_DICTIONARY))
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:220)
at org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator.prepareCommit(CdcRecordStoreMultiWriteOperator.java:229)
at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:100)
at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:80)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
... 22 more
Caused by: java.io.IOException: can not write PageHeader(type:DICTIONARY_PAGE, uncompressed_page_size:9104, compressed_page_size:5104, crc:495053246, dictionary_page_header:DictionaryPageHeader(num_values:1138, encoding:PLAIN_DICTIONARY))
at org.apache.paimon.shade.org.apache.parquet.format.Util.write(Util.java:376)
at org.apache.paimon.shade.org.apache.parquet.format.Util.writePageHeader(Util.java:124)
at org.apache.paimon.shade.org.apache.parquet.format.converter.ParquetMetadataConverter.writeDictionaryPageHeader(ParquetMetadataConverter.java:1975)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeDictionaryPage(ParquetFileWriter.java:502)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:880)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:848)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:310)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:458)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:186)
at org.apache.paimon.shade.org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:124)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:319)
at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.finish(ParquetBulkWriter.java:57)
at org.apache.paimon.io.SingleFileWriter.close(SingleFileWriter.java:144)
at org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:107)
at org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:144)
at org.apache.paimon.mergetree.MergeTreeWriter.flushWriteBuffer(MergeTreeWriter.java:225)
at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:248)
at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:198)
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:207)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:215)
... 31 more
Caused by: org.apache.paimon.shade.parquet.org.apache.thrift.transport.TTransportException: java.nio.channels.ClosedChannelException
at org.apache.paimon.shade.parquet.org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:199)
at org.apache.paimon.shade.parquet.org.apache.thrift.protocol.TCompactProtocol.writeByteDirect(TCompactProtocol.java:482)
at org.apache.paimon.shade.parquet.org.apache.thrift.protocol.TCompactProtocol.writeByteDirect(TCompactProtocol.java:489)
at org.apache.paimon.shade.parquet.org.apache.thrift.protocol.TCompactProtocol.writeFieldBeginInternal(TCompactProtocol.java:263)
at org.apache.paimon.shade.parquet.org.apache.thrift.protocol.TCompactProtocol.writeFieldBegin(TCompactProtocol.java:245)
at org.apache.paimon.shade.org.apache.parquet.format.InterningProtocol.writeFieldBegin(InterningProtocol.java:71)
at org.apache.paimon.shade.org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.write(PageHeader.java:1127)
at org.apache.paimon.shade.org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.write(PageHeader.java:1025)
at org.apache.paimon.shade.org.apache.parquet.format.PageHeader.write(PageHeader.java:906)
at org.apache.paimon.shade.org.apache.parquet.format.Util.write(Util.java:373)
... 50 more
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:158)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:106)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.paimon.fs.hadoop.HadoopFileIO$HadoopPositionOutputStream.write(HadoopFileIO.java:300)
at org.apache.paimon.format.parquet.writer.PositionOutputStreamAdapter.write(PositionOutputStreamAdapter.java:54)
at org.apache.paimon.shade.parquet.org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:197)
... 59 more

@yangjf2019
Copy link
Contributor

yangjf2019 commented Jan 21, 2025

Hi @xuzifu666 ,Can I have some of your time to help look at this?Thanks! relation issue.

@xuzifu666
Copy link
Member

xuzifu666 commented Jan 21, 2025

This maybe due to hdfs config exception when read snapshot file, could consider about this way: set fs.hdfs.impl.disable.cache=true

@yangjf2019
Copy link
Contributor

This maybe due to hdfs config exception when read snapshot file, could consider about this issue: set fs.hdfs.impl.disable.cache=true

@skdfeitian Please try it again.

@skdfeitian
Copy link
Author

Hi , @xuzifu666
When I added the parameter fs.hdfs.impl.disable.cache=true to the job, the error java.io.IOException: Filesystem closed disappeared.
However, occasionally, I still encounter this error: Caused by: java.io.IOException: java.nio.channels.ClosedChannelException. Are these two errors caused by the same issue?

@xuzifu666
Copy link
Member

Hi , @xuzifu666 When I added the parameter fs.hdfs.impl.disable.cache=true to the job, the error java.io.IOException: Filesystem closed disappeared. However, occasionally, I still encounter this error: Caused by: java.io.IOException: java.nio.channels.ClosedChannelException. Are these two errors caused by the same issue?

It is not related to paimon, maybe you can communicate with hdfs engineer in your company to improve it. @skdfeitian

@skdfeitian
Copy link
Author

@xuzifu666 @yangjf2019 thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants