InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858
这章记录一下Run命令的执行过程。
//根据用户在命令行配置的num_threads参数 //来选择创建一个多线程的模型,还是current_thread的模型 //后面有时间深入研究tokio的时候再来分析有什么异同 let tokio_runtime = get_runtime(config.num_threads)?; //block_on会让线程一直等待方法里的future执行完成 //这是让闭包中的方法占有了io driver 和 timer context tokio_runtime.block_on(async move { let host = config.host; match config.command { // 省略其它command ... Command::Run(config) => { //具体去子类型里执行,然后await一个结果 if let Err(e) = commands::run::command(logging_level, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } } } });
在influxdb_ioxd::main
方法中,忽略一些不太需要重点关注的,分别是初始化log
的管理、PanicsTracing
、CancellationToken
等。
//初始化对象存储 let object_store = ObjectStore::try_from(&config)?; //可以看到,目前已经支持了 //1.内存(在container环境运行时候使用) //2.Google //3.S3 //4.Azure //5.File 本地文件,方便开发者调试运行在云上时候的文件变化 fn try_from(config: &Config) -> Result<Self, Self::Error> { match config.object_store { Some(ObjStoreOpt::Memory) | None => { //创建一个btreemap用来缓存或者搜索 Ok(Self::new_in_memory(object_store::memory::InMemory::new())) } Some(ObjStoreOpt::Google) => { // 省略 } Some(ObjStoreOpt::S3) => { // 省略 } Some(ObjStoreOpt::Azure) => { // 省略 } Some(ObjStoreOpt::File) => match config.database_directory.as_ref() { Some(db_dir) => { //去递归创建这个配置路径中的文件夹 //context也是使用的snafu来处理错误的 fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectory { path: db_dir })?; //都创建完成,并且没出错误,把路径保存起来 Ok(Self::new_file(object_store::disk::File::new(&db_dir))) } // 如果database_directory这个参数没有配置的时候 //使用snafu这个crate来返回一个错误 None => MissingObjectStoreConfig { object_store: ObjStoreOpt::File, missing: "data-dir", } .fail(), }, } }
关于错误处理的代码:
#[snafu(display("Unable to create database directory {:?}: {}", path, source))] CreatingDatabaseDirectory { path: PathBuf, source: std::io::Error, }, #[snafu(display( "Specified {} for the object store, required configuration missing for {}", object_store, missing ))] MissingObjectStoreConfig { object_store: ObjStoreOpt, missing: String, },
我们来测试一下错误的场景,来看看是否符合代码的预期。
// 不传入路径 cargo run run --object-store file Finished dev [unoptimized + debuginfo] target(s) in 0.42s Running `./influxdb_iox run --object-store file` Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Specified File for the object store, required configuration missing for data-dir //传入一个创建不了的路径 cargo run run --object-store file --data-dir /root/1/1 Finished dev [unoptimized + debuginfo] target(s) in 0.47s Running `./influxdb_iox run --object-store file --data-dir /root/1/1` Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)
可以看到是符合预期的,bingo
//创建一个空的结构体 let connection_manager = ConnectionManager {}; //创建AppServer结构体用来保存基本的信息 //server_config里就是保存的对象存储的信息及线程配置 //如果num_worker_threads没有填写,默认就使用cpu数量 let app_server = Arc::new(AppServer::new(connection_manager, server_config)); //不设置这个writer_id能启动,但是不能做任何操作 if let Some(id) = config.writer_id { //compare and set 一个非0的数值,错误就打印一个指定的panic app_server.set_id(id).expect("writer id already set"); //校验所有的配置 if let Err(e) = app_server.load_database_configs().await { error!( "unable to load database configurations from object storage: {}", e ) } } else { warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); }
接下来进入load_database_configs
方法看看,
let list_result = self .store //把write_id和配置的文件路径组合一下,作为一个目录 //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹 //用Vec存下所有的文件信息,包括路径、修改时间、大小等 .list_with_delimiter(&self.root_path()?) .await .context(StoreError)?; //拿到配置的server的write_id let server_id = self.require_id()?; let handles: Vec<_> = list_result //配置的文件夹下的所有文件夹 .common_prefixes .into_iter() //全部进行map转换 .map(|mut path| { let store = Arc::clone(&self.store); let config = Arc::clone(&self.config); let exec = Arc::clone(&self.exec); //先找database的相关信息文件,名字叫rules.pb path.set_file_name(DB_RULES_FILE_NAME); //感觉是需要io来读取文件内容,所以开一个异步 tokio::task::spawn(async move { let mut res = get_store_bytes(&path, &store).await; //省略错误处理。。 let res = res.unwrap().freeze(); //解析文件内容,根据文件名可以看出是个pb文件。 match DatabaseRules::decode(res) { Err(e) => { //省略错误。。 } //根据解析出来的文件内容,在内存中恢复回来db的相关信息 Ok(rules) => match config.create_db(rules) { Err(e) => error!("error adding database to config: {}", e), //提交一个后台任务,用来不断的检测chunks的状态 //比如达到了某个大小,然后写入到存储等 Ok(handle) => handle.commit(server_id, store, exec), }, } }) }) .collect(); //等待所有任务完成 futures::future::join_all(handles).await;
这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。
接下来就是启动连接相关的了。
//从启动命令行中读取grpc的地址 let grpc_bind_addr = config.grpc_bind_address; //绑定这个地址 let socket = tokio::net::TcpListener::bind(grpc_bind_addr) .await .context(StartListeningGrpc { grpc_bind_addr })?; //真正的协议启动 let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //同样的启动http相关的服务,使用的hyper库 let bind_addr = config.http_bind_address; let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?; let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //省略后面的停止流程。。。
然后看grpc的启动的服务
//启动起来健康检查的服务 let stream = TcpListenerStream::new(socket); let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); //标识相对应的服务已经是可以提供服务的状态了 let services = [ generated_types::STORAGE_SERVICE, generated_types::IOX_TESTING_SERVICE, generated_types::ARROW_SERVICE, ]; for service in &services { health_reporter .set_service_status(service, tonic_health::ServingStatus::Serving) .await; } //增加一堆使用grpc的服务,并启动起来 tonic::transport::Server::builder() .add_service(health_service) .add_service(testing::make_server()) .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(Arc::clone(&server))) .add_service(write::make_server(Arc::clone(&server))) .add_service(management::make_server(Arc::clone(&server))) .add_service(operations::make_server(server)) .serve_with_incoming_shutdown(stream, shutdown.cancelled()) .await
然后是http相关的启动
pub async fn serve<M>( addr: AddrIncoming, server: Arc<AppServer<M>>, shutdown: CancellationToken, ) -> Result<(), hyper::Error> where M: ConnectionManager + Send + Sync + Debug + 'static, { //初始化路由相关的信息 let router = router(server); let service = RouterService::new(router).unwrap(); //启动服务 hyper::Server::builder(addr) .serve(service) .with_graceful_shutdown(shutdown.cancelled()) .await }
顺便看一下都提供了哪些地址可以被访问的:
Router::builder() .data(server) //写了一个拦截,打印请求参数和返回结果 .middleware(Middleware::pre(|req| async move { debug!(request = ?req, "Processing request"); Ok(req) })) .middleware(Middleware::post(|res| async move { debug!(response = ?res, "Successfully processed request"); Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::<M>) .get("/health", health) .get("/metrics", handle_metrics) .get("/iox/api/v1/databases/:name/query", query::<M>) .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>) .get("/api/v1/partitions", list_partitions::<M>) .post("/api/v1/snapshot", snapshot_partition::<M>) //错误的时候调用的处理拦截 .err_handler_with_info(error_handler) .build() .unwrap()
做一个/health
的测试:
curl localhost:8080/health OK%
可以看到成功返回了值。
到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如Panics
,Log
等等吧,欢迎持续关注。
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001
关键词:站长资讯中心,站长资讯,站长新闻