Android消息机制Native层简单分析
我们先来回顾下消息机制中Java层MessageQueue的往队列里加消息和从队列里面取消息的调用
//android.os.MessageQueue#enqueueMessage
boolean enqueueMessage(Message msg, long when) {
......
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//如果消息链表为空,或者插入的Message比消息链表第一个消息要执行的更早,直接插入到头部
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//否则在链表中找到合适位置插入
//通常情况下不需要唤醒事件队列,除非链表的头是一个同步屏障,并且该条消息是第一条异步消息
needWake = mBlocked && p.target == null && msg.isAsynchronous();
//具体实现如下,这个画张图来说明
//链表引入一个prev变量,该变量指向p也message(如果是for循环的内部第一次执行),然后把p进行向next移动,和需要插入的Message进行比较when
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
//如果插入的是异步消息,并且消息链表第一条消息是同步屏障消息。
//或者消息链表中只有刚插入的这一个Message,并且mBlocked为true即,正在阻塞状态,收到一个消息后也进入唤醒
唤醒谁?MessageQueue.next中被阻塞的nativePollOnce
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
//android.os.MessageQueue#next
Message next() {
//native层NativeMessageQueue的指针
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
......
for (;;) {
//阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒
//nativePollOnce用于“等待”, 直到下一条消息可用为止. 如果在此调用期间花费的时间很长, 表明对应线程没有实际工作要做,或者Native层的message有耗时的操作在执行
nativePollOnce(ptr, nextPollTimeoutMillis);
}
可以看到在MessageQueue#next会调用课程阻塞的native方法nativePollOnce,在MessageQueue#enqueueMessage 中如果需要唤醒会调用native方法nativeWake。 问题是怎么阻塞的,怎么唤醒的,为什么要这样设计,直接在Java层完成处理不可以吗?
带着这样的困惑,开始Native层消息机制的分析学习。
MessageQueue Init流程
调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
//android.os.MessageQueue#MessageQueue
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
mPtr = nativeInit();
}
android_os_MessageQueue_nativeInit
//java native方法 nativeInit 的jni方法,返回类型long,即 mptr
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//NativeMessageQueue是一个内部类
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
//返回为NativeMessageQueue指针地址
......
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue构造方法
会进行Native层的Looper创建。Java层创建Looper然后再创建MessageQueue,而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper。
// core/jni/android_os_MessageQueue.cpp
//NativeMessageQueue构造方法
//Java层创建Looper然后再创建MessageQueue,
//而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper
//MessageQueue是在Java层与Native层有着紧密的联系,
//但是Native层的Looper与Java层的Looper没有任何关系
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//Looper::getForThread()获取当前线程的Looper,相当于Java层的Looper.myLooper()
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//如果为空, new一个native层的Looper
mLooper = new Looper(false);
//相当于java层的ThreadLocal.set() ?
Looper::setForThread(mLooper);
}
}
Natvie层的Looper的构造
MessageQueue是在Java层与Native层有着紧密的联系,但是Native层的Looper与Java层的Looper没有任何关系
// libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//eventfd事件句柄,负责线程通信,替换了之前版本的pipe 构造唤醒事件fd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
//进行epoll句柄的创建和初始化
rebuildEpollLocked();
}
epoll句柄的创建、添加唤醒事件句柄到epoll
//libutils/Looper.cpp
void Looper::rebuildEpollLocked() {
......
//epoll_create1创建一个epoll句柄实例,并注册wake管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
......
//epoll事件结构体
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
//将唤醒事件句柄(request.fd),添加到epolled句柄(mEpollFd.get()),为epoll添加一个唤醒机制
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
......
}
}
Native层的init流程主要内容如下:
NativeQueueMessage和Looper的初始化。
构建了epoll句柄,向epoll中添加epoll事件注册
消息读取流程
nativePollOnce
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//将Java层传过来的mPtr 转换为 nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用nativeMessageQueue的pollOnce
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
NativeMessageQueue :: pollOnce
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
......
//又调用了Natvie的Looper的pollOnce
mLooper->pollOnce(timeoutMillis);
......
}
Native层Looper::pollOnce
//libutils/Looper.cpp
/**
* timeoutMillis:超时时长
* outFd:发生事件的文件描述符
* outEvents:当前outFd上发生的事件,包含以下4类事件
EVENT_INPUT:可读
EVENT_OUTPUT:可写
EVENT_ERROR:错误
EVENT_HANGUP:中断
*outData:上下文数据
**/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
......
//关键实现在pollInner中
result = pollInner(timeoutMillis);
}
}
Looper::pollInner 先会调用epoll_wait进入阻塞专题,唤醒的场景 向epoll中添加的epollevent等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符,进行唤醒。
然后进行要处理的事件收集,然后在做处理。Natvie的消息的处理顺序如下
- 处理Native的Message,调用Native的Handler来处理该Message
- 处理Resposne数组,POLL_CALLBACK类型的事件
//libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
......
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
//已经唤醒,如果是EPOLLIN类型事件,读取并清空eventfd中的数据
if (epollEvents & EPOLLIN) {
awoken();
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
//上面是收集,Done中是处理的部分,很多设计都是采用这种设计,逻辑分离 Done: ;
先处理Native的Message,调用Native的Handler来处理该Message
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
......
if (messageEnvelope.uptime <= now) {
{
......
handler->handleMessage(message);
}
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
再处理Resposne数组,POLL_CALLBACK类型的事件,比如一些
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
}
}
return result;
}
消息获取流程主要如下
- 依此调用NativeMessageQueue和looper中的pollonce,最终调用到Looper的pollInner
- pollInner会调用epoll_wait进入阻塞,唤醒的条件是epoll中添加的事件响应了或者发生了超时等。_
- 先执行Native层的Message,再执行Native层的Resposne数组,POLL_CALLBACK类型的事件(比如一些按键事件等)_
- 返回到Java层后再调用Java层MessageQueue中的读取Message。
消息发送流程
nativeWake
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用NativeMessageQueue的wake
nativeMessageQueue->wake();
}
NativeMessageQueue::wake
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::wake() {
//调用Native层Looper的wake
mLooper->wake();
}
Looper::wake
libutils/Looper.cpp
void Looper::wake() {
//write方法 向mWakeEventfd中写一些内容,会把epoll_wait唤醒,线程就不阻塞了继续发送
//Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
......
}
消息发送流程如下
- Java层的enqueueMessage向java层的MessageQueue添加消息
- 如果需要唤醒调用native方法nativeWake
- 依次调用NativeMessageQueue和Looper的wake方法
- 最终调用writ方法向eventfd中写入一些内容进行epoll_wait的唤醒。
native层播放音频三种方式
下面三种方式前两种区别不大,缺点是必须要等到系统加载差不多了才可以使用。Android系统启动时间很长,想要早点播放的话可以选择第三种,tinyalsa播放。
AudioTrack
final String TEST_NAME = "testPlaybackHeadPositionAfterFlush";
final int TEST_SR = 22050;
final int TEST_CONF = AudioFormat.CHANNEL_OUT_STEREO;
final int TEST_FORMAT = AudioFormat.ENCODING_PCM_16BIT;
final int TEST_MODE = AudioTrack.MODE_STREAM;
final int TEST_STREAM_TYPE = AudioManager.STREAM_MUSIC;
//-------- initialization --------------
int minBuffSize = AudioTrack.getMinBufferSize(TEST_SR, TEST_CONF, TEST_FORMAT);
AudioTrack track = new AudioTrack(TEST_STREAM_TYPE, TEST_SR, TEST_CONF, TEST_FORMAT, minBuffSize, TEST_MODE);
byte data[] = new byte[minBuffSize/2];
//-------- test --------------
track.write(data, 0, data.length);
track.write(data, 0, data.length);
track.play();
Thread.sleep(100);
track.stop();
track.flush();
//-------- tear down --------------
track.release();
MediaPlayer
tinyalsa
struct pcm_config config;
struct pcm *pcm;
char *buffer;
unsigned int size, read_sz;
int num_read;
memset(&config, 0, sizeof(config));
config.channels = channels;
config.rate = rate;
config.period_size = period_size;
config.period_count = period_count;
config.format = PCM_FORMAT_S16_LE;
config.start_threshold = 0;
config.stop_threshold = 0;
config.silence_threshold = 0;
pcm = pcm_open(card, device, PCM_OUT, &config);
if (!pcm || !pcm_is_ready(pcm)) {
fprintf(stderr, "Unable to open PCM device %u (%s)\n", device, pcm_get_error(pcm));
return;
}
size = pcm_frames_to_bytes(pcm, pcm_get_buffer_size(pcm));
buffer = malloc(size);
do {
read_sz = size < data_sz ? size : data_sz;
num_read = fread(buffer, 1, read_sz, file);
if (num_read > 0) {
if (pcm_write(pcm, buffer, num_read)) {
fprintf(stderr, "Error playing sample\n");
break;
}
data_sz -= num_read;
} while (!close && num_read > 0 && data_sz > 0);
free(buffer);
pcm_close(pcm);
文末
React Native的iOS和Android版本目前是由官方在维护,Windows版本是微软做了迁移,淘宝有把React Native移植到Web上,另外还有工程师已经把它移植到了OS X上了。所以在将来如果你会React Native,就会很容易成为一个全端工程师。
React Native用的是ReactJS语法。其他的跨平台方案一般是先编译成中间码再在虚拟机中运行,例如Java和.Net,React Native却不太一样,因为跨平台是非常难的,平台视图会有很大差异。React Native的思路是让我们学习这种开发模式后可以用这套模式去开发任何平台的应用,即所谓的「learn once ,write anywhere」。但是开发的代码会稍微有些不同,例如在Web端会写一些HTML的Tag,在React Native中则会写一些抽象出来的原生组件的Tag。