Skip to content

Commit

Permalink
update: query
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonnnli committed Aug 23, 2022
1 parent 38cb389 commit 9651b4c
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion source-code-reading/2_query.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* [NewPipeline](#newpipeline)
* [NewPipe](#newpipe)
* [ProcessorPtr/Processor](#processorptrprocessor)
* [Transformer](#transformer)
* [AggregatorTransform](#aggregatortransform)
* [InputPort/OutputPort/SharedStatus](#inputportoutputportsharedstatus)
* [UpdateTrigger/UpdateList/UpdateListMutable/DirectedEdge](#updatetriggerupdatelistupdatelistmutabledirectededge)
* [5 执行Pipeline](#5-执行pipeline)
Expand Down Expand Up @@ -815,6 +817,8 @@ pub struct PipelineBuilder {

### 4 Pipeline视图

![image.png](./assets/1661263886543-image.png)

##### NewPipeline

```rust
Expand Down Expand Up @@ -845,6 +849,10 @@ pub enum NewPipe {

##### ProcessorPtr/Processor

Processor是处理器的抽象,实现此trait的struct包含:

![image.png](./assets/1661265911577-image.png)

```rust
// file: query/src/pipelines/new/processors/processor.rs
pub struct ProcessorPtr {
Expand All @@ -871,11 +879,63 @@ pub trait Processor: Send {
}
```

###### Transformer

封装Transform、InputPort和OutputPort。

```rust
// file: query/src/pipelines/new/processors/transforms/transform.rs
pub struct Transformer<T: Transform + 'static> {
transform: T,
input: Arc<InputPort>,
output: Arc<OutputPort>,

input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
}

pub trait Transform: Send {
const NAME: &'static str;
const SKIP_EMPTY_DATA_BLOCK: bool = false;

fn transform(&mut self, data: DataBlock) -> Result<DataBlock>;
}
```

实现Transform等struct包含:

![image.png](./assets/1661265322060-image.png)

###### AggregatorTransform

聚合算子,封装Aggregator。

```rust
// query/src/pipelines/new/processors/transforms/transform_aggregator.rs
enum AggregatorTransform<TAggregator: Aggregator> {
ConsumeData(ConsumeState<TAggregator>),
Generate(GenerateState<TAggregator>),
Finished,
}

pub trait Aggregator: Sized + Send {
const NAME: &'static str;

fn consume(&mut self, data: DataBlock) -> Result<()>;
fn generate(&mut self) -> Result<Option<DataBlock>>;
}
```

实现Aggregator的struct包含:

![image.png](./assets/1661265830054-image.png)


##### InputPort/OutputPort/SharedStatus

```rust
// file: query/src/pipelines/new/processors/port.rs
// 数据流: OutputPort -> SharedStatus -> InputPort
// 数据流: OutputPort -> SharedStatus -> InputPort,其中SharedStatus是InputPort和OutputPort共享的
pub struct InputPort {
shared: UnSafeCellWrap<Arc<SharedStatus>>,
update_trigger: UnSafeCellWrap<*mut UpdateTrigger>,
Expand All @@ -895,6 +955,8 @@ pub struct SharedData(pub Result<DataBlock>);

##### UpdateTrigger/UpdateList/UpdateListMutable/DirectedEdge

触发器,用于更新InputPort和OutputPort中的数据。

```rust
// file: query/src/pipelines/new/processors/port_trigger.rs
pub struct UpdateTrigger {
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 9651b4c

Please sign in to comment.