Android源码分析--MediaServer源码分析(二)

在上一篇博客中Android源码分析–MediaServer源码分析(一),我们知道了ProcessState和defaultServiceManager,在分析源码的过程中,我们被Android的Binder通信机制中的各种复杂的类关系搞的眼花缭乱,接下来我们就以MediaPlayerService为例来分析一下Binder的通信机制。首先来回顾一下:

  • BpBinder和BBinder都是Android中Binder通信的代表类,其中BpBinder是客户端用来与Server交互的代理类,p代表的就是proxy,而BBinder则是交互的目的端;
  • BpBinder和BBinder是相互对应的,Binder系统会通过handle来标识对应的BBinder。

Android的通信机制基本上可以看做是Client、Server和ServiceManager三者之间的交互:

  • Server首先要注册一些Service到ServiceManager;
  • 如果某个Client要使用Service,则首先到ServiceManager中获得该Service的相关信息;
  • Client得到Service信息,然后和该Service所在的Server进程建立通信之后使用Service。

注册MediaPlayerService

我们知道若要使用一个Service的话首先要进行注册,所有首先让我们来看看MediaService是如何注册的,文件位置:frameworks\base\media\libmediaplayerservice\MediaPlayerService.cpp

void MediaPlayerService::instantiate() {
    defaultServiceManager()->addService(
            String16("media.player"), new MediaPlayerService());
}

defaultServiceManager返回的其实是一个BpServiceManager对象,在这里我看到了注册的函数addService:

    virtual status_t addService(const String16& name, const sp<IBinder>& service)
    {
        Parcel data, reply;
    //将Descriptor作为令牌
        data.writeInterfaceToken(IServiceManager::getInterfaceDescriptor());
        data.writeString16(name);
        data.writeStrongBinder(service);
        status_t err = remote()->transact(ADD_SERVICE_TRANSACTION, data, &reply);
        return err == NO_ERROR ? reply.readExceptionCode() : err;
}

Parcel类可以看作一个数据块,在该类的内部主要封装了一些数据操作的方法。在写入数据之后remote()->transact方法,remote()方法返回的值是mRemote,在上一篇中我们知道它是一个BpBinder对象,之后我们的通信工作就交由了BpBinder的transact方法:

status_t BpBinder::transact(
    uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
    // Binder一旦死去将永远不能复活.
    if (mAlive) {
        status_t status = IPCThreadState::self()->transact(
            mHandle, code, data, reply, flags);
        if (status == DEAD_OBJECT) mAlive = 0;
        return status;
    }

    return DEAD_OBJECT;
}

BpBinder完全将工作交给了IPCThreadState,接下来的任务也全部转移到了IPCThreadState类中。

IPCThreadState类分析

IPCThreadState也是以单例模式设计的。由于每个进程只维护了一个ProcessState实例,同时ProcessState只启动一个 Pool thread,也就是说每一个进程只会启动一个Pool thread,因此每个进程则只需要一个IPCThreadState即可。首先让我们来看看这个self方法,文件位置:frameworks\base\libs\binder\IPCThreadState.cpp

IPCThreadState* IPCThreadState::self()
{
    if (gHaveTLS) {
restart:
        const pthread_key_t k = gTLS;
        IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
        if (st) return st;
        return new IPCThreadState;
    }

    if (gShutdown) return NULL;

    pthread_mutex_lock(&gTLSMutex);
    if (!gHaveTLS) {
        if (pthread_key_create(&gTLS, threadDestructor) != 0) {
            pthread_mutex_unlock(&gTLSMutex);
            return NULL;
        }
        gHaveTLS = true;
    }
    pthread_mutex_unlock(&gTLSMutex);
    goto restart;
}

TLS是Thread Local Storage的缩写,表示线程本地存储,类似于Java中的ThreadLocal机制)pthread_getspecific和)pthread_setspecific方法分别提供了设置变量和获取变量的方法。

在这里我们看到了get方法,有get方法就一定有对应的set方法,set的设置就在构造函数中:

IPCThreadState::IPCThreadState()
    : mProcess(ProcessState::self()),
      mMyThreadId(androidGetTid()),
      mStrictModePolicy(0),
      mLastTransactionBinderFlags(0)
{
    pthread_setspecific(gTLS, this);
    clearCaller();
    mIn.setDataCapacity(256);
    mOut.setDataCapacity(256);
}

可以看到在构造函数中IPCThreadState将自身和线程本地存储关联起来了,在每一个IPCThreadState中都存在一个mIn和mOut,mIn用来存储接收Binder的数据,mOut用来存储发送到Binder的数据。

