本文是從英文的官網摘了翻譯的,用作自己的整理和記錄。水平有限,歡迎指正。版本是: kafka_2.10-0.10.0.0
代理(Broker):Kafka is run as a cluster comprised of one or more servers each of which is called a broker.
生產者通過網路將消息發送到Kafka集群上,集群順次(輪番)服務消息到達消費者。 Kafka運行在1個集群中,集群中的每個服務器就叫代理。
1個主題是命名或分類發布的消息。每個主題,Kafka持有1個分區日志,看起來像下面圖片。
每個Partition都是有序的,固定長度的消息隊列1直不斷增加到–1個提交日志。消息在Partition內分配了順序的id叫偏移量,這個偏移量在分區中唯1標識每一個消息的。
Kafka保存所有(1段時間內的-可配置)已發布的消息-不管它們是不是已被消費。例如,如果日志保存被設置為兩天,那末在1個消息發布后,兩天內它是可用的,兩天后它將被拋棄到空閑空間
事實上,元數據保存在每一個消費進程中,是基于消費進程在日志中的位置,該位置稱為“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the “offset”.)。這個偏移量被消費者控制:正常的消費者讀取消息時,線性增加偏移量,但事實上消費者可以以任何它順序的方式來控制。例如:1個消費者可以重置到之前的偏移量位置來重新處理。
這類組合的特點意味著Kafka的消費者是很便宜的-消費者進程可以隨時增加減少,對集群和其它消費者進程沒有任何影響。例如:你可使用命令行工具輸出任何主題的內容,而不改變任何現有的消費者所消耗的。
日志中的分區服務幾個目的。首先,日志的范圍大小可以調劑,遠不是只有1個在1個服務器上。每一個單獨的分區都必須安裝在主機上的服務器上,1個主題可以有許多分區,所以它可以處理任意數量的數據。第2,它們都是獨立相互平行的。
日志的分辨別布在Kafka集群中的服務器上,每一個服務器處理數據,并要求分區內容的副本。為了容錯,每一個分區的副本數量是可以通過服務器設置的。
每一個分區都有1個服務器它充當“leader”和0到更多的服務器,作為“followers”。leader處理所有的讀寫要求,而followers被動地復制的leader。如果leader失敗,其中1個“followers”將自動成為新的“leader”。
生產者將數據發布到他們所選擇的主題。生產者負責選擇那個消息分配到那個主題的哪一個partition。至于選擇哪一個分區可以簡單的循環方式到達負載均衡,也能夠者根據語義功能來分區。
每一個消費者把自己標示到1個消費組,當每一個消息發布到主題后,消息在投遞到每一個定閱消費組1個消費實例。消費者實例可以在不同的進程或不同的機器上。
如果所有的消費者實例都有相同的消費組,那末這就像1個傳統的隊列。
如果所有的消費者實例都有不同的消費組,那末這類作品就如發布定閱,所有的信息都被廣播給所有的消費者。
但是,更常見的是主題有1個小數量的消費組,每個為“邏輯定閱。每一個組都是由許多消費實例,為了可擴大性和容錯性。
Kafka有比傳統消息系統更強健的順序保證。
傳統的隊列在服務器上保存順序消息,如果多個消費者從隊列中消費,然后服務器將它們存儲的消息依照順序發送出去。但是,雖然服務器依照順序發送消息,但是消息傳遞異步發送給消費者,所以消息到達消費者時可能失序了。這類高效意味著在并行消費進程中,消息的順序丟失。消息傳遞系統常常圍繞這個工作,有1個“exclusive consumer“的概念,它只允許1個進程從1個隊列中消耗,但固然這意味著沒有并行性處理的可能性。
Kafka做得更好。通過對主題進行分區,Kafka是既能保證順序,又能負載均衡的消費。這是通過給主題進行分區,然后給消費組,使的每一個分區都被組內唯1消費進程消費。通過這樣做,我們確保消進程是唯1的讀取那個分區,并消費數據的順序。請注意,在1個消費組中,不能有比分區更多的消費進程。
Kafka只在1個分區中的消息提供了1個總的順序,而不是在1個主題中的不同分區之間的。但是,如果您需要1個完全有序的消息,這可以通過1個主題和1個分區來實現,明顯這將意味著每個消費組只有1個消費進程。
Kafka給出了以下保證:
重要的來了,上面看不懂的沒關系,看程序,最直接。
假設我們有1個主題叫foo,它有4個分區。我建立了兩個消費組GroupA and GroupB
其中GroupA有2個消費者,GroupB有4個消費者。
我們的生產者平均向4個分區寫入了內容。例:
package part;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”設置將致使記錄的完全提交阻塞,最慢的,但最持久的設置。
props.put("acks", "all");
//如果要求失敗,生產者也會自動重試,即便設置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默許立即發送,這里這是延時毫秒數
props.put("linger.ms", 1);
//生產者緩沖大小,當緩沖區耗盡后,額外的發送調用將被阻塞。時間超過max.block.ms將拋出TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//創建kafka的生產者類
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//生產者的主要方法
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有緩存記錄被立刻發送
for(int i = 0; i < 100; i++)
//這里平均寫入4個分區
producer.send(new ProducerRecord<String, String>("foo",i%4, Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
消費者
package part;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class TestConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
System.out.println("this is the group part test 1");
//消費者的組id
props.put("group.id", "GroupA");//這里是GroupA或GroupB
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//從poll(拉)的回話處理時長
props.put("session.timeout.ms", "30000");
//poll的數量限制
//props.put("max.poll.records", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//定閱主題列表topic
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// 正常這里應當使用線程池處理,不應當在這里處理
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
}
}
}
如果GroupA和GroupB都正常啟動,那末GroupB內4各消費平均消費生產者的消息數據(這里每一個25個消息),GroupA內2個消費者各處理50各消息,每一個消費者處理2各分區。如果GroupA內1個消費者掛斷,那末另外一個處理所有消息數據。如果GroupB掛掉1個,那末將有1個消費者出來處理掛掉沒有處理的消息數據。
以下命令可以修改某主題的分區大小。
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic foo --partitions 4
這里其實和Zookeeper機制由點類似,也是建立了1個leader和幾個follower。主要的作用還是為了可擴大性和容錯性。當集中任意1臺出問題,都可以保證系統的正確和穩定。即便是leader出現問題,它們也能夠通過投票的方式產生新leader. 這里只是簡單說明1下。
在它的官方例子中通過復制原本的配置文件,在本地建立了偽集群服務。
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
其中 broker.id 屬性是集群中唯1的和永久的節點名字,正常應當是1臺機子1個服務。其它兩個是由于偽集群的緣由必須修改。
讓后啟動這兩臺服務建立偽集群。摹擬了leader失效(被強行kill)后,它還可以正常工作。
啟動:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
英文原地址:http://kafka.apache.org/documentation.html#quickstart
上一篇 樸素貝葉斯之MapReduce版
下一篇 PCI9054 DMA設置流程