Kafka 创建消费者
用Java创建Kafka消费者
在上一节中,我们学习了用Java创建一个生产者。在本节中,我们将学习在 Java 中实现一个 Kafka 消费者。
创建消费者需要采取以下步骤:
- 创建记录器
- 创建消费者属性。
- 创建消费者。
- 为消费者订阅特定主题。
- 轮询一些新数据
我们来讨论一下在java中学习consumer实现的每一步。
创建Logger
实现logger是为了在程序中写入日志消息执行。用户需要创建一个 Logger 对象,该对象需要导入"org.slf4j 类"。下面的快照显示了 Logger 的实现:
创建消费者属性
与生产者属性类似,Apache Kafka 也提供了各种不同的属性来创建消费者。要了解每个消费者属性,请访问Apache Kafa>文档>配置>消费者配置的官方网站。这里,我们会列出一个消费者需要的属性,比如:
key.deserializer: 是key的Deserializer类,用于实现'org.apache.kafka.common.serialization.Deserializer' 接口。
value.deserializer: 实现 ' org.apache.kafka.common.serialization.Desrializer'接口。
bootstrap.servers: 是一个主机/端口对列表,用于建立与 Kafka 集群的初始连接。它不包含客户端所需的全套服务器。仅需要引导所需的服务器。
group.id: 这是一个唯一字符串,用于标识消费者组的消费者。当消费者通过订阅主题使用基于 Kafka 的偏移管理策略或组管理功能时需要此属性。
auto.offset.reset: 当没有时需要此属性初始偏移量存在或者当前偏移量在服务器上不再存在。有以下值用于重置偏移值:
earliest: 此偏移变量自动将值重置为其最早的偏移量。
最新: 这个偏移变量将偏移值重置为其最新的偏移。
无: 如果没有找到前一组的前一个偏移,它会抛出异常消费者。
其他任何事情: 它会向消费者抛出异常。
注意: 在我们的代码中,我们使用了 'earliest' 变量将值重置为最早的值。
这些是实现消费者所需的一些基本属性。让我们使用 IntelliJ IDEA 来实现。
Step1) 定义一个新的java类为'consumer1.java'。
Step2) > 描述类中的消费者属性,如下图所示:
在快照中,描述了所有必要的属性。
创建消费者
创建KafkaConsumer的对象,用于创建消费者,如下图:
订阅消费者
要从主题中读取消息,我们需要将消费者连接到指定的主题。消费者可以通过各种订阅 API 进行订阅。在这里,我们使用 Arrays.asList() 是因为用户可能想要订阅一个或多个主题。因此,Arrays.asList() 允许为消费者订阅多个主题。
下面的代码展示了消费者订阅的实现:
用户需要直接指定主题名称或通过字符串变量来读取消息。可以有多个主题也用逗号分隔。
轮询新数据
消费者通过轮询方式从Kafka读取数据。
poll 方法返回从当前分区的偏移量。指定等待数据的持续时间,否则返回一个空的 ConsumerRecord 给消费者。此外,记录器将获取记录键、分区、记录偏移量及其值。
下面给出了创建 Java 消费者的完整代码:
package com.firstgroupapp.aktutorial; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class consumer1 { public static void main(String[] args) { Logger logger= LoggerFactory.getLogger(consumer1.class.getName()); String bootstrapServers="127.0.0.1:9092"; String grp_id="third_app"; String topic="my_first"; //Creating consumer properties Properties properties=new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //creating consumer KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties); //Subscribing consumer.subscribe(Arrays.asList(topic)); //polling while(true){ ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String,String> record: records){ logger.info("Key: "+ record.key() + ", Value:" +record.value()); logger.info("Partition:" + record.partition()+",Offset:"+record.offset()); } } } }
这样,消费者可以按照每个步骤顺序读取消息。
消费者实现的输出可以在下面的快照中看到:
键值为空。这是因为我们之前没有指定任何键。由于是'earliest',所以从头开始的所有消息都会显示出来。
在Consumer Group中读取数据
用户一共可以有多个consumer读取数据.这可以通过消费者组来完成。在消费者组中,一个或多个消费者将能够从 Kafka 读取数据。如果用户想从头开始阅读消息,请重置 group_id 或更改 group_id。这将重置用户的应用程序并从一开始就显示消息。
下一章:Kafka 实时示例
到目前为止,我们学习了如何从 Apache Kafka 读取和写入数据。在本节中,我们将学习将真正的数据源放到 Kafka 中。在这里,我们将讨论一个实时应用程序,即 Twitter。 用户将了解如何创建 Twitte ...