Kafka 发送数据

为了向Kafka Topic发送数据,需要一个producer。生产者的作用是向Kafka主题发送或写入数据/消息。

在本节中,我们将学习生产者如何向Kafka主题发送消息。

启动生产者有以下步骤:

第一步: 启动zookeeper和kafka服务器。

Step2: 在命令行中输入命令: 'kafka-console-producer'。这将帮助用户从标准输入读取数据并将其写入 Kafka 主题。

注意: 根据操作系统选择".bat"或".sh"。

发送数据到Kafka主题 Sending data to Kafka Topics

突出显示的文本表示'broker-list'并且需要"主题 ID"来生成消息。这是因为生产者必须知道要写入数据的主题的 id。

步骤 3: 了解所有要求后,尝试向生产者生产消息主题使用命令:

'kafka-console-producer-broker-list localhost:9092-topic <topic_name>'。

Sending data to Kafka Topics

注意: 这里,9092是Kafka服务器的端口号。

这里,'myfirst' 主题被选择写入消息。

一个'>'将出现在新行中。开始生成一些消息,如下图:

Sending data to Kafka Topics

第4步: 按'Ctrl+c'并按'Y'键存在。

因此,这样,生产者可以向Kafka主题生产/发送多条消息.

Producer with Keys

Kafka Producer 可以使用或不使用密钥将数据写入主题。如果生产者没有指定键,则数据将存储到 key=null 的任何分区,否则数据将仅存储到指定的分区。需要使用"parse.key"和"key.seperator"来指定主题的键。使用的命令是:

'kafka-console-producer--broker-list localhost:9092--topic <topic_name>--property parse.key=true--property key.separator=,
> key,value
> another key,another value'

这里,key是具体的partition,value是生产者要写给topic的消息。

当一个topic不存在时?

假设生产者想向一个尚不存在的新主题发送消息。在这种情况下,在生成消息后会出现警告,如下面的快照所示。这只是一个警告。

Sending data to Kafka Topics

为什么会出现这个警告?

出现警告是因为之前主题"演示"不存在。但是,一旦生产者写了一条消息,Kafka 就以某种方式创建了该主题。虽然没有为这个意外的主题举行领导人选举,但可以看到"LEADER_NOT_AVAILABLE"错误。但是,下一次,生产者可以继续写入更多消息,因为不会再次出现警告。

用户可以使用'-list'命令查看,如下图:

Sending data to Kafka Topics

可以在列表中看到主题'demo'。

描述新主题

因此,由生产者直接创建的主题获取默认分区数及其复制因子为1、

例如,

Sending data to Kafka Topics

使用时描述的主题"演示" '-describe' 命令将 'PartitionCount' 和 'ReplicationFactor' 的值设为 1(默认值)。因此,在向其生成消息之前创建主题始终是更好的选择。

更改默认值

按照以下步骤更改默认值对于新主题:

  • 使用 Notepad++ 或任何其他文本编辑器打开"server.properties"文件。
  • 将 num.partitions=1 的值编辑为新值。设为 3、因此,每当引入此类新主题时,PartitionCount 和 ReplicationFactor 的数量将为 3(无论用户设置什么)。
  • 保存文件。

但是,总是先创建主题。

下一章:Kafka 控制台 Console

Kafka 控制台 Console:在本节中,用户将了解消费者如何消费或读取来自 Kafka 主题的消息。那里消费者采取以下步骤来消费来自主题的消息:步骤 1: 最初启动 zookeeper 和 kafka 服务器。步骤 2: 在 ...