基于 Android android-14.0.0_r28 的源代码,来分析 Handler 消息处理机制在 Native 层的实现逻辑
源码路径
/ |
---|
| framework/base/core/java/andorid/os/MessageQueue.java
framework/base/core/jni/android_os_MessageQueue.cpp
framework/base/core/java/andorid/os/Looper.java
system/core/libutils/Looper.cpp
system/core/include/utils/Looper.h
system/core/libutils/RefBase.cpp
framework/base/native/android/looper.cpp
framework/native/include/android/looper.h
|
在 Android 消息机制 -- Handler 的 Java 实现 中分析了 Handler 消息处理机制在 Java 层的实现机制,其中 MessageQueue 类作为连接 Java 层和 Native 层的纽带
涉及到多个 native 方法. 实际上在 Native 层还有一整套由 Java 层驱动的消息机制实现,用于处理 native 的消息。
背景知识: epoll 机制
system/core/libutils/include/utils/Looper.h |
---|
| struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }
/* The message type. (interpretation is left up to the handler) */
int what;
};
|
system/core/libutils/include/utils/Looper.h |
---|
| struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t u, sp<MessageHandler> h, const Message& m)
: uptime(u), handler(std::move(h)), message(m) {}
nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};
|
system/core/libutils/include/utils/Looper.h |
---|
| struct Request {
int fd;
int ident;
int events;
sp<LooperCallback> callback;
void* data;
uint32_t getEpollEvents() const;
};
struct Response {
SequenceNumber seq;
int events;
Request request;
};
|
system/core/libutils/include/utils/Looper.h |
---|
| class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();
public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
};
class WeakMessageHandler : public MessageHandler {
protected:
virtual ~WeakMessageHandler();
public:
WeakMessageHandler(const wp<MessageHandler>& handler);
virtual void handleMessage(const Message& message);
private:
wp<MessageHandler> mHandler;
};
|
system/core/libutils/include/utils/Looper.h |
---|
| class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();
public:
/**
* Handles a poll event for the given file descriptor.
* It is given the file descriptor it is associated with,
* a bitmask of the poll events that were triggered (typically EVENT_INPUT),
* and the data pointer that was originally supplied.
*
* Implementations should return 1 to continue receiving callbacks, or 0
* to have this file descriptor and callback unregistered from the looper.
*/
virtual int handleEvent(int fd, int events, void* data) = 0;
};
class SimpleLooperCallback : public LooperCallback {
protected:
virtual ~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunc callback);
virtual int handleEvent(int fd, int events, void* data);
private:
Looper_callbackFunc mCallback;
};
|
映射关系
| private native static long nativeInit();
private native static void nativeDestroy(long ptr);
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
|
在 Java 层创建 MessageQueue 对象时调用,用于创建 Native 层的 MessageQueue 以及 Looper. 具体调用链如下:
frameworks/base/core/jni/android_os_MessageQueue.cpp |
---|
| NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 这里的 Looper 是 Native 的实现, 和 Java 层的对象无关
// setForThread getForThread 等同于 Java 层的 ThreadLocal相关操作
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
|
system/core/libutils/Looper.cpp |
---|
| Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// Native 层的 Looper 利用 epoll 机制实现运转
// mWakeEventFd 就是用来接受写入事件的进而实现唤醒的载体(非阻塞/不可继承)
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
// 清理老的文件描述符
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
mEpollFd.reset();
}
// 调用 epoll_create1 创建 mEpollFd
// Allocate the new epoll instance and register the WakeEventFd.
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
// 并且监听 mWakeEventFd 的可读(EPOLLIN)事件
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
// 监听所有的 mRequests 队列
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
|
由此 Native 层的 MessageQueue 和 Looper 对象就创建好了,并且完成了 epoll 的创建和监听
Native 层的 Looper 并不能独立运转,需要依赖 Java 层驱动. nativePollOnce
就是在 Java Looper.loop 过程中用来驱动 Native 层的 Looper 的桥梁. 具体调用链如下:
frameworks/base/core/jni/android_os_MessageQueue.cpp |
---|
| void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
// 直接调用 Looper->pollOnce 有一场就抛出
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
|
system/core/libutils/Looper.cpp |
---|
| int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
/**
result 返回值
1. POLL_WAKE(-1) 表示 epoll_wait() 是被 eventfd 唤醒的
2. POLL_CALLBACK(-2) 表示消息或请求是通过回调处理的
3. POLL_TIMEOUT(-3) 表示epoll_wait()超时
4. POLL_ERROR(-4) 表示epoll_wait()出错
*/
int result = 0;
for (;;) {
// 优先处理 mResponses 找到 request.ident >= 0 (非 callback 处理)的消息
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
// 检查&处理返回值,这里的 result 一定是负数
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// pollInner 返回 epoll 等待并处理事件的结果
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// 根据当前时间和下一条消息的执行时间,调整超时时间
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// epoll_wait 等待事件发生, 说 nativePollOnce 阻塞, 实际是在等事件发生
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// epoll_wait 一旦返回,就说明有事可干了 也就不是空闲状态
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// 如果有必要, 重新创建 epoll
// addFd 和 removeSequenceNumberLocked 会触发 Looper::scheduleEpollRebuildLocked() 修改 mEpollRebuildRequired
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
// 虽说不提倡 goto,该用还是用
// 前面几个 goto Done 会忽略当前发生的事件
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
// WAKE_EVENT_FD_SEQ 是唤醒 mWakeEventFd 携带的数据,表示这次是被唤醒的
// awoken() 会清理唤醒时候写入的数据
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 不是 mWakeEventFd 就看看是哪个 Request 有事情,构造出 Response
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
" that is no longer registered.",
epollEvents, seq);
}
}
}
Done: ;
// 这里才是真正在处理 MessageEnvelope
// 类似 Java 层的逻辑,从前往后遍历,找到可处理的消息来处理
// 队列中剩下的都是不可处理的消息,第一个消息决定下次唤醒的时间
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// 处理掉需要通过 callback 执行的 request 产生的 response
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
AutoMutex _l(mLock);
removeSequenceNumberLocked(response.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
|
Java 层的 MessageQueue 在 1. 退出; 2. 删除了队列头部的 Barrier; 3. 有新消息来,且满足一定条件; 会通过 nativeWake 唤醒 Native 层 Looper. 具体调用链如下:
system/core/libutils/Looper.cpp |
---|
| void Looper::wake() {
uint64_t inc = 1;
// 通过不断向 mWakeEventFd 写入 1, 触发 epoll_wait 返回
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
|
Java 层队列退出或者销毁的时候通过 dispose()
调用:
system/core/libutils/RefBase.cpp |
---|
| void RefBase::decStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
refs->removeStrongRef(id); // 移除强引用
const int32_t c = refs->mStrong.fetch_sub(1, std::memory_order_release);
LOG_ALWAYS_FATAL_IF(BAD_STRONG(c), "decStrong() called on %p too many times",
refs);
if (c == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
refs->mBase->onLastStrongRef(id);
int32_t flags = refs->mFlags.load(std::memory_order_relaxed);
if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
delete this;
// The destructor does not delete refs in this case.
}
}
refs->decWeak(id); // 移除弱引用
}
|
Native 层的 RenderThread surfaceflinger的vsync 等通过 addFd 方式设置 Request,在由 Looper 机制驱动回调处理
system/core/libutils/Looper.cpp |
---|
| int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
sp<SimpleLooperCallback> looperCallback;
if (callback) {
looperCallback = sp<SimpleLooperCallback>::make(callback);
}
return addFd(fd, ident, events, looperCallback, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
// There is a sequence number reserved for the WakeEventFd.
if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
const SequenceNumber seq = mNextRequestSeq++;
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.callback = callback;
request.data = data;
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
auto seq_it = mSequenceNumberByFd.find(fd);
if (seq_it == mSequenceNumberByFd.end()) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.emplace(seq, request);
mSequenceNumberByFd.emplace(fd, seq);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
const SequenceNumber oldSeq = seq_it->second;
mRequests.erase(oldSeq);
mRequests.emplace(seq, request);
seq_it->second = seq;
}
} // release lock
return 1;
}
|
MessageQueue::postMessage
以及 Looper::sendMessage
和 Looper::sendMessageDelayed
最终都是调用 Looper::sendMessageAtTime
来完成消息插入.
system/core/libutils/Looper.cpp |
---|
| void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
// 与 Java 层逻辑类似, 将 message 插入基于 uptime 排序的队列
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
//如果当前正在发送消息,那么不再需要调用 wake(),直接返回。
if (mSendingMessage) {
return;
}
} // release lock
// message 插入队头, 主动唤醒一次
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
|
同样与 Java 层逻辑类似, 将满足条件的 message 从队列中移除
system/core/libutils/Looper.cpp |
---|
| void Looper::removeMessages(const sp<MessageHandler>& handler) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif
{ // acquire lock
AutoMutex _l(mLock);
for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}
|