From e893deed5070db2fc1e7272dcdb094d7c5bfbe18 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 16 Jan 2025 11:22:29 +0800 Subject: [PATCH] [Feature][Connector-V2] Support single file mode in file sink (#8518) --- docs/en/connector-v2/sink/CosFile.md | 63 +++---- docs/en/connector-v2/sink/FtpFile.md | 71 ++++---- docs/en/connector-v2/sink/HdfsFile.md | 5 +- docs/en/connector-v2/sink/LocalFile.md | 61 +++---- docs/en/connector-v2/sink/ObsFile.md | 49 +++--- docs/en/connector-v2/sink/OssFile.md | 65 ++++---- docs/en/connector-v2/sink/OssJindoFile.md | 63 +++---- docs/en/connector-v2/sink/S3File.md | 73 ++++---- docs/en/connector-v2/sink/SftpFile.md | 69 ++++---- docs/zh/connector-v2/sink/HdfsFile.md | 5 +- docs/zh/connector-v2/sink/LocalFile.md | 3 +- .../seatunnel/api/common/JobContext.java | 14 +- .../seatunnel/common/utils/FileUtils.java | 5 + .../connector-common/pom.xml | 14 ++ .../seatunnel/sink/SinkFlowTestUtils.java | 128 ++++++++++++++ .../file/config/BaseFileSinkConfig.java | 5 + .../seatunnel/file/config/BaseSinkConfig.java | 7 + .../seatunnel/file/sink/BaseFileSink.java | 11 ++ .../file/sink/BaseFileSinkWriter.java | 23 +++ .../file/sink/BaseMultipleTableFileSink.java | 18 +- .../sink/writer/AbstractWriteStrategy.java | 12 +- .../file/cos/sink/CosFileSinkFactory.java | 2 + .../file/ftp/sink/FtpFileSinkFactory.java | 2 + .../file/hdfs/sink/HdfsFileSinkFactory.java | 7 + .../oss/jindo/sink/OssFileSinkFactory.java | 2 + .../connector-file-local/pom.xml | 7 + .../file/local/sink/LocalFileSinkFactory.java | 2 + .../seatunnel/file/local/LocalFileTest.java | 156 ++++++++++++++++++ .../file/oss/sink/OssFileSinkFactory.java | 2 + .../file/s3/sink/S3FileSinkFactory.java | 2 + .../file/sftp/sink/SftpFileSinkFactory.java | 2 + .../starter/execution/RuntimeEnvironment.java | 16 +- .../execution/RuntimeEnvironmentTest.java | 49 ++++++ .../flink/execution/FlinkExecution.java | 2 + .../spark/execution/SparkExecution.java | 2 + .../MultipleTableJobConfigParserTest.java | 2 + .../batch_fakesource_to_file_complex.conf | 1 + .../parse/MultipleTableJobConfigParser.java | 9 +- 38 files changed, 754 insertions(+), 275 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java create mode 100644 seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java diff --git a/docs/en/connector-v2/sink/CosFile.md b/docs/en/connector-v2/sink/CosFile.md index 0ec8517fe31..db11cfb9af8 100644 --- a/docs/en/connector-v2/sink/CosFile.md +++ b/docs/en/connector-v2/sink/CosFile.md @@ -34,37 +34,38 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a COS dir. | -| bucket | string | yes | - | | -| secret_id | string | yes | - | | -| secret_key | string | yes | - | | -| region | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a COS dir. | +| bucket | string | yes | - | | +| secret_id | string | yes | - | | +| secret_key | string | yes | - | | +| region | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | ### path [string] diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index caa15d1a6b9..175d374d9aa 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -32,41 +32,42 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| host | string | yes | - | | -| port | int | yes | - | | -| user | string | yes | - | | -| password | string | yes | - | | -| path | string | yes | - | | -| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. | -| connection_mode | string | no | active_local | The target ftp connection mode | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | -| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | -| data_save_mode | string | no | APPEND_DATA | Existing data processing method | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| host | string | yes | - | | +| port | int | yes | - | | +| user | string | yes | - | | +| password | string | yes | - | | +| path | string | yes | - | | +| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. | +| connection_mode | string | no | active_local | The target ftp connection mode | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### host [string] diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index 47a2c6f0240..095c32eabc1 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -38,7 +38,7 @@ Output data to hdfs file ## Sink Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | fs.defaultFS | string | yes | - | The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster` | | path | string | yes | - | The target dir path is required. | @@ -68,9 +68,10 @@ Output data to hdfs file | xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | | parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | | parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | | encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | | remote_user | string | no | - | The remote user name of hdfs. | diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 78c7c539e80..c48394f9175 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -33,36 +33,37 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|---------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | -| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | -| data_save_mode | string | no | APPEND_DATA | Existing data processing method | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### path [string] diff --git a/docs/en/connector-v2/sink/ObsFile.md b/docs/en/connector-v2/sink/ObsFile.md index ef515c0eb1a..560e7bfb35e 100644 --- a/docs/en/connector-v2/sink/ObsFile.md +++ b/docs/en/connector-v2/sink/ObsFile.md @@ -50,30 +50,31 @@ It only supports hadoop version **2.9.X+**. ## Options -| name | type | required | default | description | -|----------------------------------|---------|----------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | The target dir path. | -| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name`. | -| access_key | string | yes | - | The access key of obs file system. | -| access_secret | string | yes | - | The access secret of obs file system. | -| endpoint | string | yes | - | The endpoint of obs file system. | -| custom_filename | boolean | no | false | Whether you need custom the filename. | -| file_name_expression | string | no | "${transactionId}" | Describes the file expression which will be created into the `path`. Only used when custom_filename is true. [Tips](#file_name_expression) | -| filename_time_format | string | no | "yyyy.MM.dd" | Specify the time format of the `path`. Only used when custom_filename is true. [Tips](#filename_time_format) | -| file_format_type | string | no | "csv" | Supported file types. [Tips](#file_format_type) | -| field_delimiter | string | no | '\001' | The separator between columns in a row of data.Only used when file_format is text. | -| row_delimiter | string | no | "\n" | The separator between rows in a file. Only needed by `text` file format. | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Partition data based on selected fields. Only used then have_partition is true. | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true.[Tips](#partition_dir_expression) | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true.[Tips](#is_partition_field_write_in_file) | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns.[Tips](#sink_columns) | -| is_enable_transaction | boolean | no | true | [Tips](#is_enable_transaction) | -| batch_size | int | no | 1000000 | [Tips](#batch_size) | -| compress_codec | string | no | none | [Tips](#compress_codec) | -| common-options | object | no | - | [Tips](#common_options) | -| max_rows_in_memory | int | no | - | When File Format is Excel,The maximum number of data items that can be cached in the memory.Only used when file_format is excel. | -| sheet_name | string | no | Sheet${Random number} | Writer the sheet of the workbook. Only used when file_format is excel. | +| name | type | required | default | description | +|----------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The target dir path. | +| bucket | string | yes | - | The bucket address of obs file system, for example: `obs://obs-bucket-name`. | +| access_key | string | yes | - | The access key of obs file system. | +| access_secret | string | yes | - | The access secret of obs file system. | +| endpoint | string | yes | - | The endpoint of obs file system. | +| custom_filename | boolean | no | false | Whether you need custom the filename. | +| file_name_expression | string | no | "${transactionId}" | Describes the file expression which will be created into the `path`. Only used when custom_filename is true. [Tips](#file_name_expression) | +| filename_time_format | string | no | "yyyy.MM.dd" | Specify the time format of the `path`. Only used when custom_filename is true. [Tips](#filename_time_format) | +| file_format_type | string | no | "csv" | Supported file types. [Tips](#file_format_type) | +| field_delimiter | string | no | '\001' | The separator between columns in a row of data.Only used when file_format is text. | +| row_delimiter | string | no | "\n" | The separator between rows in a file. Only needed by `text` file format. | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Partition data based on selected fields. Only used then have_partition is true. | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true.[Tips](#partition_dir_expression) | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true.[Tips](#is_partition_field_write_in_file) | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns.[Tips](#sink_columns) | +| is_enable_transaction | boolean | no | true | [Tips](#is_enable_transaction) | +| batch_size | int | no | 1000000 | [Tips](#batch_size) | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| compress_codec | string | no | none | [Tips](#compress_codec) | +| common-options | object | no | - | [Tips](#common_options) | +| max_rows_in_memory | int | no | - | When File Format is Excel,The maximum number of data items that can be cached in the memory.Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Writer the sheet of the workbook. Only used when file_format is excel. | ### Tips diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index cc3cf512990..52da0e83f56 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -86,38 +86,39 @@ If write to `csv`, `text` file type, All column will be string. ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| path | string | yes | The oss path to write file in. | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a OSS dir. | -| bucket | string | yes | - | | -| access_key | string | yes | - | | -| access_secret | string | yes | - | | -| endpoint | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | The oss path to write file in. | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a OSS dir. | +| bucket | string | yes | - | | +| access_key | string | yes | - | | +| access_secret | string | yes | - | | +| endpoint | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | ### path [string] diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index fa8f3875d04..1a95e81a44f 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -38,37 +38,38 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a OSS dir. | -| bucket | string | yes | - | | -| access_key | string | yes | - | | -| access_secret | string | yes | - | | -| endpoint | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a OSS dir. | +| bucket | string | yes | - | | +| access_key | string | yes | - | | +| access_secret | string | yes | - | | +| endpoint | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | ### path [string] diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 8ed708b1f9d..b5fb34e0311 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -93,42 +93,43 @@ If write to `csv`, `text` file type, All column will be string. ## Sink Options -| name | type | required | default value | Description | -|---------------------------------------|---------|----------|-------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a S3 dir. | -| bucket | string | yes | - | | -| fs.s3a.endpoint | string | yes | - | | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. | -| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used when have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used when have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used when have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | -| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before turning on the synchronous task, do different treatment of the target path | -| data_save_mode | Enum | no | APPEND_DATA | Before opening the synchronous task, the data file in the target path is differently processed | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| name | type | required | default value | Description | +|---------------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a S3 dir. | +| bucket | string | yes | - | | +| fs.s3a.endpoint | string | yes | - | | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. | +| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used when have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used when have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used when have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before turning on the synchronous task, do different treatment of the target path | +| data_save_mode | Enum | no | APPEND_DATA | Before opening the synchronous task, the data file in the target path is differently processed | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | ### path [string] diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index 7d40f5fb4b1..dbc8438ae26 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -32,40 +32,41 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| host | string | yes | - | | -| port | int | yes | - | | -| user | string | yes | - | | -| password | string | yes | - | | -| path | string | yes | - | | -| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | -| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | -| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | -| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | -| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | -| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | -| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | -| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | -| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | -| data_save_mode | string | no | APPEND_DATA | Existing data processing method | +| name | type | required | default value | remarks | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| host | string | yes | - | | +| port | int | yes | - | | +| user | string | yes | - | | +| password | string | yes | - | | +| path | string | yes | - | | +| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | +| xml_row_tag | string | no | RECORD | Only used when file_format is xml. | +| xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | +| single_file_mode | boolean | no | false | Each parallelism will only output one file. When this parameter is turned on, batch_size will not take effect. The output file name does not have a file block suffix. | +| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | +| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | +| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### host [string] diff --git a/docs/zh/connector-v2/sink/HdfsFile.md b/docs/zh/connector-v2/sink/HdfsFile.md index 807a345108f..c0212ae4017 100644 --- a/docs/zh/connector-v2/sink/HdfsFile.md +++ b/docs/zh/connector-v2/sink/HdfsFile.md @@ -36,7 +36,7 @@ ## 接收器选项 -| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | |----------------------------------|---------|------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | fs.defaultFS | string | 是 | - | 以 `hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster` | | path | string | 是 | - | 目标目录路径是必需的。 | @@ -55,13 +55,14 @@ | sink_columns | array | 否 | | 当此参数为空时,所有字段都是接收器列。需要写入文件的列,默认值是从 `Transform` 或 `Source` 获取的所有列。字段的顺序确定了实际写入文件时的顺序。 | | is_enable_transaction | boolean | 否 | true | 如果 `is_enable_transaction` 为 true,则在将数据写入目标目录时,我们将确保数据不会丢失或重复。请注意,如果 `is_enable_transaction` 为 `true`,我们将在文件头部自动添加 `${transactionId}_`。目前仅支持 `true`。 | | batch_size | int | 否 | 1000000 | 文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 `batch_size` 和 `checkpoint.interval` 共同决定。如果 `checkpoint.interval` 的值足够大,则接收器写入器将在文件中写入行,直到文件中的行大于 `batch_size`。如果 `checkpoint.interval` 很小,则接收器写入器将在新检查点触发时创建一个新文件。 | +| single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 | | compress_codec | string | 否 | none | 文件的压缩编解码器及其支持的细节如下所示:[txt: `lzo` `none`,json: `lzo` `none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`]。提示:excel类型不支持任何压缩格式。 | | krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径 | | kerberos_principal | string | 否 | - | kerberos 的主体 | | kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径 | | compress_codec | string | 否 | none | 压缩编解码器 | | common-options | object | 否 | - | 接收器插件通用参数,请参阅 [接收器通用选项](../sink-common-options.md) 了解详情 | -| enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。
false:不写入表头,true:写入表头。 | +| enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。
false:不写入表头,true:写入表头。 | | max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。 | | sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名 | | remote_user | string | 否 | - | Hdfs的远端用户名。 | diff --git a/docs/zh/connector-v2/sink/LocalFile.md b/docs/zh/connector-v2/sink/LocalFile.md index 877d77213a8..dbd77384882 100644 --- a/docs/zh/connector-v2/sink/LocalFile.md +++ b/docs/zh/connector-v2/sink/LocalFile.md @@ -32,7 +32,7 @@ ## 选项 -| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | |---------------------------------------|---------|------|--------------------------------------------|-----------------------------------------------------------------| | path | string | 是 | - | 目标目录路径 | | tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 `mv` 将临时目录提交到目标目录。 | @@ -49,6 +49,7 @@ | sink_columns | array | 否 | | 当此参数为空时,所有字段都是 sink 列 | | is_enable_transaction | boolean | 否 | true | 是否启用事务 | | batch_size | int | 否 | 1000000 | 批量大小 | +| single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 | | compress_codec | string | 否 | none | 压缩编码 | | common-options | object | 否 | - | 常见选项 | | max_rows_in_memory | int | 否 | - | 仅在 file_format_type 为 excel 时使用 | diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java index 04562330608..6e973040df8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java @@ -19,16 +19,19 @@ import org.apache.seatunnel.common.constants.JobMode; +import lombok.Getter; + import java.io.Serializable; import java.util.UUID; /** This class is used to store the context of the job. e.g. the job id, job mode ...etc. */ +@Getter public final class JobContext implements Serializable { private static final long serialVersionUID = -1L; private JobMode jobMode; - + private boolean enableCheckpoint; private final String jobId; public JobContext() { @@ -44,11 +47,8 @@ public JobContext setJobMode(JobMode jobMode) { return this; } - public JobMode getJobMode() { - return jobMode; - } - - public String getJobId() { - return this.jobId; + public JobContext setEnableCheckpoint(boolean enableCheckpoint) { + this.enableCheckpoint = enableCheckpoint; + return this; } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java index 279c4bf4cad..90ff8e5d49e 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java @@ -125,6 +125,11 @@ public static Long getFileLineNumber(@NonNull String filePath) { } } + public static boolean isFileExist(String filePath) { + File file = new File(filePath); + return file.exists(); + } + /** * return the line number of all files in the dirPath * diff --git a/seatunnel-connectors-v2/connector-common/pom.xml b/seatunnel-connectors-v2/connector-common/pom.xml index 55797e5a35e..02471d5b4ef 100644 --- a/seatunnel-connectors-v2/connector-common/pom.xml +++ b/seatunnel-connectors-v2/connector-common/pom.xml @@ -47,6 +47,20 @@ + + org.apache.maven.plugins + maven-jar-plugin + + false + + + + + test-jar + + + + diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java new file mode 100644 index 00000000000..cd9b9cca678 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sink; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DefaultSinkWriterContext; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.JobMode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class SinkFlowTestUtils { + + public static void runBatchWithCheckpointDisabled( + CatalogTable catalogTable, + ReadonlyConfig options, + TableSinkFactory factory, + List rows) + throws IOException { + JobContext context = new JobContext(System.currentTimeMillis()); + context.setJobMode(JobMode.BATCH); + context.setEnableCheckpoint(false); + runWithContext(catalogTable, options, factory, rows, context, 1); + } + + public static void runBatchWithCheckpointEnabled( + CatalogTable catalogTable, + ReadonlyConfig options, + TableSinkFactory factory, + List rows) + throws IOException { + JobContext context = new JobContext(System.currentTimeMillis()); + context.setJobMode(JobMode.BATCH); + context.setEnableCheckpoint(true); + // TODO trigger checkpoint with interval + runWithContext(catalogTable, options, factory, rows, context, 1); + } + + public static void runParallelSubtasksBatchWithCheckpointDisabled( + CatalogTable catalogTable, + ReadonlyConfig options, + TableSinkFactory factory, + List rows, + int parallelism) + throws IOException { + JobContext context = new JobContext(System.currentTimeMillis()); + context.setJobMode(JobMode.BATCH); + context.setEnableCheckpoint(false); + runWithContext(catalogTable, options, factory, rows, context, parallelism); + } + + private static void runWithContext( + CatalogTable catalogTable, + ReadonlyConfig options, + TableSinkFactory factory, + List rows, + JobContext context, + int parallelism) + throws IOException { + SeaTunnelSink sink = + factory.createSink( + new TableSinkFactoryContext( + catalogTable, + options, + Thread.currentThread().getContextClassLoader())) + .createSink(); + sink.setJobContext(context); + List commitInfos = new ArrayList<>(); + for (int i = 0; i < parallelism; i++) { + SinkWriter sinkWriter = + sink.createWriter(new DefaultSinkWriterContext(i, parallelism)); + for (SeaTunnelRow row : rows) { + sinkWriter.write(row); + } + Optional commitInfo = sinkWriter.prepareCommit(1); + sinkWriter.snapshotState(1); + sinkWriter.close(); + if (commitInfo.isPresent()) { + commitInfos.add(commitInfo.get()); + } + } + + Optional> sinkCommitter = sink.createCommitter(); + Optional> aggregatedCommitter = + sink.createAggregatedCommitter(); + + if (!commitInfos.isEmpty()) { + if (aggregatedCommitter.isPresent()) { + Object aggregatedCommitInfoT = + ((SinkAggregatedCommitter) aggregatedCommitter.get()).combine(commitInfos); + ((SinkAggregatedCommitter) aggregatedCommitter.get()) + .commit(Collections.singletonList(aggregatedCommitInfoT)); + aggregatedCommitter.get().close(); + } else if (sinkCommitter.isPresent()) { + ((SinkCommitter) sinkCommitter.get()).commit(commitInfos); + } else { + throw new RuntimeException("No committer found"); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index bbbf4553115..2957f451b4d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -43,6 +43,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable { protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue(); protected String path; protected String fileNameExpression = BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue(); + protected boolean singleFileMode = BaseSinkConfig.SINGLE_FILE_MODE.defaultValue(); protected FileFormat fileFormat = FileFormat.TEXT; protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; @@ -82,6 +83,10 @@ public BaseFileSinkConfig(@NonNull Config config) { this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()); } + if (config.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key())) { + this.singleFileMode = config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key()); + } + if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key()) && !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) { this.fileFormat = diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 2ec8ac5db4c..88be37bc106 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -187,6 +187,13 @@ public class BaseSinkConfig extends KerberosConfig { + "like `test_${uuid}_${now}`,`${now}` represents the current time, " + "and its format can be defined by specifying the option `filename_time_format`."); + public static final Option SINGLE_FILE_MODE = + Options.key("single_file_mode") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to write all data to a single file in each parallelism task"); + public static final Option FILENAME_TIME_FORMAT = Options.key("filename_time_format") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java index af6003c79ce..a02e41c87b1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; @@ -52,10 +53,20 @@ public abstract class BaseFileSink protected JobContext jobContext; protected String jobId; + public void preCheckConfig() { + if (pluginConfig.hasPath(BaseSinkConfig.SINGLE_FILE_MODE.key()) + && pluginConfig.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key()) + && jobContext.isEnableCheckpoint()) { + throw new IllegalArgumentException( + "Single file mode is not supported when checkpoint is enabled or in streaming mode."); + } + } + @Override public void setJobContext(JobContext jobContext) { this.jobContext = jobContext; this.jobId = jobContext.getJobId(); + preCheckConfig(); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java index 2527d99d56c..59aee592353 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import java.io.IOException; @@ -43,6 +44,9 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.DEFAULT_FILE_NAME_EXPRESSION; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION; + public class BaseFileSinkWriter implements SinkWriter, SupportMultiTableSinkWriter { @@ -105,6 +109,25 @@ public BaseFileSinkWriter( } else { writeStrategy.beginTransaction(1L); } + preCheckConfig(context); + } + + private void preCheckConfig(SinkWriter.Context context) { + if (writeStrategy.getFileSinkConfig().isSingleFileMode() + && context.getNumberOfParallelSubtasks() > 1) { + if (StringUtils.isNotEmpty(writeStrategy.getFileSinkConfig().getFileNameExpression()) + && !writeStrategy + .getFileSinkConfig() + .getFileNameExpression() + .contains(DEFAULT_FILE_NAME_EXPRESSION)) { + throw new IllegalArgumentException( + "Single file mode is not supported when " + + FILE_NAME_EXPRESSION.key() + + " not contains " + + DEFAULT_FILE_NAME_EXPRESSION + + " but has parallel subtasks."); + } + } } private List findTransactionList( diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java index b35c113f8da..9524424db54 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java @@ -59,6 +59,7 @@ public abstract class BaseMultipleTableFileSink private final CatalogTable catalogTable; private final FileSinkConfig fileSinkConfig; private String jobId; + private JobContext jobContext; private final ReadonlyConfig readonlyConfig; public abstract String getPluginName(); @@ -72,15 +73,25 @@ public BaseMultipleTableFileSink( this.catalogTable = catalogTable; } + public void preCheckConfig() { + if (readonlyConfig.get(BaseSinkConfig.SINGLE_FILE_MODE) + && jobContext.isEnableCheckpoint()) { + throw new IllegalArgumentException( + "Single file mode is not supported when checkpoint is enabled or in streaming mode."); + } + } + @Override public void setJobContext(JobContext jobContext) { - this.jobId = jobContext.getJobId(); + this.jobContext = jobContext; + preCheckConfig(); } @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) { - return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId, states); + return new BaseFileSinkWriter( + createWriteStrategy(), hadoopConf, context, jobContext.getJobId(), states); } @Override @@ -91,7 +102,8 @@ public SinkWriter restoreWriter( @Override public BaseFileSinkWriter createWriter(SinkWriter.Context context) { - return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId); + return new BaseFileSinkWriter( + createWriteStrategy(), hadoopConf, context, jobContext.getJobId()); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 6091ee52819..e48f8d3729d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -82,6 +82,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected Long checkpointId = 0L; protected int partId = 0; protected int batchSize; + protected boolean singleFileMode; protected int currentBatchSize = 0; public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { @@ -89,6 +90,7 @@ public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow(); this.batchSize = fileSinkConfig.getBatchSize(); this.compressFormat = fileSinkConfig.getCompressFormat(); + this.singleFileMode = fileSinkConfig.isSingleFileMode(); } /** @@ -107,7 +109,7 @@ public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIn @Override public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { - if (currentBatchSize >= batchSize) { + if (currentBatchSize >= batchSize && !singleFileMode) { newFilePart(); currentBatchSize = 0; } @@ -218,7 +220,7 @@ public LinkedHashMap> generatorPartitionDir(SeaTunnelRow se * @return file name */ @Override - public String generateFileName(String transactionId) { + public final String generateFileName(String transactionId) { String fileNameExpression = fileSinkConfig.getFileNameExpression(); FileFormat fileFormat = fileSinkConfig.getFileFormat(); String suffix = fileFormat.getSuffix(); @@ -234,8 +236,10 @@ public String generateFileName(String transactionId) { valuesMap.put(Constants.NOW, formattedDate); valuesMap.put(timeFormat, formattedDate); valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId); - String substitute = - VariablesSubstitute.substitute(fileNameExpression, valuesMap) + "_" + partId; + String substitute = VariablesSubstitute.substitute(fileNameExpression, valuesMap); + if (!singleFileMode) { + substitute += "_" + partId; + } return substitute + suffix; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java index e0f30ee9077..b728af49514 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java @@ -91,6 +91,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index cfd2351a5c7..3dc48bd3bba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -105,6 +105,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) .optional(FtpConfigOptions.FTP_CONNECTION_MODE) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java index 521456ca091..b937a8bed0a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java @@ -88,6 +88,13 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) + .optional(BaseSinkConfig.HDFS_SITE_PATH) + .optional(BaseSinkConfig.KERBEROS_PRINCIPAL) + .optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH) + .optional(BaseSinkConfig.KRB5_PATH) + .optional(BaseSinkConfig.REMOTE_USER) .build(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java index 2f29fb19cdc..7ecbb6c3f10 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java @@ -91,6 +91,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml index 9c242cd3a2a..517e053ecc3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml @@ -48,6 +48,13 @@ + + org.apache.seatunnel + connector-common + ${project.version} + test-jar + test + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index 1a9bcc1734f..8450f139994 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -98,6 +98,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java new file mode 100644 index 00000000000..8280ec5e2af --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.local; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.connectors.seatunnel.file.local.sink.LocalFileSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@DisabledOnOs( + value = OS.WINDOWS, + disabledReason = + "Hadoop has windows problem, please refer https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems") +public class LocalFileTest { + + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("catalog", "database", "table"), + TableSchema.builder() + .column( + PhysicalColumn.of( + "test", BasicType.STRING_TYPE, 1L, true, null, "")) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "comment"); + + @Test + void testSingleFileMode() throws IOException { + Map options = + new HashMap() { + { + put("path", "/tmp/seatunnel/LocalFileTest"); + put("row_delimiter", "\n"); + put("file_name_expression", "only_one_file"); + put("file_format_type", "text"); + put("is_enable_transaction", false); + put("batch_size", 1); + } + }; + options.put("single_file_mode", true); + FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest"); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + catalogTable, + ReadonlyConfig.fromMap(options), + new LocalFileSinkFactory(), + Arrays.asList( + new SeaTunnelRow(new Object[] {"test"}), + new SeaTunnelRow(new Object[] {"test"}))); + Assertions.assertEquals( + 2, + (long) + FileUtils.getFileLineNumber( + "/tmp/seatunnel/LocalFileTest/only_one_file.txt")); + + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + SinkFlowTestUtils.runBatchWithCheckpointEnabled( + catalogTable, + ReadonlyConfig.fromMap(options), + new LocalFileSinkFactory(), + Arrays.asList( + new SeaTunnelRow(new Object[] {"test"}), + new SeaTunnelRow(new Object[] {"test"})))); + Assertions.assertEquals( + "Single file mode is not supported when checkpoint is enabled or in streaming mode.", + exception.getMessage()); + + IllegalArgumentException exception2 = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + SinkFlowTestUtils.runParallelSubtasksBatchWithCheckpointDisabled( + catalogTable, + ReadonlyConfig.fromMap(options), + new LocalFileSinkFactory(), + Arrays.asList( + new SeaTunnelRow(new Object[] {"test"}), + new SeaTunnelRow(new Object[] {"test"})), + 2)); + Assertions.assertEquals( + "Single file mode is not supported when file_name_expression not contains ${transactionId} but has parallel subtasks.", + exception2.getMessage()); + + FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest"); + options.put("file_name_expression", "${transactionId}_2"); + SinkFlowTestUtils.runParallelSubtasksBatchWithCheckpointDisabled( + catalogTable, + ReadonlyConfig.fromMap(options), + new LocalFileSinkFactory(), + Arrays.asList( + new SeaTunnelRow(new Object[] {"test"}), + new SeaTunnelRow(new Object[] {"test"})), + 2); + Assertions.assertFalse( + FileUtils.isFileExist("/tmp/seatunnel/LocalFileTest/only_one_file.txt")); + Assertions.assertEquals(2, FileUtils.listFile("/tmp/seatunnel/LocalFileTest").size()); + + options.put("single_file_mode", false); + options.put("file_name_expression", "only_one_file"); + FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest"); + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + catalogTable, + ReadonlyConfig.fromMap(options), + new LocalFileSinkFactory(), + Arrays.asList( + new SeaTunnelRow(new Object[] {"test"}), + new SeaTunnelRow(new Object[] {"test"}))); + Assertions.assertFalse( + FileUtils.isFileExist("/tmp/seatunnel/LocalFileTest/only_one_file.txt")); + Assertions.assertEquals( + 1, + (long) + FileUtils.getFileLineNumber( + "/tmp/seatunnel/LocalFileTest/only_one_file_0.txt")); + Assertions.assertEquals( + 1, + (long) + FileUtils.getFileLineNumber( + "/tmp/seatunnel/LocalFileTest/only_one_file_1.txt")); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 6fd3088ddc9..246c769b76d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -103,6 +103,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index 5c231443e99..950582a860c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -103,6 +103,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .optional(BaseSinkConfig.TMP_PATH) .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java index 22f42d2bf9e..4ff9c6928fd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java @@ -103,6 +103,8 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.SINGLE_FILE_MODE) + .optional(BaseSinkConfig.BATCH_SIZE) .build(); } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java index 9814ea58e2b..13fade6aa37 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; @@ -51,11 +52,22 @@ default void initialize(Config config) { static JobMode getJobMode(Config config) { JobMode jobMode; Config envConfig = config.getConfig("env"); - if (envConfig.hasPath("job.mode")) { - jobMode = envConfig.getEnum(JobMode.class, "job.mode"); + if (envConfig.hasPath(EnvCommonOptions.JOB_MODE.key())) { + jobMode = envConfig.getEnum(JobMode.class, EnvCommonOptions.JOB_MODE.key()); } else { jobMode = JobMode.BATCH; } return jobMode; } + + static boolean getEnableCheckpoint(Config config) { + boolean enableCheckpoint; + Config envConfig = config.getConfig("env"); + if (envConfig.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { + enableCheckpoint = envConfig.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()) > 0; + } else { + enableCheckpoint = false; + } + return enableCheckpoint || getJobMode(config) == JobMode.STREAMING; + } } diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java new file mode 100644 index 00000000000..9b77b3c1a94 --- /dev/null +++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.execution; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class RuntimeEnvironmentTest { + + @Test + void testEnableCheckpoint() { + Config config = + ConfigFactory.parseString( + "env {\n" + " parallelism = 1\n" + " job.mode = \"BATCH\"\n" + "}"); + Assertions.assertFalse(RuntimeEnvironment.getEnableCheckpoint(config)); + + config = + ConfigFactory.parseString( + "env {\n" + " parallelism = 1\n" + " job.mode = \"STREAMING\"\n" + "}"); + Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config)); + + config = + ConfigFactory.parseString( + "env {\n" + + " parallelism = 1\n" + + " job.mode = \"BATCH\"\n" + + " checkpoint.interval = 10\n" + + "}"); + Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config)); + } +} diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java index f3a76adfade..627a7e687a3 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java @@ -85,6 +85,8 @@ public FlinkExecution(Config config) { registerPlugin(envConfig); JobContext jobContext = new JobContext(); jobContext.setJobMode(RuntimeEnvironment.getJobMode(config)); + jobContext.setEnableCheckpoint(RuntimeEnvironment.getEnableCheckpoint(config)); + this.sourcePluginExecuteProcessor = new SourceExecuteProcessor( jarPaths, envConfig, config.getConfigList(Constants.SOURCE), jobContext); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java index 55c145bc3b6..efe38241453 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java @@ -48,6 +48,8 @@ public SparkExecution(Config config) { this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config); JobContext jobContext = new JobContext(); jobContext.setJobMode(RuntimeEnvironment.getJobMode(config)); + jobContext.setEnableCheckpoint(RuntimeEnvironment.getEnableCheckpoint(config)); + this.sourcePluginExecuteProcessor = new SourceExecuteProcessor( sparkRuntimeEnvironment, diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java index eb7f5ca2154..f2e5036d549 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java @@ -67,6 +67,7 @@ public void testSimpleJobParse() { Assertions.assertEquals( "Source[0]-FakeSource", actions.get(0).getUpstream().get(0).getName()); + Assertions.assertFalse(jobConfig.getJobContext().isEnableCheckpoint()); Assertions.assertEquals(3, actions.get(0).getUpstream().get(0).getParallelism()); Assertions.assertEquals(3, actions.get(0).getParallelism()); } @@ -83,6 +84,7 @@ public void testComplexJobParse() { List actions = parse.getLeft(); Assertions.assertEquals(1, actions.size()); + Assertions.assertTrue(jobConfig.getJobContext().isEnableCheckpoint()); Assertions.assertEquals("Sink[0]-LocalFile-fake", actions.get(0).getName()); Assertions.assertEquals(2, actions.get(0).getUpstream().size()); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf index 0509940bbfd..7bd5f13e83f 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf @@ -21,6 +21,7 @@ env { parallelism = 1 job.mode = "BATCH" + checkpoint.interval = 10000 } source { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 08df272c43d..61df9abcfe8 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -45,6 +45,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.TypesafeConfigUtils; import org.apache.seatunnel.common.constants.CollectionConstants; +import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.ExceptionUtils; @@ -311,7 +312,13 @@ private void fillUsedFactoryUrls(List actions, Set result) { } private void fillJobConfigAndCommonJars() { - jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE)); + JobMode jobMode = envOptions.get(EnvCommonOptions.JOB_MODE); + jobConfig + .getJobContext() + .setJobMode(jobMode) + .setEnableCheckpoint( + (envOptions.get(EnvCommonOptions.CHECKPOINT_INTERVAL) != null) + || jobMode == JobMode.STREAMING); if (StringUtils.isEmpty(jobConfig.getName()) || jobConfig.getName().equals(Constants.LOGO) || jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {