生产者发送数据到topic指定分区,代码:
int partition=0;
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String,String>(properties);
int i=0;
while(i<10){
KafkaProducer.send(new ProducerRecord("topic001", partition, "", "hello" + i)));
i++;
}
消费者从topic指定分区消费数据,代码:
int partition=0;
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String,String>(properties);
ArrayList<TopicPartition> topicPartitions=new ArrayList<TopicPartition>();
topicPartitions.add(new TopicPartition("topic001", partition));
kafkaConsumer.assign(topicPartitions);
while(true){
ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSecond(2));
for(ConsumerRecord<String,String> consumerRecord:consumerRecords){
System.out.println(consumerRecord);
}
}