【老婆對老公說:老公你說我奇怪不奇怪啊,我既然可以用眼睛吃飯也。老公:說的新鮮,怎樣用眼睛怎樣吃飯啊。老婆說:我看你1眼就飽了……】
項目中如何利用Redis隊列+定時器(quartz)+websocket實現實時刷新的跑馬燈功能???
這幾天,公司有個業務,具體內容以下:
在儀表盤banner區域轉動播放提示信息。也就是實現1個實時播放消息的跑馬燈功能。播放的是1個任務內容(數據庫有1張表pm_task)。
跑馬燈消息提示內容總共有4種:
轉動播放時,每一個提示信息之間應由字符間隔。播放速度到時根據具體代碼運行情況進行分析,播放速度應不超過1般瀏覽速度。當有提示信息生成時,末位補進提示信息隊列。
特殊情況:
1. 當信息同時生成時,同級任務信息按時間進行排序。
2. 消息插播優先級:P1>P2>P3>P4。
3. 插播為即時插播,未播放完的消息不移除播放隊列。
做業務功能實現的時候,流程基本都是這樣的:熟習業務 —>>>分析業務—>>>拆分業務—>>> 尋覓拆分任務的技術解決方案 —>>>編碼實現 —>>>愉快的頑耍
在業務沒想清楚之前,千萬不要動手寫代碼。
網上搜索前端跑馬燈功能實現,1堆,可以看看文章最后參考文章那1節。具體實現就是HTML的1個便簽< marquee>
<marquee behavior="alternate">我來回轉動</marquee>
消息推送,公司既有的框架就是Websocket,所以可以在用戶進入頁面的時候,定閱相干通道,用戶退出頁面的時候,取消相干的通道。在需要推送消息時候,實現消息推送既可。
后端產生的消息,事實上有2種存儲方法:
1.我利用數據庫,建立1張表,產生的每條消息都保存到表(xxx_marquee_msg)里面。當前端跑馬燈需要數據的時候,從數據庫讀取1條優先級高的數據,返回給前端。與此同時,我把該條數據刪除,實現1個類似隊列這樣的1個功能。
2.我利用Redis的阻塞隊列功能,將數據寄存到redis隊列中。前端需要的時候,我再從隊列中獲得數據。
兩種方法的比較:
利用數據庫方法實現,簡單,業務邏輯好控制,缺點是:你得實現表的增刪改查操作,需要些很對的代碼,從控制層,業務層,DAO層,1層1層的寫,1堆代碼,麻煩。
相比之下,如果用Redis的阻塞隊列來實現,我不需要寫增刪改查操操作,只需要get和push消息到隊列中便可,同時由于在緩存中,效力高,缺點是:業務邏輯不好控制,比如我要實現隊列的排序,優先級,相對來講都比較麻煩。
就這樣糾結啊,糾結啊,我覺得選擇第2中方式,出于不想寫代碼的緣由,加上第2種方式逼格高,效力高等等。
仔細看下需求,你會發現,需求中要求消息是排優先級的,這點就有點頭疼了,不過好在,我們的消息只有4中優先級,所以具體解決方案以下:
我定義4個隊列(queue),分別寄存 P1 P2 P3 P4 4種基本的消息,取數據的時候,我先從P1隊列開始取,獲得不到時,順次從P2 P3 P4去消息。
可以參考這篇文章用redis實現支持優先級的消息隊列
由于跑馬燈的功能要實現實時刷新,也就是當有新的消息產生的時候,要實時刷新跑馬燈的內容,我選擇的方案是:在后端開啟1個定時器,實時的去Redis緩存隊列獲得相干的信息,推送給前端。
我在pcsMainTaskService這個業務類實現1個定時器,定時器的方法是pcsMarqueeRefresh:
<bean name="pcsMarqueeRefreshParseJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"
p:targetObject-ref="pcsMainTaskService" p:targetMethod="pcsMarqueeRefresh"
p:concurrent="false"/>
<bean id="pcsMarqueeRefreshTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="pcsMarqueeRefreshParseJob"/>
<property name="cronExpression" value="0/10 * * * * ?"/>
</bean>
<util:list id="schedulerTriggers">
<ref bean="pcsMarqueeRefreshTrigger"></ref>
</util:list>
/**
* 描寫:跑馬燈刷新(定時器)
*/
public void pcsMarqueeRefresh() throws Exception{
// 推送內容
String pushContent = null;
if(StringUtils.isEmpty(pushContent)){
pushContent = RedisUtils.getFromQueue(MarqueeRefreshUtils.REDIS_MARQUEE_P1_KEY);
}
if(StringUtils.isEmpty(pushContent)){
pushContent = RedisUtils.getFromQueue(MarqueeRefreshUtils.REDIS_MARQUEE_P2_KEY);
}
if(StringUtils.isEmpty(pushContent)){
pushContent = RedisUtils.getFromQueue(MarqueeRefreshUtils.REDIS_MARQUEE_P3_KEY);
}
if(StringUtils.isEmpty(pushContent)){
pushContent = RedisUtils.getFromQueue(MarqueeRefreshUtils.REDIS_MARQUEE_P4_KEY);
}
//推送消息
if(StringUtils.isNotEmpty(pushContent)){
//查詢系統所有的用戶
List<String> userIds = sysUserService.find(new ArrayList<>()).stream().map(SysUser::getId).collect(Collectors.toList());
//websocket推送消息
redisPubSubService.publish(new RedisMessage(pushContent, userIds, MarqueeRefreshUtils.MARQUEE_CHANNEL,false));
}
}
消息推送代碼比較簡單,獲得系統用戶,往通道(MarqueeRefreshUtils.MARQUEE_CHANNEL)推送消息。
//查詢系統所有的用戶
List<String> userIds = sysUserService.find(new ArrayList<>()).stream().map(SysUser::getId).collect(Collectors.toList());
//websocket推送消息
redisPubSubService.publish(new RedisMessage(pushContent, userIds, MarqueeRefreshUtils.MARQUEE_CHANNEL,false));
package com.evada.de.projcommand.utils;
import com.evada.de.common.enums.projcommond.TaskDeliverStatus;
import com.evada.de.common.enums.projcommond.TaskTypeEnum;
import com.evada.de.common.util.RedisUtils;
import com.evada.de.projcommand.model.PcsTask;
/**
* 描寫:跑馬燈消息刷新
* Created by huangwy on 2017/1/9.
*/
public class MarqueeRefreshUtils {
// 隊列總共分為4個級別,分別為 P1 P2 P3 P4
public static final String REDIS_MARQUEE_P1_KEY = "inno.pcs.marquee.refresh.p1";
public static final String REDIS_MARQUEE_P2_KEY = "inno.pcs.marquee.refresh.p2";
public static final String REDIS_MARQUEE_P3_KEY = "inno.pcs.marquee.refresh.p3";
public static final String REDIS_MARQUEE_P4_KEY = "inno.pcs.marquee.refresh.p4";
// 定閱頻道
public static final String MARQUEE_CHANNEL = "inno.pcs.marquee.refresh";
//消息前綴
public static final String PRE_P1_MESSAGE = "關鍵決策任務通過:";
public static final String PRE_P2_MESSAGE = "關鍵驗證任務通過:";
public static final String PRE_P3_MESSAGE = "任務已下發:";
public static final String PRE_P4_MESSAGE = "任務已通過:";
public static void pushToQueue(PcsTask pcsTask){
if(!(pcsTask.getWorkitemStatus().equals("3")
|| pcsTask.getDeliverStatus().equals(TaskDeliverStatus.TASK_DELIVER.toString()))){
return;
}
StringBuffer content = new StringBuffer();
//關鍵決策任務
if(TaskTypeEnum.KEY_DECISION_TASK.toString().equals(pcsTask.getType()) && pcsTask.getWorkitemStatus().equals("3")){
content.append(PRE_P1_MESSAGE).append(pcsTask.getCode()).append(" ").append(pcsTask.getName());
RedisUtils.putToQueue(REDIS_MARQUEE_P1_KEY,content.toString());
}
//關鍵驗證任務
if(TaskTypeEnum.KEY_VALIDATION_TASK.toString().equals(pcsTask.getType()) && pcsTask.getWorkitemStatus().equals("3")){
content.append(PRE_P2_MESSAGE).append(pcsTask.getCode()).append(" ").append(pcsTask.getName());
RedisUtils.putToQueue(REDIS_MARQUEE_P2_KEY,content.toString());
}
//任務已下發
if(TaskTypeEnum.KEY_VALIDATION_TASK.toString().equals(pcsTask.getType())
&& pcsTask.getDeliverStatus().equals(TaskDeliverStatus.TASK_DELIVER.toString())){
content.append(PRE_P3_MESSAGE).append(pcsTask.getCode()).append(" ").append(pcsTask.getName());
RedisUtils.putToQueue(REDIS_MARQUEE_P3_KEY,content.toString());
}
//1般任務
if(TaskTypeEnum.GENERAL_TASK.toString().equals(pcsTask.getType()) && pcsTask.getWorkitemStatus().equals("3")){
content.append(PRE_P4_MESSAGE).append(pcsTask.getCode()).append(" ").append(pcsTask.getName());
RedisUtils.putToQueue(REDIS_MARQUEE_P4_KEY,content.toString());
}
}
}
該工具類主要是實現隊列數據的存和取,相對來講比較簡單:
package com.evada.de.common.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisUtils {
private static RedisTemplate tmp;
@Autowired
RedisUtils(RedisTemplate redisTemplate) {
tmp = redisTemplate;
}
/**
* set value to queue
* @param key
* @param value
* @return
*/
public static Long putToQueue(final String key, final String value) {
Long l = (Long) tmp.execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection)throws DataAccessException {
return connection.lPush(key.getBytes(), value.getBytes());
}
});
return l;
}
/**
* get value from queue
* @param key
* @return
*/
public static String getFromQueue(final String key) {
byte[] b = (byte[]) tmp.execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection)throws DataAccessException {
return connection.lPop(key.getBytes());
}
});
if(b != null){
return new String(b);
}
return null;
}
}
好了,寫到這里基本就實現了,很簡單有木有~~~
來自 古斯塔夫·勒龐《烏合之眾》
【1個失意年輕人尋覓成功,哲人給1顆花生說:“用力捏它。”年輕人用力1捏,花生殼碎了,剩下仁。哲人又叫他搓,結果搓掉紅色的皮,只留下白白的果實。哲人再叫他捏,不論他如何用力,卻捏不碎花生仁。哲人說:“雖然屢受磨難,失去了很多,但要有1顆不屈的心。”】
【1】標簽 HTML跑馬燈
【2】Spring+Websocket實現消息的推送
【3】Spring3.0與Quartz的整合實現定時任務調度
【4】利用Redis 實現消息隊列
【5】用redis實現支持優先級的消息隊列
【6】java redis使用之利用jedis實現redis消息隊列
如果有帶給你1絲絲小快樂,就讓快樂繼續傳遞下去,歡迎點贊、頂、歡迎留下寶貴的意見、多謝支持!
上一篇 vector基礎使用
下一篇 GPS定位基本原理淺析