多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > php教程 > 探討erlang消息選擇性接收和改進

探討erlang消息選擇性接收和改進

來源:程序員人生   發布時間:2015-03-16 11:02:20 閱讀次數:3644次
從 rabbitMQ 代碼中找到 gen_server2 , 對gen_server進行了1些優化。看到先輩寫的博文也提到這個,引發了我的思考。見 gen_server2 - OTP gen_server優化版 。

gen_server2 引發的思考

正如 litaocheng 所說的:
gen_server 和 gen_server2 最大的不同是:
gen_server2 收到任何1條消息放到外部的隊列中,當VM內部消息隊列為空后,才進行消息處理,繼續循環
gen_server 收到任何1條消息后,立即進行處理,處理完成后繼續循環

其次,還有1個很重要的不同點:
gen_server2 使用的外部隊列是帶優先級排序的,功能模塊本身可以定制消息優先級,乃至直接拋棄消息。(導出 prioritise_call/prioritise_cast/prioritise_info 幾個函數實現定制,返回的數值越大優先級越高,返回drop就拋棄消息)
最高優先級是 infinity, 在處理 {system, _From, _Req} 和 {'EXIT', Parent, _R} 使用了這個優先級。

但在博文也援用了Joe' Bog 在merle 所做的測試,看了merle 的代碼,沒有用到 prioritise_XXX 函數,說明沒有明顯利用到 gen_server2 優先級控制的好處,那為何能取得不錯的效果?(見下圖)

討論 gen_server2 的測試

通過瀏覽 Joe 在Github寫的 merle 代碼,很快發現問題:
可以看出,merle 在 handle_call 時都會調用 send_generic_cmd 、send_get_cmd 等類似函數。這些函數實現上都會阻塞進程直到接收到某些特定消息。
下面以 send_generic_cmd 為例做說明:
send_generic_cmd(Socket, Cmd) ->
    gen_tcp:send(Socket, <<Cmd/binary, " ">>),
    Reply = recv_simple_reply(),
    Reply.


recv_simple_reply() ->
   receive
      {tcp,_,Data} ->
           string:tokens(binary_to_list(Data), " ");
      {error, closed} ->
           connection_closed
      after ?TIMEOUT -> timeout
   end.
另外,gen_tcp:send 在實現上也 receive 等待某個特定信息,見 prim_inet:send(Socket, Packet, Opts)
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    try erlang:port_command(S, Data, OptList) of
    false -> % Port busy and nosuspend option passed
       {error,busy};
    true ->
       receive
          {inet_reply,S,Status} ->
             Status
      end
   catch
      error:_Error ->
         {error,einval}
   end.
也許可能有讀者不明白,這里說的等待某個特定消息是指選擇性接收。具體例子以下:
選擇性接收:
   receive
         {ok, Result} ->
               Result
   end.

非選擇性接收:
     receive
         Info ->
                Info
     end.
選擇性接收只針對某類信件,會1直阻塞住直到找到該信件為止,或超時。非選擇性接收的結果是每條消息都會被消費掉,方式為先進先出,不存在掃描信箱的問題。

前面提到,merle 在 handle_call 時都會 receive 住,等待某個特定消息。這個的代價就是每次receive住,erlang VM都要掃描進程全部信箱隊列。特別像 Joe 在做此類測試時,消息處理速度遠遠低于消息投遞速度,換句話說,gen_server進程信箱前面所有大部份的信件都是作者自己發的 gen_server:call 要求消息,然后每次 receive 住都要匹配這些消息。
比如, Joe 測試的是 merle:getkey 操作,那末信箱大部份消息就是 gen_server:call 投遞的 getkey 消息,而 handle_call 在處理時就要掃描完前面的getkey消息,才能得到想要的 {tcp,_,Data} 消息。進程信箱消息隊列以下所示:
getkey
getkey
getkey
...
getkey
{tcp,_,Data}
...
換成 gen_server2 的方式,gen_server2 會清空消息隊列,那末進程信箱消息隊列以下所示:
getkey
getkey
{tcp,_,Data}
...
前面還有2個getkey表示 gen_server2清空后在 handle_call 處理進程中 gen_server:call 又投遞了新的 getkey 消息,數據量對照 gen_server來講可以說是極少了,所以,消息匹配的次數就少了很多,這就會出現 Joe 測試的結果。

討論erlang消息選擇性接收

在討論這個問題之前,先援用 learnyousomeerlang 對消息選擇性接收的介紹(原文),很生動具體。
When there is no way to match a given message, it is put in a save queue and the next message is tried. If the second message matches, the first message is put back on top of the mailbox to be retried later.

就是說,erlang消息匹配不上,就會把消息放到 Save Queue 的隊列中,當匹配到了后再把消息放回進程信箱。以上是形象化的說法,如果是這樣就必定存在消息入列出列開消,那VM究竟是不是這樣實現呢?

