kafka-consumer-offset-monitor 发表于 2020-08-31 | 分类于 Apache-Kafka , Kafka监控 | 阅读量 次 字数统计 826 | 阅读时长 5 此为监控kafka consumer offset的代码,仅供记录。123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169import kafka.common.OffsetAndMetadata;import kafka.coordinator.BaseKey;import kafka.coordinator.GroupMetadataManager;import kafka.coordinator.GroupTopicPartition;import kafka.coordinator.OffsetKey;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.ByteArrayDeserializer;import org.apache.log4j.Logger;import java.nio.ByteBuffer;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import java.time.LocalDateTime;import java.util.*;/** * 保消费数据线程 * 通过 eebbk-monitor-group 消费 __consumer_offsets 数据,获取kafka中的消费数据。确定所有消费组、Topic、Partition当前的 consumer offset * 通过 consumer 获取当前TopicPartition的最大最小offset * * created by lxm at 2020-08-31 */public class KafkaOffset2Mysql extends Thread { private static final Logger LOG = Logger.getLogger(KafkaOffset2Mysql.class); private String bootstrapServers; private Connection connection; private String clusterId; public KafkaOffset2Mysql(String bootstrapServers, Connection connection){ this.bootstrapServers = bootstrapServers; this.connection = connection; String s = bootstrapServers.split(":")[0]; int length = s.length(); this.clusterId = s.substring(0,length-1); } @Override public void run(){ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "eebbk-monitor-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Collections.singleton("__consumer_offsets")); Set<TopicPartition> topicPartitionSet = new HashSet<>(); long beginTimeMillis = System.currentTimeMillis(); HashMap<String, MonitorOffsetResult> resultHashMap = new HashMap<>(); Map<TopicPartition, Long> topicPartitionLEOMap = null; Map<TopicPartition, Long> topicPartitionLSOMap = null; while (true){ ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(100); for (ConsumerRecord<byte[], byte[]> record:consumerRecords){ BaseKey key = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key())); if (record.value()==null){ System.out.println(key); continue; } if (key instanceof OffsetKey){ GroupTopicPartition groupTopicPartition = (GroupTopicPartition) key.key(); OffsetAndMetadata offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value())); String groupTopicPartitionStr = groupTopicPartition.group() + "__" + groupTopicPartition.topicPartition().topic() + "__" + groupTopicPartition.topicPartition().partition(); if (!resultHashMap.containsKey(groupTopicPartitionStr)){ MonitorOffsetResult monitorOffsetResult = new MonitorOffsetResult(); monitorOffsetResult.setGroup(groupTopicPartition.group()); monitorOffsetResult.setTopicPartition(groupTopicPartition.topicPartition()); monitorOffsetResult.setPreviousCurrentOffset(offsetAndMetadata.offset()); monitorOffsetResult.setPreviousLogEndOffset(0L); topicPartitionSet.add(groupTopicPartition.topicPartition()); resultHashMap.put(groupTopicPartitionStr, monitorOffsetResult); } resultHashMap.get(groupTopicPartitionStr).setCurrentOffset(offsetAndMetadata.offset()); }// else if (key instanceof GroupMetadataKey){//// GroupMetadata groupMetadata = GroupMetadataManager.readGroupMessageValue(((GroupMetadataKey)key).key(), ByteBuffer.wrap(record.value()));//// System.out.println("key:"+key+" value:"+groupMetadata.toString());//// } } if (System.currentTimeMillis()-beginTimeMillis>=1000*60){ kafkaConsumer.commitAsync(); // 取整分 String dateTime = LocalDateTime.now().toString().replace("T", " ").substring(0,16)+":00"; LOG.info("提交新的offset信息到MySQL:"+dateTime); try { topicPartitionLEOMap = kafkaConsumer.endOffsets(topicPartitionSet); topicPartitionLSOMap = kafkaConsumer.beginningOffsets(topicPartitionSet); } catch (Exception e) { if (topicPartitionLEOMap==null || topicPartitionLSOMap==null){ LOG.error("获取 LEO 或 LSO 异常!"+e); continue; } LOG.warn("获取 LEO 或 LSO 异常!使用前次记录!"); } for (MonitorOffsetResult monitorOffsetResult: resultHashMap.values()){ TopicPartition topicPartition = monitorOffsetResult.getTopicPartition(); if (!monitorOffsetResult.isCanInsert() && monitorOffsetResult.getCurrentOffset() == 0L){ monitorOffsetResult.setCanInsert(true); continue; }// if (monitorOffsetResult.getCurrentOffset() == 0L){// monitorOffsetResult.setPreviousCurrentOffset(monitorOffsetResult.getCurrentOffset());// } if (monitorOffsetResult.getPreviousCurrentOffset() == 0L){ monitorOffsetResult.setPreviousCurrentOffset(monitorOffsetResult.getCurrentOffset()); } if (monitorOffsetResult.getPreviousLogEndOffset()==0L){ monitorOffsetResult.setPreviousLogEndOffset(topicPartitionLEOMap.get(topicPartition)); } if (topicPartitionLEOMap.get(topicPartition) != null ){ monitorOffsetResult.setLogEndOffset(topicPartitionLEOMap.get(topicPartition)); } if (topicPartitionLSOMap.get(topicPartition) != null ){ monitorOffsetResult.setLogStartOffset(topicPartitionLSOMap.get(topicPartition)); } String sql = "insert into kafka_monitor_offsets values (?,?,?,?,?,?,?,?,?,?,?,?)"; try(PreparedStatement ps = connection.prepareStatement(sql)) { ps.setString(1, clusterId); ps.setString(2, dateTime); ps.setString(3, monitorOffsetResult.getGroup()); ps.setString(4, monitorOffsetResult.getTopicPartition().topic()); ps.setInt( 5, monitorOffsetResult.getTopicPartition().partition()); ps.setLong( 6, monitorOffsetResult.getLogStartOffset()); ps.setLong( 7, monitorOffsetResult.getLogEndOffset()); ps.setLong( 8, monitorOffsetResult.getCurrentOffset()); ps.setLong( 9, monitorOffsetResult.getLogEndOffset() - monitorOffsetResult.getCurrentOffset()); ps.setLong( 10, monitorOffsetResult.getCurrentOffset() - monitorOffsetResult.getLogStartOffset()); ps.setLong( 11,monitorOffsetResult.getPreviousCurrentOffset()); ps.setLong( 12,monitorOffsetResult.getPreviousLogEndOffset()); ps.execute(); } catch (SQLException e) { LOG.error("执行SQL出错:\n"+e); } // reset Previous value from Current monitorOffsetResult.setPreviousCurrentOffset(); monitorOffsetResult.setPreviousLogEndOffset(topicPartitionLEOMap.get(topicPartition)); } beginTimeMillis = System.currentTimeMillis(); } } }}