多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 互聯網 > Kafka集群安裝

Kafka集群安裝

來源:程序員人生   發布時間:2014-10-17 05:31:44 閱讀次數:3644次

安裝Kafka集群

         假設我們有集群中,需要配置4個broker,形成下面圖表的Kafka集群。




2.1 配置文件

         配置所有的Kafka的Producer文件,其中,brokerpid的值是獨一無二的數字值。幾個核心屬性如下:

<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=11 # The port the socketserver listens on port=9092 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master #zookeeper 集群 zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222</span>


         由于Kafka依賴于Zookeeper集群,所以,必須先啟動Zookeeper集群。這里不作具體介紹。

2.2 broker屬性配置

         在機器machine-0和Hadoop-master,配置好對應broker配置文件,兩個機器上的配置屬性相同。

Hadoop-master 上,Kafka的server.properties配置:

<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=11 #############################Socket Server Settings ############################# # The port the socketserver listens on port=9092 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master # Hostname the broker willadvertise to producers and consumers. If not set, it uses the # value for"host.name" if configured. Otherwise, it will use the value returned from #java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostnameroutable by clients> # The port to publish toZooKeeper for clients to use. If this is not set, # it will publish the sameport that the broker binds to. #advertised.port=<portaccessible by clients> # The number of threadshandling network requests num.network.threads=3 # The number of threadsdoing disk I/O num.io.threads=8 # The send buffer(SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer(SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of arequest that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 #############################Log Basics ############################# # A comma seperated list ofdirectories under which to store log files log.dirs=/opt/kafka/logs # The default number of logpartitions per topic. More partitions allow greater # parallelism forconsumption, but this will also result in more files across # the brokers. num.partitions=3 #############################Log Flush Policy ############################# # Messages are immediatelywritten to the filesystem but by default we only fsync() to sync # the OS cache lazily. Thefollowing configurations control the flush of data to disk. # There are a few importanttrade-offs here: # 1. Durability: Unflushed data may be lostif you are not using replication. # 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush. # 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks. # The settings below allowone to configure the flush policy to flush data after a period of time or # every N messages (orboth). This can be done globally and overridden on a per-topic basis. # The number of messages toaccept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount oftime a message can sit in a log before we force a flush #log.flush.interval.ms=1000 #############################Log Retention Policy ############################# # The followingconfigurations control the disposal of log segments. The policy can # be set to delete segmentsafter a period of time, or after a given size has accumulated. # A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a logfile to be eligible for deletion log.retention.hours=168 # A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining # segments don't drop belowlog.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a logsegment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which logsegments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires. # Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction. log.cleaner.enable=false #############################Zookeeper ############################# # Zookeeper connectionstring (see zookeeper docs for details). # This is a comma separatedhost:port pairs, each corresponding to a zk # server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append anoptional chroot string to the urls to specify the # root directory for allkafka znodes. zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222 #server.1=machine-0:2888:3888 #server.2=machine-1:2888:3888 #server.3=machine-2:2888:3888 # Timeout in ms forconnecting to zookeeper zookeeper.connection.timeout.ms=1000000 </span>


Hadoop-master 上,Kafka的server-1.properties配置:

<span style="font-size:18px;"># The id of the broker.This must be set to a unique integer for each broker. broker.id=12 #############################Socket Server Settings ############################# # The port the socketserver listens on port=9093 # Hostname the broker willbind to. If not set, the server will bind to all interfaces host.name=hadoop-master # Hostname the broker willadvertise to producers and consumers. If not set, it uses the # value for"host.name" if configured. Otherwise, it will use the value returned from #java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostnameroutable by clients> # The port to publish toZooKeeper for clients to use. If this is not set, # it will publish the sameport that the broker binds to. #advertised.port=<portaccessible by clients> # The number of threadshandling network requests num.network.threads=3 # The number of threadsdoing disk I/O num.io.threads=8 # The send buffer(SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer(SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of arequest that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 #############################Log Basics ############################# # A comma seperated list ofdirectories under which to store log files log.dirs=/opt/kafka/logs-1 # The default number of logpartitions per topic. More partitions allow greater # parallelism forconsumption, but this will also result in more files across # the brokers. num.partitions=3 #############################Log Flush Policy ############################# # Messages are immediatelywritten to the filesystem but by default we only fsync() to sync # the OS cache lazily. Thefollowing configurations control the flush of data to disk. # There are a few importanttrade-offs here: # 1. Durability: Unflushed data may be lostif you are not using replication. # 2. Latency: Very large flush intervals maylead to latency spikes when the flush does occur as there will be a lot of datato flush. # 3. Throughput: The flush is generally themost expensive operation, and a small flush interval may lead to exceessiveseeks. # The settings below allowone to configure the flush policy to flush data after a period of time or # every N messages (orboth). This can be done globally and overridden on a per-topic basis. # The number of messages toaccept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount oftime a message can sit in a log before we force a flush #log.flush.interval.ms=1000 #############################Log Retention Policy ############################# # The followingconfigurations control the disposal of log segments. The policy can # be set to delete segmentsafter a period of time, or after a given size has accumulated. # A segment will be deletedwhenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a logfile to be eligible for deletion log.retention.hours=168 # A size-based retentionpolicy for logs. Segments are pruned from the log as long as the remaining # segments don't drop belowlog.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a logsegment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which logsegments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the logcleaner is disabled and the log retention policy will default to just deletesegments after their retention expires. # Iflog.cleaner.enable=true is set the cleaner will be enabled and individual logscan then be marked for log compaction. log.cleaner.enable=false #############################Zookeeper ############################# # Zookeeper connectionstring (see zookeeper docs for details). # This is a comma separatedhost:port pairs, each corresponding to a zk # server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append anoptional chroot string to the urls to specify the # root directory for allkafka znodes. zookeeper.connect=machine-1:2222,machine-2:2222,machine-0:2222 #server.1=machine-0:2888:3888 #server.2=machine-1:2888:3888 #server.3=machine-2:2888:3888 # Timeout in ms forconnecting to zookeeper zookeeper.connection.timeout.ms=1000000</span>