接下来看看transact方法:

status_t IPCThreadState::transact(int32_t handle,
                                  uint32_t code, const Parcel& data,
                                  Parcel* reply, uint32_t flags)
{
    status_t err = data.errorCheck();

    flags |= TF_ACCEPT_FDS;
    ......
    if (err == NO_ERROR) {
        ......
        err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, NULL);
    }     
    ......
        if (reply) {
            err = waitForResponse(reply);
        } else {
            Parcel fakeReply;
            err = waitForResponse(&fakeReply);
        }
        ......
    } else {
        err = waitForResponse(NULL, NULL);
    }

    return err;
}

在这里省略了一些打印输出信息的语句,在这里我们可以清楚的看到通信的流程:先写入数据,然后等待接收数据。

如何进行通信

在前面我们知道了通信的流程,首先写入数据,之后等待回应,那么我们的通信就是这样不断的读写数据吗?接下来我们来一下看一下这两个读写的函数:
1.写入数据:

status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
    int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
    binder_transaction_data tr;

    tr.target.handle = handle;
    tr.code = code;
    tr.flags = binderFlags;
    tr.cookie = 0;
    tr.sender_pid = 0;
    tr.sender_euid = 0;

    const status_t err = data.errorCheck();
    if (err == NO_ERROR) {
        tr.data_size = data.ipcDataSize();
        tr.data.ptr.buffer = data.ipcData();
        tr.offsets_size = data.ipcObjectsCount()*sizeof(size_t);
        tr.data.ptr.offsets = data.ipcObjects();
    } else if (statusBuffer) {
        tr.flags |= TF_STATUS_CODE;
        *statusBuffer = err;
        tr.data_size = sizeof(status_t);
        tr.data.ptr.buffer = statusBuffer;
        tr.offsets_size = 0;
        tr.data.ptr.offsets = NULL;
    } else {
        return (mLastError = err);
    }

    mOut.writeInt32(cmd);
    mOut.write(&tr, sizeof(tr));

    return NO_ERROR;
}

在写入数据之前,我们首先对BpBinder进行了标识,在这里我们看到了之前所说的handle值,这样印证了之前一篇博客中所说的BBinder和BpBinder是由handle值来相互标识的。

2.等待回应:

status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
    int32_t cmd;
    int32_t err;

while (1) {
    //talkWithDriver重要函数之一,负责从Binder中读写数据
        if ((err=talkWithDriver()) < NO_ERROR) break;
        err = mIn.errorCheck();
        if (err < NO_ERROR) break;
        if (mIn.dataAvail() == 0) continue;

        cmd = mIn.readInt32();

        switch (cmd) {
        case BR_TRANSACTION_COMPLETE://发送完一个数据包后,可以收到该消息做为成功发送的反馈。
            if (!reply && !acquireResult) goto finish;
            break;

        case BR_DEAD_REPLY://交互过程中如果发现对方进程或线程已经死亡
            err = DEAD_OBJECT;
            goto finish;

        case BR_FAILED_REPLY://发送失败
            err = FAILED_TRANSACTION;
            goto finish;

        case BR_ACQUIRE_RESULT://获得结果
            {
                const int32_t result = mIn.readInt32();
                if (!acquireResult) continue;
                *acquireResult = result ? NO_ERROR : INVALID_OPERATION;
            }
            goto finish;

        case BR_REPLY://当前接收的数据是回复
            {
                binder_transaction_data tr;
                err = mIn.read(&tr, sizeof(tr));
                ALOG_ASSERT(err == NO_ERROR, "Not enough command data for brREPLY");
                if (err != NO_ERROR) goto finish;

                if (reply) {
                    if ((tr.flags & TF_STATUS_CODE) == 0) {
                        reply->ipcSetDataReference(
                            reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                            tr.data_size,
                            reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
                            tr.offsets_size/sizeof(size_t),
                            freeBuffer, this);
                    } else {
                        err = *static_cast<const status_t*>(tr.data.ptr.buffer);
                        freeBuffer(NULL,
                            reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                            tr.data_size,
                            reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
                            tr.offsets_size/sizeof(size_t), this);
                    }
                } else {
                    freeBuffer(NULL,
                        reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                        tr.data_size,
                        reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
                        tr.offsets_size/sizeof(size_t), this);
                    continue;
                }
            }
            goto finish;

        default:
        //第二个重要函数,处理命令
            err = executeCommand(cmd);
            if (err != NO_ERROR) goto finish;
            break;
        }
    }