所以,對選擇性接收,這里取3個問題出來說:
1、上面提到的,消息 Save Queue 是不是存在入列出列開消
2、當選擇性接收時,新消息到來時會不會重復掃描信箱前面匹配不上的消息
3、假定第2點不存在重復掃描,那末如果消息已匹配到了,再匹配多1次這個消息,會不會重復掃描前面的消息

帶著上面的疑問,下面以1個簡單的例子做說明
-module(test).
-compile(export_all).
t() ->
   receive
      ok ->
          ok
   end.
保存為test.erl,然后編譯,生成opcode
1> c(test).
{ok,test}
2> erts_debug:df(test).
ok
在目錄下找到生成的 test.dis,t() 函數opcode以下:
04BE84B0: i_func_info_IaaI 0 test t 0 
04BE84C4: i_loop_rec_fr f(04BE84EC) x(0) 
04BE84CC: i_is_eq_exact_immed_frc f(04BE84E4) x(0) ok 
04BE84D8: remove_message 
04BE84DC: move_return_cr ok x(0) 
04BE84E4: loop_rec_end_f test:t/0 
04BE84EC: wait_locked_f test:t/0 
逐行解釋這段代碼:
i_loop_rec_fr
receive接收信息,如果有信息放到 x(0) 寄存器,繼續下1條指令;沒有消息就跳到地址 04BE84EC,即 wait_locked_f 
i_is_eq_exact_immed_frc 
匹配 x(0)寄存器的值和ok是不是相等,如果相等繼續下1條指令;否則跳到04BE84E4,即 loop_rec_end_f 
remove_message 
移除進程消息隊列中“當前”的信息(也就是上1行匹配到的信息)
move_return_cr 
將 ok 送到 x(0)寄存器并返回結果
loop_rec_end_f 
將“當前消息”指針指向下1個位置,如果指向位置有消息,則跳到test:t/0第1段代碼地址繼續履行,即 04BE84C4;否則繼續履行下1條指令 04BE84EC,即 wait_locked_f 
wait_locked_f  
阻塞當前進程,等待下1次調度,再檢查是不是有新的消息到達
以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h實現,其他在 beam_emu.c 實現,都可以找相干代碼。

