kafka入門:簡介、使用場景、設計原理、主要配置及集群搭建
來源:程序員人生 發布時間:2016-06-04 14:49:03 閱讀次數:3878次
問題導讀:
1.zookeeper在kafka的作用是甚么?
2.kafka中幾近不允許對消息進行“隨機讀寫”的緣由是甚么?
3.kafka集群consumer和producer狀態信息是如何保存的?
4.partitions設計的目的的根本緣由是甚么?
1、入門
1、簡介
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設計實現上完全不同,另外它其實不是JMS規范的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,另外kafka集群有多個kafka實例組成,每一個實例(server)成為broker。不管是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統可用性集群保存1些meta信息。
2、Topics/logs
1個Topic可以認為是1類消息,每一個topic將被分成多個partition(區),每一個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為1個long型數字,它是唯1標記1條消息。它唯1的標記1條消息。kafka并沒有提供其他額外的索引機制來存儲offset,由于在kafka中幾近不允許對消息進行“隨機讀寫”。
kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即便消息被消費,消息依然不會被立即刪除.日志文件將會根據broker中的配置要求,保存1定的時間以后刪除;比如log文件保存2天,那末兩天后,文件會被清除,不管其中的消息是不是被消費.kafka通過這類簡單的手段,來釋放磁盤空間,和減少消息消費以后對文件內容改動的磁盤IO開支.
對consumer而言,它需要保存消費消息的offset,對offset的保存和使用,有consumer來控制;當consumer正常消費消息時,offset將會"線性"的向先驅動,即消息將順次順序被消費.事實上consumer可使用任意順序消費消息,它只需要將offset重置為任意值..(offset將會保存在zookeeper中,參見下文)
kafka集群幾近不需要保護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨便離開,而不會對集群造成額外的影響.
partitions的設計目的有多個.最根本緣由是kafka基于文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸到達單機磁盤的上限,每一個partiton都會被當前server(kafka實例)保存;可以將1個topic切分多任意多個partitions,來消息保存/消費的效力.另外越多的partitions意味著可以容納更多的consumer,有效提升并發消費的能力.(具體原理參見下文).
3、Distribution
1個Topic的多個partitions,被散布在kafka集群中的多個server上;每一個server(kafka實例)負責partitions中消息的讀寫操作;另外kafka還可以配置partitions需要備份的個數(replicas),每一個partition將會被備份到多臺機器上,以提高可用性.
基于replicated方案,那末就意味著需要對多個備份進行調度;每一個partition都有1個server為"leader";leader負責所有的讀寫操作,如果leader失效,那末將會有其他follower來接收(成為新的leader);follower只是單調的和leader跟進,同步消息便可..因而可知作為leader的server承載了全部的要求壓力,因此從集群的整體斟酌,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每一個實例上,來確保整體的性能穩定.
Producers
Producer將消息發布到指定的Topic中,同時Producer也能決定將此消息歸屬于哪一個partition;比如基于"round-robin"方式或通過其他的1些算法等.
Consumers
本質上kafka只支持Topic.每一個consumer屬于1個consumer group;反過來講,每一個group中可以有多個consumer.發送到Topic的消息,只會被定閱此Topic的每一個group中的1個consumer消費.
如果所有的consumer都具有相同的group,這類情況和queue模式很像;消息將會在consumers之間負載均衡.
如果所有的consumer都具有不同的group,那這就是"發布-定閱";消息將會廣播給所有的消費者.
在kafka中,1個partition中的消息只會被group中的1個consumer消費;每一個group中consumer消息消費相互獨立;我們可以認為1個group是1個"定閱"者,1個Topic中的每一個partions,只會被1個"定閱者"中的1個consumer消費,不過1個consumer可以消費多個partitions中的消息.kafka只能保證1個partition中的消息被某個consumer消費時,消息是順序的.事實上,從Topic角度來講,消息仍不是有序的.
kafka的設計原理決定,對1個topic,同1個group中不能有多于partitions個數的consumer同時消費,否則將意味著某些consumer將沒法得到消息.
Guarantees
1) 發送到partitions中的消息將會依照它接收的順序追加到日志中
2) 對消費者而言,它們消費消息的順序和日志中消息順序1致.
3) 如果Topic的"replicationfactor"為N,那末允許N⑴個kafka實例失效.
2、使用處景
1、Messaging
對1些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可使kafka具有良好的擴大性和性能優勢.不過到目前為止,我們應當很清楚認識到,kafka并沒有提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特性;kafka只能使用作為"常規"的消息系統,在1定程度上,還沒有確保消息的發送與接收絕對可靠(比如,消息重發,消息發送丟失等)
2、Websit activity tracking
kafka可以作為"網站活性跟蹤"的最好工具;可以將網頁/用戶操作等信息發送到kafka中.并實時監控,或離線統計分析等
3、Log Aggregation
kafka的特性決定它非常合適作為"日志搜集中心";application可以將操作日志"批量""異步"的發送到kafka集群中,而不是保存在本地或DB中;kafka可以批量提交消息/緊縮消息等,這對producer端而言,幾近感覺不到性能的開支.此時consumer端可使hadoop等其他系統化的存儲和分析系統.
3、設計原理
kafka的設計初衷是希望作為1個統1的信息搜集平臺,能夠實時的搜集反饋信息,并需要能夠支持較大的數據量,且具有良好的容錯能力.
1、持久性
kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的本身特性.且不管任何OS下,對文件系統本身的優化幾近沒有可能.文件緩存/直接內存映照等是經常使用的手段.由于kafka是對日志文件進行append操作,因此磁盤檢索的開支是較小的;同時為了減少磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)到達1定閥值時,再flush到磁盤,這樣減少了磁盤IO調用的次數.
2、性能
需要斟酌的影響性能點很多,除磁盤IO以外,我們還需要斟酌網絡IO,這直接關系到kafka的吞吐量問題.kafka并沒有提供太多高超的技能;對producer端,可以將消息buffer起來,當消息的條數到達1定閥值時,批量發送給broker;對consumer端也是1樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定.對kafka broker端,似乎有個sendfile系統調用可以潛伏的提升網絡IO的性能:將文件的數據映照到系統內存中,socket直接讀取相應的內存區域便可,而無需進程再次copy和交換.
其實對producer/consumer/broker3者而言,CPU的開支應當都不大,因此啟用消息緊縮機制是1個良好的策略;緊縮需要消耗少許的CPU資源,不過對kafka而言,網絡IO更應當需要斟酌.可以將任何在網絡上傳輸的消息都經過緊縮.kafka支持gzip/snappy等多種緊縮方式.
3、生產者
負載均衡: producer將會和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發送到broker,中間不會經過任何"路由層".事實上,消息被路由到哪一個partition上,有producer客戶端決定.比如可以采取"random""key-hash""輪詢"等,如果1個topic中有多個partitions,那末在producer端實現"消息均衡分發"是必要的.
其中partition leader的位置(host:port)注冊在zookeeper中,producer作為zookeeper client,已注冊了watch用來監聽partition leader的變更事件.
異步發送:將多條消息暫且在客戶端buffer起來,并將他們批量的發送到broker,小數據IO太多,會拖慢整體的網絡延遲,批量延遲發送事實上提升了網絡效力。不過這也有1定的隱患,比如說當producer失效時,那些還沒有發送的消息將會丟失。
4、消費者
consumer端向broker發送"fetch"要求,并告知其獲得消息的offset;爾后consumer將會取得1定條數的消息;consumer端也能夠重置offset來重新消費消息.
在JMS實現中,Topic模型基于push方式,即broker將消息推送給consumer端.不過在kafka中,采取了pull方式,即consumer在和broker建立連接以后,主動去pull(或說fetch)消息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch消息并處理,且可以控制消息消費的進度(offset);另外,消費者可以良好的控制消息消費的數量,batch fetch.
其他JMS實現,消息消費的位置是有prodiver保存,以便避免重復發送消息或將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有1個consumer在消費,且不存在消息狀態的控制,也沒有復雜的消息確認機制,可見kafka broker端是相當輕量級的.當消息被consumer接收以后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊offset.因而可知,consumer客戶端也很輕量級.
5、消息傳送機制
對JMS實現,消息傳輸擔保非常直接:有且只有1次(exactly once).在kafka中稍有不同:
1) at most once: 最多1次,這個和JMS中"非持久化"消息類似.發送1次,不管成敗,將不會重發.
2) at least once: 消息最少發送1次,如果消息未能接受成功,可能會重發,直到接收獲功.
3) exactly once: 消息只會發送1次.
at most once: 消費者fetch消息,然后保存offset,然后處理消息;當client保存offset以后,但是在消息處理進程中出現了異常,致使部份消息未能繼續處理.那末爾后"未處理"的消息將不能被fetch到,這就是"at most once".
at least once: 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功以后,但是在保存offset階段zookeeper異常致使保存操作未能履行成功,這就致使接下來再次fetch時可能取得上次已處理過的消息,這就是"at least once",緣由offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態.
exactly once: kafka中并沒有嚴格的去實現(基于2階段提交,事務),我們認為這類策略在kafka中是沒有必要的.
通常情況下"at-least-once"是我們搜選.(相比at most once而言,重復接收數據總比丟失數據要好).
6、復制備份
kafka將每一個partition數據復制到多個server上,任何1個partition有1個leader和多個follower(可以沒有);備份的個數可以通過broker配置文件來設定.leader處理所有的read-write要求,follower需要和leader保持同步.Follower和consumer1樣,消費消息并保存在本地日志中;leader負責跟蹤所有的follower狀態,如果follower"落后"太多或失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將1條消息保存成功,此消息才被認為是"committed",那末此時consumer才能消費它.即便只有1個replicas實例存活,依然可以保證消息的正常發送和接收,只要zookeeper集群存活便可.(不同于其他散布式存儲,比如hbase需要"多數派"存活才行)
當leader失效時,需在followers當選取出新的leader,可能此時follower落后于leader,因此需要選擇1個"up-to-date"的follower.選擇follower時需要統籌1個問題,就是新leaderserver上所已承載的partition leader的個數,如果1個server上有過量的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要斟酌到"負載均衡".
7.日志
如果1個topic的名稱為"my_topic",它有2個partitions,那末日志將會保存在my_topic_0和my_topic_1兩個目錄中;日志文件中保存了1序列"log entries"(日志條目),每一個log entry格式為"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每一個日志都有1個offset來唯1的標記1條消息,offset的值為8個字節的數字,表示此消息在此partition中所處的起始位置..每一個partition在物理存儲層面,有多個log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
其中每一個partiton中所持有的segments列表信息會存儲在zookeeper中.
當segment文件尺寸到達1定閥值時(可以通過配置文件設定,默許1G),將會創建1個新的文件;當buffer中消息的條數到達閥值時將會觸發日志信息flush到日志文件中,同時如果"距離最近1次flush的時間差"到達閥值時,也會觸發flush到日志文件.如果broker失效,極有可能會丟失那些還沒有flush到文件的消息.由于server意外實現,依然會致使log文件格式的破壞(文件尾部),那末就要求當server啟東是需要檢測最后1個segment的文件結構是不是合法并進行必要的修復.
獲得消息時,需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲得消息的總長度(間接的表示消息的條數).根據offset,可以找到此消息所在segment文件,然后根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出便可.
日志文件的刪除策略非常簡單:啟動1個后臺線程定期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的創建時間).為了不刪除文件時依然有read操作(consumer消費),采取copy-on-write方式.
8、分配
kafka使用zookeeper來存儲1些meta信息,并使用了zookeeper watch機制來發現meta信息的變更并作出相應的動作(比如consumer失效,觸發負載均衡等)
1) Broker node registry: 當1個kafkabroker啟動后,首先會向zookeeper注冊自己的節點信息(臨時znode),同時當broker和zookeeper斷開連接時,此znode也會被刪除.
格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每一個broker的配置文件中都需要指定1個數字類型的id(全局不可重復),znode的值為此broker的host:port信息.
2) Broker Topic Registry: 當1個broker啟動時,會向zookeeper注冊自己持有的topic和partitions信息,依然是1個臨時znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引號.
3) Consumer and Consumer group: 每一個consumer客戶端被創建時,會向zookeeper注冊自己的信息;此作用主要是為了"負載均衡".
1個group中的多個consumer可以交錯的消費1個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能斟酌,讓partition相對均衡的分散到每一個consumer上.
4) Consumer id Registry: 每一個consumer都有1個唯1的ID(host:uuid,可以通過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
格式:/consumers/[group_id]/ids/[consumer_id]
依然是1個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.
5) Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.
格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
此znode為持久節點,可以看出offset跟group_id有關,以表明當group中1個消費者失效,其他consumer可以繼續消費.
6) Partition Owner registry: 用來標記partition被哪一個consumer消費.臨時znode
格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作:
A) 首先進行"Consumer id Registry";
B) 然后在"Consumer id Registry"節點下注冊1個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如1個consumer失效,那末其他consumer接收partitions).
C) 在"Broker id registry"節點下,注冊1個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
1) Producer端使用zookeeper用來"發現"broker列表,和和Topic下每一個partition leader建立socket連接并發送消息.
2) Broker端使用zookeeper用來注冊broker信息,已監測partitionleader存活性.
3) Consumer端使用zookeeper用來注冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,并和partition leader建立socket連接,并獲得消息.
4、主要配置
1、Broker配置
2.Consumer主要配置
3.Producer主要配置
以上是關于kafka1些基礎說明,在其中我們知道如果要kafka正常運行,必須配置zookeeper,否則不管是kafka集群還是客戶真個生存者和消費者都沒法正常的工作的,以下是對zookeeper進行1些簡單的介紹:
5、zookeeper集群
zookeeper是1個為散布式利用提供1致性服務的軟件,它是開源的Hadoop項目的1個子項目,并根據google發表的1篇論文來實現的。zookeeper為散布式系統提供了高笑且易于使用的協同服務,它可以為散布式利用提供相當多的服務,諸如統1命名服務,配置管理,狀態同步和組服務等。zookeeper接口簡單,我們沒必要過量地糾結在散布式系統編程難于處理的同步和1致性問題上,你可使用zookeeper提供的現成(off-the-shelf)服務來實現來實現散布式系統額配置管理,組管理,Leader選舉等功能。
zookeeper集群的安裝,準備3臺
服務器server1:192.168.0.1,server2:192.168.0.2,
server3:192.168.0.3.
1)下載zookeeper
到http://zookeeper.apache.org/releases.html去下載最新版本Zookeeper⑶.4.5的安裝包zookeeper⑶.4.5.tar.gz.將文件保存server1的~目錄下
2)安裝zookeeper
a)解壓
tar -zxvf zookeeper⑶.4.5.tar.gz
解壓完成后在目錄~下會發現多出1個目錄zookeeper⑶.4.5,重新命令為zookeeper
b)配置
將conf/zoo_sample.cfg拷貝1份命名為zoo.cfg,也放在conf目錄下。然后依照以下值修改其中的配置:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/wwb/zookeeper /data
dataLogDir=/home/wwb/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#http://zookeeper.apache.org/doc/ ... html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.0.1:3888:4888
server.2=192.168.0.2:3888:4888
server.3=192.168.0.3:3888:4888
tickTime:這個時間是作為 Zookeeper
服務器之間或客戶端與
服務器之間保持心跳的時間間隔,也就是每一個 tickTime 時間就會發送1個心跳。
dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默許情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。
clientPort:這個端口就是客戶端連接 Zookeeper
服務器的端口,Zookeeper 會監聽這個端口,接受客戶真個訪問要求。
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper
服務器的客戶端,而是 Zookeeper
服務器集群中連接到 Leader 的 Follower
服務器)初始化連接時最長能忍耐多少個心跳時間間隔數。當已超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper
服務器還沒有收到客戶真個返回信息,那末表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
syncLimit:這個配置項標識 Leader 與Follower 之間發送消息,要求和應對時間長度,最長不能超過量少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒
server.A=B:C:D:其中 A 是1個數字,表示這個是第幾號
服務器;B 是這個
服務器的 ip 地址;C 表示的是這個
服務器與集群中的 Leader
服務器交換信息的端口;D 表示的是萬1集群中的 Leader
服務器掛了,需要1個端口來重新進行選舉,選出1個新的 Leader,而這個端口就是用來履行選舉時
服務器相互通訊的端口。如果是偽集群的配置方式,由于 B 都是1樣,所以不同的 Zookeeper 實例通訊端口號不能1樣,所以要給它們分配不同的端口號
注意:dataDir,dataLogDir中的wwb是當前登錄用戶名,data,logs目錄開始是不存在,需要使用mkdir命令創建相應的目錄。并且在該目錄下創建文件myid,serve1,server2,server3該文件內容分別為1,2,3。
針對
服務器server2,server3可以將server1復制到相應的目錄,不過需要注意dataDir,dataLogDir目錄,并且文件myid內容分別為2,3
3)順次啟動server1,server2,server3的zookeeper.
/home/wwb/zookeeper/bin/zkServer.sh start,出現類似以下內容
JMX enabled by default
Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
4) 測試zookeeper是不是正常工作,在server1上履行以下命令
/home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現類似以下內容
JLine support is enabled
2013⑴1⑵7 19:59:40,560 - INFO [main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1429cdb49220000, negotiatedtimeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#
即代表集群構建成功了,如果出現毛病那應當是第3部時沒有啟動好集群,
運行,先利用
ps aux | grep zookeeper查看是不是有相應的進程的,沒有話,說明集群啟動出現問題,可以在每一個
服務器上使用
./home/wwb/zookeeper/bin/zkServer.sh stop。再順次使用./home/wwb/zookeeper/binzkServer.sh start,這時候在履行41般是沒有問題,如果還是有問題,那末先stop再到bin的上級目錄履行./bin/zkServer.shstart試試。
注意:zookeeper集群時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