MongoDB Tailable Cursors 深入剖析
下面是测试的第一步,作者的步骤是先让数据接收者对测试的目标colleciton进行读取,然后再用python脚本写入10w条数据到对应的capped collection里。写入的数据都带有写入时的时间戳信息,读取端在读到数据后对比时间戳即可计算数据延迟。
下面是写数据的python脚本:
#!/usr/bin/python
import time
from pymongo import Connectionconn = Connection()
db = conn.queues
coll = db.messagesstart = time.time()
count = 100000
for i in range(0, count):
coll.insert({ "time": time.time()})
#This generates messages at a rate of about 3,900 msg/s
time.sleep(0.0001)end = time.time()
print("total: ", count)
print("msg/s: ", count/(end - start))
下面是读数据的C++脚本,根据官方文档上的脚本改写
#include
#include
using namespace mongo;
int main(int argc, char* argv[])
{
DBClientConnection conn;
conn.connect("localhost");// minKey is smaller than any other possible value
BSONElement lastId = minKey.firstElement();// { $natural : 1 } means in forward capped collection insertion order
Query query = Query().sort("$natural");cout << "loc,val" << std::endl;
uint32_t i = 0;
struct timeval tv;
while( true ) {
auto_ptr c = conn.query("queues.messages", query, 0, 0, 0,
QueryOption_CursorTailable);
while( true ) {
if( !c->more() ) {
if( c->isDead() )
break;
sleepsecs(1);
continue; // we will try more() again
}const BSONObj& o = c->next();
lastId = o["_id"];
const double time = o["time"].Double();gettimeofday(&tv, NULL);
const double curr = tv.tv_sec + tv.tv_usec / 1000000.0;
cout << i++ << ", " << curr - time << endl;
}// prepare to requery from where we left off
query = QUERY( "_id" << GT << lastId ).sort("$natural");
}return 0;
}
测试结果如下,下面是由这10w个 document 的延迟画出的线状图,可以看到图呈锯齿形。也就是说,如果你使用官方的例子,那么其读取性能是呈锯齿形的。
然后作者猜测可能是由于程序里在每次数据读尽后进行了一秒的sleep导致的(数据读尽的原因应该是写的速度赶不上读的速度,导致每过一段时间就会出现一次数据读尽的情况)。于是作者在已经填充了10w条数据的情况下再跑了一次上面的C++程序。这次数据都已经写好了,不会出现读尽的情况,于是得到下面的图:
此图基本验证了作者的猜想,为了进一步验证其猜想,他去掉了程序里的sleep 1秒代码,再次进行了第一次测试(先开着读进程,再进行写入)。效果如下:
可以看出,效果非常好,但是很明显,如果我们不再sleep了,那么在数据读尽时实际上程序是在跑死循环,CPU负载可想而知。
其实MongoDB是提供了一个方法来解决这种问题的,那就是QueryOption_AwaitData选项,在官方的程序里是加了此选项的,此选项的解释如下:
Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
在使用TailableCursor时,此参数会在数据读尽时先阻塞一小段时间后再读取一次并进行返回。
于是按官方文档中例子的做法,作者加上了QueryOption_AwaitData选项,当然我们还需要去掉程序里的sleep。而同时,在写脚本还没有开始写时,我们的collection里是没有数据里,在一个capped collection里没有数据时,实际上这个cursor是会被关闭的,这时候会再次导致死循环,所以这个sleep又需要移到isDead判断里,具体进行两处修改后的代码片断如下:
while( true ) {
if( !c->more() ) {
if( c->isDead() )
{
// 这个sleep是为了在collection里没有数据时防止死循环而写的
sleepsecs(1);
break;
}
continue;
}
执行脚本后的延迟曲线如下:
好吧,除了刚开始有一段锯齿外,基本上都很理想。那刚开始的一段锯齿是从何而来呢。其实就是因为我们之前collection里没数据导致了在第一次读取前sleep了一秒,堆积了一些数据,而后来一直是在query方法里通过QueryOption_AwaitData参数block一小段时间,所以后续的延迟时间都一直比较小。
为了解决这个没有数据时情况,作者做了一个小技巧,预先在collection里写入一条特殊数据,使这个cursor不会被服务端杀掉。代码段如下:
while( true )
{
auto_ptr c = conn.query("queues.messages", query, 0, 0, 0,
QueryOption_CursorTailable | QueryOption_AwaitData);
while( true ) {
if( !c->more() ) {
if( c->isDead() )
{
// this sleep is important for collections that start out with no data
sleepsecs(1);
break;
}
continue;
}const BSONObj& o = c->next();
lastId = o["_id"];
const double time = o["time"].Double();// 处理这条特殊数据: {time: 0.0}
if (time == 0)
continue;gettimeofday(&tv, NULL);
const double curr = tv.tv_sec + tv.tv_usec / 1000000.0;
cout << i++ << ", " << curr - time << endl;
}// prepare to requery from where we left off
query = QUERY( "_id" << GT << lastId ).sort("$natural");
}
如上面红色标示部分,对于这条特殊数据不进行处理。这样改动后得到了一个比较完美的最终版性能图:
好吧,解释就到这里,如果您打算使用或者正在使用Tailable Cursors特性做消息队列,不知道是否注意到了这些细节。如果还没有,恭喜你能看到这篇文章。:)
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。