Install kafka

这一片文章是关于安装kafka的,但是是基于前面几篇文章的基础之上的,请参考前面的安装hadoop,安装zookeeper。

准备

kafka的运行需要zookeeper,kafka自身是带有zookeeper的,但是是一个单节点,可以参考kafka的文档,了解如何使用。

这里使用的是上一篇文章中安装好的zookeeper。

下载kafka,这里下载的是kafka_2.11-0.8.2.1.tgz

解压文件包,将目录拷贝到/user/目录下,并修改权限:

1
chown -R hadoop:hadoop kafka_2.11-0.8.2.1

配置kafka

修改${KAFKA_HOME}/config/server.properties:

1
2
3
broker.id=1
log.dirs=/usr/kafka_2.11-0.8.2.1/logs
zookeeper.connect=master.hadoop:2181,slave1.hadoop:2181,slave2.hadoop:2181

对于具体的配置内容,请参考kafka broker configs

针对上面的配置,需要创建日志目录:/usr/kafka_2.11-0.8.2.1/logs

拷贝到其他节点

将配置好的kafka拷贝到其他节点的相应的目录下。

需要注意的是拷贝到其他节点以后,需要修改${KAFKA_HOME}/config/server.properties中的broker.id, 需要保证broker.id是一个唯一的整数。

启动和验证

启动和停止kafka:

1
2
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-stop.sh

  • 连通性测试

在一台服务器上运行consumer:

1
bin/kafka-console-consumer.sh --zookeeper master.hadoop:2181 --topic test --from-beginning

在另外一台服务器上运行producer:

1
bin/kafka-console-producer.sh --broker-list slave1.hadoop:9092 --topic test

这时在producer中,输入任何消息,在consumer端就可以看到。下面的图片:

producer的情况:

consumer的情况:

kafka Utility

  1. Create topic
    1
    bin/kafka-topics.sh --create --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --replication-factor ${replication_factor} --partitions ${partition_count} --topic ${topic_name}

Note: for zookeeper connect string \({zk_host}:\){zk_port}[${zk_chroot}], ${zk_chroot} is optional

  1. Describe topic
  • Describe topic all topics

    1
    bin/kafka-topics.sh --describe --zookeeper ${zk_host}:${zk_port}[${zk_chroot}]

  • Describe specified topic

    1
    bin/kafka-topics.sh --describe --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --topic ${topic_name}

  1. Publish messages. You can use this utility to test you topic.
    1
    bin/kafka-console-producer.sh --broker-list ${kafka_server}:{kafka_port} --topic ${topic_name}

Note: if you local kafka server is started, you can use localhost:9092 as broker-list.

  1. Consume messages

    1
    bin/kafka-console-consumer.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] [--from-beginning] --topic ${topic_name}

  2. Modifying topics Currently, support increase partitions well:

    1
    bin/kafka-topics.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --alter --topic ${topic_name} --partitions ${partition_count}

Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor.

  1. Delete topic
    1
    bin/kafka-topics.sh --zookeeper ${zk_host}:${zk_port}[${zk_chroot}] --delete --topic ${topic_name}

Topic deletion option is disabled by default. To enable it set the server config:

1
delete.topic.enable=true

kafka常用命令

启动kafka

1
JMX_PORT=9996 nohup bin/kafka-server-start.sh config/server.properties >> /dev/null &

重新选举kafka

1
bin/kafka-preferred-replica-election.sh --zookeeper ${zk_host}:2181/ROOT-PATH

创建一个topic

1
./kafka-topics.sh --create --zookeeper ${zk_host}:2181/ROOT-PATH --replication-factor 3 --partition 3  --topic mytopic

增加partition数量

1
2
3
./kafka-topics.sh --alter --topic your-topic-name --partitions new-values(contain old value) --zookeeper your zookeeper-address/kafka-path

./kafka-topics.sh --alter --topic nelo2-normal-logs --partitions 4 --zookeeper ${zk_host}:2181/ROOT-PATH

删除一个topic

1
./kafka-topics.sh --delete --zookeeper ${zk_host}:2181/ROOT-PATH --topic test4

查看topic

1
./kafka-topics.sh --list --zookeeper ${zk_host}:2181/ROOT-PATH

查看详细信息

1
./kafka-topics.sh --describe --zookeeper ${zk_host}:2181/ROOT-PATH

创建一个生产者

1
./kafka-console-producer.sh --broker-list ${zk_host}:2181/ROOT-PATH --topic mytopic

创建一个消费者

1
./kafka-console-consumer.sh --zookeeper ${zk_host}:2181/ROOT-PATH --from-beginning --topic mytopic

杀掉一个broker

1
pkill -9 -f server-1.properties

注:需要杀掉哪个broker就需要登录到哪台机器上执行以上命令。

查看指定消费者的消费状况

1
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 10.98.213.108:10013/NHN/NELO2/kafka --group river

参考

http://kafka.apache.org/documentation.html#quickstart