finish:
    if (err != NO_ERROR) {
        if (acquireResult) *acquireResult = err;
        if (reply) reply->setError(err);
        mLastError = err;
    }

    return err;
}

可以看到我们首先从Binder中读写数据,之后执行命令并返回结果状态码。

两个重要函数

IPCThreadState有两个重要的函数,talkWithDriver函数负责从Binder读写数据,executeCommand函数负责解析并执行mIn中的数据。
1.talkWithDriver:

status_t IPCThreadState::talkWithDriver(bool doReceive)
{
//用来和Binder交换数据的结构
    binder_write_read bwr;

    // 判断数据是否为空
    const bool needRead = mIn.dataPosition() >= mIn.dataSize();

    // 如果caller已经请求读取下一行数据,或缓冲区中有数据没读完的时候我们不会进行任何写操作
    const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0;
    //填充请求命令
    bwr.write_size = outAvail;
    bwr.write_buffer = (long unsigned int)mOut.data();

    if (doReceive && needRead) {
        bwr.read_size = mIn.dataCapacity();
        bwr.read_buffer = (long unsigned int)mIn.data();//mIn用来存储接收的Binder数据
    } else {
        bwr.read_size = 0;
        bwr.read_buffer = 0;
    }
......
    // 如果不需要任何操作了立即返回.
    if ((bwr.write_size == 0) && (bwr.read_size == 0)) return NO_ERROR;

    bwr.write_consumed = 0;
    bwr.read_consumed = 0;
    status_t err;
    do {
#if defined(HAVE_ANDROID_OS)
//ioctl方式进行通信
        if (ioctl(mProcess->mDriverFD, BINDER_WRITE_READ, &bwr) >= 0)
            err = NO_ERROR;
        else
            err = -errno;
#else
        err = INVALID_OPERATION;
#endif
    } while (err == -EINTR);

    if (err >= NO_ERROR) {
        if (bwr.write_consumed > 0) {
            if (bwr.write_consumed < (ssize_t)mOut.dataSize())
                mOut.remove(0, bwr.write_consumed);
            else
                mOut.setDataSize(0);
        }
        if (bwr.read_consumed > 0) {
            mIn.setDataSize(bwr.read_consumed);
            mIn.setDataPosition(0);
        }
        return NO_ERROR;
    } 
    return err;
}

可以看到我们的Binder通信并不是采用的读写方式,而是ioctl方式,什么是ioctl方式呢?

ioctl是设备驱动程序中对设备的I/O通道进行管理的函数。所谓对I/O通道进行管理,就是对设备的一些特性进行控制,例如串口的传输波特率、马达的 转速等等。它的调用函数如下:int ioctl(int fd, ind cmd, …);其中fd就是用户程序打开设备时使用open函数返回的文件标示符,cmd就是用户程序对设备的控制命令。

2.executeCommand函数:

status_t IPCThreadState::executeCommand(int32_t cmd)
{
    BBinder* obj;
    RefBase::weakref_type* refs;
    status_t result = NO_ERROR;

    switch (cmd) {
    case BR_ERROR:
        result = mIn.readInt32();
        break;

    case BR_OK:
        break;

    case BR_ACQUIRE:
        refs = (RefBase::weakref_type*)mIn.readInt32();
        obj = (BBinder*)mIn.readInt32();
        obj->incStrong(mProcess.get()); 
        mOut.writeInt32(BC_ACQUIRE_DONE);
        mOut.writeInt32((int32_t)refs);
        mOut.writeInt32((int32_t)obj);
        break;

    case BR_RELEASE:
        refs = (RefBase::weakref_type*)mIn.readInt32();
        obj = (BBinder*)mIn.readInt32();
        mPendingStrongDerefs.push(obj);
        break;

    case BR_INCREFS:
        refs = (RefBase::weakref_type*)mIn.readInt32();
        obj = (BBinder*)mIn.readInt32();
        refs->incWeak(mProcess.get());
        mOut.writeInt32(BC_INCREFS_DONE);
        mOut.writeInt32((int32_t)refs);
        mOut.writeInt32((int32_t)obj);
        break;

    case BR_DECREFS:
        refs = (RefBase::weakref_type*)mIn.readInt32();
        obj = (BBinder*)mIn.readInt32();
        mPendingWeakDerefs.push(refs);
        break;

    case BR_ATTEMPT_ACQUIRE:
        refs = (RefBase::weakref_type*)mIn.readInt32();
        obj = (BBinder*)mIn.readInt32();

        {
            const bool success = refs->attemptIncStrong(mProcess.get());
            mOut.writeInt32(BC_ACQUIRE_RESULT);
            mOut.writeInt32((int32_t)success);
        }
        break;

    case BR_TRANSACTION://当前接收的数据是请求
        {
            binder_transaction_data tr;
            result = mIn.read(&tr, sizeof(tr));
            if (result != NO_ERROR) break;

            Parcel buffer;
            buffer.ipcSetDataReference(
                reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                tr.data_size,
                reinterpret_cast<const size_t*>(tr.data.ptr.offsets),
                tr.offsets_size/sizeof(size_t), freeBuffer, this);

            const pid_t origPid = mCallingPid;
            const uid_t origUid = mCallingUid;

            mCallingPid = tr.sender_pid;
            mCallingUid = tr.sender_euid;

            int curPrio = getpriority(PRIO_PROCESS, mMyThreadId);
            if (gDisableBackgroundScheduling) {
                if (curPrio > ANDROID_PRIORITY_NORMAL) {
                //我们已经从caller实现了一个减小的优先级,但我们不希望它以这
                //种状态运行在进程中。驱动已经设置了优先级,所以在开始事务
            //前首先将他设置为默认                   
                    setpriority(PRIO_PROCESS, mMyThreadId, ANDROID_PRIORITY_NORMAL);
                }
            } else {
                if (curPrio >= ANDROID_PRIORITY_BACKGROUND) {
                    androidSetThreadSchedulingGroup(mMyThreadId,                                                   ANDROID_TGROUP_BG_NONINTERACT);
                }
            }

            Parcel reply;
            if (tr.target.ptr) {
        //在这里我们看到了BBinder,还记得吗,BBinder和BpBinder相对应,
        //是交互的目的端
        sp<BBinder> b((BBinder*)tr.cookie);
        //在transact函数中调用了onTransact方法向reply写入数据
                const status_t error = b->transact(tr.code, buffer, &reply, tr.flags);
                if (error < NO_ERROR) reply.setError(error);
            } else {
                const status_t error = the_context_object->transact(tr.code, buffer, &reply, tr.flags);
                if (error < NO_ERROR) reply.setError(error);
            }

            if ((tr.flags & TF_ONE_WAY) == 0) {
                sendReply(reply, 0);
            } 

            mCallingPid = origPid;
            mCallingUid = origUid;    
        }
        break;

    case BR_DEAD_BINDER:
        {
            BpBinder *proxy = (BpBinder*)mIn.readInt32();
            proxy->sendObituary();
            mOut.writeInt32(BC_DEAD_BINDER_DONE);
            mOut.writeInt32((int32_t)proxy);
        } break;

    case BR_CLEAR_DEATH_NOTIFICATION_DONE:
        {
            BpBinder *proxy = (BpBinder*)mIn.readInt32();
            proxy->getWeakRefs()->decWeak(proxy);
        } break;

    case BR_FINISHED:
        result = TIMED_OUT;
        break;

    case BR_NOOP:
        break;

case BR_SPAWN_LOOPER:
    //用于接收方线程池管理。当驱动发现接收方所有线程都处于忙碌状态且线
    //程池里的线程总数没有超过BINDER_SET_MAX_THREADS设置的最大线程数    
    //时,向接收方发送该命令要求创建更多线程以备接收数据。
        mProcess->spawnPooledThread(false);
        break;

    default:
        printf("*** BAD COMMAND %d received from Binder driver\n", cmd);
        result = UNKNOWN_ERROR;
        break;
    }

    if (result != NO_ERROR) {
        mLastError = result;
    }

    return result;
}

在这里我们主要分析一下接收数据请求的情况:在这里我们看到了BBinder对象,并调用了它的transact函数,其实在内部调用了onTransact函数,而这个函数在BBinder中只是定义了两种code之下的操作,大部分的情况是由它的子类完成。还记得上一篇中那个类的关系图吗:BBinder的直接子类是BnInterface,而这是一个接口类,所以所有的任务都落在了BnInterface的子类BnServiceManager中。

总结

看了这个类IPCThreadState,真的有点头晕眼花,究其原因是:该类将代码一部分给客户端使用,一部分给服务端使用,所以有点杂乱。总结一下今天学到的IPCThreadState的工作:

  1. 首先,通过线程本地存储机制IPCThreadState将自身和线程本地存储关联起来;
  2. 通过writeTransactionData将发生的数据存储在mOut中;
  3. waitForResponse调用了talkWithDriver从Binder设备中读写数据,在这里采用的是ioctl方式;
  4. 调用executeCommand函数解析并执行mIn中的数据。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。