这一片文章是关于安装kafka的,但是是基于前面几篇文章的基础之上的,请参考前面的安装hadoop,安装zookeeper。
准备
kafka的运行需要zookeeper,kafka自身是带有zookeeper的,但是是一个单节点,可以参考kafka的文档,了解如何使用。
这里使用的是上一篇文章中安装好的zookeeper。
下载kafka,这里下载的是kafka_2.11-0.8.2.1.tgz
解压文件包,将目录拷贝到/user/目录下,并修改权限: 1
chown -R hadoop:hadoop kafka_2.11-0.8.2.1
配置kafka
修改${KAFKA_HOME}/config/server.properties
: 1
2
3broker.id=1
log.dirs=/usr/kafka_2.11-0.8.2.1/logs
zookeeper.connect=master.hadoop:2181,slave1.hadoop:2181,slave2.hadoop:2181
对于具体的配置内容,请参考kafka broker configs。
针对上面的配置,需要创建日志目录:/usr/kafka_2.11-0.8.2.1/logs
拷贝到其他节点
将配置好的kafka拷贝到其他节点的相应的目录下。
需要注意的是拷贝到其他节点以后,需要修改${KAFKA_HOME}/config/server.properties
中的broker.id
, 需要保证broker.id
是一个唯一的整数。
启动和验证
启动和停止kafka: 1
2bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-stop.sh
- 连通性测试
在一台服务器上运行consumer: 1
bin/kafka-console-consumer.sh --zookeeper master.hadoop:2181 --topic test --from-beginning
在另外一台服务器上运行producer: 1
bin/kafka-console-producer.sh --broker-list slave1.hadoop:9092 --topic test
这时在producer中,输入任何消息,在consumer端就可以看到。下面的图片:
producer的情况:
consumer的情况:
kafka Utility
- Create topic
1
bin/kafka-topics.sh --create --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --replication-factor ${replication_factor} --partitions ${partition_count} --topic ${topic_name}
Note: for zookeeper connect string \({zk_host}:\){zk_port}[${zk_chroot}], ${zk_chroot} is optional
- Describe topic
Describe topic all topics
1
bin/kafka-topics.sh --describe --zookeeper ${zk_host}:${zk_port}[${zk_chroot}]
Describe specified topic
1
bin/kafka-topics.sh --describe --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --topic ${topic_name}
- Publish messages. You can use this utility to test you topic.
1
bin/kafka-console-producer.sh --broker-list ${kafka_server}:{kafka_port} --topic ${topic_name}
Note: if you local kafka server is started, you can use localhost:9092 as broker-list.
Consume messages
1
bin/kafka-console-consumer.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] [--from-beginning] --topic ${topic_name}
Modifying topics Currently, support increase partitions well:
1
bin/kafka-topics.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --alter --topic ${topic_name} --partitions ${partition_count}
Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor.
- Delete topic
1
bin/kafka-topics.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --delete --topic ${topic_name}
Topic deletion option is disabled by default. To enable it set the server config: 1
delete.topic.enable=true
kafka常用命令
启动kafka 1
JMX_PORT=9996 nohup bin/kafka-server-start.sh config/server.properties >> /dev/null &
重新选举kafka 1
bin/kafka-preferred-replica-election.sh --zookeeper ${zk_host}:2181/ROOT-PATH
创建一个topic 1
./kafka-topics.sh --create --zookeeper ${zk_host}:2181/ROOT-PATH --replication-factor 3 --partition 3 --topic mytopic
增加partition数量 1
2
3./kafka-topics.sh --alter --topic your-topic-name --partitions new-values(contain old value) --zookeeper your zookeeper-address/kafka-path
./kafka-topics.sh --alter --topic nelo2-normal-logs --partitions 4 --zookeeper ${zk_host}:2181/ROOT-PATH
删除一个topic 1
./kafka-topics.sh --delete --zookeeper ${zk_host}:2181/ROOT-PATH --topic test4
查看topic 1
./kafka-topics.sh --list --zookeeper ${zk_host}:2181/ROOT-PATH
查看详细信息 1
./kafka-topics.sh --describe --zookeeper ${zk_host}:2181/ROOT-PATH
创建一个生产者 1
./kafka-console-producer.sh --broker-list ${zk_host}:2181/ROOT-PATH --topic mytopic
创建一个消费者 1
./kafka-console-consumer.sh --zookeeper ${zk_host}:2181/ROOT-PATH --from-beginning --topic mytopic
杀掉一个broker 1
pkill -9 -f server-1.properties
注:需要杀掉哪个broker就需要登录到哪台机器上执行以上命令。
查看指定消费者的消费状况 1
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 10.98.213.108:10013/NHN/NELO2/kafka --group river