Skip to content

Commit

Permalink
Strict table expectation on export API
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Aug 22, 2024
1 parent 513118f commit 2d66440
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ case class Table private (ref: GTableReference, filter: Option[Table.Filter]) ex
}

object Table {

/**
* @param selectedFields
* names of the fields in the table that should be read. If empty, all fields will be read. If
* the specified field is a nested field, all the sub-fields in the field will be selected.
* Fields will always appear in the generated class in the same order as they appear in the
* table, regardless of the order specified in selectedFields.
* @param rowRestriction
* SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support
* combinations of predicates that are a comparison between a column and a constant value in SQL
* statement. Aggregates are not supported. For example:
*
* {{{
* "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')"
* }}}
*/
final case class Filter(
selectedFields: List[String],
rowRestriction: Option[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,9 @@ import org.apache.beam.sdk.io.Compression
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment
import org.apache.beam.sdk.transforms.errorhandling.{BadRecord, ErrorHandler}
import org.slf4j.{Logger, LoggerFactory}

object ScioContextOps {
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
}

/** Enhanced version of [[ScioContext]] with BigQuery methods. */
final class ScioContextOps(private val self: ScioContext) extends AnyVal {
import ScioContextOps._

/**
* Get an SCollection for a BigQuery SELECT query. Both
Expand Down Expand Up @@ -85,13 +79,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
configOverride: BigQueryIO.ReadParam.ConfigOverride[TableRow] =
BigQueryIO.ReadParam.DefaultConfigOverride
): SCollection[TableRow] = {
if (table.filter.nonEmpty) {
logger.warn(
"Using filtered table with standard API. " +
"selectedFields and rowRestriction are ignored. " +
"Use bigQueryStorage instead"
)
}
require(
table.filter.isEmpty,
"Cannot use filtered table with standard API. Use bigQueryStorage instead"
)
val params = BigQueryIO.TableReadParam(
BigQueryIO.Format.Default(),
Method.DEFAULT,
Expand All @@ -107,13 +98,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
configOverride: BigQueryIO.ReadParam.ConfigOverride[T] =
BigQueryIO.ReadParam.DefaultConfigOverride
): SCollection[T] = {
if (table.filter.nonEmpty) {
logger.warn(
"Using filtered table with standard API. " +
"selectedFields and rowRestriction are ignored. " +
"Use bigQueryStorage instead"
)
}
require(
table.filter.isEmpty,
"Cannot use filtered table with standard API. Use bigQueryStorageFormat instead"
)
val params = BigQueryIO.TableReadParam(
format,
Method.DEFAULT,
Expand All @@ -123,23 +111,6 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
self.read(BigQueryIO[T](table))(params)
}

/**
* Get an SCollection for a BigQuery table using the storage API.
*
* @param selectedFields
* names of the fields in the table that should be read. If empty, all fields will be read. If
* the specified field is a nested field, all the sub-fields in the field will be selected.
* Fields will always appear in the generated class in the same order as they appear in the
* table, regardless of the order specified in selectedFields.
* @param rowRestriction
* SQL text filtering statement, similar ti a WHERE clause in a query. Currently, we support
* combinations of predicates that are a comparison between a column and a constant value in SQL
* statement. Aggregates are not supported. For example:
*
* {{{
* "a > DATE '2014-09-27' AND (b > 5 AND c LIKE 'date')"
* }}}
*/
def bigQueryStorage(
table: Table,
errorHandler: ErrorHandler[BadRecord, _] = BigQueryIO.ReadParam.DefaultErrorHandler,
Expand Down Expand Up @@ -234,13 +205,10 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
configOverride
)
case t: Table =>
if (t.filter.nonEmpty) {
logger.warn(
"Using filtered table with standard API. " +
"selectedFields and rowRestriction are ignored. " +
"Use typedBigQueryStorage instead"
)
}
require(
t.filter.isEmpty,
"Cannot use filtered table with standard API. Use typedBigQuery instead"
)
BigQueryIO.TableReadParam[T](
format,
Method.DEFAULT,
Expand Down

0 comments on commit 2d66440

Please sign in to comment.