指定时间消费数据:需要把时间戳转换成对应的offset值。
代码如下:
Set<TopicPartition> assignment=kafkaCousumer.assignment();
// 获取分区分配方案
while(assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(2));
assignment = kafkaConsumer.assignment();
}
// 用来把时间戳转换成对应的offset值
Map<TopicPartition, Long> topicPartitionHashMap = new HashMap<TopicPartition, Long>();
// 为每个分区设定需要消费的时间值
for(TopicPartition topicPartition:assignment){
topicPartitionHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 60 * 60 * 1000); // 指定时间为一天(24小时)前
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestamp = kafkaConsumer.offsetForTimes(topicPartitionHashMap);
// 为每个分区设定需要消费的offset值:500
for(TopicPartition topicPartition:assignment){
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestamp.get(topicPartition);
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); // 把时间转换成offset
}