Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][flink] Support CALL statement #4145

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public void initTenantByTaskId(Integer id) {
Integer tenantId = baseMapper.getTenantByTaskId(id);
Asserts.checkNull(tenantId, Status.TASK_NOT_EXIST.getMessage());
TenantContextHolder.set(tenantId);
log.info("Init task tenan finished..");
log.info("Init task tenant finished..");
}

@Override
Expand Down Expand Up @@ -995,7 +995,7 @@ public Result<Tree<Integer>> queryAllCatalogue() {
@Override
public LineageResult getTaskLineage(Integer id) {
TaskDTO task = getTaskInfoById(id);
if (!Dialect.isCommonSql(task.getDialect())) {
if (Dialect.isCommonSql(task.getDialect())) {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
Expand Down
7 changes: 5 additions & 2 deletions dinky-common/src/main/java/org/dinky/data/job/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ public enum SqlType {

WITH("WITH", "^WITH.*", SqlCategory.DQL),

CALL("CALL", "^CALL.*", SqlCategory.DQL),

UNKNOWN("UNKNOWN", "^UNKNOWN.*", SqlCategory.UNKNOWN);

private String type;
private final Pattern pattern;
private final SqlCategory category;

private static final List<SqlType> TRANS_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE);
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS, RTAS, UPDATE, DELETE, CALL);

private static final List<SqlType> CTAS_TYPES = Lists.newArrayList(CTAS, RTAS, PRINT);

private static final List<SqlType> PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT);
private static final List<SqlType> PIPELINE_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, CTAS, RTAS, PRINT, CALL);

private static final List<SqlType> SINK_MODIFY_SQL_TYPES = Lists.newArrayList(INSERT, CTAS, RTAS, PRINT);

Expand Down
2 changes: 0 additions & 2 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ public JobResult executeJarSql(String statement) throws Exception {
.collect(Collectors.toList());
statement = String.join(";\n", statements);
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
Expand Down Expand Up @@ -282,7 +281,6 @@ public JobResult executeSql(String statement) throws Exception {
ready();
try {
jobStatementPlan = Explainer.build(this).parseStatements(SqlUtil.getStatements(statement));
jobStatementPlan.setSubmissionMode(config.isSubmissionMode());
jobStatementPlan.buildFinalStatement();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
Expand Down
43 changes: 2 additions & 41 deletions dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@
public class JobStatementPlan {

private List<JobStatement> jobStatementList = new ArrayList<>();
private boolean isSubmissionMode;

public JobStatementPlan() {}

public List<JobStatement> getJobStatementList() {
return jobStatementList;
}

public boolean isSubmissionMode() {
return isSubmissionMode;
}

public void setSubmissionMode(boolean isSubmissionMode) {
this.isSubmissionMode = isSubmissionMode;
}

public void addJobStatement(String statement, JobStatementType statementType, SqlType sqlType) {
jobStatementList.add(new JobStatement(jobStatementList.size() + 1, statement, statementType, sqlType));
}
Expand All @@ -76,8 +67,8 @@ public void buildFinalStatement() {
if (executableIndex >= 0) {
jobStatementList.get(executableIndex).asFinalExecutableStatement();
} else {
// If there is no INSERT/CTAS/RTAS statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the final
// statement.
// If there is no INSERT/CTAS/RTAS/CALL statement, use the first SELECT/WITH/SHOW/DESC SQL statement as the
// final statement.
for (int i = 0; i < jobStatementList.size(); i++) {
if (jobStatementList.get(i).getStatementType().equals(JobStatementType.SQL)) {
jobStatementList.get(i).asFinalExecutableStatement();
Expand All @@ -94,43 +85,13 @@ public void buildFinalStatement() {

public void checkStatement() {
checkEmptyStatement();
checkSubmissionStatement();
checkPipelineStatement();
}

private void checkEmptyStatement() {
if (jobStatementList.isEmpty()) {
throw new DinkyException("None of valid statement is executed. Please check your statements.");
}
boolean hasSqlStatement = false;
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatement().trim().isEmpty()) {
throw new DinkyException("The statement cannot be empty. Please check your statements.");
}
if (jobStatement.getStatementType().equals(JobStatementType.SQL)
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)
|| jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) {
hasSqlStatement = true;
}
}
if (!hasSqlStatement) {
throw new DinkyException(
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}
}

private void checkSubmissionStatement() {
if (isSubmissionMode) {
for (JobStatement jobStatement : jobStatementList) {
if (jobStatement.getStatementType().equals(JobStatementType.SQL)) {
if (!jobStatement.getSqlType().isSinkyModify()) {
throw new DinkyException(
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: " + jobStatement.getStatement());
}
}
}
}
}

private void checkPipelineStatement() {
Expand Down
23 changes: 1 addition & 22 deletions dinky-core/src/test/java/org/dinky/job/JobStatementPlanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,7 @@ void testEmptyStatements() {
@Test
void testEmptyStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("", JobStatementType.SQL, SqlType.UNKNOWN);
checkInvalidStatement(jobStatementPlan, "The statement cannot be empty. Please check your statements.");
}

@Test
void testNoSqlStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.addJobStatement("set 'parallelism.default' = '2';\n", JobStatementType.DDL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The statements must contain at least one INSERT/CTAS/RTAS/SELECT/WITH/SHOW/DESC statement.");
}

@Test
void testSubmissionWithQueryStatement() {
JobStatementPlan jobStatementPlan = new JobStatementPlan();
jobStatementPlan.setSubmissionMode(true);
jobStatementPlan.addJobStatement("select 'A' as name;\n", JobStatementType.SQL, SqlType.SET);
checkInvalidStatement(
jobStatementPlan,
"The submission mode cannot contain one statement which is not a sink operation."
+ "\nThe valid statement is: select 'A' as name;\n");
checkInvalidStatement(jobStatementPlan, "None of valid statement is executed. Please check your statements.");
}

@Test
Expand Down
Loading