storm DRPC例子
來源:程序員人生 發布時間:2015-02-26 21:08:23 閱讀次數:3344次
1,DRPC原理
客戶端給DRPC服務器發送要履行的方法的名字,和這個方法的參數。實現了這個函數的topology使用DRPCSpout從DRPC服務器接收函 數調用流。每一個函數調用被DRPC服務器標記了1個唯1的id。 這個topology然后計算結果,在topology的最后1個叫做ReturnResults的bolt會連接到DRPC服務器,并且把這個調用的結
果發送給DRPC服務器(通過那個唯1的id標識)。DRPC服務器用那個唯1id來跟等待的客戶端匹配上,喚醒這個客戶端并且把結果發送給它。
2,例子
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "!!!", retInfo));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());
System.out.println(drpc.execute("exclamation", "aaa"));
System.out.println(drpc.execute("exclamation", "bbb"));
}
}
備注:DRPCSpout的名字與drpc.execute指定運行的名字1致
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