Rocksdb 离线生成sst文件并在线加载

rocksdb简介

RocksDB是Facebook的一个实验项目,目的是希望能开发一套能在服务器压力下,真正发挥高速存储硬件(特别是Flash存储)性能的高效数据库系统。这是一个C++库,允许存储任意长度二进制kv数据。支持原子读写操作。

RocksDB依靠大量灵活的配置,使之能针对不同的生产环境进行调优,包括直接使用内存,使用Flash,使用硬盘或者HDFS。支持使用不同的压缩算法,并且有一套完整的工具供生产和调试使用。

离线生成sst的意义

我们有亿级别的kv数据, 原来是存储在mongodb中,存储满了后,扩容较难,并且每天增量的大数据量写入会影响现网性能,我们考虑每天增量的数据可以离线写好生成一个数据文件,线上的kv系统能直接load这个文件。

可以根据预估的数据量,提前计算好需要多少shard,大数据平台可以根据id将数据hash分片,离线生成好分片数据,查询时,根据查询id,计算在那个分片,然后路由服务路由到对应分片去查询。

这样的好处:

  • 数据文件可以有版本,在多套环境时,只要加载的数据文件一致,数据就一致
  • 扩容方便,当服务器资源不够时,直接增加服务器,加载新的分片并将新启动的服务注册到配置中心即可
  • 数据写入都是离线写入好的,不会影响线上的读取

当然,对于需要实时写入的数据,会稍微麻烦点,我们可以考虑Plain+rt方案,提供一个在线实时写入的小库,这样查询时两个一起查询即可,小库可以定期刷入大库。

rocksdb 可以离线生成好sst文件,将sst文件拷贝到现网,导入SST文件即可,并且新的sst里会覆盖老的同key数据,正好符合我们的需求。

java 生成sst文件

需要先引入maven依赖

<dependency>  
    <groupId>org.rocksdb</groupId>  
    <artifactId>rocksdbjni</artifactId>  
    <version>8.8.1</version>  
</dependency>

然后通过SstFileWriter 创建sst文件写入即可,需要注意的是,写入时,Keys must be added in strict ascending order. key需要严格遵循字节序。

我们写一个程序从mongodb读取数据,并写入sst。可以先读取出id,然后按字符串排序。

// SST 写入,需要按照id排序 Keys must be added in strict ascending order.
Query query = new Query();  
query.fields().include("_id");  
CloseableIterator<Document> dataList = mongoTemplate.stream(query, Document.class, collectionName);

List<String> docIds = new ArrayList<>();  
log.info("start load ids");  
while (dataList.hasNext()) {  
    Document doc = dataList.next();  
    docIds.add(doc.get("_id").toString());  
}  
log.info("load ids finish, start sort id");  
docIds = docIds.stream().sorted().collect(Collectors.toList());

然后根据id从mongo读取数据,写入sst



Path sstFile = Paths.get(outputFile);  
EnvOptions env = new EnvOptions();  
Options options = new Options();  
options.setCompressionType(CompressionType.ZSTD_COMPRESSION);  
//        options.setComparator(new MyComparator(new ComparatorOptions()));  
SstFileWriter sst = new SstFileWriter(env, options);  
sst.open(sstFile.toString());  
int count = 0;  
int batchSize = 100;  
int batchCount = docIds.size() / batchSize + 1;  
log.info("batch count is {}", batchCount);  
for (int i = 0; i < batchCount; i++) {  
	if (i * batchCount >= docIds.size()) {  
		break;  
	}  
	List<Long> batchIds = docIds.subList(i * batchCount, Math.min(docIds.size() - 1, (i + 1) * batchCount))  
			.stream().map(Long::parseLong).collect(Collectors.toList());  
	List<Document> batchDocs = mongoTemplate.find(Query.query(Criteria.where("_id").in(batchIds)),  
			Document.class, collectionName);  
	Map<String, Document> docId2Doc = batchDocs.stream().collect(Collectors.toMap(doc -> doc.get("_id").toString(), doc -> doc));  

	for (Long id : batchIds) {  
		String docId = id.toString();  
		Document doc = docId2Doc.get(docId);  
		byte[] value = doc.toJson().getBytes(StandardCharsets.UTF_8);  
		sst.put(docId.getBytes(StandardCharsets.UTF_8), value);  
	}  

	count += batchIds.size();  
	if (count % 10000 == 0) {  
		log.info("already load {} items", count);  
	}  
}  
// 注意一定要finish
sst.finish();  
sst.close();

PS : 上面我们遵循了字节序,如果不想遵循,可以自定义Comparator

class MyComparator extends AbstractComparator {  
  
    protected MyComparator(ComparatorOptions comparatorOptions) {  
        super(comparatorOptions);  
    }  
  
