探討erlang消息選擇性接收和改進
來源:程序員人生 發布時間:2015-03-16 11:02:20 閱讀次數:3644次
從 rabbitMQ 代碼中找到 gen_server2 , 對gen_server進行了1些優化。看到先輩寫的博文也提到這個,引發了我的思考。見 gen_server2 - OTP gen_server優化版
。gen_server2 引發的思考
正如 litaocheng 所說的:
gen_server 和 gen_server2 最大的不同是:
gen_server2 收到任何1條消息放到外部的隊列中,當VM內部消息隊列為空后,才進行消息處理,繼續循環
gen_server 收到任何1條消息后,立即進行處理,處理完成后繼續循環
其次,還有1個很重要的不同點:
gen_server2 使用的外部隊列是帶優先級排序的,功能模塊本身可以定制消息優先級,乃至直接拋棄消息。(導出 prioritise_call/prioritise_cast/prioritise_info 幾個函數實現定制,返回的數值越大優先級越高,返回drop就拋棄消息)
最高優先級是 infinity, 在處理 {system, _From, _Req} 和 {'EXIT', Parent, _R} 使用了這個優先級。
但在博文也援用了Joe' Bog 在merle 所做的測試,看了merle 的代碼,沒有用到 prioritise_XXX 函數,說明沒有明顯利用到 gen_server2 優先級控制的好處,那為何能取得不錯的效果?(見下圖)
討論 gen_server2 的測試
通過瀏覽 Joe 在Github寫的 merle 代碼,很快發現問題:
可以看出,merle 在 handle_call 時都會調用 send_generic_cmd 、send_get_cmd 等類似函數。這些函數實現上都會阻塞進程直到接收到某些特定消息。
下面以 send_generic_cmd 為例做說明:send_generic_cmd(Socket, Cmd) -> gen_tcp:send(Socket, <<Cmd/binary, "
">>), Reply = recv_simple_reply(), Reply.
recv_simple_reply() -> receive {tcp,_,Data} -> string:tokens(binary_to_list(Data), "
"); {error, closed} -> connection_closed after ?TIMEOUT -> timeout end. |
另外,gen_tcp:send 在實現上也 receive 等待某個特定信息,見 prim_inet:send(Socket, Packet, Opts)
send(S, Data, OptList) when is_port(S), is_list(OptList) -> try erlang:port_command(S, Data, OptList) of false -> % Port busy and nosuspend option passed {error,busy}; true -> receive {inet_reply,S,Status} -> Status end catch error:_Error -> {error,einval} end. |
也許可能有讀者不明白,這里說的等待某個特定消息是指選擇性接收。具體例子以下:
選擇性接收: receive {ok, Result} -> Result end.
非選擇性接收: receive Info -> Info end. |
選擇性接收只針對某類信件,會1直阻塞住直到找到該信件為止,或超時。非選擇性接收的結果是每條消息都會被消費掉,方式為先進先出,不存在掃描信箱的問題。
前面提到,merle 在 handle_call 時都會 receive 住,等待某個特定消息。這個的代價就是每次receive住,erlang VM都要掃描進程全部信箱隊列。特別像 Joe 在做此類測試時,消息處理速度遠遠低于消息投遞速度,換句話說,gen_server進程信箱前面所有大部份的信件都是作者自己發的 gen_server:call 要求消息,然后每次 receive 住都要匹配這些消息。
比如, Joe 測試的是 merle:getkey 操作,那末信箱大部份消息就是 gen_server:call 投遞的 getkey 消息,而 handle_call 在處理時就要掃描完前面的getkey消息,才能得到想要的 {tcp,_,Data} 消息。進程信箱消息隊列以下所示:
getkey | getkey | getkey | ... | getkey | {tcp,_,Data} | ... |
換成 gen_server2 的方式,gen_server2 會清空消息隊列,那末進程信箱消息隊列以下所示:
getkey | getkey | {tcp,_,Data} | ... |
前面還有2個getkey表示 gen_server2清空后在 handle_call 處理進程中 gen_server:call 又投遞了新的 getkey 消息,數據量對照 gen_server來講可以說是極少了,所以,消息匹配的次數就少了很多,這就會出現 Joe 測試的結果。
討論erlang消息選擇性接收
在討論這個問題之前,先援用 learnyousomeerlang 對消息選擇性接收的介紹(原文),很生動具體。
When there is no way to match a given message, it is put in a save queue and the next message is tried. If the second message matches, the first message is put back on top of the mailbox to be retried later.
|
就是說,erlang消息匹配不上,就會把消息放到 Save Queue 的隊列中,當匹配到了后再把消息放回進程信箱。以上是形象化的說法,如果是這樣就必定存在消息入列出列開消,那VM究竟是不是這樣實現呢?
所以,對選擇性接收,這里取3個問題出來說:
1、上面提到的,消息 Save Queue 是不是存在入列出列開消
2、當選擇性接收時,新消息到來時會不會重復掃描信箱前面匹配不上的消息
3、假定第2點不存在重復掃描,那末如果消息已匹配到了,再匹配多1次這個消息,會不會重復掃描前面的消息
帶著上面的疑問,下面以1個簡單的例子做說明
-module(test). -compile(export_all). t() -> receive ok -> ok end. |
保存為test.erl,然后編譯,生成opcode
1> c(test). {ok,test} 2> erts_debug:df(test). ok |
在目錄下找到生成的 test.dis,t() 函數opcode以下:
04BE84B0: i_func_info_IaaI 0 test t 0 04BE84C4: i_loop_rec_fr f(04BE84EC) x(0) 04BE84CC: i_is_eq_exact_immed_frc f(04BE84E4) x(0) ok 04BE84D8: remove_message 04BE84DC: move_return_cr ok x(0) 04BE84E4: loop_rec_end_f test:t/0 04BE84EC: wait_locked_f test:t/0 |
逐行解釋這段代碼:
i_loop_rec_fr | receive接收信息,如果有信息放到 x(0) 寄存器,繼續下1條指令;沒有消息就跳到地址 04BE84EC,即 wait_locked_f |
i_is_eq_exact_immed_frc
| 匹配 x(0)寄存器的值和ok是不是相等,如果相等繼續下1條指令;否則跳到04BE84E4,即 loop_rec_end_f |
remove_message | 移除進程消息隊列中“當前”的信息(也就是上1行匹配到的信息) |
move_return_cr | 將 ok 送到 x(0)寄存器并返回結果 |
loop_rec_end_f | 將“當前消息”指針指向下1個位置,如果指向位置有消息,則跳到test:t/0第1段代碼地址繼續履行,即 04BE84C4;否則繼續履行下1條指令 04BE84EC,即 wait_locked_f |
wait_locked_f | 阻塞當前進程,等待下1次調度,再檢查是不是有新的消息到達 |
以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h實現,其他在 beam_emu.c 實現,都可以找相干代碼。
/*
* beam_emu.c process_main() 線程入口函數,實現VM調度
* 以下截取 i_loop_rec_fr 處理進程
* 作用是從信箱取出1條消息放到 x(0) 寄存器;沒消息則跳到 wait或 wait_timeout指令
*/
OpCase(i_loop_rec_fr):
{
BeamInstr *next;
ErlMessage* msgp;
loop_rec__:
PROCESS_MAIN_CHK_LOCKS(c_p);
// 取出“當前位置”的消息
msgp = PEEK_MESSAGE(c_p);
if (!msgp) { //如果消息不存在,嘗試從SMP下public queue獲得消息
#ifdef ERTS_SMP
erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
if (ERTS_PROC_PENDING_EXIT(c_p)) {
// 如果進程準備退出,則不處理消息了
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
SWAPOUT;
goto do_schedule; // 等待下1次調度
}
// SMP下把消息移到進程私有堆尾部(純指針操作)
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
// 再嘗試取出“當前位置”的消息
msgp = PEEK_MESSAGE(c_p);
if (msgp)
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
else
#endif
{
// 信箱沒消息則跳到 wait或 wait_timeout指令(實際上就是履行下1條履行)
SET_I((BeamInstr *) Arg(0));
Goto(*I);
}
}
// 解析散布式消息,把消息附加的數據復制到進程私有堆
ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS,
{
SWAPOUT;
reg[0] = r(0);
PROCESS_MAIN_CHK_LOCKS(c_p);
},
{
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
r(0) = reg[0];
SWAPIN;
});
if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
/*
* 如果消息破壞就移除(出現這類情況是散布式消息解碼出現毛?。? */
ASSERT(!msgp->data.attached);
UNLINK_MESSAGE(c_p, msgp); // 移除消息,側重將“當前”位置指向下1條消息
free_message(msgp); // 燒毀消息
goto loop_rec__; // 跳到上面繼續
}
PreFetch(1, next); // 標記下1條指令位置
r(0) = ERL_MESSAGE_TERM(msgp);
NextPF(1, next); // 履行下1條指令
}
來看下這兩個宏定義:
/* Get "current" message */
#define PEEK_MESSAGE(p) (*(p)->msg.save)
從字面上就知道這個宏是取"當前的"消息,取了 msg.save 的值
#define UNLINK_MESSAGE(p,msgp) do {
ErlMessage* __mp = (msgp)->next;
*(p)->msg.save = __mp;
(p)->msg.len--;
if (__mp == NULL)
(p)->msg.last = (p)->msg.save;
(p)->msg.mark = 0;
} while(0)
這個宏就是移除消息操作,消息隊列長度⑴,把 msg.save 指向了 msgp的下1條消息;如果 msgp->next 為 NULL,表示這是最后1條消息,就把 msg.last 等于了 msg.save
/*
* beam_emu.c process_main() 線程入口函數,實現VM調度
* 以下截取 remove_message 處理進程(已刪除沒必要要的代碼)
* 作用是將消息從信箱隊列中移除
*/
OpCase(remove_message): {
BeamInstr *next;
ErlMessage* msgp;
PROCESS_MAIN_CHK_LOCKS(c_p);
PreFetch(0, next);
msgp = PEEK_MESSAGE(c_p); // 取出當前的消息
if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
save_calls(c_p, &exp_receive);
}
if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
SEQ_TRACE_TOKEN(c_p) = NIL;
} else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
// 追蹤調試內容,可以疏忽
Eterm msg;
SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
}
msg = ERL_MESSAGE_TERM(msgp);
seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE,
c_p->common.id, c_p);
}
UNLINK_MESSAGE(c_p, msgp); // 移除消息,側重隊列長度⑴
JOIN_MESSAGE(c_p); // 重置“當前”位置,指向了隊列第1條消息
CANCEL_TIMER(c_p);
free_message(msgp); // 燒毀消息
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
NextPF(0, next); // 履行下1條指令
}
所以,當消息匹配時,就會重新指向了信箱第1條消息,這樣,第3個問題就有了答案,會重新掃描信箱。
再來看看這個宏:
/* Reset message save point (after receive match) */
#define JOIN_MESSAGE(p)
(p)->msg.save = &(p)->msg.first
這個宏就是講 msg.save 指向了 msg.first ,就是第1個消息
下面看下消息不匹配的情況就是 loop_rec_end_f
/*
* beam_emu.c process_main() 線程入口函數,實現VM調度
* 以下截取 loop_rec_end_f 處理進程
* 作用是繼續取出最新的消息匹配
*/
/*
* Advance the save pointer to the next message (the current
* message didn't match), then jump to the loop_rec instruction.
*/
OpCase(loop_rec_end_f): {
SET_I((BeamInstr *) Arg(0));
SAVE_MESSAGE(c_p); // “當前”位置指向下1個位置
goto loop_rec__; // 繼續取出消息匹配
}
這個opcode實現了不斷取消息出來匹配的進程,直到失去調度機會,等待下1次調度。
也看下這個宏:
/* Save current message */
#define SAVE_MESSAGE(p)
(p)->msg.save = &(*(p)->msg.save)->next
這個宏就是將 msg.save 指向了下1個位置。
到這里第1個問題和第2個問題都有答案了,前面說到的 Save Queue 只是“形象化”的隊列,實際不存在,所以不存在消息入列出列的開消問題。然后第2個問題,消息選擇性接收,當消息匹配不上,有新消息到來時不會重復掃描信箱前面匹配不上的消息。
總結
針對erlang選擇性接收的問題,gen_server2給我們1個方向,通過外部隊列減少了消息的匹配,而且控制優先級來控制消息的處理。
這里也說說 gen_server2 的副作用:
gen_server2會帶來1種問題,erlang原來會利用進程信箱長度來抑制發送者進程(通過減少消息發送者進程的調度機會 Reduction,可以參考這篇文章《erlang send剖析及參數意義》)。但是,gen_server2 每次都會清空進程信箱的消息隊列,沒法利用到 VM 提供的抑制消息隊列過快暴漲的保護機制。
針對這個問題,gen_server2 通過 prioritise_XXX 函數向外部模塊暴露消息隊列長度,使調用者可以根據消息隊列長度控制是不是拋棄消息,以實現對消息的抑制。
實際上,gen_server 在我們的開發中就夠用了,很少需要去斟酌erlang選擇性接收的問題。rabbitMQ是針對消息隊列的處理,必定有不計其數的消息量,那才正好需要 gen_server2 的作用。如果也到了這類消息量,那就建議使用 gen_server2
參考:http://blog.csdn.net/mycwq/article/details/44049749
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