多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > 綜合技術 > java后端IM消息推送服務開發——協議

java后端IM消息推送服務開發——協議

來源:程序員人生   發布時間:2016-07-04 16:58:46 閱讀次數:3510次

最近在1家saas企業使用Mqtt開發IM消息推送服務,把開發中的1些問題記錄下來,項目仍在商用中,完全的消息服務包括4個模塊---協議protocol,信令Signal,規則Rule,狀態Status,這個主題主要是協議protocol部份。

主要技術觸及到MongoDB,webservice,httpclient,Mqtt等

protocol分為4個模塊類來實現,固然這是為了以后的擴大性比較好

首先看1下我們的主類,主要是mqtt基礎方法的1個框架

public class MqttProtocol { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static final String HOST = "tcp://xx.xx.xx.xx:1883"; private static final String CLIENTID = "yyyy"; private MqttClient client; private MqttConnectOptions options = new MqttConnectOptions(); //private String userName = "admin"; //private String passWord = "public"; public MqttMessage message; private PushCallback callback; /** * 用于初始化mqttclient客戶端,設置回調函數,同時連接mqtt服務器 * @throws MqttException */ public MqttProtocol() throws MqttException { //MemoryPersistence設置clientid的保存情勢,默許為之內存保存 client = new MqttClient(HOST, CLIENTID, new MemoryPersistence()); callback = new PushCallback(); client.setCallback(callback); options = new MqttConnectOptions(); options.setCleanSession(false); options.setKeepAliveInterval(60); connect(); } /** * 連接mqtt消息服務器,同時設置了斷開重連的功能,主要是為了高可用性斟酌,在斷網服務器崩潰時候我們的程序依然不會終止 */ private void connect() { SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); boolean tryConnecting = true; while (tryConnecting) { try { client.connect(options); } catch (Exception e1) { System.out.println("Connection attempt failed with '"+e1.getCause()+ "'. Retrying."); } if (client.isConnected()) { System.out.println("Connected."); tryConnecting = false; } else { pause(); } } } private void pause() { try { Thread.sleep(1000); } catch (InterruptedException e) { // Error handling goes here... } } /** * * @param topic * @param qos * @throws MqttPersistenceException * @throws MqttException * 定閱相干主題 */ public void subscribe(String topic , int qos) throws MqttPersistenceException, MqttException { client.subscribe(topic, qos); } /** * * @throws MqttPersistenceException * @throws MqttException * 斷開連接服務器 */ public void disconnect() throws MqttPersistenceException, MqttException { client.disconnect(); } /** * * @author binshi *實現mqttcallback接口,主要用于接收消息后的處理方法 */ private class PushCallback implements MqttCallback { /** * 斷開后 系統會自動調用這個函數,同時在這個函數里進行重連操作 */ public void connectionLost(Throwable cause) { // 連接丟失后,1般在這里面進行重連 System.out.println("連接斷開,可以做重連"); connect(); try { subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 消息成功傳送后,系統會自動調用此函數,表明成功向topic發送消息 */ @Override public void deliveryComplete(IMqttDeliveryToken arg0) { // TODO Auto-generated method stub System.out.println("deliveryComplete---------" + arg0.isComplete()); } /** * 連接mongo數據庫,返回關于具體collection的Mongocollection * @param collectionname * @return */ public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(topic); SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內容 : " + new String(message.getPayload())); //1 抽取事件信令消息 String messagejudge=new String(message.getPayload()); System.out.println("疏忽所有robot消息和offline離線消息"); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(messagejudge); } catch (Exception e) { e.printStackTrace(); } String from=jo.getString("from"); System.out.println("取得from"+from); System.out.println("肯定消息是不是包括offline,如果包括獲得offline,為1就不處理"); String offline=null; if(messagejudge.contains("offline")) { offline=jo.getString("offline"); } if((offline==null)&&(!from.contains("robot"))) { System.out.println("處理非系統消息和非離線消息"); String type=jo.getString("type"); System.out.println("取得type"+type); if(type.equals("shakehand")) { System.out.println("處理shakehand消息"); String admin="doyounkowwhy"; if(jo.toString().contains("admin")) { admin=jo.getString("admin"); } System.out.println("獲得admin 如果為1定義為客服,否則為普通用戶 admin為"+admin); if(admin.equals("1")) { System.out.println("處理客服握手消息"); System.out.println("發送握手成功消息"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic); System.out.println("向客戶端發送離線未接收的消息"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection); } else { System.out.println("處理普通用戶的握手消息"); String appid=jo.getString("appid"); String pageid=jo.getString("pageid"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic); } } else if(type.equals("text")||type.equals("image")) { System.out.println("處理圖片和文字消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.getTextMsg( tmpindex, from, convid, retopic); System.out.println("保存圖片文字消息"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo); } else if(type.equals("ack")) { System.out.println("處理ack消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection); } } } } /** * * @param args * @throws MqttException * 全部工程從這里開始履行,生成可履行jar包,這個設置為主類。 */ public static void main(String[] args) throws MqttException { MqttProtocol signal = new MqttProtocol(); signal.message = new MqttMessage(); /** server.message.setQos(2); server.message.setRetained(false); server.message.setPayload("給客戶端124推送的信息".getBytes()); server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2); */ signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); System.out.println(signal.message.isRetained() + "------ratained狀態"); } }
接下來使我們的遠程連接模塊,主要是通過給定的url調用遠程接口

public class RemoteOperation { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static JSONObject remoteCall(String url) throws HttpException, IOException { HttpClient httpClient = new HttpClient(); GetMethod method =null ; method=new GetMethod(url); int retcode = httpClient.executeMethod(method); if (retcode != HttpStatus.SC_OK) {// 發送不成功 logger.info("遠程調用出錯"); return null; } else { String body = method.getResponseBodyAsString(); logger.info(body+"遠程調用php成功"); JSONObject jsonObject=new JSONObject(); try { jsonObject=JSONObject.fromObject(body); } catch (Exception e) { e.printStackTrace(); } if (method != null) { method.releaseConnection(); } return jsonObject; } } }

下面是Mongo數據庫的相干操作的1個封裝,設計為單例模式,相當于每次都使用同1個client打開連接,類似于連接池的概念,固然業務邏輯部份可以更換

public class MongoDBDao { private static Logger logger = Logger.getLogger(MongoDBDao.class); /** * MongoClient的實例代表數據庫連接池,是線程安全的,可以被多線程同享,客戶端在多線程條件下僅保持1個實例便可 * Mongo是非線程安全的,目前mongodb API中已建議用MongoClient替換Mongo */ private MongoClient mongoClient = null; /** * * 私有的構造函數 * 作者:shibin */ private MongoDBDao(){ if(mongoClient == null){ String url = Constant.MONGO_MQTT_URL; String user = Constant.MONGO_MQTT_USER; String password = Constant.MONGO_MQTT_PASSWORD; String database = Constant.MONGO_MQTT_DATABASE; int port = 27017; ServerAddress serverAddress = new ServerAddress(url, port); List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>(); serverAddresses.add(serverAddress); MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray()); List<MongoCredential> credentials = new ArrayList<MongoCredential>(); credentials.add(credential); mongoClient = new MongoClient(serverAddresses, credentials); System.out.println(mongoClient); System.out.println("初始化client完成"); } } /********單例模式聲明開始,采取餓漢式方式生成,保證線程安全********************/ //類初始化時,自行實例化,餓漢式單例模式 private static final MongoDBDao mongoDBDao = new MongoDBDao(); /** * * 方法名:getMongoDBDaoImplInstance * 作者:shibin * * 描寫:單例的靜態工廠方法 * @return */ public static MongoDBDao getMongoDBDaoInstance(){ return mongoDBDao; } public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException { System.out.println("取得message的連接"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); System.out.println("獲得convid所對應的msg列表"); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); FindIterable<Document> iterable=null; iterable = mongoCollection.find(query); if(iterable.first()!=null) { System.out.println(iterable.first()); String res= iterable.first().toJson(); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(res); } catch (Exception e) { e.printStackTrace(); } JSONArray jsonArray=jo.getJSONArray("msg"); for(int i=0;i<jsonArray.length();i++) { String read=jsonArray.getJSONObject(i).getString("read"); System.out.println("取得msg對應的第"+i+"條記錄的read信息"+read); System.out.println("判斷read是不是包括from的信息,如果不包括且這條消息不是他自己發的就給她發送這條消息"); if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from)) { System.out.println("取得這條消息的原型,然后加上offline=1并發送消息"); JSONObject msg=jsonArray.getJSONObject(i); msg.put("offline", "1"); retopic.publish(msg.toString().getBytes(), 0, false); } else { System.out.println("no offline message for "+from); } } } } public void saveTextMsg(String database,String collection,JSONObject jo) { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); String convid=jo.getString("convid"); query.put("_id", convid); FindIterable iterable; iterable = mongoCollection.find(query); System.out.println("更新message之前的值"+iterable.first()); Bson filter = Filters.eq("_id", convid); Document content = new Document(); String type=jo.getString("type"); if(type.equals("text")) { String contentMsg=jo.getJSONObject("content").getString("content"); content.put("content", contentMsg); } else { String url=jo.getJSONObject("content").getString("url"); content.put("url", url); } String admin=jo.getJSONObject("extra").getString("admin"); String headimgurl=jo.getJSONObject("extra").getString("headimgurl"); String nickname=jo.getJSONObject("extra").getString("nickname"); String from=jo.getString("from"); String tmpindex=jo.getString("tmpindex"); Document extra = new Document(); extra.put("nickname", nickname); Document doc = new Document(); doc.put("from",from ); ArrayList<String> read=new ArrayList<String>(); doc.put("read", read); Document tdoc = new Document(); tdoc.put("msg", doc); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions); iterable = mongoCollection.find(query); System.out.println("更新message以后的值"+iterable.first()); } public void getAck(String tmpindex,String convid,String from,String database,String collection) { System.out.println("接收到ack消息后更新message中的read字段"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); query.put("msg.tmpindex", tmpindex); BasicDBObject query1 = new BasicDBObject(); query1.put("_id", convid); FindIterable iterable; FindIterable iterable2; iterable = mongoCollection.find(query1); iterable2 = mongoCollection.find(query); System.out.println("更新message滿足id過濾條件之前的值"+iterable.first()); System.out.println("更新message滿足id和tmpindex過濾條件之前的值"+iterable2.first()); if(iterable2.first()!=null) { Document doc = new Document(); doc.put("msg.$.read", from); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions); } iterable = mongoCollection.find(query1); System.out.println("更新messages以后的值"+iterable.first()); } }

剩下的關于業務邏輯方面的就不多說了,主要是關于mqtt高可用性斷開重連的功能和mongo相干的操作

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 成人免费观看视频久爱网 | ww亚洲ww亚在线观看 | 国产日韩片 | 一级坐爱 | 日韩精品欧美亚洲高清有无 | 久久网站免费 | 久久亚洲精中文字幕冲田杏梨 | 免费一级毛片在播放视频 | 亚洲天堂中文字幕在线观看 | 亚洲成人bt | 国产成人精品免费视频大 | 日本草久| 性猛交xxxxx按摩中国 | 一区二区三区中文国产亚洲 | 日本一区二区免费在线观看 | 秋霞午夜 | 91福利视频免费观看 | 欧美视频一区在线 | 亚洲动漫第一页 | 国产高清在线精品一区在线 | 日韩中文一区 | 网站在线免费观看 | 欧美三级韩国三级日本一级 | 性做久久久久免费看 | 福利在线一区 | 亚洲国产精品一区 | 亚洲精品视频在线观看免费 | 99久久精品国产高清一区二区 | 成年人免费看视频 | 国内一区二区三区精品视频 | 在线免费午夜视频 | 亚洲欧美成人 | 性做久久久久久久免费看 | 亚洲视频免费在线看 | 亚洲成综合人影院在院播放 | 伊人色综合97| 手机看片亚洲 | 免费一级欧美片在线观免看 | 国产精品系列在线一区 | 欧美一区二区影院 | 国产专区自拍 |