-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT]Checkpoint Fails with Timeout while waiting for instant initialize When Migrating Data from Kudu to Hudi COW Bucketed Table #12589
Comments
@pengxianzi How may records are there in the Kudu table, can you switch to mor table instead, mor is more suitable for large table. And if possible, can you upgrade to 0.14.1 or 0.15.0 which fix some stability bugs. |
Thank you for your response! Here is our further feedback: Regarding Upgrading Hudi Version: Regarding Data Volume: Regarding Switching to MOR Table: Regarding the Impact of Kudu Table: |
For migration, maybe you can use the bulk_insert to write the history dataset from Kudu in batch execution mode, you can then ingest into this Hudi table switching to the upsert operation. The write is slow for cow because for each checkpointing, the cow would trigger a whole table rewrite, this is also true for mor compaction. So maybe you can migrate the existing data set from Kudu using the bulk_insert operation, and do streaming upsert with the incremental inputs. If the data set itself is huge, partition table using datetime should also be helpful because that would reduce the scope for rewrite significantly. |
Thank you for your suggestions! Our current approach for large tables aligns with your recommendations: We use the bulk_insert operation to migrate historical data from Kudu to the Hudi table. We switch to the upsert operation for incremental data writes. However, we encountered the following issues during implementation: Necessity of Bucketing: Without bucketing, data duplication occurs during Flink writes. Only after adding bucketing is the data duplication issue resolved. |
you are right, bulk_insert also supports bucket index. |
We are still facing write failure issues. |
what kind of failures did you encounter? |
@danny0405 I couldn't find the reason. The error log indicates that the checkpoint failed org.apache.flink.streaming.runtime.tasks.AsynccheckpointRunnable []. bucket_write default_databases.loan_withdraw_order_new_cow ardemmewlcos part of checkpoint 1 could not be completed. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator bucket_write: default_database.lom_withdraw_order_new_con (3/4). failure reason: Checkpoint was declined. Caused by :org.apache.hudi.exception.HoodieException:Timeout(1201000ms) while waiting for instant initialize
switched from RUNNING to FAILED with failure cause: jave.io.I0Exception: could not perforn checkpoint 2 for operator bucket_write: default_database.loan_withdraw_order_new_cow(3/4)#0.
|
Files Generated in HDFS but No Commits: During task execution, new files are generated in HDFS, but there are no commit records in the Hudi CLI. Checkpoint Timeout Failure: Flink checkpoints timeout and fail without specific reasons in the logs. Extending the checkpoint interval may allow successful writes, but this reduces data update frequency and affects real-time performance. Why are new files generated in HDFS, but no commit records appear in the Hudi CLI? How can we solve the checkpoint timeout issue and ensure successful writes while maintaining data real-time performance? |
@danny0405 builder.pk(primaryKey) primaryKey=id,age,name,address,phone,email However, I am unsure if this configuration is effective. I couldn't find any documentation regarding composite primary keys on the Hudi official website. Additionally, when checking through the Hudi CLI, the total records written and total updates written counts are the same, which makes me suspect that the composite primary key configuration has not been applied correctly. The Flink job is running normally, but no files are being written. Looking forward to your response. |
You can check the The COW bucket index table is prone to timed out because each checkpointing triggers a rewrite to the whole table, if the table is not well partitioned. |
Problem Description
We encountered the following issues while using Apache Hudi for data migration and real-time writing:
Scenario 1:
Migrating data from Kudu to a Hudi MOR bucketed table, then writing data from MySQL via Kafka to the Hudi MOR bucketed table works fine.
Scenario 2:
Migrating data from Kudu to a Hudi COW bucketed table, then writing data from MySQL via Kafka to the Hudi COW bucketed table fails to generate commits, and the checkpoint fails.
Error Log
Here is the error log when the task fails:
org.apache.flink.streaming.runtime.tasks.AsynccheckpointRunnable []. bucket_write default_databases.table_cow ardemmewlcos part of checkpoint 1 could not be completed.
java.util.concurrent.cancellationException:null
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator bucket_write: default_database.table_con (3/4). failure reason: Checkpoint was declined.
Caused by :org.apache.hudi.exception.HoodieException:Timeout(1201000ms) while waiting for instant initialize
switched from RUNNING to FAILED with failure cause: jave.io.I0Exception: could not perforn checkpoint 2 for operator bucket_write: default_database.table_cow(3/4)#0.
Configuration Parameters
Here is our Hudi table configuration:
options.put("hoodie.upsert.shuffle.parallelism", "20");
options.put("hoodie.insert.shuffle.parallelism", "20");
options.put("write.operation", "upsert");
options.put(FlinkOptions.TABLE_TYPE.key(), name);
options.put(FlinkOptions.PRECOMBINE_FIELD.key(),precombing);
options.put(FlinkOptions.PRE_COMBINE.key(), "true");
options.put("hoodie.clean.automatic", "true");
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "5");
options.put("hoodie.clean.async", "true");
options.put("hoodie.archive.min.commits", "20");
options.put("hoodie.archive.max.commits", "30");
options.put("hoodie.clean.parallelism", "20");
options.put("hoodie.archive.parallelism", "20");
options.put("hoodie.write.concurrency.mode","optimistic_concurrency_control");
options.put("write.tasks", "20");
options.put("index.type","BUCKET");
options.put("hoodie.bucket.index.num.buckets","80");
options.put("hoodie.index.bucket.engine","SIMPLE");
Checkpoint Configuration
We tested various checkpoint timeout and interval configurations, but the issue persists:
env.getCheckpointConfig().setCheckpointTimeout(5601000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60*1000L);
Steps to Reproduce
Migrate data from Kudu to a Hudi COW bucketed table.
Write data from MySQL via Kafka to the Hudi COW bucketed table.
The task fails with the error Timeout while waiting for instant initialize.
Expected Behavior
The task should generate commits normally, and the checkpoint should succeed.
Actual Behavior
The task fails, no commits are generated, and the checkpoint fails with an error.
Hudi version : 0.14.0
Spark version : 2.4.7
Hive version : 3.1.3
Hadoop version : 3.1.1
Further Questions
Checkpoint Timeout Issue:
The error log mentions Timeout while waiting for instant initialize. Is this related to the initialization mechanism of the Hudi COW table? Are there ways to optimize the initialization time?
COW Table Write Performance:
Is the write performance of COW tables slower than MOR tables? Are there optimization suggestions for COW tables?
Impact of Bucketed Table:
Does the bucketed table have a specific impact on the write performance of COW tables? Are there optimization configurations for bucketed tables?
Checkpoint Configuration:
We tried various checkpoint timeout and interval configurations, but the issue persists. Are there recommended checkpoint configurations?
Summary
We would like to know:
Why does the Hudi COW bucketed table encounter checkpoint timeout issues during writes?
Are there optimization suggestions for COW table write performance?
Does the bucketed table have a specific impact on COW table write performance?
Are there recommended checkpoint configurations?
Thank you for your help!
The text was updated successfully, but these errors were encountered: