Skip to main content

Command Palette

Search for a command to run...

mongo-rocks data format

Updated
6 min read

mongo-rocks 存储数据的底层格式

1. mongo-rocks

mongo-rocks是基于mongo对存储引擎的规定的接口,用rocksdb实现的一个存储插件的;类似于mysql本身底层的存储引擎可以是插件化的,mongo本身也支持的;mongo-rocks就是基于rocksdb为mongodb提供存储的功能的;当然目前版本的mongo的默认的存储引擎是wt引擎,官方支持的;mongo-rocks这个库目前也在不断的更新用来支持新版本的mongo版本的;当然由于官方版本的版本迭代速度实在太快了,目前最新的mongo-rocks的能支持的最新的稳定版本是4.2.5,如果是第一次用mongo的话,推荐还是使用官方的wt引擎吧,别折腾自己;而自己分析这个东西的原因主要是因为我司的mongo版本算是走了弯路:1. 版本是3.4(mongo默认是放弃维护的) 2. 底层存储引擎是rocksdb;所以还是需要了解这个东西是如何组织在底层rocksdb;

2. Mongodb存储接口

在具体了解底层rocksdb的组织结构之前,先对一个存储引擎需要提供哪些接口进行分析,这部分的代码在mongodb的工程的percona-server-mongodb/src/mongo/db/storage/kv/ 目录下面的,这部分定义了如果是用kv存储来支持mongo需要实现的接口有哪些;

  1. KVEngine (kv_engine.h): kv存储引擎定义接口的地方
// 1. 指定ns和ident来获得对应表的元信息;    
virtual std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
                                                        StringData ns,
                                                        StringData ident,
                                                        const CollectionOptions& options) = 0;
// 2. 指定ns和索引信息返回一个可以进行索引的数据访问接口
virtual SortedDataInterface* getSortedDataInterface(OperationContext* opCtx,
                                                        StringData ident,
                                                        const IndexDescriptor* desc) = 0;
// 3. 创建一张表
virtual Status createRecordStore(OperationContext* opCtx,
                                     StringData ns,
                                     StringData ident,
                                     const CollectionOptions& options) = 0;
// 4. 为ns创建索引
virtual Status createSortedDataInterface(OperationContext* opCtx,
                                             StringData ident,
                                             const IndexDescriptor* desc) = 0;
// 5. 获得ns的大小
virtual int64_t getIdentSize(OperationContext* opCtx, StringData ident) = 0;

// 6. 删除一张表
virtual Status dropIdent(OperationContext* opCtx, StringData ident) = 0;

// 其他还有,但是并不是很重要
  1. RecordStore: 对应的是某一个表的读写操作;

    上面KVEngine没有定义如何写入或者查询数据,这些定义的接口都在这个类中RecordStore; 主要关心的就是所谓的CRUD + 一些表的元信息的获得;

// 获得collection的数据大小,
virtual long long dataSize(OperationContext* txn) const = 0;
// 获得collection有多少条记录
virtual long long numRecords(OperationContext* txn) const = 0;
// 是否是限定容量的表,类似于oplog
virtual bool isCapped() const = 0;
// 设置如果容量达到上限之后如何处理的回调函数
virtual void setCappedCallback(CappedCallback*) {
        invariant(false);
}
// 数据在存储引擎中的真实大小,大部分时候都不大准;
virtual int64_t storageSize(OperationContext* txn,
                                BSONObjBuilder* extraInfo = NULL,
                                int infoLevel = 0) const = 0;

//Query
virtual bool findRecord(OperationContext* txn, const RecordId& loc, RecordData* out) const {}
//Delete
virtual void deleteRecord(OperationContext* txn, const RecordId& dl) = 0;
// Insert
virtual StatusWith<RecordId> insertRecord(OperationContext* txn,
                                              const char* data,
                                              int len,
                                              bool enforceQuota) = 0;
// Update
virtual Status updateRecord(OperationContext* txn,
                                const RecordId& oldLocation,
                                const char* data,
                                int len,
                                bool enforceQuota,
                                UpdateNotifier* notifier) = 0;


// 当然还是其他的接口,比如批量insert或者mongo的一些东西,但是这次重点还是以上的接口
  1. mongodb是如何初始化整个存储引擎并且如何调用上面的这些接口
