InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见:
https://my.oschina.net/u/3374539/blog/5035628
这一章主要记录一下物理计划是怎样执行的。
在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。
对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。
对于一个过滤而言,会在物理计划中产生对应的信息,展示如下:
select * from myMeasurement where fieldKey like 'value1'; input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }
接下来看物理计划的执行代码:
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> { match plan.output_partitioning().partition_count() { 0 => Ok(vec![]), //单一块的时候直接取出数据 1 => { let it = plan.execute(0).await?; common::collect(it).await } //多个数据块的时候就需要进行合并数据 _ => { let plan = MergeExec::new(plan.clone()); assert_eq!(1, plan.output_partitioning().partition_count()); //这里分为了两步execute 和 collect common::collect(plan.execute(0).await?).await } } }
接下来看plan.execute方法:
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { 。。。省略 tokio::spawn(async move { //这里的input就代表了上面展示的filter的input或者是数据的input let mut stream = match input.execute(part_i).await { Err(e) => { let arrow_error = ArrowError::ExternalError(Box::new(e)); sender.send(Err(arrow_error)).await.ok(); return; } Ok(stream) => stream, }; //计划执行完成之后返回一个stream,这里就是一直next获取完 while let Some(item) = stream.next().await { sender.send(item).await.ok(); } }); 。。。省略 }
上面的input代表了以下这么多东西:
上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。
Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。
姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。
async fn execute( &self, partition: usize, ) -> datafusion::error::Result<SendableRecordBatchStream> { //因为在前面物理计划中得到了所有列,这里拿出列的名字 let fields = self.schema.fields(); let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>(); //多个分区的时候可以根据分区号拿出chunk信息 let ChunkInfo { chunk, chunk_table_schema, } = &self.chunk_and_infos[partition]; //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空 let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection = Selection::Some(&selection_cols); //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。 let stream = chunk .read_filter(&self.table_name, &self.predicate, selection) .map_err(|e| { DataFusionError::Execution(format!( "Error creating scan for table {} chunk {}: {}", self.table_name, chunk.id(), e )) })?; //这里使用SchemaAdapterStream的结构来填充空值列 let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema)) .map_err(|e| DataFusionError::Internal(e.to_string()))?; Ok(Box::pin(adapter)) }
这个SchemaAdapterStream在代码中给了一个特别形象的描述:
/// /// ┌────────────────┐ ┌─────────────────────────┐ /// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │ /// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │ /// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │ /// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │ /// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │ /// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │ /// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │ /// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │ /// │ │ │ │ /// │ Record Batch │ │ Record Batch │ /// └────────────────┘ └─────────────────────────┘ ///
接下来看如何实现数据查找的:
fn read_filter( &self, table_name: &str, predicate: &Predicate, selection: Selection<'_>, ) -> Result<SendableRecordBatchStream, Self::Error> { //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile match self { //还是在写入阶段的buffer,暂时不支持查询条件 Self::MutableBuffer { chunk, .. } => { if !predicate.is_empty() { return InternalPredicateNotSupported { predicate: predicate.clone(), } .fail(); } let batch = chunk .read_filter(table_name, selection) .context(MutableBufferChunk)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } //不可写阶段的buffer,对数据进行过滤 Self::ReadBuffer { chunk, .. } => { let rb_predicate = to_read_buffer_predicate(&predicate).context(PredicateConversion)?; //读取数据并过滤 let read_results = chunk .read_filter(table_name, rb_predicate, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //读取schema信息并过滤 let schema = chunk .read_filter_table_schema(table_name, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //ReadFilterResultsStream是对不同的chunk类型实现的读取接口 Ok(Box::pin(ReadFilterResultsStream::new( read_results, schema.into(), ))) } //Parquet同理 Self::ParquetFile { chunk, .. } => chunk .read_filter(table_name, predicate, selection) .context(ParquetFileChunkError { chunk_id: chunk.id(), }), } }
数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。
数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。
好了查询就先写到这里。
祝玩儿的开心!!
欢迎关注微信公众号:
或添加微信好友: liutaohua001
- 怎样有效祛眼袋的方法(三个去眼袋方法)
- 个人创业适合做什么(4个推荐供参考)
- 在家能做什么小生意(12个零门槛副业推荐)