    @Override  
    public String name() {  
        return "MyComparator";  
    }  
  
    @Override  
    public int compare(ByteBuffer byteBuffer, ByteBuffer byteBuffer1) {  
        // always true  
        return 1;  
    }  
}


Options options = new Options();  
options.setCompressionType(CompressionType.ZSTD_COMPRESSION);  
// 使用自定义
options.setComparator(new MyComparator(new ComparatorOptions()));

java 加载sst

可以通过ingestExternalFile 加载sst文件

private static void read(String[] args) {  
  
    String dbFile = args[1];  
    String sstFile = args[2];  
  
  
    // a static method that loads the RocksDB C++ library.  
    RocksDB.loadLibrary();  
  
    // the Options class contains a set of configurable DB options  
    // that determines the behaviour of the database.    try (final Options options = new Options().setCreateIfMissing(true)) {  
  
        options.setCompressionType(CompressionType.ZSTD_COMPRESSION);  
  
        // a factory method that returns a RocksDB instance  
        try (final RocksDB db = RocksDB.open(options, dbFile)) {  
            db.ingestExternalFile(Collections.singletonList(sstFile),new IngestExternalFileOptions());  
  
            while(true) {  
                System.out.print("请输入查询key:");  
                String key =  System.console().readLine();  
                byte[] result = db.get(key.getBytes(StandardCharsets.UTF_8));  
                if (result != null) {  
                    System.out.println(new String(result));  
                }  
            }  
        }  
  
  
    } catch (RocksDBException e) {  
        // do some error handling  
        e.printStackTrace();  
    }  
}

golang 加载java生成好的sst

我们已经有一个golang开发的分布式框架,因此可以java在大数据平台生成好sst文件,传输到现网供go服务load。

golang使用rocksdb,可以使用 "github.com/linxGnu/grocksdb",需要先编译相关依赖,可以用仓库中的makefile,make安装rocksdb等依赖。

然后编译读取程序:

package main

import (
	"fmt"
	"os"
	"time"
	
	"github.com/linxGnu/grocksdb"
)

func main() {

	arg := os.Args
	if len(arg) < 3 {
		fmt.Println("Please provide a directory to store the database")
		return
	}

	dir := arg[1]
	fmt.Printf("db dir is %s \n", dir)
	sstFile := arg[2]
	fmt.Printf("sst file is %s \n", sstFile)
	opts := grocksdb.NewDefaultOptions()
	// test the ratelimiter
	rateLimiter := grocksdb.NewRateLimiter(1024, 100*1000, 10)
	opts.SetRateLimiter(rateLimiter)
	opts.SetCreateIfMissing(true)
	// 默认是Snappy。我们相信LZ4总是比Snappy好的。我们之所以把Snappy作为默认的压缩方法,是为了与之前的用户保持兼容。LZ4/Snappy是轻量压缩,所以在CPU使用率和存储空间之间能取得一个较好的平衡
	// 如果你有大量空闲CPU并且希望同时减少空间和写放大,把options.compression设置为重量级的压缩方法。我们推荐ZSTD,如果没有就用Zlib
	opts.SetCompression(grocksdb.ZSTDCompression)
	db, _ := grocksdb.OpenDb(opts, dir)

	// db.EnableManualCompaction()
	// db.DisableManualCompaction()

	defer db.Close()

	// 合并数据
	ingestOpts := grocksdb.NewDefaultIngestExternalFileOptions()

	err := db.IngestExternalFile([]string{sstFile}, ingestOpts)
	if err != nil {
		fmt.Println("Error ingesting external file:", err)
		return
	}

	fmt.Println("finish IngestExternalFile", time.Now())
	readOpts := grocksdb.NewDefaultReadOptions()

	for {
		var input string
		fmt.Println("请输入一个查询key:")
		fmt.Scanln(&input)
		if input == "exit" {
			return
		}
		v1, err := db.Get(readOpts, []byte(input))
		if err != nil {
			fmt.Println("read key error", err)
			continue
		}
		fmt.Println("value = ", string(v1.Data()))

	}
}

编译时,指定下rocksdb库的位置。

CGO_CFLAGS="-I/data/grocksdb-1.8.10/dist/linux_amd64/include" CGO_LDFLAGS="-L/data/grocksdb-1.8.10/dist/linux_amd64/lib -lrocksdb -lstdc++ -lm -lz -lsnappy -llz4 -lzstd"   go build -o load_sst_demo ./load_sst_demo.go

