首页 笔记 图片 查字 
所属分类:Kafka
浏览:62
内容:

指定时间消费数据:需要把时间戳转换成对应的offset值。
代码如下:
Set<TopicPartition> assignment=kafkaCousumer.assignment();
// 获取分区分配方案
while(assignment.size()==0){
   kafkaConsumer.poll(Duration.ofSeconds(2));
   assignment = kafkaConsumer.assignment();
}

// 用来把时间戳转换成对应的offset值
Map<TopicPartition, Long> topicPartitionHashMap = new HashMap<TopicPartition, Long>();

// 为每个分区设定需要消费的时间值
for(TopicPartition topicPartition:assignment){
   topicPartitionHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 60 * 60 * 1000); // 指定时间为一天(24小时)前
}

Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestamp = kafkaConsumer.offsetForTimes(topicPartitionHashMap);

// 为每个分区设定需要消费的offset值:500
for(TopicPartition topicPartition:assignment){
   OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestamp.get(topicPartition);
   kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); // 把时间转换成offset
}