// 1. 核心类,这些成员并非所有,但是是最重要的
class KVStorageEngine final : public StorageEngine {
    // This must be the first member so it is destroyed last.
  // 真实的存储引擎
    std::unique_ptr<KVEngine> _engine;
  // 存储元信息的表;上面介绍过RecordStore是表的read/write操作
    std::unique_ptr<RecordStore> _catalogRecordStore;
  // mongo的db/coll的元信息的操作,比如创建表、删除、修改、list等等
    std::unique_ptr<KVCatalog> _catalog;
  // database的管理;
    typedef std::map<std::string, KVDatabaseCatalogEntry*> DBMap;
    DBMap _dbs;
}
//用来存放元信息的表名
const std::string catalogInfo = "_mdb_catalog";
//下面分析KVStorageEngine初始化过程
KVStorageEngine::KVStorageEngine(KVEngine* engine, const KVStorageEngineOptions& options)
    : _options(options), _engine(engine), _supportsDocLocking(_engine->supportsDocLocking()) {

    //底层存储中是否包含了存储mongodb表信息的表
    bool catalogExists = engine->hasIdent(&opCtx, catalogInfo);
    if (options.forRepair && catalogExists) {
        log() << "Repairing catalog metadata";
        // TODO should also validate all BSON in the catalog.
        engine->repairIdent(&opCtx, catalogInfo);
    }

    if (!catalogExists) {
        WriteUnitOfWork uow(&opCtx);

        Status status =
            _engine->createRecordStore(&opCtx, catalogInfo, catalogInfo, CollectionOptions());
        // BadValue is usually caused by invalid configuration string.
        // We still fassert() but without a stack trace.
        if (status.code() == ErrorCodes::BadValue) {
            fassertFailedNoTrace(28562);
        }
        fassert(28520, status);
        uow.commit();
    }
    // 2. 从底层引擎中获得对应的元信息表的对象
    _catalogRecordStore =
        _engine->getRecordStore(&opCtx, catalogInfo, catalogInfo, CollectionOptions());
    // _catalog本身是基于_catalogRecordStore来提供对表的一些访问、list、删除等等的操作
    _catalog.reset(new KVCatalog(
        _catalogRecordStore.get(), _options.directoryPerDB, _options.directoryForIndexes));
    _catalog->init(&opCtx);

    std::vector<std::string> collections;
    _catalog->getAllCollections(&collections);

    for (size_t i = 0; i < collections.size(); i++) {
        std::string coll = collections[i];
        NamespaceString nss(coll);
        string dbName = nss.db().toString();

        // No rollback since this is only for committed dbs.
        KVDatabaseCatalogEntry*& db = _dbs[dbName];
        if (!db) {
            db = new KVDatabaseCatalogEntry(dbName, this);
        }

        db->initCollection(&opCtx, coll, options.forRepair);
    }

     //清理无用的表
    {
        // get all idents
        std::set<std::string> allIdents;
        {
            std::vector<std::string> v = _engine->getAllIdents(&opCtx);
            allIdents.insert(v.begin(), v.end());
            allIdents.erase(catalogInfo);
        }

        // remove ones still in use
        {
            vector<string> idents = _catalog->getAllIdents(&opCtx);
            for (size_t i = 0; i < idents.size(); i++) {
                allIdents.erase(idents[i]);
            }
        }

        for (std::set<std::string>::const_iterator it = allIdents.begin(); it != allIdents.end();
             ++it) {
            const std::string& toRemove = *it;
            if (!_catalog->isUserDataIdent(toRemove))
                continue;
            log() << "dropping unused ident: " << toRemove;
            WriteUnitOfWork wuow(&opCtx);
            _engine->dropIdent(&opCtx, toRemove);
            wuow.commit();
        }
    }
}
  1. mongodb创建表的一个具体的过程,中间会涉及到一些参数
// 用来生成collection的ident,最终的样子应该是: collection-2-一个随机数
std::string KVCatalog::_newUniqueIdent(StringData ns, const char* kind) {
    // If this changes to not put _rand at the end, _hasEntryCollidingWithRand will need fixing.
    StringBuilder buf;
    if (_directoryPerDb) {
        buf << NamespaceString::escapeDbName(nsToDatabaseSubstring(ns)) << '/';
    }
    buf << kind;
    buf << (_directoryForIndexes ? '/' : '-');
    buf << _next.fetchAndAdd(1) << '-' << _rand;
    return buf.str();
}
// 创建一张表
Status KVCatalog::newCollection(OperationContext* opCtx,
                                StringData ns,
                                const CollectionOptions& options) {
    invariant(opCtx->lockState() == NULL ||
              opCtx->lockState()->isDbLockedForMode(nsToDatabaseSubstring(ns), MODE_X));
   //1. 获得这个ns的一个唯一的标识符
    const string ident = _newUniqueIdent(ns, "collection");

    stdx::lock_guard<stdx::mutex> lk(_identsLock);
    Entry& old = _idents[ns.toString()];
    if (!old.ident.empty()) {
        return Status(ErrorCodes::NamespaceExists, "collection already exists");
    }
      // 一个表信息存储到底层的信息;大概是这个样子,option中会包含更多的一些关于collection的元信息;
    /**
    {
    "ns":"xx",
    "ident":"uniq",
    "md":{
        "ns":"xxx",
        "options":{

        }
        }
        }
    */
    BSONObj obj;
    {
        BSONObjBuilder b;
        b.append("ns", ns);
        b.append("ident", ident);
        BSONCollectionCatalogEntry::MetaData md;
        md.ns = ns.toString();
        md.options = options;
        b.append("md", md.toBSON());
        obj = b.obj();
    }

    // 直接把表的信息存储到底层的元信息的表中;
    StatusWith<RecordId> res = _rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false);
    if (!res.isOK())
        return res.getStatus();

    old = Entry(ident, res.getValue());
    LOG(1) << "stored meta data for " << ns << " @ " << res.getValue();
    return Status::OK();
}

