Kafka问题记录

日志:

1
2
3
4
5
6
Exception in thread "Thread-0" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1090)
at com.eebbk.da.kafka2HDFS.handle.KafkaToParquetAutoOffset.run(KafkaToParquetAutoOffset.java:287)
at java.lang.Thread.run(Thread.java:748)

查看日志,因为处理poll到数据的时间超过了 max.poll.interval.ms 导致 consumer 发生了 rebalance ,从而无法手动提交 offset

溯源:影响处理poll数据的参数是 max.poll.records,因为此参数设置过大,导致超 max.poll.interval.ms 时间。(max.poll.records默认500条,max.poll.interval.ms 默认300000)

解决办法:降低吞吐量,即将 max.poll.records 调小,目前环境我设置的为5000,调整为2000

​ 调整超时时间,将 max.poll.interval.ms 调整为 600000

​ 在手动提交失败是,跳过本次提交,继续处理,期待下一次提交。

KAFKA broker一直起不来,有时候有报错信息,有时候直接崩溃。

启动日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:111)
at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:101)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:101)
at kafka.log.LogSegment.truncateTo(LogSegment.scala:292)
at kafka.log.Log.truncateTo(Log.scala:893)
at kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:301)
at kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:293)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.LogManager.truncateTo(LogManager.scala:293)
at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:854)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:700)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
at kafka.server.KafkaApis.handle(KafkaApis.scala:84)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885)
1
其他直接JVM崩溃

JVM崩溃日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 312 bytes for AllocateHeap
# Possible reasons:
# The system is out of physical RAM or swap space
# In 32 bit mode, the process size limit was hit
# Possible solutions:
# Reduce memory load on the system
# Increase physical memory or swap space
# Check if swap backing store is full
# Use 64 bit Java on a 64 bit OS
# Decrease Java heap size (-Xmx/-Xms)
# Decrease number of Java threads
# Decrease Java thread stack sizes (-Xss)
# Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
# Out of Memory Error (allocation.inline.hpp:61), pid=56865, tid=0x00007fb2aa587700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode linux-amd64 compressed oops)
# Core dump written. Default location: /data1/kafka/kafka_2.11-0.10.2.1/core or core.56865
#

--------------- T H R E A D ---------------

Current thread (0x00007fcaa1e3f000): JavaThread "ExpirationReaper-5" [_thread_in_vm, id=57248, stack(0x00007fb2aa487000,0x00007fb2aa588000)]

Stack: [0x00007fb2aa487000,0x00007fb2aa588000], sp=0x00007fb2aa586900, free space=1022k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
V [libjvm.so+0xac826a]
V [libjvm.so+0x4fd4cb]
V [libjvm.so+0x2e23c8]
V [libjvm.so+0x2e2444]
V [libjvm.so+0x70f40a]
V [libjvm.so+0xa76910]
V [libjvm.so+0x927568]
C [libpthread.so.0+0x7aa1]


--------------- P R O C E S S ---------------
...
...
power management:



Memory: 4k page, physical 65860516k(289912k free), swap 20479996k(19336528k free)

vm_info: Java HotSpot(TM) 64-Bit Server VM (25.131-b11) for linux-amd64 JRE (1.8.0_131-b11), built on Mar 15 2017 01:23:40 by "java_re" with gcc 4.3.0 20080428 (Red Hat 4.3.0-8)

time: Wed Jun 10 08:48:32 2020
elapsed time: 322 seconds (0d 0h 5m 22s)

搜索 https://blog.csdn.net/b644ROfP20z37485O35M/article/details/81571710

问题解决

从上面分析解决问题的方法有两个

  • 增大系统限制/proc/sys/vm/max_map_count vm.max_map_count=200000直接写到/etc/sysctl.conf中,然后执行sysctl -p
  • kafka的索引文件是否不需要一直有?是否可以限制一下
问题总结

上面的过程是我思考的一个过程,可能过程有些绕,不过我这里可以来个简单的概述,描述下整个问题发生的过程:

kafka做了很多索引文件的内存映射,每个索引文件占用的内存还很大,这些索引文件并且一直占着没有释放,于是随着索引文件数的增多,而慢慢达到了物理内存的一个上限,比如映射到某个索引文件的时候,物理内存还剩1G,但是我们要映射一个超过1G的文件,因此会映射失败,映射失败接着就做了一次System GC,而在System GC过程中因为PermSize和MaxPermSize不一样,从而导致了在Full GC完之后Perm进行扩容,在扩容的时候因为又调用了一次mmap,而在mmap的时候check是否达到了vma的最大上限,也就是/proc/sys/vm/max_map_count里的65530,如果超过了,就直接crash了。

这只是我从此次crash文件里能想像到的一个现场,当然其实可能会有更多的场景,只要是能触发mmap动作的地方都有可能是导致crash的案发现场,比如后面又给了我一个crash文件,是在创建线程栈的时候因为mmap而导致的crash,完全和OOM没有关系,所以根本原因还是因为kafka做了太多的索引文件映射,导致mmap的vma非常多,超过了系统的限制,从而导致了crash。