From f9c02ac512ae7d663f7044dd317bed8b95412f11 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 21 Aug 2024 16:58:31 +0200 Subject: [PATCH] Strict table expectation on export API --- .../spotify/scio/bigquery/BigQueryTypes.scala | 16 +++++++ .../bigquery/syntax/ScioContextSyntax.scala | 45 ++++--------------- 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala index 919768598a..4c49a08585 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala @@ -130,6 +130,22 @@ case class Table private (ref: GTableReference, filter: Option[Table.Filter]) ex } object Table { + + /** + * @param selectedFields + * names of the fields in the table that should be read. If empty, all fields will be read. If + * the specified field is a nested field, all the sub-fields in the field will be selected. + * Fields will always appear in the generated class in the same order as they appear in the + * table, regardless of the order specified in selectedFields. + * @param rowRestriction + * SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support + * combinations of predicates that are a comparison between a column and a constant value in SQL + * statement. Aggregates are not supported. For example: + * + * {{{ + * "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')" + * }}} + */ final case class Filter( selectedFields: List[String], rowRestriction: Option[String] diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala index 0626cc542d..c0162c773b 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/ScioContextSyntax.scala @@ -30,15 +30,9 @@ import org.apache.beam.sdk.io.Compression import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method import org.apache.beam.sdk.io.fs.EmptyMatchTreatment import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler} -import org.slf4j.{Logger, LoggerFactory} - -object ScioContextOps { - @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) -} /** Enhanced version of [[ScioContext]] with BigQuery methods. */ final class ScioContextOps(private val self: ScioContext) extends AnyVal { - import ScioContextOps._ /** * Get an SCollection for a BigQuery SELECT query. Both @@ -85,13 +79,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] = BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[TableRow] = { - if (table.filter.nonEmpty) { - logger.warn( - "Using filtered table with standard API. " + - "selectedFields and rowRestriction are ignored. " + - "Use bigQueryStorage instead" - ) - } + require( + table.filter.isEmpty, + "Cannot use filtered table with standard API. Use bigQueryStorage instead" + ) val params = BigQueryIO.TableReadParam( BigQueryIO.Format.Default(), Method.DEFAULT, @@ -107,13 +98,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { configOverride: BigQueryIO.ReadParam.ConfigOverride[T] = BigQueryIO.ReadParam.DefaultConfigOverride ): SCollection[T] = { - if (table.filter.nonEmpty) { - logger.warn( - "Using filtered table with standard API. " + - "selectedFields and rowRestriction are ignored. " + - "Use bigQueryStorage instead" - ) - } + require( + table.filter.isEmpty, + "Cannot use filtered table with standard API. Use bigQueryStorage instead" + ) val params = BigQueryIO.TableReadParam( format, Method.DEFAULT, @@ -123,23 +111,6 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal { self.read(BigQueryIO[T](table))(params) } - /** - * Get an SCollection for a BigQuery table using the storage API. - * - * @param selectedFields - * names of the fields in the table that should be read. If empty, all fields will be read. If - * the specified field is a nested field, all the sub-fields in the field will be selected. - * Fields will always appear in the generated class in the same order as they appear in the - * table, regardless of the order specified in selectedFields. - * @param rowRestriction - * SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support - * combinations of predicates that are a comparison between a column and a constant value in SQL - * statement. Aggregates are not supported. For example: - * - * {{{ - * "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')" - * }}} - */ def bigQueryStorage( table: Table, errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler,