Storm-Kafka模塊常用接口分析及消費kafka數據例子
來源:程序員人生 發布時間:2015-02-10 08:39:44 閱讀次數:9036次
使用storm-kafka模塊讀取kafka中的數據,依照以下兩步進行構建(我使用的版本是0.9.3)
1. 使用BrokerHosts接口來配置kafka broker host與partition的mapping信息;
2. 使用KafkaConfig來配置1些與kafka本身相干的選項,如fetchSizeBytes、socketTimeoutMs
下面分別介紹這兩塊的實現:
對配置1,目前支持兩種實現方式:zk配置、靜態ip端口方式
第1種方式:Zk讀取(比較常見)
ZkHosts支持兩種創建方式,
public ZkHosts(String brokerZkStr, String brokerZkPath)
//使用默許brokerZkPath:"/brokers"
public ZkHosts(String brokerZkStr)
通過這類方式訪問的時候,經過60s會刷新1下host->partition的mapping
第2步:構建KafkaConfig對象
目條件供兩種構造函數,
public KafkaConfig(BrokerHosts hosts, String topic)
//clientId如果不想每次隨機生成的話,就自己設置1個
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
代碼參考:
//這個地方其實就是kafka配置文件里邊的zookeeper.connect這個參數,可以去那里拿過來。
String brokerZkStr = "10.100.90.201:2181/kafka_online_sample";
String brokerZkPath = "/brokers";
ZkHosts zkHosts = new ZkHosts(brokerZkStr, brokerZkPath);
String topic = "mars-wap";
//以下:將offset匯報到哪一個zk集群,相應配置
// String offsetZkServers = "10.199.203.169";
String offsetZkServers = "10.100.90.201";
String offsetZkPort = "2181";
List<String> zkServersList = new ArrayList<String>();
zkServersList.add(offsetZkServers);
//匯報offset信息的root路徑
String offsetZkRoot = "/stormExample";
//存儲該spout id的消費offset信息,比方以topoName來命名
String offsetZkId = "storm-example";
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topic, offsetZkRoot, offsetZkId);
kafkaConfig.zkRoot = offsetZkRoot;
kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
kafkaConfig.zkServers = zkServersList;
kafkaConfig.id = offsetZkId;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
// cluster submit.
// try {
// StormSubmitter.submitTopology("storm-kafka-example",config,builder.createTopology());
// } catch (AlreadyAliveException e) {
// e.printStackTrace();
// } catch (InvalidTopologyException e) {
// e.printStackTrace();
// }
第2種方式:靜態ip端口方式
String kafkaHost = "10.100.90.201";
Broker brokerForPartition0 = new Broker(kafkaHost);//localhost:9092
Broker brokerForPartition1 = new Broker(kafkaHost, 9092);//localhost:9092 but we specified the port explicitly
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
StaticHosts hosts = new StaticHosts(partitionInfo);
String topic="mars-wap";
String offsetZkRoot ="/stormExample";
String offsetZkId="staticHost";
String offsetZkServers = "10.100.90.201";
String offsetZkPort = "2181";
List<String> zkServersList = new ArrayList<String>();
zkServersList.add(offsetZkServers);
SpoutConfig kafkaConfig = new SpoutConfig(hosts,topic,offsetZkRoot,offsetZkId);
kafkaConfig.zkPort = Integer.parseInt(offsetZkPort);
kafkaConfig.zkServers = zkServersList;
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout spout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("bolt", new Bolt(), 1).shuffleGrouping("spout");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
完全的使用例子,見github源碼
https://github.com/tonylee0329/storm-example/blob/master/src/main/java/org/tony/storm_kafka/common/
參考:
https://github.com/apache/storm/blob/v0.9.3/external/storm-kafka/README.md
本文為原創,轉載請標明出處!From Tony_老7
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