上面几个类的分析,基本上能知道mongo是如何使用这些接口,主要是也解释了一些参数的生成,等到分析底层的rocksdb的数据组织形式的时候就会比较好的理解;

3. Mongo-Rocks是如何组织数据

因为我们使用的版本实在是很老,对应到的mongo版本是3.4的,所以存储的格式和目前的版本差别很大的;尤其是新版本都是用的Column family的方式来存储不同的表, 等下次我也总结直接的区别;

具体的目录如下: percona-server-mongodb/src/mongo/db/storage/kv/rocks/src下面,当然也可以直接clone mongo-rocks的代码; 下面集中几个点来分析,分别为:

  1. collection存储
  2. data的存储
  3. index的存储
  4. capped的表是如何控制size

1. Collection在rocksdb中如何存储

const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12);    
//  创建一个表
    Status RocksEngine::createRecordStore(OperationContext* opCtx, StringData ns, StringData ident,
                                          const CollectionOptions& options) {
        BSONObjBuilder configBuilder;
        //构建一个rocksdb的key,结构为:KmetadataPrefix + ident,value就是{"prefix":<uint32>}, prefix应该是rockdb自己统计的目前最大的prefix; 并且会把这个prefix编码也存储到rocksdb中,可能是为了加速找到最大的prefix, 这里我不管oplog的特殊性,所以下面的逻辑可以忽略
        auto s = _createIdent(ident, &configBuilder);
        if (s.isOK() && NamespaceString::oplog(ns)) {
            _oplogIdent = ident.toString();
            // oplog needs two prefixes, so we also reserve the next one
            uint64_t oplogTrackerPrefix = 0;
            {
                stdx::lock_guard<stdx::mutex> lk(_identMapMutex);
                oplogTrackerPrefix = ++_maxPrefix;
            }
            // we also need to write out the new prefix to the database. this is just an
            // optimization, 
            std::string encodedPrefix(encodePrefix(oplogTrackerPrefix));
            s = rocksToMongoStatus(
                _db->Put(rocksdb::WriteOptions(), encodedPrefix, rocksdb::Slice()));
        }
        return s;
    }
//读取一个表的信息
std::unique_ptr<RecordStore> RocksEngine::getRecordStore(OperationContext* opCtx, StringData ns,
                                             StringData ident, const CollectionOptions& options) {
        if (NamespaceString::oplog(ns)) {
            _oplogIdent = ident.toString();
        }

        auto config = _getIdentConfig(ident);
          // 从config配置里面获得对应的prefix,上面有介绍过,每一个collection在创建的时候会对应一个prefix,是一个递增的uint32
        std::string prefix = _extractPrefix(config);

          // 构建RocksRecordStore,这个类是RecordStore的一个实现类,用rocksdb来存储,本质也是用来操作crud的
        std::unique_ptr<RocksRecordStore> recordStore =
            options.capped
                ? stdx::make_unique<RocksRecordStore>(
                      ns, ident, _db.get(), _counterManager.get(), _durabilityManager.get(),
                      _compactionScheduler.get(), prefix,
                      true, options.cappedSize ? options.cappedSize : 4096,  // default size
                      options.cappedMaxDocs ? options.cappedMaxDocs : -1)
                : stdx::make_unique<RocksRecordStore>(ns, ident, _db.get(), _counterManager.get(),
                                                      _durabilityManager.get(), _compactionScheduler.get(),
                                                      prefix);

        {
            stdx::lock_guard<stdx::mutex> lk(_identObjectMapMutex);
            _identCollectionMap[ident] = recordStore.get();
        }
        return std::move(recordStore);
    }

