Android中的消息處理機制大量依賴于Handler。每一個Handler都有對應的Looper,用于不斷地從對應的MessageQueue中取出消息處理。
1直以來,覺得MessageQueue應當是Java層的抽象,但是事實上MessageQueue的主要部份在Native層中。
自己對MessageQueue在Native層的工作不太熟習,借此機會分析1下。
1、MessageQueue的創建
當需要使用Looper時,我們會調用Looper的prepare函數:
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//sThreadLocal為線程本地存儲區;每一個線程唯一1個Looper
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
//創建出MessageQueue
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
1 NativeMessageQueue
我們看看MessageQueue的構造函數:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//mPtr的類型為long?
mPtr = nativeInit();
}
MessageQueue的構造函數中就調用了native函數,我們看看android_os_MessageQueue.cpp中的實現:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//MessageQueue的Native層實體
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
............
//這里應當類似與將指針轉化成long類型,放在Java層保存;估計Java層使用時,會在native層將long變成指針,就能夠操作隊列了
return reinterpret_cast<jlong>(nativeMessageQueue);
}
我們跟進NativeMessageQueue的構造函數:
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//創建1個Native層的Looper,也是線程唯1的
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
從代碼來看,Native層和Java層均有Looper對象,應當都是操作MessageQueue的。MessageQueue在Java層和Native層有各自的存儲結構,分別存儲Java層和Native層的消息。
2 Native層的looper
我們看看Native層looper的構造函數:
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
//此處創建了個fd
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
.......
rebuildEpollLocked();
}
在native層中,MessageQueue中的Looper初始化時,還調用了rebuildEpollLocked函數,我們跟進1下:
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
............
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
//在mEpollFd上監聽mWakeEventFd上是不是有數據到來
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
...........
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
//監聽request對應fd上數據的到來
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
............
}
}
從native層的looper來看,我們知道Native層依賴于epoll來驅動事件處理。此處我們先保存1下大致的映像,后文詳細分析。
2、使用MessageQueue
1 寫入消息
Android中既可以在Java層向MessageQueue寫入消息,也能夠在Native層向MessageQueue寫入消息。我們分別看1下對應的操作流程。
1.1 Java層寫入消息
Java層向MessageQueue寫入消息,依賴于enqueueMessage函數:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
.....
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
//在頭部插入數據,如果之前MessageQueue是阻塞的,那末現在需要喚醒
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
//不是第1個異步消息時,needWake置為false
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
上述代碼比較簡單,主要就是將新加入的Message按履行時間插入到原本的隊列中,然后根據情況調用nativeAwake函數。
我們跟進1下nativeAwake:
void NativeMessageQueue::wake() {
mLooper->wake();
}
void Looper::wake() {
uint64_t inc = 1;
//就是向mWakeEventFd寫入數據
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
.............
}
在native層的looper初始化時,我們提到過native層的looper將利用epoll來驅動事件,其中構造出的epoll句柄就監聽了mWakeEventFd。
實際上從MessageQueue中取出數據時,若沒有數據到來,就會利用epoll進行等待;因此當Java層寫入消息時,將會將喚醒處于等待狀態的MessageQueue。
在后文介紹從MessageQueue中提取消息時,將再次分析這個問題。
1.2 Native層寫入消息
Native層寫入消息,依賴于Native層looper的sendMessage函數:
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{
AutoMutex _l(mLock);
//一樣需要按時間插入
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
//將message包裝成1個MessageEnvelope對象
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
}
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
//若插入在隊列頭部,一樣利用wake函數觸發epoll喚醒
wake();
}
}
以上就是向MessageQueue中加入消息的主要流程,接下來我們看看從MessageQueue中取出消息的流程。
2、提取消息
當Java層的Looper對象調用loop函數時,就開始使用MessageQueue提取消息了:
public static void loop() {
final Looper me = myLooper();
.......
for (;;) {
Message msg = queue.next(); // might block
.......
try {
//調用Message的處理函數進行處理
msg.target.dispatchMessage(msg);
}........
}
}
此處我們看看MessageQueue的next函數:
Message next() {
//mPtr保存了NativeMessageQueue的指針
final long ptr = mPtr;
.......
int pendingIdleHandlerCount = -1; // ⑴ only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
//會調用Native函數,終究調用IPCThread的talkWithDriver,將數據寫入Binder驅動或讀取1次數據
//不知道在此處進行這個操作的理由?
Binder.flushPendingCommands();
}
//處理native層的數據,此處會利用epoll進行blocked
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//下面其實就是找出下1個異步處理類型的消息;異步處理類型的消息,才含有對應的履行函數
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
//完成next記錄的存儲
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
//MessageQueue中引入了IdleHandler接口,即當MessageQueue沒有數據處理時,調用IdleHandler進行1些工作
//pendingIdleHandlerCount表示待處理的IdleHandler,初始為⑴
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
//mIdleHandlers的size默許為0,調用接口addIdleHandler才能增加
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
//將待處理的IdleHandler加入到PendingIdleHandlers中
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
//調用ArrayList.toArray(T[])節省每次分配的開消;畢竟對Message.Next這樣調用頻率較高的函數,能省1點就是1點
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
//履行實現類的queueIdle函數,返回值決定是不是繼續保存
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}
全部提取消息的進程,大致上如上圖所示。
可以看到在Java層,Looper除要取出MessageQueue的消息外,還會在隊列空閑期履行IdleHandler定義的函數。
2.1 nativePollOnce
現在唯1的疑點是nativePollOnce是如何處理Native層數據的,我們看看對應的native函數:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//果然Java層調用native層MessageQueue時,將long類型的ptr變成指針
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
//最后還是進入到Native層looper的pollOnce函數
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
.........
}
}
看看native層looper的pollOnce函數:
//timeoutMillis為超時等待時間。值為⑴時,表示無窮等待直到有事件到來;值為0時,表示無需等待
//outFd此時為null,含義是:存儲產生事件的文件句柄
//outEvents此時為null,含義是:存儲outFd上產生了哪些事件,包括可讀、可寫、毛病和中斷
//outData此時為null,含義是:存儲上下文數據,其實調用時傳入的參數
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
//處理response,目前我們先不關注response的內含
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
//根據pollInner的結果,進行操作
if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
//主力還是靠pollInner
result = pollInner(timeoutMillis);
}
}
跟進1下pollInner函數:
int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
//timeoutMillis是Java層事件等待事件
//native層保持了native message的等待時間
//此處其實就是選擇最小的等待時間
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
int result = POLL_WAKE;
//pollInner初始就清空response
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
//利用epoll等待mEpollFd監控的句柄上事件到達
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
//重新調用rebuildEpollLocked時,將使得epoll句柄能夠監聽新加入request對應的fd
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
......
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
for (int i = 0; i < eventCount; i++) {
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
//前面已分析過,當java層或native層有數據寫入隊列時,將寫mWakeEventFd,以觸發epoll喚醒
//awoken將讀取并清空mWakeEventFd上的數據
awoken();
} else {
.........
}
} else {
//epoll一樣監聽的request對應的fd
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
//存儲這個fd對應的response
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
..........
}
}
}
Done:
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
//處理Native層的Message
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
//處理Native Message
handler->handleMessage(message);
}
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
//處理帶回調函數的response
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//調用response的callback
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
說實話native層的代碼寫的很亂,該函數的功能比較多。
如上圖所示,在nativePollOnce中利用epoll監聽是不是有數據到來,然后處理native message、native response。
最后,我們看看如何在native層中加入request。
3 添加監控要求
native層增加request依賴于looper的接口addFd:
//fd表示需要監聽的句柄
//ident的含義還沒有弄明白
//events表示需要監聽的事件,例如EVENT_INPUT、EVENT_OUTPUT、EVENT_ERROR和EVENT_HANGUP中的1個或多個
//callback為事件產生后的回調函數
//data為回調函數對應的參數
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}
結合上文native層輪詢隊列的操作,我們大致可以知道:addFd的目的,就是讓native層的looper監控新加入的fd上是不是有指定事件產生。
如果產生了指定的事件,就利用回調函數及參數構造對應的response。
native層的looper處理response時,就能夠履行對應的回調函數了。
看看實際的代碼:
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
........
{
AutoMutex _l(mLock);
//利用參數構造1個request
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number ⑴
struct epoll_event eventItem;
request.initEventItem(&eventItem);
//判斷之前是不是已利用該fd構造過Request
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
//mEpollFd新增1個需監聽fd
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
.......
mRequests.add(fd, request);
} else {
//mEpollFd修改舊的fd對應的監聽事件
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
// Tolerate ENOENT because it means that an older file descriptor was
// closed before its callback was unregistered and meanwhile a new
// file descriptor with the same number has been created and is now
// being registered for the first time.
epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
.......
}
//產生毛病重新加入時,安排EpollRebuildLocked,將讓epollFd重新添加1次待監聽的fd
scheduleEpollRebuildLocked();
}
mRequests.replaceValueAt(requestIndex, request);
}
}
}
對加入監控要求的處理,在上文介紹pollInner函數時已做分析,此處不再贅述。
3、總結
1、流程總結
MessageQueue的全部流程包括了Java部份和Native部份,從圖中可以看出Native層的比重還是很大的。我們結合上圖回想1下全部MessageQueue對應的處理流程:
1、Java層創建Looper對象時,將會創建Java層的MessageQueue;Java層的MessageQueue初始化時,將利用Native函數創建出Native層的MessageQueue。
2、Native層的MessageQueue初始化后,將創建對應的Native Looper對象。Native對象初始化時,將創建對應epollFd和WakeEventFd。其中,epollFd將作為epoll的監聽句柄,初始時epollFd僅監聽WakeEventFd。
3、圖中紅色線條為Looper從MessageQueue中取消息時,處理邏輯的流向。
3.1、當Java層的Looper開始循環時,首先需要通過JNI函數調用Native Looper進行pollOnce的操作。
3.2、Native Looper開始運行后,需要等待epollFd被喚醒。當epollFd等待超時或監聽的句柄有事件到來,Native Looper就能夠開始處理事件了。
3.3、在Native層,Native Looper將先處理Native MessageQueue中的消息,再調用Response對應的回調函數。
3.4、本次循環中,Native層事件處理終了后,才開始處理Java層中MessageQueue的消息。若MessageQueue中沒有消息需要處理,并且MessageQueue中存在IdleHandler時,將調用IdleHandler定義的處理函數。
圖中藍色部份為對應的函數調用:
在Java層:
利用MessageQueue的addIdleHandler,可以為MessageQueue增加IdleHandler;
利用MessageQueue的enqueueMessage,可以向MessageQueue增加消息;必要時將利用Native函數向Native層的WakeEventFd寫入消息,以喚醒epollFd。
在Native層:
利用looper:sendMessage,可以為Native MessageQueue增加消息;一樣,要時將向Native層的WakeEventFd寫入消息,以喚醒epollFd;
利用looper:addFd,可以向Native Looper注冊監聽要求,監聽要求包括需監聽的fd、監聽的事件及對應的回調函數等,監聽要求對應的fd將被成為epollFd監聽的對象。當被監聽的fd產生對應的事件后,將會喚醒epollFd,此時將生成對應response加入的response List中,等待處理。1旦response被處理,就會調用對應的回調函數。
2、注意事項
MessageQueue在Java層和Native層有各自的存儲結構,可以分別增加消息。從處理邏輯來看,會優先處理native層的Message,然后處理Native層生成的response,最后才是處理Java層的Message。