MongoDB源码分析——mongod程序源码入口分析
mongod程序源码入口分析
为了理解MongoDB的运行机制,首先要对主要运行流程有个大概理解,我们首先从main函数开始。mongod项目的main函数位于db.cpp文件中,出于跨平台的需求,对Windows和Linux平台提供了两个main函数:
#if defined(_WIN32)
int wmain(int argc, wchar_t* argvW[], wchar_t* envpW[]) {
WindowsCommandLine wcl(argc, argvW, envpW);
int exitCode = mongoDbMain(argc, wcl.argv(), wcl.envp());
::_exit(exitCode);
}
#else
int main(int argc, char* argv[], char** envp) {
int exitCode = mongoDbMain(argc, argv, envp);
::_exit(exitCode);
}
#endif
上面代码中wmain代码对命令行做了编码转换,然后统一调用了mongoDbMain函数,进入流程处理,下面具体分析mongoDbMain函数:
static int mongoDbMain(int argc, char* argv[], char **envp) {
//静态观察,大概作用是main函数结束时候局部static staticObserver会首先释放
//其他地方通过观察StaticObserver 的成员变量_destroyingStatics来判断mongoDbMain结束。
static StaticObserver staticObserver;
#if defined(_WIN32)
mongo::reportEventToSystem = &mongo::reportEventToSystemImpl;
#endif
getcurns = ourgetns;
setupSignalHandlers();
dbExecCommand = argv[0];
srand(curTimeMicros());
{
//判断是否为系统是否为小端模式,MongoDB暂时不支持大端模式服务器。
unsigned x = 0x12345678;
unsigned char& b = (unsigned char&) x;
if ( b != 0x78 ) {
out() << "big endian cpus not yet supported" << endl;
return 33;
}
}
if( argc == 1 )
cout << dbExecCommand << " --help for help and startup options" << endl;
//主要解析命令行,其中会读取配置文件,并赋值给变量,这一块设计比较巧妙,日后可以单独开篇研究。
Status status = mongo::runGlobalInitializers(argc, argv, envp);
if (!status.isOK()) {
severe() << "Failed global initialization: " << status;
::_exit(EXIT_FAILURE);
}
startupConfigActions(std::vector<std::string>(argv, argv + argc));
cmdline_utils::censorArgvArray(argc, argv);
//初始化服务器认证配置
if (!initializeServerGlobalState())
::_exit(EXIT_FAILURE);
//启动一个线程处理系统信号
startSignalProcessingThread();
//定时同步文件
dataFileSync.go();
#if defined(_WIN32)
if (ntservice::shouldStartService()) {
ntservice::startService();
// exits directly and so never reaches here either.
}
#endif
//进行一些测试
StartupTest::runTests();
//初始化各模块,并开始监听客户端请求,服务端开始运行
initAndListen(serverGlobalParams.port);
dbexit(EXIT_CLEAN);
return 0;
}
dataFileSync是DataFileSync对象
class DataFileSync : public BackgroundJob , public ServerStatusSection {
public:
...
void run() {
...
}
void BackgroundJob::go() {
scoped_lock l( _status->mutex );
massert( 17234, mongoutils::str::stream()
<< "backgroundJob already running: " << name(),
_status->state != Running );
// If the job is already ‘done‘, for instance because it was cancelled or already
// finished, ignore additional requests to run the job.
if (_status->state == NotStarted) {
//BackgroundJob::jobBody函数中执行子类run函数,所以文件同步将在后台线程中执行。
boost::thread t( boost::bind( &BackgroundJob::jobBody , this ) );
_status->state = Running;
}
}
Client::initThread(…)函数在之后很多地方还会用到,源码中的注释是“each thread which does db operations has a Client object in TLS. call this when your thread starts.”
Client& Client::initThread(const char *desc, AbstractMessagingPort *mp) {
verify( currentClient.get() == 0 );
string fullDesc = desc;
if ( str::equals( "conn" , desc ) && mp != NULL )
fullDesc = str::stream() << desc << mp->connectionId();
//设置线程名称,该名称保存在线程全局变量_threadName中
setThreadName( fullDesc.c_str() );
// Create the client obj, attach to thread
Client *c = new Client( fullDesc, mp );
currentClient.reset(c);
mongo::lastError.initThread();
c->setAuthorizationSession(new AuthorizationSession(new AuthzSessionExternalStateMongod(
getGlobalAuthorizationManager())));
return *c;
}
上面的代码中setThreadName(), currentClient.reset(c), mongo::lastError.initThread()这三个地方刚开始让我比较疑惑,因为_threadName,currentClient,lastError都是全局变量,但是每个线程都去设置一次是不是会改变其他之前设置的值呢?后来自习分析源码后发现他们都使用了线程本地存储技术,mongodb中是直接用 boost::thread_specific_ptr 来实现,在每个线程中,都各自new一个对象交给全局的threah_specific_ptr进行管理,当线程退出后,他会自动释放这个对象。
下面对initAndListen函数单独进行分析:
void initAndListen(int listenPort) {
try {
_initAndListen(listenPort);
}
...
}
以下是对_initAndListen主要代码的分析:
void _initAndListen(int listenPort ) {
//设置线程名称,为当前线程创建数据库操作的Client对象
Client::initThread("initandlisten");
//判断当前是否为32位系统
bool is32bit = sizeof(int*) == 4;
....
// Read storage engine metadata file (introduced in 2.8) if present.
// Do not start server if storage engine in metadata is not ‘mmapv1‘.
StorageEngineMetadata::validate(storageGlobalParams.dbpath, "mmapv1");
// TODO check non-journal subdirs if using directory-per-db
checkReadAhead(storageGlobalParams.dbpath);
//文件锁
//保证当前dbpath目录下只有一个mongodb实例,如果当前dbpath目录已经有mongodb运行
//则获取文件锁失败。
acquirePathLock(mongodGlobalParams.repair);
//删除所有临时文件
boost::filesystem::remove_all(storageGlobalParams.dbpath + "/_tmp/");
//文件
FileAllocator::get()->start();
...
MONGO_ASSERT_ON_EXCEPTION_WITH_MSG( clearTmpFiles(), "clear tmp files" );
//开启journal线程,创建持久化目录和文件
dur::startup();
...
//创建v8脚本引擎
if (mongodGlobalParams.scriptingEnabled) {
ScriptEngine::setup();
globalScriptEngine->setCheckInterruptCallback( jsInterruptCallback );
globalScriptEngine->setGetCurrentOpIdCallback( jsGetCurrentOpIdCallback );
}
...
//启动线程快照,定时拍照,默认4秒
if (serverGlobalParams.isHttpInterfaceEnabled)
snapshotThread.go();
d.clientCursorMonitor.go();
//报告内存使用情况和过时过时删除客户端游标的线程
PeriodicTask::startRunningPeriodicTasks();
//启动定期执行任务的线程,创建PeriodicTaskRunner,检测shutdown信号,没有就等待,默认60秒。
if (missingRepl) {
// a warning was logged earlier
}
else {
startTTLBackgroundJob();
}
#ifndef _WIN32
mongo::signalForkSuccess();
#endif
if(getGlobalAuthorizationManager()->isAuthEnabled()) {
// open admin db in case we need to use it later. TODO this is not the right way to
// resolve this.
Client::WriteContext c("admin", storageGlobalParams.dbpath);
}
authindex::configureSystemIndexes("admin");
getDeleter()->startWorkers();
// Starts a background thread that rebuilds all incomplete indices.
indexRebuilder.go();
//开始监听
listen(listenPort);
// listen() will return when exit code closes its socket.
exitCleanly(EXIT_NET_ERROR);
}
从上面的_initAndListen函数中可以看到,并没有开始监听客户端请求,只是做了一些数据库准备工作,最后才调用listen(listenPort)函数对端口进行监听,下面继续分析listen函数:
void listen(int port) {
//testTheDb();
MessageServer::Options options;
options.port = port;
options.ipList = serverGlobalParams.bind_ip;
MessageServer * server = createServer( options , new MyMessageHandler() );
server->setAsTimeTracker();
// we must setupSockets prior to logStartup() to avoid getting too high
// a file descriptor for our calls to select()
//初始化Socket,然后调用listen函数开始监听
server->setupSockets();
logStartup();
//后台启动数据分片线程
startReplication();
//如果设置了--httpinterface参数,则会启动web线程线程监听http请求
if (serverGlobalParams.isHttpInterfaceEnabled)
boost::thread web( boost::bind(&webServerThread, new RestAdminAccess() /* takes ownership */));
server->run();
}
MessageServer 是消息服务的基类,createServer返回的是他的子类PortMessageServer的对象,PortMessageServer中重写了虚函数run,客户端连接处理又被延迟到了PortMessageServer::run()函数中。
class MessageServer {
public:
struct Options {
int port; // port to bind to
string ipList; // addresses to bind to
Options() : port(0), ipList("") {}
};
virtual ~MessageServer() {}
virtual void run() = 0;
virtual void setAsTimeTracker() = 0;
virtual void setupSockets() = 0;
};
class Listener : boost::noncopyable {
public:
void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
virtual void accepted(boost::shared_ptr<Socket> psocket, long long connectionId );
virtual void acceptedMP(MessagingPort *mp);
...
}
class PortMessageServer : public MessageServer , public Listener {
...
}
下面继续分析PortMessageServer::run()函数:
void run() {
initAndListen();
}
由于initAndListen函数在mongodb中Windows和Linux两部分分开来实现,我们只分析Linux部分,两部分原来相同,都是使用select来监听客户端请求。下面代码我删除部分,只保留处理逻辑。
#if !defined(_WIN32)
void Listener::initAndListen() {
if (!_setupSocketsSuccessful) {
return;
}
SOCKET maxfd = 0; // needed for select()
for (unsigned i = 0; i < _socks.size(); i++) {
if (_socks[i] > maxfd)
maxfd = _socks[i];
}
...
struct timeval maxSelectTime;
while ( ! inShutdown() ) {
fd_set fds[1];
FD_ZERO(fds);
for (vector<SOCKET>::iterator it=_socks.begin(), end=_socks.end(); it != end; ++it) {
FD_SET(*it, fds);
}
maxSelectTime.tv_sec = 0;
maxSelectTime.tv_usec = 10000;
const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime);
...
for (vector<SOCKET>::iterator it=_socks.begin(), end=_socks.end(); it != end; ++it) {
if (! (FD_ISSET(*it, fds)))
continue;
SockAddr from;
int s = accept(*it, from.raw(), &from.addressSize);
...
long long myConnectionNumber = globalConnectionNumber.addAndFetch(1);
...
boost::shared_ptr<Socket> pnewSock( new Socket(s, from) );
#ifdef MONGO_SSL
if (_ssl) {
pnewSock->secureAccepted(_ssl);
}
#endif
accepted( pnewSock , myConnectionNumber );
}
}
}
#else
上面是一个标准的服务端socket select模型,每当有客户端socket连接时,accept成功后调用accepted函数处理。
void Listener::accepted(boost::shared_ptr<Socket> psocket, long long connectionId ) {
MessagingPort* port = new MessagingPort(psocket);
port->setConnectionId( connectionId );
acceptedMP( port );
}
此处调用的PortMessageServer对象的acceptedMP成员函数(PortMessageServer是Listener的子类),这里我们看到了一个新的类MessagingPort,MessagingPort封装了Message的发送、接收等操作,若有需求以后再具体分析。
virtual void acceptedMP(MessagingPort * p) {
...
pthread_attr_t attrs;
pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
static const size_t STACK_SIZE = 1024*1024; // if we change this we need to update the warning
struct rlimit limits;
pthread_t thread;
HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler);
int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, himParam);
pthread_attr_destroy(&attrs);
}
acceptedMP接收到消息后建立一个后台线程,在后台线程中执行handleIncomingMsg函数,处理客户端连接。
static void* handleIncomingMsg(void* arg) {
TicketHolderReleaser connTicketReleaser( &Listener::globalTicketHolder );
scoped_ptr<HandleIncomingMsgParam> himArg(static_cast<HandleIncomingMsgParam*>(arg));
MessagingPort* inPort = himArg->inPort;
MessageHandler* handler = himArg->handler;
...
inPort->psock->setLogLevel(logger::LogSeverity::Debug(1));
scoped_ptr<MessagingPort> p( inPort );
string otherSide;
Message m;
try {
LastError * le = new LastError();
lastError.reset( le ); // lastError now has ownership
handler->connected( p.get() );
while ( ! inShutdown() ) {
m.reset();
p->psock->clearCounters();
if ( ! p->recv(m) ) {
...
}
handler->process( m , p.get() , le );
networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() );
}
}
...
handler->disconnected( p.get() );
return NULL;
}
};
以上代码可以看出,服务端在接收到客户端连接请求后建立连接,然后创建后台线程,在线程中使用while循环不断的接收Message,然后调用handler的process处理。handler->connected里面只是再次调用Client::initThread(“conn”, p),这个问题之前已经分析过。
最后一个问题,handler对象其实就是createServer时候传入的第二个参数new MyMessageHandler();
class MessageHandler {
public:
virtual ~MessageHandler() {}
/**
* called once when a socket is connected
*/
virtual void connected( AbstractMessagingPort* p ) = 0;
/**
* called every time a message comes in
* handler is responsible for responding to client
*/
virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0;
/**
* called once when a socket is disconnected
*/
virtual void disconnected( AbstractMessagingPort* p ) = 0;
};
class MyMessageHandler : public MessageHandler {
...
}
至此mongod的整个消息处理轮廓已经出来了,其他的部分将会在后续的部分中继续分析
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。