原文地址:https://www.cnblogs.com/xiaoqi/p/17949153/rocksdb-sst

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 连接 连接池产生原因 连接池实现原理 小结 TEMPERANCE:Eat not to dullness;drink not to elevation.节制
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 一个优秀的工程师和一个普通的工程师的区别,不是满天飞的架构图,他的功底体现在所写的每一行代码上。-- 毕玄 1. 命名风格 【书摘】类名用 UpperCamelC
今天犯了个错:“接口变动,伤筋动骨,除非你确定只有你一个人在用”。哪怕只是throw了一个新的Exception。哈哈,这是我犯的错误。一、接口和抽象类类,即一个对象。先抽象类,就是抽象出类的基础部分,即抽象基类(抽象类)。官方定义让人费解,但是记忆方法是也不错的 —包含抽象方法的类叫做抽象类。接口
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket一、引子文件,作为常见的数据源。关于操作文件的字节流就是 —FileInputStream&amp;FileOutputStream。
作者:泥沙砖瓦浆木匠网站:http://blog.csdn.net/jeffli1993个人签名:打算起手不凡写出鸿篇巨作的人,往往坚持不了完成第一章节。交流QQ群:【编程之美 365234583】http://qm.qq.com/cgi-bin/qm/qr?k=FhFAoaWwjP29_Aonqz
本文目录 线程与多线程 线程的运行与创建 线程的状态 1 线程与多线程 线程是什么? 线程(Thread)是一个对象(Object)。用来干什么?Java 线程(也称 JVM 线程)是 Java 进程内允许多个同时进行的任务。该进程内并发的任务成为线程(Thread),一个进程里至少一个线程。 Ja
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket在面向对象编程中,编程人员应该在意“资源”。比如?1String hello = &quot;hello&quot;; 在代码中,我们
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 这是泥瓦匠的第103篇原创 《程序兵法:Java String 源码的排序算法(一)》 文章工程:* JDK 1.8* 工程名:algorithm-core-le
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 一、父子类变量名相同会咋样? 有个小故事,今天群里面有个人问下面如图输出什么? 我回答:60。但这是错的,答案结果是 40 。我知错能改,然后说了下父子类变
作者:泥瓦匠 出处:https://www.bysocket.com/2021-10-26/mac-create-files-from-the-root-directory.html Mac 操作系统挺适合开发者进行写代码,最近碰到了一个问题,问题是如何在 macOS 根目录创建文件夹。不同的 ma
作者:李强强上一篇,泥瓦匠基础地讲了下Java I/O : Bit Operation 位运算。这一讲,泥瓦匠带你走进Java中的进制详解。一、引子在Java世界里,99%的工作都是处理这高层。那么二进制,字节码这些会在哪里用到呢?自问自答:在跨平台的时候,就凸显神功了。比如说文件读写,数据通信,还
1 线程中断 1.1 什么是线程中断? 线程中断是线程的标志位属性。而不是真正终止线程,和线程的状态无关。线程中断过程表示一个运行中的线程,通过其他线程调用了该线程的 方法,使得该线程中断标志位属性改变。 深入思考下,线程中断不是去中断了线程,恰恰是用来通知该线程应该被中断了。具体是一个标志位属性,
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want需求 项目在设计表的时候,要处理并发多的一些数据,类似订单号不能重复,要保持唯一。原本以为来个时间戳,精确到毫秒应该不错了。后来觉得是错了,测试环境下很多一
纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 加微信:bysocket01
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want.文章Points:1、介绍RESTful架构风格2、Spring配置CXF3、三层初设计,实现WebService接口层4、撰写HTTPClient 客户
Writer :BYSocket(泥沙砖瓦浆木匠)什么是回调?今天傻傻地截了张图问了下,然后被陈大牛回答道“就一个回调…”。此时千万个草泥马飞奔而过(逃哈哈,看着源码,享受着这种回调在代码上的作用,真是美哉。不妨总结总结。一、什么是回调回调,回调。要先有调用,才有调用者和被调用者之间的回调。所以在百
Writer :BYSocket(泥沙砖瓦浆木匠)一、什么大小端?大小端在计算机业界,Endian表示数据在存储器中的存放顺序。百度百科如下叙述之:大端模式,是指数据的高字节保存在内存的低地址中,而数据的低字节保存在内存的高地址中,这样的存储模式有点儿类似于把数据当作字符串顺序处理:地址由小向大增加
What is a programming language? Before introducing compilation and decompilation, let&#39;s briefly introduce the Programming Language. Programming la
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket泥瓦匠喜欢Java,文章总是扯扯Java。 I/O 基础,就是二进制,也就是Bit。一、Bit与二进制什么是Bit(位)呢?位是CPU
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocket一、前言 泥瓦匠最近被项目搞的天昏地暗。发现有些要给自己一些目标,关于技术的目标:专注很重要。专注Java 基础 + H5(学习) 其他操作系统,算法,数据结构当成课外书博览。有时候,就是那样你越是专注方面越