Kafka集群安裝
來源:程序員人生 發布時間:2014-10-17 05:31:44 閱讀次數:3644次
安裝Kafka集群
假設我們有集群中,需要配置4個broker,形成下面圖表的Kafka集群。

2.1 配置文件
配置所有的Kafka的Producer文件,其中,brokerpid的值是獨一無二的數字值。幾個核心屬性如下:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker.
broker.id=11
# The port the socketserver listens on
port=9092
# Hostname the broker willbind to. If not set, the server will bind to all interfaces
host.name=hadoop-master
#zookeeper 集群
zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222</span>
由于Kafka依賴于Zookeeper集群,所以,必須先啟動Zookeeper集群。這里不作具體介紹。
2.2 broker屬性配置
在機器machine-0和Hadoop-master,配置好對應broker配置文件,兩個機器上的配置屬性相同。
Hadoop-master 上,Kafka的server.properties配置:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker.
broker.id=11
#############################Socket Server Settings #############################
# The port the socketserver listens on
port=9092
# Hostname the broker willbind to. If not set, the server will bind to all interfaces
host.name=hadoop-master
# Hostname the broker willadvertise to producers and consumers. If not set, it uses the
# value for"host.name" if configured. Otherwise, it will use the value returned from
#java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostnameroutable by clients>
# The port to publish toZooKeeper for clients to use. If this is not set,
# it will publish the sameport that the broker binds to.
#advertised.port=<portaccessible by clients>
# The number of threadshandling network requests
num.network.threads=3
# The number of threadsdoing disk I/O
num.io.threads=8
# The send buffer(SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer(SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of arequest that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#############################Log Basics #############################
# A comma seperated list ofdirectories under which to store log files
log.dirs=/opt/kafka/logs
# The default number of logpartitions per topic. More partitions allow greater
# parallelism forconsumption, but this will also result in more files across
# the brokers.
num.partitions=3
#############################Log Flush Policy #############################
# Messages are immediatelywritten to the filesystem but by default we only fsync() to sync
# the OS cache lazily. Thefollowing configurations control the flush of data to disk.
# There are a few importanttrade-offs here:
# 1. Durability: Unflushed data may be lostif you are not using replication.
# 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush.
# 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks.
# The settings below allowone to configure the flush policy to flush data after a period of time or
# every N messages (orboth). This can be done globally and overridden on a per-topic basis.
# The number of messages toaccept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount oftime a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
#############################Log Retention Policy #############################
# The followingconfigurations control the disposal of log segments. The policy can
# be set to delete segmentsafter a period of time, or after a given size has accumulated.
# A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a logfile to be eligible for deletion
log.retention.hours=168
# A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop belowlog.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a logsegment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which logsegments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires.
# Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction.
log.cleaner.enable=false
#############################Zookeeper #############################
# Zookeeper connectionstring (see zookeeper docs for details).
# This is a comma separatedhost:port pairs, each corresponding to a zk
# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append anoptional chroot string to the urls to specify the
# root directory for allkafka znodes.
zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222
#server.1=machine-0:2888:3888
#server.2=machine-1:2888:3888
#server.3=machine-2:2888:3888
# Timeout in ms forconnecting to zookeeper
zookeeper.connection.timeout.ms=1000000
</span>
Hadoop-master 上,Kafka的server-1.properties配置:
<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker.
broker.id=12
#############################Socket Server Settings #############################
# The port the socketserver listens on
port=9093
# Hostname the broker willbind to. If not set, the server will bind to all interfaces
host.name=hadoop-master
# Hostname the broker willadvertise to producers and consumers. If not set, it uses the
# value for"host.name" if configured. Otherwise, it will use the value returned from
#java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostnameroutable by clients>
# The port to publish toZooKeeper for clients to use. If this is not set,
# it will publish the sameport that the broker binds to.
#advertised.port=<portaccessible by clients>
# The number of threadshandling network requests
num.network.threads=3
# The number of threadsdoing disk I/O
num.io.threads=8
# The send buffer(SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer(SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of arequest that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#############################Log Basics #############################
# A comma seperated list ofdirectories under which to store log files
log.dirs=/opt/kafka/logs-1
# The default number of logpartitions per topic. More partitions allow greater
# parallelism forconsumption, but this will also result in more files across
# the brokers.
num.partitions=3
#############################Log Flush Policy #############################
# Messages are immediatelywritten to the filesystem but by default we only fsync() to sync
# the OS cache lazily. Thefollowing configurations control the flush of data to disk.
# There are a few importanttrade-offs here:
# 1. Durability: Unflushed data may be lostif you are not using replication.
# 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush.
# 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks.
# The settings below allowone to configure the flush policy to flush data after a period of time or
# every N messages (orboth). This can be done globally and overridden on a per-topic basis.
# The number of messages toaccept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount oftime a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
#############################Log Retention Policy #############################
# The followingconfigurations control the disposal of log segments. The policy can
# be set to delete segmentsafter a period of time, or after a given size has accumulated.
# A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a logfile to be eligible for deletion
log.retention.hours=168
# A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop belowlog.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a logsegment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which logsegments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires.
# Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction.
log.cleaner.enable=false
#############################Zookeeper #############################
# Zookeeper connectionstring (see zookeeper docs for details).
# This is a comma separatedhost:port pairs, each corresponding to a zk
# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append anoptional chroot string to the urls to specify the
# root directory for allkafka znodes.
zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222
#server.1=machine-0:2888:3888
#server.2=machine-1:2888:3888
#server.3=machine-2:2888:3888
# Timeout in ms forconnecting to zookeeper
zookeeper.connection.timeout.ms=1000000</span>
2.3 啟動腳本
在兩臺機器上,啟動對應的腳本:
<span style="font-size:18px;">bin/kafka-server-start.shconfig/server.properties
bin/kafka-server-start.shconfig/server-1.properties</span>
創建topic:
<span style="font-size:18px;">##創建topic cluster_topic
bin/kafka-topics.sh --create--zookeeper
machine-1:2222,machine-0:2222,machine-2:2222 --replication-factor 3 --partitions 3 --topiccluster_topic</span>
下面,將執行Producer和Consumer端的shell腳本。發送端和接受端的交互,展現訂閱發布的過程。啟動一個Producer,發布消息。
<span style="font-size:18px;">bin/kafka-console-producer.sh--broker-list
hadoop-master:9092,hadoop-master:9093,machine-0:9092,machine-0:9093--topic cluster_topic</span>
發送的消息將會被發送到指定的4個broker中,下面將啟動Consumer接受消息。
<span style="font-size:18px;"> bin/kafka-console-consumer.sh --zookeeper
machine-1:2222,machine-0:2222,machine-2:2222--topic cluster_topic --from-beginning</span>
上面的腳本接受來自主題為cluster_topic的消息,這就意味著,所有發送給cluster_topic的消息,將會被這個Consumer接受。發送下面圖表中顯示的消息,Consumer端也會打印出對應的消息。
假設,讓machine-0當掉。在使用這個Producer發送消息,你會發現,消息照樣會被接受到。這也說明,集群實現了容錯功能。
下面,將介紹幾個有用的腳本,幫助我們監控Kafka的有關信息。
列出topic
<span style="font-size:18px;">bin/kafka-topics.sh--list --zookeeper
machine-1:2222,machine-0:2222,machine-2:2222</span>
查詢某個配置文件的執行線程,比如,下面查詢所執行配置文件為server-1.properties的進程信息。
<span style="font-size:18px;">ps | grep server-1.properties</span>
<span style="font-size:18px;">root 21765 21436 0 09:48 pts/0 00:00:00 grep server.properties
root 23156 1 0 Aug27 ? 00:06:11 /usr/java/latest/bin/java -Xmx1G -Xms1G -server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka/bin/../logs -Dlog4j.configuration=file:/root/kafka/bin/../config/log4j.properties -cp :/root/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/root/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/root/kafka/bin/../clients/build/libs//kafka-clients*.jar:/root/kafka/bin/../examples/build/libs//kafka-examples*.jar:/root/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/root/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/root/kafka/bin/../libs/jopt-simple-3.2.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/root/kafka/bin/../libs/log4j-1.2.15.jar:/root/kafka/bin/../libs/metrics-core-2.2.0.jar:/root/kafka/bin/../libs/scala-library-2.10.1.jar:/root/kafka/bin/../libs/slf4j-api-1.7.2.jar:/root/kafka/bin/../libs/snappy-java-1.0.5.jar:/root/kafka/bin/../libs/zkclient-0.3.jar:/root/kafka/bin/../libs/zookeeper-3.3.4.jar:/root/kafka/bin/../core/build/libs/kafka_2.8.0*.jar kafka.Kafka /root/kafka/bin/../config/server.properties</span>
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