多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 服務器 > Storm-Kafka模塊常用接口分析及消費kafka數據例子

Storm-Kafka模塊常用接口分析及消費kafka數據例子

來源:程序員人生   發布時間:2015-02-10 08:39:44 閱讀次數:9036次
使用storm-kafka模塊讀取kafka中的數據,依照以下兩步進行構建(我使用的版本是0.9.3)
1. 使用BrokerHosts接口來配置kafka broker host與partition的mapping信息;
2. 使用KafkaConfig來配置1些與kafka本身相干的選項,如fetchSizeBytes、socketTimeoutMs
下面分別介紹這兩塊的實現:

對配置1,目前支持兩種實現方式:zk配置、靜態ip端口方式

第1種方式:Zk讀取(比較常見)
ZkHosts支持兩種創建方式, public ZkHosts(String brokerZkStr, String brokerZkPath) //使用默許brokerZkPath:"/brokers" public ZkHosts(String brokerZkStr)

通過這類方式訪問的時候,經過60s會刷新1下host->partition的mapping
   
第2步:構建KafkaConfig對象
目條件供兩種構造函數,
public KafkaConfig(BrokerHosts hosts, String topic) //clientId如果不想每次隨機生成的話,就自己設置1個 public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

代碼參考:
//這個地方其實就是kafka配置文件里邊的zookeeper.connect這個參數,可以去那里拿過來。 String brokerZkStr = "10.100.90.201:2181/kafka_online_sample"; String brokerZkPath = "/brokers"; ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath); String topic = "mars-wap"; //以下:將offset匯報到哪一個zk集群,相應配置 // String offsetZkServers = "10.199.203.169"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); //匯報offset信息的root路徑 String offsetZkRoot = "/stormExample"; //存儲該spout id的消費offset信息,比方以topoName來命名 String offsetZkId = "storm-example"; SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId); kafkaConfig.zkRoot = offsetZkRoot; kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.id = offsetZkId; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology()); // cluster submit. // try { // StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology()); // } catch (AlreadyAliveException e) { // e.printStackTrace(); // } catch (InvalidTopologyException e) { // e.printStackTrace(); // }

第2種方式:靜態ip端口方式
String kafkaHost = "10.100.90.201"; Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092 Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 StaticHosts hosts = new StaticHosts(partitionInfo); String topic="mars-wap"; String offsetZkRoot ="/stormExample"; String offsetZkId="staticHost"; String offsetZkServers = "10.100.90.201"; String offsetZkPort = "2181"; List<String> zkServersList = new ArrayList<String>(); zkServersList.add(offsetZkServers); SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId); kafkaConfig.zkPort = Integer.parseInt(offsetZkPort); kafkaConfig.zkServers = zkServersList; kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout spout = new KafkaSpout(kafkaConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", spout, 1); builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology());



完全的使用例子,見github源碼
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/

參考:
https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md

本文為原創,轉載請標明出處!From Tony_老7
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 日本大片免费播放网站 | 日本高清无吗免费播放 | 亚洲伊人影院 | 97午夜理伦影院在线观看 | 欧美成人性视频播放 | 国产亚洲精品自在线观看 | 亚洲福利视频在线 | 精品国产免费人成网站 | 免费看黄色的网址 | 中文字幕在线视频网站 | 亚洲欧美高清 | 动漫美女口工h福利动画网站 | 久久99精品国产99久久6男男 | 亚洲第一精品夜夜躁人人爽 | 欧美整片在线观看 | 免费在线观看黄色网址 | 午夜视频观看 | 亚洲天堂第一页 | 亚洲成人播放 | 一级特黄色大片 | 新天堂网| www.99爱| 精品国产免费福利片 | 日毛片 | 一区二区三区视频在线观看 | 亚洲成人av | 国产免费一区二区三区 | 成人国产精品久久久免费 | 国产欧美日韩一区 | 欧美三级大片在线观看 | av在线影院 | 国产亚洲精品久久久久91网站 | 武则天全黄肉体毛片免费看 | 欧美黑人xxxx性高清版 | 免费ab| 国产最新一区二区三区天堂 | 欧美日韩国产一区二区三区欧 | 色爱区综合 | 欧美亚洲日本一区 | 噜噜噜噜私人影院老湿在线观看 | 最近中文字幕mv免费高清视频免费 |