多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > php教程 > 初識Apache Kafka+JAVA程序實例

初識Apache Kafka+JAVA程序實例

來源:程序員人生   發布時間:2016-07-11 13:24:46 閱讀次數:2887次

  本文是從英文的官網摘了翻譯的,用作自己的整理和記錄。水平有限,歡迎指正。版本是: kafka_2.10-0.10.0.0
  

1、基礎概念

  • 主題:Kafka maintains feeds of messages in categories called topics.
      
  • 生產者:We’ll call processes that publish messages to a Kafka topic producers.
         
  • 消費者:We’ll call processes that subscribe to topics and process the feed of published messages consumers.
         
  • 代理(Broker):Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

     生產者通過網路將消息發送到Kafka集群上,集群順次(輪番)服務消息到達消費者。 Kafka運行在1個集群中,集群中的每個服務器就叫代理。
    這里寫圖片描述

  • Partition:Partition 是物理上的概念,每一個 Topic 包括1個或多個 Partition。

主題和日志

  1個主題是命名或分類發布的消息。每個主題,Kafka持有1個分區日志,看起來像下面圖片。 
  這里寫圖片描述
  每個Partition都是有序的,固定長度的消息隊列1直不斷增加到–1個提交日志。消息在Partition內分配了順序的id叫偏移量,這個偏移量在分區中唯1標識每一個消息的。
  Kafka保存所有(1段時間內的-可配置)已發布的消息-不管它們是不是已被消費。例如,如果日志保存被設置為兩天,那末在1個消息發布后,兩天內它是可用的,兩天后它將被拋棄到空閑空間

事實上,元數據保存在每一個消費進程中,是基于消費進程在日志中的位置,該位置稱為“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the “offset”.)。這個偏移量被消費者控制:正常的消費者讀取消息時,線性增加偏移量,但事實上消費者可以以任何它順序的方式來控制。例如:1個消費者可以重置到之前的偏移量位置來重新處理。
  這類組合的特點意味著Kafka的消費者是很便宜的-消費者進程可以隨時增加減少,對集群和其它消費者進程沒有任何影響。例如:你可使用命令行工具輸出任何主題的內容,而不改變任何現有的消費者所消耗的。
  日志中的分區服務幾個目的。首先,日志的范圍大小可以調劑,遠不是只有1個在1個服務器上。每一個單獨的分區都必須安裝在主機上的服務器上,1個主題可以有許多分區,所以它可以處理任意數量的數據。第2,它們都是獨立相互平行的。 

Distribution(散布)

日志的分辨別布在Kafka集群中的服務器上,每一個服務器處理數據,并要求分區內容的副本。為了容錯,每一個分區的副本數量是可以通過服務器設置的。
 
每一個分區都有1個服務器它充當“leader”和0到更多的服務器,作為“followers”。leader處理所有的讀寫要求,而followers被動地復制的leader。如果leader失敗,其中1個“followers”將自動成為新的“leader”。
 

Producers

生產者將數據發布到他們所選擇的主題。生產者負責選擇那個消息分配到那個主題的哪一個partition。至于選擇哪一個分區可以簡單的循環方式到達負載均衡,也能夠者根據語義功能來分區。

Consumers

  每一個消費者把自己標示到1個消費組,當每一個消息發布到主題后,消息在投遞到每一個定閱消費組1個消費實例。消費者實例可以在不同的進程或不同的機器上。
  如果所有的消費者實例都有相同的消費組,那末這就像1個傳統的隊列。
  如果所有的消費者實例都有不同的消費組,那末這類作品就如發布定閱,所有的信息都被廣播給所有的消費者。
  但是,更常見的是主題有1個小數量的消費組,每個為“邏輯定閱。每一個組都是由許多消費實例,為了可擴大性和容錯性。
  Kafka有比傳統消息系統更強健的順序保證。
  傳統的隊列在服務器上保存順序消息,如果多個消費者從隊列中消費,然后服務器將它們存儲的消息依照順序發送出去。但是,雖然服務器依照順序發送消息,但是消息傳遞異步發送給消費者,所以消息到達消費者時可能失序了。這類高效意味著在并行消費進程中,消息的順序丟失。消息傳遞系統常常圍繞這個工作,有1個“exclusive consumer“的概念,它只允許1個進程從1個隊列中消耗,但固然這意味著沒有并行性處理的可能性。
  Kafka做得更好。通過對主題進行分區,Kafka是既能保證順序,又能負載均衡的消費。這是通過給主題進行分區,然后給消費組,使的每一個分區都被組內唯1消費進程消費。通過這樣做,我們確保消進程是唯1的讀取那個分區,并消費數據的順序。請注意,在1個消費組中,不能有比分區更多的消費進程。

Kafka只在1個分區中的消息提供了1個總的順序,而不是在1個主題中的不同分區之間的。但是,如果您需要1個完全有序的消息,這可以通過1個主題和1個分區來實現,明顯這將意味著每個消費組只有1個消費進程。
 

Guarantees(保證)

Kafka給出了以下保證:

  • 生產者發送到1個特定主題的分區的消息,將被添加,并且發送是順序的。
  • 各消費實例看到消息是順序,并且存儲在日志里。
  • 1個主題由N各復制備份,我們將容忍N⑴服務器故障而不丟失任何信息提交到日志。
     