存储Collection的元信息的总结:

  1. 每一个collection对应会存在一个prefix,后面会解释有什么用
  2. 元信息的key的格式为:kMetadataPrefix + ident, value:存储上面的prefix
  3. prefix encode之后也存储在rocksdb,目前还不是很清楚是做什么

2. collection中的数据如何存储

主要是看RocksRecordStore,因为它操作了collection的crud的操作;这里面我们将看到真实mongodb的存储结构,尤其是recordId的这个生成,为什么比较重要,原因在于不管是mongo的_id还是其他的索引,理论上来说最终都会转化成recordId,逻辑上接近在物理上可能相差很远,这是之前我遇到的一个问题;

        rocksdb::DB* _db;                      // rocksdb
        std::string _prefix; // 每一个collectuin都存在一个prefix,uniq
        const bool _isCapped; //是否是容量限制的collection
        const int64_t _cappedMaxSize; //max size
        const int64_t _cappedMaxSizeSlack;  // when to start applying backpressure
        const int64_t _cappedMaxDocs; // max doc num
        CappedCallback* _cappedCallback; // callback
        int _cappedDeleteCheckCount;      // 有多少的数据要被删
        const bool _isOplog; //是否是oplog

        // invariant: there is no live records earlier than _cappedOldestKeyHint. There might be
        // some records that are dead after _cappedOldestKeyHint.
        // SeekToFirst() on an capped collection is an expensive operation because bunch of keys at
        // the start are deleted. To reduce the overhead, we remember the next key to delete and
        // seek directly to it. This will not work correctly if somebody inserted a key before this
        // _cappedOldestKeyHint. However, we prevent this from happening by using
        // _cappedVisibilityManager and checking isCappedHidden() during deletions
        RecordId _cappedOldestKeyHint; //capped中最早的一条数据的recordId
        std::string _ident; //collection's ident
        AtomicUInt64 _nextIdNum; //recordId的生成依赖
        std::atomic<long long> _dataSize; //data size
        std::atomic<long long> _numRecords; //num record

        const std::string _dataSizeKey; // 存储datasize的rocksdb的key
        const std::string _numRecordsKey; // 存储record num的record的key

// 如何生成recordId,是一个原子变量的递增
    RecordId RocksRecordStore::_nextId() {
        invariant(!_isOplog);
        return RecordId(_nextIdNum.fetchAndAdd(1));
    }
// 查看如何初始化的_nextIdNum,主要思考如何持久化的; 下面这段代码来源于RocksRecordStore在初始化的时候使用,本质上是通过查找表中最后一条记录获得对应的Id,然后+1; TODO:是否会存在已经被使用过的recordId再次被重用呢?理论上应该是会的,但是recordId本质上不对外,所以即使重用了,也是说明之前的数据是被删除的,理论上问题不大的;
        bool emptyCollection = !iter->Valid();
        if (!emptyCollection) {
            // if it's not empty, find next RecordId
            iter->SeekToLast();
            dassert(iter->Valid());
            rocksdb::Slice lastSlice = iter->key();
            RecordId lastId = _makeRecordId(lastSlice);
            if (_isOplog || _isCapped) {
                _cappedVisibilityManager->updateHighestSeen(lastId);
            }
            _nextIdNum.store(lastId.repr() + 1);
        } else {
            // Need to start at 1 so we are always higher than RecordId::min()
            _nextIdNum.store(1);
        }

// 如何写入一条数据,这边主要关心key的构建, 这是在insertRecord中最终往rocksdb中使用的;
ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len));
//一个key的构成: encord(prefix) + encord(recordId), 其实理论上在rocksdb中,所有表的数据是聚集在一起的;
std::string RocksRecordStore::_makePrefixedKey(const std::string& prefix, const RecordId& loc) {
        int64_t storage;
        auto encodedLoc = _makeKey(loc, &storage);
        std::string key(prefix);
        key.append(encodedLoc.data(), encodedLoc.size());
        return key;
}
//recordId统一编码
rocksdb::Slice RocksRecordStore::_makeKey(const RecordId& loc, int64_t* storage) {
        *storage = endian::nativeToBig(loc.repr());
        return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
}

//查找一个记录,比较简单的,就是构建key,直接在rocksdb中去找即可
RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, const std::string& prefix,
                                             OperationContext* txn, const RecordId& loc) {
        RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);

        std::string valueStorage;
        auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage);
        if (status.IsNotFound()) {
            return RecordData(nullptr, 0);
        }
        invariantRocksOK(status);

        SharedBuffer data = SharedBuffer::allocate(valueStorage.size());
        memcpy(data.get(), valueStorage.data(), valueStorage.size());
        return RecordData(data, valueStorage.size());
}

