mongo-rocks data format
mongo-rocks 存储数据的底层格式
1. mongo-rocks
mongo-rocks是基于mongo对存储引擎的规定的接口,用rocksdb实现的一个存储插件的;类似于mysql本身底层的存储引擎可以是插件化的,mongo本身也支持的;mongo-rocks就是基于rocksdb为mongodb提供存储的功能的;当然目前版本的mongo的默认的存储引擎是wt引擎,官方支持的;mongo-rocks这个库目前也在不断的更新用来支持新版本的mongo版本的;当然由于官方版本的版本迭代速度实在太快了,目前最新的mongo-rocks的能支持的最新的稳定版本是4.2.5,如果是第一次用mongo的话,推荐还是使用官方的wt引擎吧,别折腾自己;而自己分析这个东西的原因主要是因为我司的mongo版本算是走了弯路:1. 版本是3.4(mongo默认是放弃维护的) 2. 底层存储引擎是rocksdb;所以还是需要了解这个东西是如何组织在底层rocksdb;
2. Mongodb存储接口
在具体了解底层rocksdb的组织结构之前,先对一个存储引擎需要提供哪些接口进行分析,这部分的代码在mongodb的工程的percona-server-mongodb/src/mongo/db/storage/kv/ 目录下面的,这部分定义了如果是用kv存储来支持mongo需要实现的接口有哪些;
KVEngine (kv_engine.h): kv存储引擎定义接口的地方
// 1. 指定ns和ident来获得对应表的元信息;
virtual std::unique_ptr<RecordStore> getRecordStore(OperationContext* opCtx,
StringData ns,
StringData ident,
const CollectionOptions& options) = 0;
// 2. 指定ns和索引信息返回一个可以进行索引的数据访问接口
virtual SortedDataInterface* getSortedDataInterface(OperationContext* opCtx,
StringData ident,
const IndexDescriptor* desc) = 0;
// 3. 创建一张表
virtual Status createRecordStore(OperationContext* opCtx,
StringData ns,
StringData ident,
const CollectionOptions& options) = 0;
// 4. 为ns创建索引
virtual Status createSortedDataInterface(OperationContext* opCtx,
StringData ident,
const IndexDescriptor* desc) = 0;
// 5. 获得ns的大小
virtual int64_t getIdentSize(OperationContext* opCtx, StringData ident) = 0;
// 6. 删除一张表
virtual Status dropIdent(OperationContext* opCtx, StringData ident) = 0;
// 其他还有,但是并不是很重要
RecordStore: 对应的是某一个表的读写操作;上面
KVEngine没有定义如何写入或者查询数据,这些定义的接口都在这个类中RecordStore; 主要关心的就是所谓的CRUD+ 一些表的元信息的获得;
// 获得collection的数据大小,
virtual long long dataSize(OperationContext* txn) const = 0;
// 获得collection有多少条记录
virtual long long numRecords(OperationContext* txn) const = 0;
// 是否是限定容量的表,类似于oplog
virtual bool isCapped() const = 0;
// 设置如果容量达到上限之后如何处理的回调函数
virtual void setCappedCallback(CappedCallback*) {
invariant(false);
}
// 数据在存储引擎中的真实大小,大部分时候都不大准;
virtual int64_t storageSize(OperationContext* txn,
BSONObjBuilder* extraInfo = NULL,
int infoLevel = 0) const = 0;
//Query
virtual bool findRecord(OperationContext* txn, const RecordId& loc, RecordData* out) const {}
//Delete
virtual void deleteRecord(OperationContext* txn, const RecordId& dl) = 0;
// Insert
virtual StatusWith<RecordId> insertRecord(OperationContext* txn,
const char* data,
int len,
bool enforceQuota) = 0;
// Update
virtual Status updateRecord(OperationContext* txn,
const RecordId& oldLocation,
const char* data,
int len,
bool enforceQuota,
UpdateNotifier* notifier) = 0;
// 当然还是其他的接口,比如批量insert或者mongo的一些东西,但是这次重点还是以上的接口
mongodb是如何初始化整个存储引擎并且如何调用上面的这些接口
// 1. 核心类,这些成员并非所有,但是是最重要的
class KVStorageEngine final : public StorageEngine {
// This must be the first member so it is destroyed last.
// 真实的存储引擎
std::unique_ptr<KVEngine> _engine;
// 存储元信息的表;上面介绍过RecordStore是表的read/write操作
std::unique_ptr<RecordStore> _catalogRecordStore;
// mongo的db/coll的元信息的操作,比如创建表、删除、修改、list等等
std::unique_ptr<KVCatalog> _catalog;
// database的管理;
typedef std::map<std::string, KVDatabaseCatalogEntry*> DBMap;
DBMap _dbs;
}
//用来存放元信息的表名
const std::string catalogInfo = "_mdb_catalog";
//下面分析KVStorageEngine初始化过程
KVStorageEngine::KVStorageEngine(KVEngine* engine, const KVStorageEngineOptions& options)
: _options(options), _engine(engine), _supportsDocLocking(_engine->supportsDocLocking()) {
//底层存储中是否包含了存储mongodb表信息的表
bool catalogExists = engine->hasIdent(&opCtx, catalogInfo);
if (options.forRepair && catalogExists) {
log() << "Repairing catalog metadata";
// TODO should also validate all BSON in the catalog.
engine->repairIdent(&opCtx, catalogInfo);
}
if (!catalogExists) {
WriteUnitOfWork uow(&opCtx);
Status status =
_engine->createRecordStore(&opCtx, catalogInfo, catalogInfo, CollectionOptions());
// BadValue is usually caused by invalid configuration string.
// We still fassert() but without a stack trace.
if (status.code() == ErrorCodes::BadValue) {
fassertFailedNoTrace(28562);
}
fassert(28520, status);
uow.commit();
}
// 2. 从底层引擎中获得对应的元信息表的对象
_catalogRecordStore =
_engine->getRecordStore(&opCtx, catalogInfo, catalogInfo, CollectionOptions());
// _catalog本身是基于_catalogRecordStore来提供对表的一些访问、list、删除等等的操作
_catalog.reset(new KVCatalog(
_catalogRecordStore.get(), _options.directoryPerDB, _options.directoryForIndexes));
_catalog->init(&opCtx);
std::vector<std::string> collections;
_catalog->getAllCollections(&collections);
for (size_t i = 0; i < collections.size(); i++) {
std::string coll = collections[i];
NamespaceString nss(coll);
string dbName = nss.db().toString();
// No rollback since this is only for committed dbs.
KVDatabaseCatalogEntry*& db = _dbs[dbName];
if (!db) {
db = new KVDatabaseCatalogEntry(dbName, this);
}
db->initCollection(&opCtx, coll, options.forRepair);
}
//清理无用的表
{
// get all idents
std::set<std::string> allIdents;
{
std::vector<std::string> v = _engine->getAllIdents(&opCtx);
allIdents.insert(v.begin(), v.end());
allIdents.erase(catalogInfo);
}
// remove ones still in use
{
vector<string> idents = _catalog->getAllIdents(&opCtx);
for (size_t i = 0; i < idents.size(); i++) {
allIdents.erase(idents[i]);
}
}
for (std::set<std::string>::const_iterator it = allIdents.begin(); it != allIdents.end();
++it) {
const std::string& toRemove = *it;
if (!_catalog->isUserDataIdent(toRemove))
continue;
log() << "dropping unused ident: " << toRemove;
WriteUnitOfWork wuow(&opCtx);
_engine->dropIdent(&opCtx, toRemove);
wuow.commit();
}
}
}
mongodb创建表的一个具体的过程,中间会涉及到一些参数
// 用来生成collection的ident,最终的样子应该是: collection-2-一个随机数
std::string KVCatalog::_newUniqueIdent(StringData ns, const char* kind) {
// If this changes to not put _rand at the end, _hasEntryCollidingWithRand will need fixing.
StringBuilder buf;
if (_directoryPerDb) {
buf << NamespaceString::escapeDbName(nsToDatabaseSubstring(ns)) << '/';
}
buf << kind;
buf << (_directoryForIndexes ? '/' : '-');
buf << _next.fetchAndAdd(1) << '-' << _rand;
return buf.str();
}
// 创建一张表
Status KVCatalog::newCollection(OperationContext* opCtx,
StringData ns,
const CollectionOptions& options) {
invariant(opCtx->lockState() == NULL ||
opCtx->lockState()->isDbLockedForMode(nsToDatabaseSubstring(ns), MODE_X));
//1. 获得这个ns的一个唯一的标识符
const string ident = _newUniqueIdent(ns, "collection");
stdx::lock_guard<stdx::mutex> lk(_identsLock);
Entry& old = _idents[ns.toString()];
if (!old.ident.empty()) {
return Status(ErrorCodes::NamespaceExists, "collection already exists");
}
// 一个表信息存储到底层的信息;大概是这个样子,option中会包含更多的一些关于collection的元信息;
/**
{
"ns":"xx",
"ident":"uniq",
"md":{
"ns":"xxx",
"options":{
}
}
}
*/
BSONObj obj;
{
BSONObjBuilder b;
b.append("ns", ns);
b.append("ident", ident);
BSONCollectionCatalogEntry::MetaData md;
md.ns = ns.toString();
md.options = options;
b.append("md", md.toBSON());
obj = b.obj();
}
// 直接把表的信息存储到底层的元信息的表中;
StatusWith<RecordId> res = _rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false);
if (!res.isOK())
return res.getStatus();
old = Entry(ident, res.getValue());
LOG(1) << "stored meta data for " << ns << " @ " << res.getValue();
return Status::OK();
}
上面几个类的分析,基本上能知道mongo是如何使用这些接口,主要是也解释了一些参数的生成,等到分析底层的rocksdb的数据组织形式的时候就会比较好的理解;
3. Mongo-Rocks是如何组织数据
因为我们使用的版本实在是很老,对应到的mongo版本是3.4的,所以存储的格式和目前的版本差别很大的;尤其是新版本都是用的Column family的方式来存储不同的表, 等下次我也总结直接的区别;
具体的目录如下: percona-server-mongodb/src/mongo/db/storage/kv/rocks/src下面,当然也可以直接clone mongo-rocks的代码; 下面集中几个点来分析,分别为:
- collection存储
- data的存储
- index的存储
- capped的表是如何控制size
1. Collection在rocksdb中如何存储
const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12);
// 创建一个表
Status RocksEngine::createRecordStore(OperationContext* opCtx, StringData ns, StringData ident,
const CollectionOptions& options) {
BSONObjBuilder configBuilder;
//构建一个rocksdb的key,结构为:KmetadataPrefix + ident,value就是{"prefix":<uint32>}, prefix应该是rockdb自己统计的目前最大的prefix; 并且会把这个prefix编码也存储到rocksdb中,可能是为了加速找到最大的prefix, 这里我不管oplog的特殊性,所以下面的逻辑可以忽略
auto s = _createIdent(ident, &configBuilder);
if (s.isOK() && NamespaceString::oplog(ns)) {
_oplogIdent = ident.toString();
// oplog needs two prefixes, so we also reserve the next one
uint64_t oplogTrackerPrefix = 0;
{
stdx::lock_guard<stdx::mutex> lk(_identMapMutex);
oplogTrackerPrefix = ++_maxPrefix;
}
// we also need to write out the new prefix to the database. this is just an
// optimization,
std::string encodedPrefix(encodePrefix(oplogTrackerPrefix));
s = rocksToMongoStatus(
_db->Put(rocksdb::WriteOptions(), encodedPrefix, rocksdb::Slice()));
}
return s;
}
//读取一个表的信息
std::unique_ptr<RecordStore> RocksEngine::getRecordStore(OperationContext* opCtx, StringData ns,
StringData ident, const CollectionOptions& options) {
if (NamespaceString::oplog(ns)) {
_oplogIdent = ident.toString();
}
auto config = _getIdentConfig(ident);
// 从config配置里面获得对应的prefix,上面有介绍过,每一个collection在创建的时候会对应一个prefix,是一个递增的uint32
std::string prefix = _extractPrefix(config);
// 构建RocksRecordStore,这个类是RecordStore的一个实现类,用rocksdb来存储,本质也是用来操作crud的
std::unique_ptr<RocksRecordStore> recordStore =
options.capped
? stdx::make_unique<RocksRecordStore>(
ns, ident, _db.get(), _counterManager.get(), _durabilityManager.get(),
_compactionScheduler.get(), prefix,
true, options.cappedSize ? options.cappedSize : 4096, // default size
options.cappedMaxDocs ? options.cappedMaxDocs : -1)
: stdx::make_unique<RocksRecordStore>(ns, ident, _db.get(), _counterManager.get(),
_durabilityManager.get(), _compactionScheduler.get(),
prefix);
{
stdx::lock_guard<stdx::mutex> lk(_identObjectMapMutex);
_identCollectionMap[ident] = recordStore.get();
}
return std::move(recordStore);
}
存储Collection的元信息的总结:
- 每一个
collection对应会存在一个prefix,后面会解释有什么用 - 元信息的key的格式为:
kMetadataPrefix+ident, value:存储上面的prefix - 将
prefixencode之后也存储在rocksdb,目前还不是很清楚是做什么
2. collection中的数据如何存储
主要是看RocksRecordStore,因为它操作了collection的crud的操作;这里面我们将看到真实mongodb的存储结构,尤其是recordId的这个生成,为什么比较重要,原因在于不管是mongo的_id还是其他的索引,理论上来说最终都会转化成recordId,逻辑上接近在物理上可能相差很远,这是之前我遇到的一个问题;
rocksdb::DB* _db; // rocksdb
std::string _prefix; // 每一个collectuin都存在一个prefix,uniq
const bool _isCapped; //是否是容量限制的collection
const int64_t _cappedMaxSize; //max size
const int64_t _cappedMaxSizeSlack; // when to start applying backpressure
const int64_t _cappedMaxDocs; // max doc num
CappedCallback* _cappedCallback; // callback
int _cappedDeleteCheckCount; // 有多少的数据要被删
const bool _isOplog; //是否是oplog
// invariant: there is no live records earlier than _cappedOldestKeyHint. There might be
// some records that are dead after _cappedOldestKeyHint.
// SeekToFirst() on an capped collection is an expensive operation because bunch of keys at
// the start are deleted. To reduce the overhead, we remember the next key to delete and
// seek directly to it. This will not work correctly if somebody inserted a key before this
// _cappedOldestKeyHint. However, we prevent this from happening by using
// _cappedVisibilityManager and checking isCappedHidden() during deletions
RecordId _cappedOldestKeyHint; //capped中最早的一条数据的recordId
std::string _ident; //collection's ident
AtomicUInt64 _nextIdNum; //recordId的生成依赖
std::atomic<long long> _dataSize; //data size
std::atomic<long long> _numRecords; //num record
const std::string _dataSizeKey; // 存储datasize的rocksdb的key
const std::string _numRecordsKey; // 存储record num的record的key
// 如何生成recordId,是一个原子变量的递增
RecordId RocksRecordStore::_nextId() {
invariant(!_isOplog);
return RecordId(_nextIdNum.fetchAndAdd(1));
}
// 查看如何初始化的_nextIdNum,主要思考如何持久化的; 下面这段代码来源于RocksRecordStore在初始化的时候使用,本质上是通过查找表中最后一条记录获得对应的Id,然后+1; TODO:是否会存在已经被使用过的recordId再次被重用呢?理论上应该是会的,但是recordId本质上不对外,所以即使重用了,也是说明之前的数据是被删除的,理论上问题不大的;
bool emptyCollection = !iter->Valid();
if (!emptyCollection) {
// if it's not empty, find next RecordId
iter->SeekToLast();
dassert(iter->Valid());
rocksdb::Slice lastSlice = iter->key();
RecordId lastId = _makeRecordId(lastSlice);
if (_isOplog || _isCapped) {
_cappedVisibilityManager->updateHighestSeen(lastId);
}
_nextIdNum.store(lastId.repr() + 1);
} else {
// Need to start at 1 so we are always higher than RecordId::min()
_nextIdNum.store(1);
}
// 如何写入一条数据,这边主要关心key的构建, 这是在insertRecord中最终往rocksdb中使用的;
ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len));
//一个key的构成: encord(prefix) + encord(recordId), 其实理论上在rocksdb中,所有表的数据是聚集在一起的;
std::string RocksRecordStore::_makePrefixedKey(const std::string& prefix, const RecordId& loc) {
int64_t storage;
auto encodedLoc = _makeKey(loc, &storage);
std::string key(prefix);
key.append(encodedLoc.data(), encodedLoc.size());
return key;
}
//recordId统一编码
rocksdb::Slice RocksRecordStore::_makeKey(const RecordId& loc, int64_t* storage) {
*storage = endian::nativeToBig(loc.repr());
return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
}
//查找一个记录,比较简单的,就是构建key,直接在rocksdb中去找即可
RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, const std::string& prefix,
OperationContext* txn, const RecordId& loc) {
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
std::string valueStorage;
auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage);
if (status.IsNotFound()) {
return RecordData(nullptr, 0);
}
invariantRocksOK(status);
SharedBuffer data = SharedBuffer::allocate(valueStorage.size());
memcpy(data.get(), valueStorage.data(), valueStorage.size());
return RecordData(data, valueStorage.size());
}
//update + delete和正常的rocksdb操作都类似;
总结存储数据的:
data key的组成:encode(prefix) + encode(recordId), prefix是每一个collection的uniq id, Len:32byteprefix: 每次mongo重启的时候会去当前数据库找到最大的前缀,后面的新生成的prefix就递增;data size的前缀:std::string("\0\0\0\0", 4) + "datasize-" + id.toString(),id = collection的identrecord num的前缀:std::string("\0\0\0\0", 4) + "numrecords-" + id.toString(), id = collection的identprefix: 每一个mongodb的meta信息的一个前缀,为了区别各个元数据ident:每一个mongodb的meta信息的名字,比如collection、索引之类的,名字上还是有点区别