2、程序實例

重要的來了,上面看不懂的沒關系,看程序,最直接。
假設我們有1個主題叫foo,它有4個分區。我建立了兩個消費組GroupA and GroupB
 這里寫圖片描述
其中GroupA有2個消費者,GroupB有4個消費者。
我們的生產者平均向4個分區寫入了內容。例:

package part; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. //“所有”設置將致使記錄的完全提交阻塞,最慢的,但最持久的設置。 props.put("acks", "all"); //如果要求失敗,生產者也會自動重試,即便設置成0 the producer can automatically retry. props.put("retries", 0); //The producer maintains buffers of unsent records for each partition. props.put("batch.size", 16384); //默許立即發送,這里這是延時毫秒數 props.put("linger.ms", 1); //生產者緩沖大小,當緩沖區耗盡后,額外的發送調用將被阻塞。時間超過max.block.ms將拋出TimeoutException props.put("buffer.memory", 33554432); //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建kafka的生產者類 Producer<String, String> producer = new KafkaProducer<String, String>(props); //生產者的主要方法 // close();//Close this producer. // close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests. // flush() ;所有緩存記錄被立刻發送 for(int i = 0; i < 100; i++)       //這里平均寫入4個分區 producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i))); producer.close(); } }

消費者

package part; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class TestConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); System.out.println("this is the group part test 1"); //消費者的組id props.put("group.id", "GroupA");//這里是GroupA或GroupB props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); //從poll(拉)的回話處理時長 props.put("session.timeout.ms", "30000"); //poll的數量限制      //props.put("max.poll.records", "100"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //定閱主題列表topic consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // 正常這里應當使用線程池處理,不應當在這里處理 System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n"); } } }

如果GroupA和GroupB都正常啟動,那末GroupB內4各消費平均消費生產者的消息數據(這里每一個25個消息),GroupA內2個消費者各處理50各消息,每一個消費者處理2各分區。如果GroupA內1個消費者掛斷,那末另外一個處理所有消息數據。如果GroupB掛掉1個,那末將有1個消費者出來處理掛掉沒有處理的消息數據。
  以下命令可以修改某主題的分區大小。

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic foo --partitions 4

3、multi-broker cluster

這里其實和Zookeeper機制由點類似,也是建立了1個leader和幾個follower。主要的作用還是為了可擴大性和容錯性。當集中任意1臺出問題,都可以保證系統的正確和穩定。即便是leader出現問題,它們也能夠通過投票的方式產生新leader. 這里只是簡單說明1下。

在它的官方例子中通過復制原本的配置文件,在本地建立了偽集群服務。

> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2

其中 broker.id 屬性是集群中唯1的和永久的節點名字,正常應當是1臺機子1個服務。其它兩個是由于偽集群的緣由必須修改。
讓后啟動這兩臺服務建立偽集群。摹擬了leader失效(被強行kill)后,它還可以正常工作。
啟動:

> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties &

4、典型利用場景

  1. 監控:主機通過Kafka發送與系統和利用程序健康相干的指標,然后這些信息會被搜集和處理從而創建監控儀表盤并發送正告。除此以外,LinkedIn還利用Apache Samza實現了1個能夠實時處理事件的富調用圖分析系統。
  2. 傳統的消息: 利用程度使用Kafka作為傳統的消息系統實現標準的隊列和消息的發布—定閱,例如搜索和內容提要(Content Feed)。
  3. 分析: 為了更好地理解用戶行動,改良用戶體驗,LinkedIn會將用戶查看了哪一個頁面、點擊了哪些內容等信息發送到每一個數據中心的Kafka集群上,并通過Hadoop進行分析、生成平常報告。
  4. 作為散布式利用程序或平臺的構件(日志):大數據倉庫解決方案Pinot等產品將Kafka作為核心構件(散布式日志),散布式數據庫Espresso將其作為內部副本并改變傳播層。

英文原地址:http://kafka.apache.org/documentation.html#quickstart

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 在线观看网址 | 日韩精品网站 | 手机福利在线观看 | 欧美日韩一区二区视频免费看 | 久久黄色毛片 | 性欧美激情videos | 一个色亚洲| 日日麻批视频 | 免费观看福利视频 | 国内精品久久久久久久999下 | 欧美洲久久日韩欧美 | 一本久道久久综合婷婷五 | a免费国产一级特黄aa大 | 麻豆影视视频高清在线观看 | 伊人操| 日韩在线手机看片免费看 | 久久99精品久久久久久三级 | 久草在线播放视频 | 久久精品天堂 | 成人精品第一区二区三区 | 福利视频一区二区 | 校园春色欧美色图 | 香蕉狠狠再啪线视频 | 另类小说校园春色 | 龙口护士门91午夜国产在线 | 成人97| yellow中文字幕官网是 | 综合自拍亚洲综合图区美腿丝袜 | 国产xxxxxx久色视频在 | 亚洲精品区一区二区三区四 | 日本特黄特色aa大片免费 | 欧美成人免费全部色播 | 欧美一级性视频 | 日本久久综合 | avwww在线| 手机看片福利日韩欧美看片 | 高清在线一区二区三区亚洲综合 | 日本高清免费中文字幕不卡 | 亚洲日本一区二区 | 久爱精品视频在线视频 | 日本专区|