多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 互聯網 > Kafka編程實例

Kafka編程實例

來源:程序員人生   發布時間:2014-09-21 02:14:47 閱讀次數:3499次

 編程

    Producer是一個應用程序,它創建消息并發送它們到Kafka broker中。這些producer在本質上是不同。比如,前端應用程序,后端服務,代理服務,適配器對于潛在的系統,Hadoop對于的Producer。這些不同的Producer能夠使用不同的語言實現,比如java、C和Python。下面的這部圖表解釋了消息producer的Kafka API.


下面將詳細介紹如果編寫一個簡單的Producer和Consumer應用程序。

發送簡單消息給Kafka broker,Producer端編寫類ClusterProducer。

public classClusterProducer extends Thread { private static final Log log =LogFactory.getLog(ClusterProducer.class); public void sendData() { Random rnd = new Random(); Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME); if (props == null) { log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME); return; } //set the producer configurationproperties ProducerConfig config = newProducerConfig(props); Producer<String, String> producer= new Producer<String, String>(config); //Send the data int count = 1; KeyedMessage<String, String>data; while (count < 100) { String sign = "*"; String ip = "192.168.2."+ rnd.nextInt(255); StringBuffer sb = newStringBuffer(); for (int i = 0; i < count; i++){ sb.append(sign); } log.info("set data:" +sb); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString()); producer.send(data); count++; } producer.close(); } public void run() { sendData(); } public static void main(String[] args) { new ClusterProducer().sendData(); } }


定于Consumer獲取端,獲取對應topic的數據:

public class Consumerextends Thread { private static final Log log =LogFactory.getLog(Consumer.class); private final ConsumerConnector consumer; private final String topic; public Consumer(String topic) { consumer =kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; } private static ConsumerConfigcreateConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id",KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer>topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, newInteger(1)); Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]>it = stream.iterator(); while (it.hasNext()) { log.info("+message: " +new String(it.next().message())); } } public static void main(String[] args) { Consumer client = new Consumer("cluster_statistics_topic"); client.

     輔助類:

public interface PropertiesSettings { final static String CONSUMER_FILE_NAME = "consumer.properties"; final static String PRODUCER_FILE_NAME = "producer.properties"; final static String TOPIC_NAME = "cluster_statistics_topic"; final static String TOPIC_A = "cluster_statistics_topic_A"; }


package com.kafka.utils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @author JohnLiu * @version 0.1.0 * @date 2014/8/27 */ public class PropertiesParser { private static final Log log = LogFactory.getLog(PropertiesParser.class); /* properties file type */ Properties props = null; /* constructor method*/ public PropertiesParser(Properties props) { this.props = props; } /** * Get the trimmed String value of the property with the given * <code>name</code>. If the value the empty String (after * trimming), then it returns null. */ public String getStringProperty(String name) { return getStringProperty(name, null); } /** * Get the trimmed String value of the property with the given * <code>name</code> or the given default value if the value is * null or empty after trimming. */ public String getStringProperty(String name, String def) { String val = props.getProperty(name, def); if (val == null) { return def; } val = val.trim(); return (val.length() == 0) ? def : val; } private Properties loadPropertiesFile() { Properties props = new Properties(); InputStream in; ClassLoader cl = getClass().getClassLoader(); if (cl == null) cl = findClassloader(); if (cl == null) try { throw new ProcessingException("Unable to find a class loader on the current thread or class."); } catch (ProcessingException e) { e.printStackTrace(); } in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME); try { props.load(in); } catch (IOException ioe) { log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe); } return props; } private ClassLoader findClassloader() { // work-around set context loader for windows-service started jvms (QUARTZ-748) if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); } return Thread.currentThread().getContextClassLoader(); } public static Properties getProperties(final String fileName) { Properties props = new Properties(); InputStream in = Thread.currentThread().getContextClassLoader() .getResourceAsStream(fileName); try { props.load(in); } catch (IOException ioe) { log.error("can't load " + fileName, ioe); } return props; } }

      配置參數文件consumer.properties:

zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181 group.id=cluster_group zookeeper.session.timeout.ms=400 zookeeper.sync.time.ms=200 auto.commit.interval.ms=1000



      配置參數文件producer.properties:

metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092 serializer.class=kafka.serializer.StringEncoder #partitioner.class=com.kafka.producer.SimplePartitioner request.required.acks=1


     分別執行上面的代碼,可以發送或者得到對應topic信息。

     Enjoy yourself!(*^__^*) ……

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: www免费看 | 亚州都市春色校园小说另类 | www日本www| 国产精品2020观看久久 | 一区二区三区国产精品 | 视频www| 欧美巨大xxxx做受孕妇视频 | 亚洲在线中文 | www.自拍 | 亚洲精品成人一区二区aⅴ 亚洲精品成人在线 | 偷拍区小说区图片区另类呻吟 | 亚洲视频在线网站 | 手机在线日韩高清理论片 | nnnwww在线观看视频 | 手机看片成人 | 国产精品高清一区二区三区不卡 | 在线亚洲欧国产精品专区 | 欧美在线三级 | 欧美人与性禽xxxx | 视频一区二区三区欧美日韩 | 又大又硬又黄又刺激的免费视频 | 中文字幕亚洲图片 | 久久麻豆亚洲精品 | 色吊丝在线观看 | 最近中文字幕高清字幕 | 亚洲男人的天堂久久无 | 国产精品无码久久av | 武则天一级淫片免费 | 小说区图片区综合久久88 | 亚洲久久久久久久 | 香蕉成人啪国产精品视频综合网 | 久久精品免费 | 欧美成人毛片一级在线 | 久久精品国产线看观看亚洲 | a免费视频| 国产精品免费αv视频 | 69免费视频大片 | 奇奇影院理论片在线观看 | 国产精品麻豆高清在线观看 | 特级aav毛片日本免费视频 | 亚洲aaaa级特黄毛片 |