/* * beam_emu.c process_main() 線程入口函數,實現VM調度 * 以下截取 i_loop_rec_fr 處理進程 * 作用是從信箱取出1條消息放到 x(0) 寄存器;沒消息則跳到 wait或 wait_timeout指令 */ OpCase(i_loop_rec_fr): { BeamInstr *next; ErlMessage* msgp; loop_rec__: PROCESS_MAIN_CHK_LOCKS(c_p); // 取出“當前位置”的消息 msgp = PEEK_MESSAGE(c_p); if (!msgp) { //如果消息不存在,嘗試從SMP下public queue獲得消息 #ifdef ERTS_SMP erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); if (ERTS_PROC_PENDING_EXIT(c_p)) { // 如果進程準備退出,則不處理消息了 erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); SWAPOUT; goto do_schedule; // 等待下1次調度 } // SMP下把消息移到進程私有堆尾部(純指針操作) ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); // 再嘗試取出“當前位置”的消息 msgp = PEEK_MESSAGE(c_p); if (msgp) erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); else #endif { // 信箱沒消息則跳到 wait或 wait_timeout指令(實際上就是履行下1條履行) SET_I((BeamInstr *) Arg(0)); Goto(*I); } } // 解析散布式消息,把消息附加的數據復制到進程私有堆 ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS, { SWAPOUT; reg[0] = r(0); PROCESS_MAIN_CHK_LOCKS(c_p); }, { ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); r(0) = reg[0]; SWAPIN; }); if (is_non_value(ERL_MESSAGE_TERM(msgp))) { /* * 如果消息破壞就移除(出現這類情況是散布式消息解碼出現毛?。? */ ASSERT(!msgp->data.attached); UNLINK_MESSAGE(c_p, msgp); // 移除消息,側重將“當前”位置指向下1條消息 free_message(msgp); // 燒毀消息 goto loop_rec__; // 跳到上面繼續 } PreFetch(1, next); // 標記下1條指令位置 r(0) = ERL_MESSAGE_TERM(msgp); NextPF(1, next); // 履行下1條指令 }
來看下這兩個宏定義:
/* Get "current" message */ #define PEEK_MESSAGE(p) (*(p)->msg.save)
從字面上就知道這個宏是取"當前的"消息,取了 msg.save 的值
#define UNLINK_MESSAGE(p,msgp) do { ErlMessage* __mp = (msgp)->next; *(p)->msg.save = __mp; (p)->msg.len--; if (__mp == NULL) (p)->msg.last = (p)->msg.save; (p)->msg.mark = 0; } while(0)
這個宏就是移除消息操作,消息隊列長度⑴,把 msg.save 指向了 msgp的下1條消息;如果 msgp->next 為 NULL,表示這是最后1條消息,就把 msg.last 等于了 msg.save
/* * beam_emu.c process_main() 線程入口函數,實現VM調度 * 以下截取 remove_message 處理進程(已刪除沒必要要的代碼) * 作用是將消息從信箱隊列中移除 */ OpCase(remove_message): { BeamInstr *next; ErlMessage* msgp; PROCESS_MAIN_CHK_LOCKS(c_p); PreFetch(0, next); msgp = PEEK_MESSAGE(c_p); // 取出當前的消息 if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) { save_calls(c_p, &exp_receive); } if (ERL_MESSAGE_TOKEN(msgp) == NIL) { SEQ_TRACE_TOKEN(c_p) = NIL; } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) { // 追蹤調試內容,可以疏忽 Eterm msg; SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp); c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) { c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); } msg = ERL_MESSAGE_TERM(msgp); seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, c_p->common.id, c_p); } UNLINK_MESSAGE(c_p, msgp); // 移除消息,側重隊列長度⑴ JOIN_MESSAGE(c_p); // 重置“當前”位置,指向了隊列第1條消息 CANCEL_TIMER(c_p); free_message(msgp); // 燒毀消息 ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); NextPF(0, next); // 履行下1條指令 }
所以,當消息匹配時,就會重新指向了信箱第1條消息,這樣,第3個問題就有了答案,會重新掃描信箱。
再來看看這個宏:
/* Reset message save point (after receive match) */ #define JOIN_MESSAGE(p) (p)->msg.save = &(p)->msg.first
這個宏就是講 msg.save 指向了 msg.first ,就是第1個消息
下面看下消息不匹配的情況就是 loop_rec_end_f 
/* * beam_emu.c process_main() 線程入口函數,實現VM調度 * 以下截取 loop_rec_end_f 處理進程 * 作用是繼續取出最新的消息匹配 */ /* * Advance the save pointer to the next message (the current * message didn't match), then jump to the loop_rec instruction. */ OpCase(loop_rec_end_f): { SET_I((BeamInstr *) Arg(0)); SAVE_MESSAGE(c_p); // “當前”位置指向下1個位置 goto loop_rec__; // 繼續取出消息匹配 }
這個opcode實現了不斷取消息出來匹配的進程,直到失去調度機會,等待下1次調度。
也看下這個宏:
/* Save current message */ #define SAVE_MESSAGE(p) (p)->msg.save = &(*(p)->msg.save)->next
這個宏就是將 msg.save 指向了下1個位置。
到這里第1個問題和第2個問題都有答案了,前面說到的 Save Queue 只是“形象化”的隊列,實際不存在,所以不存在消息入列出列的開消問題。然后第2個問題,消息選擇性接收,當消息匹配不上,有新消息到來時不會重復掃描信箱前面匹配不上的消息。

總結

針對erlang選擇性接收的問題,gen_server2給我們1個方向,通過外部隊列減少了消息的匹配,而且控制優先級來控制消息的處理。
這里也說說 gen_server2 的副作用:
gen_server2會帶來1種問題,erlang原來會利用進程信箱長度來抑制發送者進程(通過減少消息發送者進程的調度機會 Reduction,可以參考這篇文章《erlang send剖析及參數意義》)。但是,gen_server2 每次都會清空進程信箱的消息隊列,沒法利用到 VM 提供的抑制消息隊列過快暴漲的保護機制。
針對這個問題,gen_server2 通過 prioritise_XXX 函數向外部模塊暴露消息隊列長度,使調用者可以根據消息隊列長度控制是不是拋棄消息,以實現對消息的抑制。

實際上,gen_server 在我們的開發中就夠用了,很少需要去斟酌erlang選擇性接收的問題。rabbitMQ是針對消息隊列的處理,必定有不計其數的消息量,那才正好需要 gen_server2 的作用。如果也到了這類消息量,那就建議使用 gen_server2

參考:http://blog.csdn.net/mycwq/article/details/44049749

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 日韩欧美亚洲一区 | 亚洲综合校园春色 | 亚洲福利院 | 亚洲天堂第一 | 久久久久国产精品美女毛片 | 性xxxx奶大欧美高清 | 欧美一级黄色片免费看 | 国产精品中文字幕在线观看 | 亚洲激情另类 | 最近中文在线国语 | 成人国产在线不卡视频 | 精品一区二区久久久久久久网站 | 黄色免费在线网站 | 一二三四视频观看中文在线看 | 性欧美高清 | 91精品一区国产高清在线 | 欧美 日韩 国产在线 | 请看一下欧美一级毛片 | 欧美黑人巨大xxxx猛交 | 国产在线观看成人免费视频 | 久久久久久久综合日本亚洲 | 中文字幕精品一区二区三区视频 | 波多野结衣 在线资源观看 波多野结衣 一区二区 | 久久午夜羞羞影院免费观看 | 欧美日韩看片 | 综合免费视频 | 午夜啪啪免费视频 | 亚洲第一天堂 | 国产亚洲网站 | 手机看片日韩欧美 | jjzz韩国| 99成人精品 | 日韩有码在线视频 | 免费观看老外特级毛片 | 久久久久成人精品一区二区 | 日韩欧美视频在线 | 国产成人高清亚洲一区91 | 最新日本一级中文字幕 | 亚洲性生活网站 | 视频啪啪 | 久久久日本精品一区二区三区 |