• Error:KafkaStorageException打开的文件过多


    问题描述,在Flink集群大数据处理过程中,向Kafka进行生产数据和消费数据;如果Flink处理过程中出现异常,采取相应的重启机制或设置检查点策略;项目启动后,随着设备接入越来越多,kafka的topic动态产生的也越来越多,Flink处理开始出现异常

    1. java.io.IOException: Could not perform checkpoint 87 for operator Sink: Unnamed (34/90)#99.
    2. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
    3. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    4. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    5. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    6. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
    7. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    8. at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    9. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    10. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    11. at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    12. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    13. at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    14. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    15. at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    16. at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    17. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    18. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    19. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    20. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    21. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    22. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    23. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    24. at java.lang.Thread.run(Thread.java:748)
    25. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 87 for operator Sink: Unnamed (34/90)#99. Failure reason: Checkpoint was declined.
    26. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
    27. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
    28. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    29. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
    30. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
    31. at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
    32. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
    33. at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
    34. at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
    35. at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    36. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
    37. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
    38. ... 22 more
    39. Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation
    40. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1429)
    41. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1117)
    42. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1014)
    43. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:102)
    44. at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:345)
    45. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1122)
    46. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    47. at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    48. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
    49. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
    50. ... 33 more
    51. Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation

    Kafka集群中某一台服务器挂掉,报错信息如下:

    1. [2022-08-01 14:55:22,453] ERROR Error while writing to checkpoint file /home/kafka-logs/fan_sink_29-1/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel)
    2. java.io.FileNotFoundException: /home/kafka-logs/topic_min/leader-epoch-checkpoint.tmp (打开的文件过多)
    3. at java.io.FileOutputStream.open0(Native Method)
    4. at java.io.FileOutputStream.open(FileOutputStream.java:270)
    5. at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    6. at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
    7. at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:94)
    8. at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
    9. at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:70)
    10. at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:292)
    11. at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:61)
    12. at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1(Log.scala:1368)
    13. at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1$adapted(Log.scala:1367)
    14. at scala.Option.foreach(Option.scala:437)
    15. at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1367)
    16. at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:592)
    17. at kafka.cluster.Partition.makeLeader(Partition.scala:547)
    18. at kafka.server.ReplicaManager.$anonfun$makeLeaders$5(ReplicaManager.scala:1568)
    19. at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
    20. at scala.collection.mutable.HashMap$Node.foreachEntry(HashMap.scala:633)
    21. at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:499)
    22. at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1566)
    23. at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1411)
    24. at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:258)
    25. at kafka.server.KafkaApis.handle(KafkaApis.scala:171)
    26. at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
    27. at java.lang.Thread.run(Thread.java:748)

    处理方案如下:

    //修改操作系统限制
    [root@kafka101 ~] vi /etc/security/limits.conf

    root soft nofile 65536
    root hard nofile 65536

    //查找包含kafka的目录或文件【定位kafka.service】

    [root@kafka103 ~]# cd /

    [root@kafka103 ~]# find / -name *kafka*

    /etc/systemd/system/kafka.service

    [root@kafka103 ~]# cd /etc/systemd/system/
    //修改配置-增加读取文件大小

    [root@kafka103 ~]# vi kafka.service

    #增加最大文件数
    LimitNOFILE=65535

    [root@kafka103 ~]# systemctl daemon-reload 

    //重启kafka

    [root@kafka103 ~]# systemctl stop kafka

    [root@kafka103 ~]# systemctl start kafka

    //查看kafka进程

    [root@kafka103 system]# ps -ef|grep kafka
    这里找到kafka进程号为19694

    1. [root@kafka103 system]# cat /proc/19694/limits
    2. Limit Soft Limit Hard Limit Units
    3. Max cpu time unlimited unlimited seconds
    4. Max file size unlimited unlimited bytes
    5. Max data size unlimited unlimited bytes
    6. Max stack size 8388608 unlimited bytes
    7. Max core file size 0 unlimited bytes
    8. Max resident set unlimited unlimited bytes
    9. Max processes 2062355 2062355 processes
    10. Max open files 65535 65535 files
    11. Max locked memory 65536 65536 bytes
    12. Max address space unlimited unlimited bytes
    13. Max file locks unlimited unlimited locks
    14. Max pending signals 2062355 2062355 signals
    15. Max msgqueue size 819200 819200 bytes
    16. Max nice priority 0 0
    17. Max realtime priority 0 0
    18. Max realtime timeout unlimited unlimited

    Max Open Files  已经变为65535

    至此"打开文件过多"问题已处理完毕

  • 相关阅读:
    【高等数学基础进阶】定积分与反常积分-定积分
    【诈骗离你我很近】中国同胞进来看看国外诈骗新套路。
    【JavaWeb】Filter
    7.26模拟赛总结
    若依前后端分离版开源项目学习
    设计模式(十)----结构型模式之适配器模式
    什么是接触电流怎么测?
    小分子CDK9抑制剂 | MedChemExpress
    vue3的params传参失效的解决方案state
    网络安全内网渗透之信息收集--systeminfo查看电脑有无加域
  • 原文地址:https://blog.csdn.net/learnworm/article/details/126106548