2.3 啟動腳本

         在兩臺機器上,啟動對應的腳本:

<span style="font-size:18px;">bin/kafka-server-start.shconfig/server.properties bin/kafka-server-start.shconfig/server-1.properties</span>


         創建topic:

<span style="font-size:18px;">##創建topic cluster_topic bin/kafka-topics.sh --create--zookeeper machine-1:2222,machine-0:2222,machine-2:2222 --replication-factor 3 --partitions 3 --topiccluster_topic</span>


         下面,將執行Producer和Consumer端的shell腳本。發送端和接受端的交互,展現訂閱發布的過程。啟動一個Producer,發布消息。

<span style="font-size:18px;">bin/kafka-console-producer.sh--broker-list hadoop-master:9092,hadoop-master:9093,machine-0:9092,machine-0:9093--topic cluster_topic</span>


發送的消息將會被發送到指定的4個broker中,下面將啟動Consumer接受消息。

<span style="font-size:18px;"> bin/kafka-console-consumer.sh --zookeeper machine-1:2222,machine-0:2222,machine-2:2222--topic cluster_topic --from-beginning</span>


上面的腳本接受來自主題為cluster_topic的消息,這就意味著,所有發送給cluster_topic的消息,將會被這個Consumer接受。發送下面圖表中顯示的消息,Consumer端也會打印出對應的消息。

假設,讓machine-0當掉。在使用這個Producer發送消息,你會發現,消息照樣會被接受到。這也說明,集群實現了容錯功能。

        下面,將介紹幾個有用的腳本,幫助我們監控Kafka的有關信息。

列出topic

<span style="font-size:18px;">bin/kafka-topics.sh--list --zookeeper machine-1:2222,machine-0:2222,machine-2:2222</span>


查詢某個配置文件的執行線程,比如,下面查詢所執行配置文件為server-1.properties的進程信息。

<span style="font-size:18px;">ps | grep server-1.properties</span>

<span style="font-size:18px;">root 21765 21436 0 09:48 pts/0 00:00:00 grep server.properties root 23156 1 0 Aug27 ? 00:06:11 /usr/java/latest/bin/java -Xmx1G -Xms1G -server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka/bin/../logs -Dlog4j.configuration=file:/root/kafka/bin/../config/log4j.properties -cp :/root/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/root/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/root/kafka/bin/../clients/build/libs//kafka-clients*.jar:/root/kafka/bin/../examples/build/libs//kafka-examples*.jar:/root/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/root/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/root/kafka/bin/../libs/jopt-simple-3.2.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/root/kafka/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/root/kafka/bin/../libs/log4j-1.2.15.jar:/root/kafka/bin/../libs/metrics-core-2.2.0.jar:/root/kafka/bin/../libs/scala-library-2.10.1.jar:/root/kafka/bin/../libs/slf4j-api-1.7.2.jar:/root/kafka/bin/../libs/snappy-java-1.0.5.jar:/root/kafka/bin/../libs/zkclient-0.3.jar:/root/kafka/bin/../libs/zookeeper-3.3.4.jar:/root/kafka/bin/../core/build/libs/kafka_2.8.0*.jar kafka.Kafka /root/kafka/bin/../config/server.properties</span>






生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 美女享受黑人的巨茎 | 亚洲性生活视频 | 图片区小说区av区 | 欧美亚洲韩国 | 日产免费线路一区二区三区 | 高清欧美一区二区免费影视 | 男女性免费视频观看 | 国产亚洲精品美女一区二区 | 中文字幕第二区 | 免费观看做网站爱 | 国产成人做受免费视频 | 99国产精品久久久久久久成人热 | 毛片新网址 | 国产精品久久久久激情影院 | 色吊丝在线观看国产 | 日产一区二区三区四区 | 久久国产成人精品国产成人亚洲 | 激情综合亚洲欧美日韩 | 一级做a爰片性色毛片视频图片 | 成人网久久 | 欧美精品一区二区三区视频 | 毛片的网站| 日本www在线视频 | 性色网| 欧美 xx性 在线 | 国产美女亚洲精品久久久毛片 | 久久免费视频观看 | 亚洲国产激情一区二区三区 | 一级片.| 午夜爽爽视频 | 中文字幕最新在线 | 在线免费福利 | freesexvideos性21| 亚洲丝袜另类 | 免费看一级毛片欧美 | 国产第4页 | 亚洲女人的天堂 | 国产亚洲欧美在线观看的 | 成人国产在线不卡视频 | 国产美女久久久久久久久久久 | 免费一级肉体全黄毛片高清 |