[INLONG-7056][Sort]Adjust sort resources according to data scale #10915
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #7056
Motivation
Currently, the total amount of resources for the Flink Sort Job comes from the configuration file
flink-sort-plugin.properties
, meaning that all submitted sort jobs will use the same amount of resources. When the data scale is large, the resources may be insufficient, and when the data scale is small, the resources may be wasted. Therefore, dynamically adjusting the number of resources according to the amount of data is a critically needed function.Modifications
Before submitting a job to Flink with
org.apache.inlong.manager.plugin.flink.FlinkService#submitJobBySavepoint
, theorg.apache.inlong.manager.plugin.flink.FlinkParallelismOptimizer
will first query the average data volume from the past hour and adjust the parallelism based on this data volume.Meanwhile, this function can be swiched on or off and maxmimum message for one core can be configured in
flink-sort-plugin.properties
Verifying this change
This change is a trivial rework/code cleanup without any test coverage.
This change is already covered by existing tests, such as:
When creating a stream in Data Ingestion, you can try to make the source data constantly increase and reach a significant amount (approximately more than 2000 per hour). Then, resubmit the job. You should notice that the parallelism of the Flink job corresponding to the stream will be larger than the default value of 1. This change will also be reflected in the manager logs.
[] This change added tests and can be verified as follows: