MongoDB Tailable Cursors 深入剖析

MongoDB Tailable Cursors 是一个类似于UNIX tail -f 命令的功能,它使用在MongoDB的capped collection上。利用这一特性可以轻松的实现消息队列系统(简介看这里)。而官网上对于 Tailable Cursors 的讲解却非常简略,下面推荐的是一篇通过多种方式的性能测试,对 Tailable Cursors 进行深入剖析的文章。10gen 的CEO Dwight Merriman 在看完此文后也表示官方的文档需要完善了。

下面是测试的第一步,作者的步骤是先让数据接收者对测试的目标colleciton进行读取,然后再用python脚本写入10w条数据到对应的capped collection里。写入的数据都带有写入时的时间戳信息,读取端在读到数据后对比时间戳即可计算数据延迟。

下面是写数据的python脚本:

#!/usr/bin/python

import time
from pymongo import Connection

conn = Connection()
db = conn.queues
coll = db.messages

start = time.time()

count = 100000
for i in range(0, count):
    coll.insert({ "time": time.time()})
    #This generates messages at a rate of about 3,900 msg/s
    time.sleep(0.0001)

end = time.time()
print("total: ", count)
print("msg/s: ", count/(end - start))

下面是读数据的C++脚本,根据官方文档上的脚本改写

#include

#include

using namespace mongo;                                                                                                 

int main(int argc, char* argv[])
{
    DBClientConnection conn;
    conn.connect("localhost");                                                                                         

    // minKey is smaller than any other possible value
    BSONElement lastId = minKey.firstElement();                                                                        

    // { $natural : 1 } means in forward capped collection insertion order
    Query query = Query().sort("$natural");                                                                            

    cout << "loc,val" << std::endl;
    uint32_t i = 0;
    struct timeval tv;
     while( true ) {
         auto_ptr c = conn.query("queues.messages", query, 0, 0, 0,
                QueryOption_CursorTailable);
         while( true ) {
             if( !c->more() ) {
                if( c->isDead() )
                    break;
                sleepsecs(1);
                continue; // we will try more() again
            }

            const BSONObj& o = c->next();
            lastId = o["_id"];
            const double time = o["time"].Double();

            gettimeofday(&tv, NULL);
            const double curr = tv.tv_sec + tv.tv_usec / 1000000.0;
            cout << i++ << ", " << curr - time << endl;
        }

        // prepare to requery from where we left off
        query = QUERY( "_id" << GT << lastId ).sort("$natural");
    }

    return 0;
}

测试结果如下,下面是由这10w个 document 的延迟画出的线状图,可以看到图呈锯齿形。也就是说,如果你使用官方的例子,那么其读取性能是呈锯齿形的。

然后作者猜测可能是由于程序里在每次数据读尽后进行了一秒的sleep导致的(数据读尽的原因应该是写的速度赶不上读的速度,导致每过一段时间就会出现一次数据读尽的情况)。于是作者在已经填充了10w条数据的情况下再跑了一次上面的C++程序。这次数据都已经写好了,不会出现读尽的情况,于是得到下面的图:

此图基本验证了作者的猜想,为了进一步验证其猜想,他去掉了程序里的sleep 1秒代码,再次进行了第一次测试(先开着读进程,再进行写入)。效果如下:

可以看出,效果非常好,但是很明显,如果我们不再sleep了,那么在数据读尽时实际上程序是在跑死循环,CPU负载可想而知。

其实MongoDB是提供了一个方法来解决这种问题的,那就是QueryOption_AwaitData选项,在官方的程序里是加了此选项的,此选项的解释如下:

Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.

在使用TailableCursor时,此参数会在数据读尽时先阻塞一小段时间后再读取一次并进行返回。

于是按官方文档中例子的做法,作者加上了QueryOption_AwaitData选项,当然我们还需要去掉程序里的sleep。而同时,在写脚本还没有开始写时,我们的collection里是没有数据里,在一个capped collection里没有数据时,实际上这个cursor是会被关闭的,这时候会再次导致死循环,所以这个sleep又需要移到isDead判断里,具体进行两处修改后的代码片断如下:

while( true ) {
         if( !c->more() ) {
             if( c->isDead() )
             {
                 // 这个sleep是为了在collection里没有数据时防止死循环而写的
                 sleepsecs(1);
                 break;
             }
             continue;
         }

执行脚本后的延迟曲线如下:

好吧,除了刚开始有一段锯齿外,基本上都很理想。那刚开始的一段锯齿是从何而来呢。其实就是因为我们之前collection里没数据导致了在第一次读取前sleep了一秒,堆积了一些数据,而后来一直是在query方法里通过QueryOption_AwaitData参数block一小段时间,所以后续的延迟时间都一直比较小。

为了解决这个没有数据时情况,作者做了一个小技巧,预先在collection里写入一条特殊数据,使这个cursor不会被服务端杀掉。代码段如下:

while( true )
{
     auto_ptr c = conn.query("queues.messages", query, 0, 0, 0,
             QueryOption_CursorTailable | QueryOption_AwaitData);
     while( true ) {
         if( !c->more() ) {
             if( c->isDead() )
             {
                 // this sleep is important for collections that start out with no data
                 sleepsecs(1);
                 break;
             }
             continue;
         }

         const BSONObj& o = c->next();
         lastId = o["_id"];
         const double time = o["time"].Double();

         // 处理这条特殊数据: {time: 0.0}
         if (time == 0)
             continue;

         gettimeofday(&tv, NULL);
         const double curr = tv.tv_sec + tv.tv_usec / 1000000.0;
         cout << i++ << ", " << curr - time << endl;
     }

     // prepare to requery from where we left off
     query = QUERY( "_id" << GT << lastId ).sort("$natural");
}

如上面红色标示部分,对于这条特殊数据不进行处理。这样改动后得到了一个比较完美的最终版性能图:

好吧,解释就到这里,如果您打算使用或者正在使用Tailable Cursors特性做消息队列,不知道是否注意到了这些细节。如果还没有,恭喜你能看到这篇文章。:)

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。