kafka-consumer-offset-monitor

此为监控kafka consumer offset的代码,仅供记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import kafka.common.OffsetAndMetadata;
import kafka.coordinator.BaseKey;
import kafka.coordinator.GroupMetadataManager;
import kafka.coordinator.GroupTopicPartition;
import kafka.coordinator.OffsetKey;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.log4j.Logger;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.*;

/**
* 保消费数据线程
* 通过 eebbk-monitor-group 消费 __consumer_offsets 数据,获取kafka中的消费数据。确定所有消费组、Topic、Partition当前的 consumer offset
* 通过 consumer 获取当前TopicPartition的最大最小offset
*
* created by lxm at 2020-08-31
*/
public class KafkaOffset2Mysql extends Thread {

private static final Logger LOG = Logger.getLogger(KafkaOffset2Mysql.class);

private String bootstrapServers;
private Connection connection;
private String clusterId;

public KafkaOffset2Mysql(String bootstrapServers, Connection connection){
this.bootstrapServers = bootstrapServers;
this.connection = connection;
String s = bootstrapServers.split(":")[0];
int length = s.length();
this.clusterId = s.substring(0,length-1);

}

@Override
public void run(){

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "eebbk-monitor-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props);

kafkaConsumer.subscribe(Collections.singleton("__consumer_offsets"));

Set<TopicPartition> topicPartitionSet = new HashSet<>();
long beginTimeMillis = System.currentTimeMillis();
HashMap<String, MonitorOffsetResult> resultHashMap = new HashMap<>();
Map<TopicPartition, Long> topicPartitionLEOMap = null;
Map<TopicPartition, Long> topicPartitionLSOMap = null;


while (true){
ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record:consumerRecords){
BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (record.value()==null){
System.out.println(key);
continue;
}
if (key instanceof OffsetKey){
GroupTopicPartition groupTopicPartition = (GroupTopicPartition) key.key();
OffsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
String groupTopicPartitionStr = groupTopicPartition.group() + "__" + groupTopicPartition.topicPartition().topic() + "__" + groupTopicPartition.topicPartition().partition();
if (!resultHashMap.containsKey(groupTopicPartitionStr)){
MonitorOffsetResult monitorOffsetResult = new MonitorOffsetResult();
monitorOffsetResult.setGroup(groupTopicPartition.group());
monitorOffsetResult.setTopicPartition(groupTopicPartition.topicPartition());
monitorOffsetResult.setPreviousCurrentOffset(offsetAndMetadata.offset());
monitorOffsetResult.setPreviousLogEndOffset(0L);
topicPartitionSet.add(groupTopicPartition.topicPartition());
resultHashMap.put(groupTopicPartitionStr, monitorOffsetResult);
}
resultHashMap.get(groupTopicPartitionStr).setCurrentOffset(offsetAndMetadata.offset());
}
// else if (key instanceof GroupMetadataKey){
//
// GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey)key).key(), ByteBuffer.wrap(record.value()));
//
// System.out.println("key:"+key+" value:"+groupMetadata.toString());
//
// }
}

if (System.currentTimeMillis()-beginTimeMillis>=1000*60){

kafkaConsumer.commitAsync();

// 取整分
String dateTime = LocalDateTime.now().toString().replace("T", " ").substring(0,16)+":00";
LOG.info("提交新的offset信息到MySQL:"+dateTime);

try {
topicPartitionLEOMap = kafkaConsumer.endOffsets(topicPartitionSet);
topicPartitionLSOMap = kafkaConsumer.beginningOffsets(topicPartitionSet);
} catch (Exception e) {
if (topicPartitionLEOMap==null || topicPartitionLSOMap==null){
LOG.error("获取 LEO 或 LSO 异常!"+e);
continue;
}
LOG.warn("获取 LEO 或 LSO 异常!使用前次记录!");
}

for (MonitorOffsetResult monitorOffsetResult: resultHashMap.values()){

TopicPartition topicPartition = monitorOffsetResult.getTopicPartition();

if (!monitorOffsetResult.isCanInsert() && monitorOffsetResult.getCurrentOffset() == 0L){
monitorOffsetResult.setCanInsert(true);
continue;
}

// if (monitorOffsetResult.getCurrentOffset() == 0L){
// monitorOffsetResult.setPreviousCurrentOffset(monitorOffsetResult.getCurrentOffset());
// }

if (monitorOffsetResult.getPreviousCurrentOffset() == 0L){
monitorOffsetResult.setPreviousCurrentOffset(monitorOffsetResult.getCurrentOffset());
}
if (monitorOffsetResult.getPreviousLogEndOffset()==0L){
monitorOffsetResult.setPreviousLogEndOffset(topicPartitionLEOMap.get(topicPartition));
}
if (topicPartitionLEOMap.get(topicPartition) != null ){
monitorOffsetResult.setLogEndOffset(topicPartitionLEOMap.get(topicPartition));
}
if (topicPartitionLSOMap.get(topicPartition) != null ){
monitorOffsetResult.setLogStartOffset(topicPartitionLSOMap.get(topicPartition));
}
String sql = "insert into kafka_monitor_offsets values (?,?,?,?,?,?,?,?,?,?,?,?)";
try(PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, clusterId);
ps.setString(2, dateTime);
ps.setString(3, monitorOffsetResult.getGroup());
ps.setString(4, monitorOffsetResult.getTopicPartition().topic());
ps.setInt( 5, monitorOffsetResult.getTopicPartition().partition());
ps.setLong( 6, monitorOffsetResult.getLogStartOffset());
ps.setLong( 7, monitorOffsetResult.getLogEndOffset());
ps.setLong( 8, monitorOffsetResult.getCurrentOffset());
ps.setLong( 9, monitorOffsetResult.getLogEndOffset() - monitorOffsetResult.getCurrentOffset());
ps.setLong( 10, monitorOffsetResult.getCurrentOffset() - monitorOffsetResult.getLogStartOffset());
ps.setLong( 11,monitorOffsetResult.getPreviousCurrentOffset());
ps.setLong( 12,monitorOffsetResult.getPreviousLogEndOffset());
ps.execute();

} catch (SQLException e) {
LOG.error("执行SQL出错:\n"+e);
}

// reset Previous value from Current
monitorOffsetResult.setPreviousCurrentOffset();
monitorOffsetResult.setPreviousLogEndOffset(topicPartitionLEOMap.get(topicPartition));
}
beginTimeMillis = System.currentTimeMillis();
}
}
}
}