//update + delete和正常的rocksdb操作都类似;

总结存储数据的:

  1. data key的组成: encode(prefix) + encode(recordId), prefix是每一个collection的uniq id, Len:32byte
  2. prefix: 每次mongo重启的时候会去当前数据库找到最大的前缀,后面的新生成的prefix就递增;
  3. data size的前缀: std::string("\0\0\0\0", 4) + "datasize-" + id.toString(),id = collection的ident
  4. record num的前缀: std::string("\0\0\0\0", 4) + "numrecords-" + id.toString(), id = collection的ident
  5. prefix: 每一个mongodb的meta信息的一个前缀,为了区别各个元数据
  6. ident:每一个mongodb的meta信息的名字,比如collection、索引之类的,名字上还是有点区别

More from this blog

Ai时代的工具链

本周是black Friday,我订阅了几个AI服务,还是蛮贵的...不过这样基本上构成我目前整体的知识阅读的过程,随着Ai的不断发展,工具链的替换可能是很重要的一个过程的。我主要订购了以下几个工具: Memo: 这个工具的主要作用是将视频/audio转srt,并且带有ai翻译的工具;当然我觉得它做的非常好的是,它把整个链路做的非常好的,并且可以用本地的资源做audio->text;而且它自带了很多的ai功能,比如对字幕进行进一步的AI的处理,提问,summarize和思维导图等等;目前我主要...

Nov 30, 20251 min read

做了一个噩梦

今天凌晨4点多起来看了一眼丈母娘的发烧是否ok...就导致我有点睡不着的,刷了一会推特之后又开始睡觉了,于是就开始做了一个很可怕的梦。 噩梦 那天,我不知道是在哪里..我带着女儿和我弟出去玩的,貌似是一个风景山区。于是我就带着女儿和弟弟出去玩的;我们走啊走, 沿着一条路一直走..突然看到一个小道有一家饭店的,这个饭店是比较特殊,有很多海鲜的;我看上了一只大龙虾,我问多少钱的,他说大概就70rmb就可以的。。。我觉得很划算的,我心想:我买下来,到时候把老婆叫过来一起吃的,并且告诉她这个才70rmb...

Nov 24, 20251 min read

子女教育-2

下面我分享一个推特上的一个关于子女教育的推 哈哈哈哈,李诞这个视频我看过 我给你分享几个我和我女儿之间的小故事 第一个故事 我经常给小朋友说:你们现在上学的成绩不重要,你们现在数学考试都是语文脑筋急转弯,语文考试都是历史背诵,一点用都没有,你出了社会就知道,社会根本没有选择题,社会要有选择题就好了,最难的是你遇到困难,你连门都找不到。我第一次这样讲的时候是女儿小学4年级,那时候我女儿听的一愣一愣的,她不明白,但是觉得我的理论和学校的不一样,很狂妄,但是她很喜欢,哈哈哈哈。 她什么时候真正明...

Nov 13, 20251 min read

被诈骗-马来西亚

最近我在国内,我老婆在马来;最近在计划搬家的,找的那个房子不包含一些必要的家具,于是我老婆就必须要买点家具的,主要是沙发和餐桌..我们本来计划是说去ikea去买,但是我老婆觉得ikea的家具不便宜,并且款式一般的,最终问了中介找了一个二手平台找找看不错的家具。 我老婆挑了两个家具的,我看了一下价格也不算便宜的,但是我老婆喜欢的,于是我就说你觉得ok那就购买吧。我还顺便问了一下,这个家具能不能线下看一下货的,但是我老婆说这货在很远的地方的,大概是300公里的一个城市的。那我就说这个包邮吗,我老婆说...

Nov 13, 20251 min read

当下和最近想做的事情

1. Current 当下 最近依然还在中国,已经回来快一个月了. 最近一直在忙着带丈母娘看病和住院的。索性一切都还在可控范围内的,丈母娘由于糖尿病控制的很差导致本身的冠心病也复发. 这次去浙江省人民医院去做了造影检查和支架植入的手术的,不过这一切都比我预估的要顺利,我就怕她由于长时间没吃药和高血糖的持续的时间太长了,会带来严重的问题,不过好在没有发生最坏的事情的。 因为做了手术,所以这段时间我和我老婆的姐姐每人轮换的陪床,不过陪床真的好累的,因为睡得很不好的,特别的累。不过好在都结束了,而且丈...

Nov 9, 20251 min read

Keep Move - 永不止步

39 posts