多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > Flink流計算編程--Kafka+Flink整合demo

Flink流計算編程--Kafka+Flink整合demo

來源:程序員人生   發布時間:2016-06-30 08:24:47 閱讀次數:10838次

1、簡介

1.1、Kafka Consumer提供了2種API:high level與low level(SimpleConsumer)。
(1)high level consumer的API較為簡單,不需要關心offset、partition、broker等信息,kafka會自動讀取zookeeper中該consumer group的last offset。
(2)low level consumer也叫SimpleConsumer,這個接口非常復雜,需要自己寫代碼去實現對offset、partition、broker和broker的切換,能不用就不用,那什么時候必須用?

1、Read a message multiple times
2、Consume only a subset of the partitions in a topic in a process
3、Manage transactions to make sure a message is processed once and only once

這里寫圖片描述

2、Flink的開發準備

Flink提供了high level的API來消費kafka的數據:flink-connector-kafka-0.8_2.10。注意,這里的0.8代表的是kafka的版本,你可以通過maven來導入kafka的依賴,具體以下:
這里寫圖片描述

例如你的kafka安裝版本是“kafka_2.10-0.8.2.1”,即此版本是由scala2.10編寫,kafka的本身版本是0.8.2.1.那此時你需要添加以下的內容到maven的pom.xml文件中:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.10</artifactId> <version>${flink.version}</version> </dependency>

注意:
$${flink.version}是個變量,自己調劑下代碼,例如可以直接寫1.0.0。我的項目里采取的是添加了properties來控制${flink.version}:

<properties> <project.build.sourceEncoding>UTF⑻</project.build.sourceEncoding> <flink.version>1.0.0</flink.version> </properties>

3、集群環境準備

這里主要是介紹下Flink集群與kafka集群的搭建。
基礎的軟件安裝包括JDK、scala、hadoop、zookeeper、kafka和flink就不介紹了,直接看下flink的集群配置和kafka的集群配置。
zookeeper–3.4.6
hadoop–2.6.0
kafka–2.10-0.8.2.1
flink–1.0.3

3.1、Flink集群配置(standalone且沒有用zookeeper的HA)

3.1.1、環境變量
添加FLINK_HOME和path的內容:

export FLINK_HOME=/usr/local/flink/flink-1.0.3 export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZOOKEEPER_HOME}/bin:${KAFKA_HOME}/bin:${FLINK_HOME}/bin:$PATH export CLASS_PATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

3.1.2、修改conf/flink-conf.yaml
這里寫圖片描述

這幾近是最簡單的配置方式了,主要注意要修改jobmanager.rpc.address為集群中jobManager的IP或hostname。檢查點和HA的參數都沒有配置。

3.1.3、slaves文件
這里寫圖片描述

這個文件中寄存的信息是taskmanager的hostname。

3.1.4、復制flink目錄和.bashrc文件到集群中其他的機器,并使bashrc生效

root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker1:/usr/local/flink/ root@master:/usr/local/flink# scp -r flink⑴.0.3/ root@worker2:/usr/local/flink/ root@master:/usr/local/flink# scp ~/.bashrc root@worker1:~/.bashrc root@master:/usr/local/flink# scp ~/.bashrc root@worker2:~/.bashrc root@worker1:~# source ~/.bashrc root@worker2:~# source ~/.bashrc

3.2、kafka集群配置

3.2.1、環境變量
省略

3.2.2、配置config/zookeeper.properties
由于kafka集群依賴于zookeeper集群,所以kafka提供了通過kafka去啟動zookeeper集群的功能,固然也能夠手動去啟動zookeeper的集群而不通過kafka去啟動zookeeper的集群。
這里寫圖片描述
注意這里的dataDir最好不要指定/tmp目錄下,由于機器重啟會刪除此目錄下的文件。且指定的新路徑必須存在。

3.2.3、配置config/server.properties
這個文件是啟動kafka集群需要指定的配置文件,注意2點:

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on #port=9092 listeners=PLAINTEXT://:9092

broker.id在kafka集群的每臺機器上都不1樣,我這里3臺集群分別是0、1、2.

############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,worker1:2181,worker2:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000

zookeeper.connect要配置kafka集群所依賴的zookeeper集群的信息,hostname:port。

3.2.4、復制kafka路徑及環境變量到其他kafka集群的機器,并修改server.properties中的broker_id.
復制進程省略

3.3、啟動kafka集群+Flink集群

3.3.1、首先啟動zookeeper集群(3臺zookeeper機器都要啟動):

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh start

驗證zookeeper集群:
進程是不是啟動;zookeeper集群中是不是可以正常顯示leader和follower。

root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# jps 3295 QuorumPeerMain root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@master:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: follower root@worker1:/usr/local/zookeeper/zookeeper-3.4.6/bin# root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin# zkServer.sh status JMX enabled by default Using config: /usr/local/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: leader root@worker2:/usr/local/zookeeper/zookeeper-3.4.6/bin#

3.3.2、啟動kafka集群(3臺都要啟動)

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker1:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties & root@worker2:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-server-start.sh ../config/server.properties &

驗證:
進程;日志

3512 Kafka

3.3.3、啟動hdfs(master上啟動便可)

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# start-dfs.sh

驗證:進程及webUI

root@master:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3798 NameNode 4007 SecondaryNameNode root@worker1:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3843 DataNode root@worker2:/usr/local/hadoop/hadoop-2.6.0/sbin# jps 3802 DataNode

webUI:50070,默許可配置
這里寫圖片描述

3.3.4、啟動Flink集群(master便可)

root@master:/usr/local/flink/flink-1.0.3/bin# start-cluster.sh

驗證:進程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 4411 JobManager root@worker1:/usr/local/flink/flink-1.0.3/bin# jps 4151 TaskManager root@worker2:/usr/local/flink/flink-1.0.3/bin# jps 4110 TaskManager

WebUI:8081(默許,可配置)
這里寫圖片描述

4、編寫Flink程序,實現consume kafka的數據(demo)

4.1、代碼
這里就是簡單的實現接收kafka的數據,要指定zookeeper和kafka的集群配置,并指定topic的名字。
最后將consume的數據直接打印出來。

import java.util.Properties import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ /** * 用Flink消費kafka */ object ReadingFromKafka { private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181" private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092" private val TRANSACTION_GROUP = "transaction" def main(args : Array[String]){ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // configure Kafka consumer val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) //topicd的名字是new,schema默許使用SimpleStringSchema()便可 val transaction = env .addSource( new FlinkKafkaConsumer08[String]("new", new SimpleStringSchema(), kafkaProps) ) transaction.print() env.execute() } }

4.2、打包:

mvn clean package

這里寫圖片描述
看到成功標志,否則會提示error的地方。

4.3、發布到集群

root@master:/usr/local/flink/flink-1.0.3/bin# flink run -c wikiedits.ReadingFromKafka /root/Documents/wiki-edits-0.1.jar

驗證:進程及WebUI

root@master:/usr/local/flink/flink-1.0.3/bin# jps 6080 CliFrontend

這里寫圖片描述

5、kafka produce數據,驗證flink是不是正常消費

5.1、通過kafka console produce數據
之前已在kafka中創建了名字為new的topic,因此直接produce new的數據:

root@master:/usr/local/kafka/kafka_2.10-0.8.2.1/bin# kafka-console-producer.sh --broker-list master:9092,worker1:9092,worker2:9092 --topic new

生產數據:
這里寫圖片描述

5.2、查看flink的標準輸出中,是不是已消費了這部份數據:

root@worker2:/usr/local/flink/flink-1.0.3/log# ls -l | grep out -rw-r--r-- 1 root root 254 629 09:37 flink-root-taskmanager-0-worker2.out root@worker2:/usr/local/flink/flink-1.0.3/log#

我們在worker2的log中發現已有了數據,下面看看內容:
這里寫圖片描述

OK,沒問題,flink正常消費了數據。

6、總結

kafka作為1個消息系統,本身具有高吞吐、低延時、持久化、散布式等特點,其topic可以指定replication和partitions,使得可靠性和性能都可以很好的保證。
Kafka+Flink的架構,可使flink只需關注計算本身。

參考
http://www.tuicool.com/articles/fI7J3m
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
http://kafka.apache.org/082/documentation.html
http://dataartisans.github.io/flink-training/exercises/toFromKafka.html
http://data-artisans.com/kafka-flink-a-practical-how-to/

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 久久有精品 | 亚洲免费专区 | 日韩一级片视频 | 亚洲欧美另类日韩 | 国产三级在线播放 | 亚洲邪恶| 日本中文字幕在线看 | 欧美人善交 | 一区二区三区四区在线观看视频 | 一区二区三区四区视频 | 欧美激情五月 | 欧美最猛黑人xxxxwww | 最近最新中文字幕免费高清1 | 亚洲色中文字幕在线播放 | 国产亚洲精品福利在线 | 亚洲精品欧美精品国产精品 | 久久久久久综合对白国产 | 日本护士xxxx黑人巨大 | 日韩一级一片 | 在线 | 一区二区三区四区 | 欧美一区二区手机在线观看视频 | 欧美超清性videosfree | 色综合天天综合网国产成人网 | 国内视频一区二区三区 | 91成人福利 | 日韩精品欧美激情国产一区 | 日本色性 | 国精品日韩欧美一区二区三区 | 日韩免费看片 | 国产精品国产三级国产a | 伊人蕉久 | jizz日本在线播放 | 最近中文免费字幕1 | 火辣福利网站 | 亚洲欧美日韩国产综合 | 国产一国产一有一级毛片 | 天堂中文在线乱码 | 精品视频一区二区三区四区 | 亚洲欧美视频一区二区三区 | 国产精品一区伦免视频播放 | 最近最新高清中文字幕6页 最近最新免费中文字幕8 |