leveldb源码分析--WriteBatch
从【leveldb源码分析--插入删除流程】和WriteBatch其名我们就很轻易的知道,这个是leveldb内部的一个批量写的结构,在leveldb为了提高插入和删除的效率,在其插入过程中都采用了批量集合相邻的多个具有相同同步设置的写请求以批量的方式进行写入。
其成员变量仅包含了一个 std::string 类型的 rep_变量,其Put和Delete(其实也是插入删除操作,而非删除Put进去的数据,或者你可以将其理解为Put Delete operation的过度简写)都将相应的操作Encode后存入其中。我们来看看其encode的格式
WriteBatch::rep_ := [ sequence: fixed64 | count: fixed32 | data: record[count ]
record := [ kTypeValue | key(varint32 | data) | value(varint32 | data) ]
我们首先看一下WriteBatch内部相关的一些结构和成员
class Handler { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; friend class WriteBatchInternal;
WriteBatchInternal主要是对WriteBatch的内部解码编码的封装,简化WriteBatch的结构。而Handler接口我们可以看到MemTableInserter成为了其一个实现,这个handler是在WriteBatch的内部解码遍历过程中逐个调用Handler对应的Put、Delete到MemTable中。所以再回头分析DBImpl::BuildBatchGroup方法。
// REQUIRES: First writer must have a non-NULL batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { Writer* first = writers_.front(); WriteBatch* result = first->batch;
size_t size = WriteBatchInternal::ByteSize(first->batch); /* 这里主要是设置每次批量写入的最大的数据量,防止一次插入过多数据导致等待写完成的时间过长 因为从Write的逻辑分析中我们知道只有位于队列首的写线程会去批量组装然后执行真正的插入,
其他的线程都是在等待这个批量写的完成。 */
size_t max_size = 1 << 20; if (size <= (128<<10)) { max_size = size + (128<<10); } *last_writer = first; std::deque<Writer*>::iterator iter = writers_.begin(); ++iter; // 跳过 "first" for (; iter != writers_.end(); ++iter) { Writer* w = *iter; if (w->sync && !first->sync) { // 只组合相邻的sync设置相同的操作到一批进行处理. break; } if (w->batch != NULL) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { // 如果总大小超过设置的现在大小,不再继续组装过程,跳出循环执行已组装的请求 break; } // 第一次进入循环,把tmp_batch_赋给result并把第一个放入其中,后继的都是append到tmp_batch_中
最后return出去进行操作的也是这个tmp_batch_ if (result == first->batch) { result = tmp_batch_; assert(WriteBatchInternal::Count(result) == 0); WriteBatchInternal::Append(result, first->batch); } WriteBatchInternal::Append(result, w->batch); } *last_writer = w; } return result; }
接下来对该batch进行操作的是WriteBatchInternal::InsertInto(updates, mem_)这个调用,这就是前面提到的采用了一个Iterator的方式解码遍历操作batch中的数据
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter); }
Status WriteBatch::Iterate(Handler* handler) const {
while (!input.empty()) { switch (tag) { case kTypeValue: handler->Put(key, value); break; case kTypeDeletion: handler->Delete(key);
break; } } return Status::OK(); }
这里我们再回顾一下Write有
while (!w.done && &w != writers_.front()) {
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。