diff --git a/source-code-reading/2_query.md b/source-code-reading/2_query.md index a95fb7f..4f6f7ab 100644 --- a/source-code-reading/2_query.md +++ b/source-code-reading/2_query.md @@ -31,6 +31,8 @@ * [NewPipeline](#newpipeline) * [NewPipe](#newpipe) * [ProcessorPtr/Processor](#processorptrprocessor) + * [Transformer](#transformer) + * [AggregatorTransform](#aggregatortransform) * [InputPort/OutputPort/SharedStatus](#inputportoutputportsharedstatus) * [UpdateTrigger/UpdateList/UpdateListMutable/DirectedEdge](#updatetriggerupdatelistupdatelistmutabledirectededge) * [5 执行Pipeline](#5-执行pipeline) @@ -815,6 +817,8 @@ pub struct PipelineBuilder { ### 4 Pipeline视图 +![image.png](./assets/1661263886543-image.png) + ##### NewPipeline ```rust @@ -845,6 +849,10 @@ pub enum NewPipe { ##### ProcessorPtr/Processor +Processor是处理器的抽象,实现此trait的struct包含: + +![image.png](./assets/1661265911577-image.png) + ```rust // file: query/src/pipelines/new/processors/processor.rs pub struct ProcessorPtr { @@ -871,11 +879,63 @@ pub trait Processor: Send { } ``` +###### Transformer + +封装Transform、InputPort和OutputPort。 + +```rust +// file: query/src/pipelines/new/processors/transforms/transform.rs +pub struct Transformer { + transform: T, + input: Arc, + output: Arc, + + input_data: Option, + output_data: Option, +} + +pub trait Transform: Send { + const NAME: &'static str; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + fn transform(&mut self, data: DataBlock) -> Result; +} +``` + +实现Transform等struct包含: + +![image.png](./assets/1661265322060-image.png) + +###### AggregatorTransform + +聚合算子,封装Aggregator。 + +```rust +// query/src/pipelines/new/processors/transforms/transform_aggregator.rs +enum AggregatorTransform { + ConsumeData(ConsumeState), + Generate(GenerateState), + Finished, +} + +pub trait Aggregator: Sized + Send { + const NAME: &'static str; + + fn consume(&mut self, data: DataBlock) -> Result<()>; + fn generate(&mut self) -> Result>; +} +``` + +实现Aggregator的struct包含: + +![image.png](./assets/1661265830054-image.png) + + ##### InputPort/OutputPort/SharedStatus ```rust // file: query/src/pipelines/new/processors/port.rs -// 数据流: OutputPort -> SharedStatus -> InputPort +// 数据流: OutputPort -> SharedStatus -> InputPort,其中SharedStatus是InputPort和OutputPort共享的 pub struct InputPort { shared: UnSafeCellWrap>, update_trigger: UnSafeCellWrap<*mut UpdateTrigger>, @@ -895,6 +955,8 @@ pub struct SharedData(pub Result); ##### UpdateTrigger/UpdateList/UpdateListMutable/DirectedEdge +触发器,用于更新InputPort和OutputPort中的数据。 + ```rust // file: query/src/pipelines/new/processors/port_trigger.rs pub struct UpdateTrigger { diff --git a/source-code-reading/assets/1661263886543-image.png b/source-code-reading/assets/1661263886543-image.png new file mode 100644 index 0000000..7dc1996 Binary files /dev/null and b/source-code-reading/assets/1661263886543-image.png differ diff --git a/source-code-reading/assets/1661265316769-image.png b/source-code-reading/assets/1661265316769-image.png new file mode 100644 index 0000000..7cf06ca Binary files /dev/null and b/source-code-reading/assets/1661265316769-image.png differ diff --git a/source-code-reading/assets/1661265322060-image.png b/source-code-reading/assets/1661265322060-image.png new file mode 100644 index 0000000..7cf06ca Binary files /dev/null and b/source-code-reading/assets/1661265322060-image.png differ diff --git a/source-code-reading/assets/1661265830054-image.png b/source-code-reading/assets/1661265830054-image.png new file mode 100644 index 0000000..be78308 Binary files /dev/null and b/source-code-reading/assets/1661265830054-image.png differ diff --git a/source-code-reading/assets/1661265911577-image.png b/source-code-reading/assets/1661265911577-image.png new file mode 100644 index 0000000..30c1ef8 Binary files /dev/null and b/source-code-reading/assets/1661265911577-image.png differ