Skip to content

Commit

Permalink
[feature](connector) support datasource v2 pushdown (#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil authored Jan 3, 2025
1 parent b81bf16 commit c0b1bc2
Show file tree
Hide file tree
Showing 24 changed files with 704 additions and 54 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/run-itcase-12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ jobs:
run: |
cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86"
- name: Run ITCases for spark 3
- name: Run ITCases for spark 3.1
run: |
cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86"
- name: Run ITCases for spark 3.3
run: |
cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86"
6 changes: 5 additions & 1 deletion .github/workflows/run-itcase-20.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ jobs:
run: |
cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3"
- name: Run ITCases for spark 3
- name: Run ITCases for spark 3.1
run: |
cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3"
- name: Run ITCases for spark 3.3
run: |
cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3"
2 changes: 1 addition & 1 deletion spark-doris-connector/spark-doris-connector-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<dependencies>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-spark-3.1</artifactId>
<artifactId>spark-doris-connector-spark-${spark.major.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,31 @@ class DorisReaderITCase extends DorisTestBase {
} else false
}

@Test
@throws[Exception]
def testSQLSourceWithCondition(): Unit = {
initializeTable(TABLE_READ_TBL)
val session = SparkSession.builder().master("local[*]").getOrCreate()
session.sql(
s"""
|CREATE TEMPORARY VIEW test_source
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
| "fenodes"="${DorisTestBase.getFenodes}",
| "user"="${DorisTestBase.USERNAME}",
| "password"="${DorisTestBase.PASSWORD}"
|)
|""".stripMargin)

val result = session.sql(
"""
|select name,age from test_source where age = 18
|""".stripMargin).collect().toList.toString()
session.stop()

assert("List([doris,18])".equals(result))
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.read

import org.apache.doris.spark.client.entity.{Backend, DorisReaderPartition}
import org.apache.doris.spark.client.read.ReaderPartitionGenerator
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.types.StructType

import scala.language.implicitConversions

abstract class AbstractDorisScan(config: DorisConfig, schema: StructType) extends Scan with Batch with Logging {

private val scanMode = ScanMode.valueOf(config.getValue(DorisOptions.READ_MODE).toUpperCase)

override def readSchema(): StructType = schema

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = {
ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters()).map(toInputPartition)
}


override def createReaderFactory(): PartitionReaderFactory = {
new DorisPartitionReaderFactory(readSchema(), scanMode, config)
}

private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition =
DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, rp.getFilters)

protected def compiledFilters(): Array[String]

}

case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String]) extends InputPartition
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,17 @@

package org.apache.doris.spark.read

import org.apache.doris.spark.client.entity.{Backend, DorisReaderPartition}
import org.apache.doris.spark.client.read.ReaderPartitionGenerator
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.util.DorisDialects
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

import scala.language.implicitConversions

class DorisScan(config: DorisConfig, schema: StructType, filters: Array[Filter]) extends Scan with Batch with Logging {

private val scanMode = ScanMode.valueOf(config.getValue(DorisOptions.READ_MODE).toUpperCase)

override def readSchema(): StructType = schema

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = {
class DorisScan(config: DorisConfig, schema: StructType, filters: Array[Filter]) extends AbstractDorisScan(config, schema) with Logging {
override protected def compiledFilters(): Array[String] = {
val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
val compiledFilters = filters.map(DorisDialects.compileFilter(_, inValueLengthLimit)).filter(_.isDefined).map(_.get)
ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters).map(toInputPartition)
}


override def createReaderFactory(): PartitionReaderFactory = {
new DorisPartitionReaderFactory(readSchema(), scanMode, config)
filters.map(DorisDialects.compileFilter(_, inValueLengthLimit)).filter(_.isDefined).map(_.get)
}

private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition =
DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, rp.getFilters)

}

case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String]) extends InputPartition
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,9 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, schema: StructType) extends ScanBuilder
with SupportsPushDownFilters
with SupportsPushDownRequiredColumns {

private var readSchema: StructType = schema

private var pushDownPredicates: Array[Filter] = Array[Filter]()

private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)

override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates)

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined)
this.pushDownPredicates = pushed
unsupported
}

override def pushedFilters(): Array[Filter] = pushDownPredicates
protected var readSchema: StructType = schema

override def pruneColumns(requiredSchema: StructType): Unit = {
readSchema = StructType(requiredSchema.fields.filter(schema.contains(_)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,26 @@

package org.apache.doris.spark.read

import org.apache.doris.spark.config.DorisConfig
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.util.DorisDialects
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {}
class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) with SupportsPushDownFilters {

private var pushDownPredicates: Array[Filter] = Array[Filter]()

private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)

override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates)

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined)
this.pushDownPredicates = pushed
unsupported
}

override def pushedFilters(): Array[Filter] = pushDownPredicates

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,27 @@

package org.apache.doris.spark.read

import org.apache.doris.spark.config.DorisConfig
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.util.DorisDialects
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {}
class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema)
with SupportsPushDownFilters {

private var pushDownPredicates: Array[Filter] = Array[Filter]()

private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)

override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates)

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined)
this.pushDownPredicates = pushed
unsupported
}

override def pushedFilters(): Array[Filter] = pushDownPredicates

}
5 changes: 5 additions & 0 deletions spark-doris-connector/spark-doris-connector-spark-3.3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.major.version}</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,29 @@

package org.apache.doris.spark.read

import org.apache.doris.spark.config.DorisConfig
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.read.expression.V2ExpressionBuilder
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownV2Filters}
import org.apache.spark.sql.types.StructType

class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {}
class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema)
with SupportsPushDownV2Filters {

private var pushDownPredicates: Array[Predicate] = Array[Predicate]()

private val expressionBuilder = new V2ExpressionBuilder(config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT))

override def build(): Scan = new DorisScanV2(config, schema, pushDownPredicates)

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val (pushed, unsupported) = predicates.partition(predicate => {
Option(expressionBuilder.build(predicate)).isDefined
})
this.pushDownPredicates = pushed
unsupported
}

override def pushedPredicates(): Array[Predicate] = pushDownPredicates

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.read

import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.read.expression.V2ExpressionBuilder
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.types.StructType

class DorisScanV2(config: DorisConfig, schema: StructType, filters: Array[Predicate]) extends AbstractDorisScan(config, schema) with Logging {
override protected def compiledFilters(): Array[String] = {
val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit)
filters.map(e => Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get)
}
}
Loading

0 comments on commit c0b1bc2

Please sign in to comment.