Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include the task scheduling errors in schedule response #14754

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -642,21 +641,35 @@ public Map<String, String> scheduleTasks(
@ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
@Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
if (taskType != null) {
// Schedule task for the given task type
List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
} else {
// Schedule tasks for all task types
Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
Map<String, String> result = allTaskNames.entrySet().stream().filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
return result.isEmpty() ? null : result;
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
}
response.put("generationErrors", String.join(",", generationErrors));
response.put("schedulingErrors", String.join(",", schedulingErrors));
return response;
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,70 +486,79 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
/**
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}

/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled.
*/
protected synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
@Nullable String minionInstanceTag) {
protected synchronized Map<String, TaskSchedulingInfo> scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand All @@ -565,7 +574,7 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
}

// Generate each type of tasks
Map<String, List<String>> tasksScheduled = new HashMap<>();
Map<String, TaskSchedulingInfo> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
Expand All @@ -577,16 +586,18 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
addTaskTypeMetricsUpdaterIfNeeded(taskType);
tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag));
} else {
LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
tasksScheduled.put(taskType, null);
String message = "Task type: " + taskType + " is not registered, cannot enable it for tables: " + enabledTables;
LOGGER.warn(message);
TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo();
taskSchedulingInfo.addSchedulingError(message);
tasksScheduled.put(taskType, taskSchedulingInfo);
}
}

return tasksScheduled;
}

@Nullable
protected synchronized List<String> scheduleTask(String taskType, List<String> tables,
protected synchronized TaskSchedulingInfo scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
Expand All @@ -608,17 +619,23 @@ protected synchronized List<String> scheduleTask(String taskType, List<String> t

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
* Returns
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader, @Nullable String minionInstanceTagForTask) {
TaskSchedulingInfo response = new TaskSchedulingInfo();
String taskType = taskGenerator.getTaskType();
List<String> enabledTables =
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
LOGGER.info("Trying to schedule task type: {}, for tables: {}, isLeader: {}", taskType, enabledTables, isLeader);
if (!isTaskSchedulable(taskType, enabledTables)) {
return null;
response.addSchedulingError("Unable to start scheduling for task type " + taskType
+ " as task queue may be stopped. Please check the task queue status.");
return response;
}
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>();
for (TableConfig tableConfig : enabledTableConfigs) {
Expand All @@ -645,6 +662,8 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
response.addGenerationError("Failed to generate tasks for task type " + taskType + " for table " + tableName
+ "\n Reason : " + errors);
long failureRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
Expand Down Expand Up @@ -684,17 +703,17 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
numErrorTasksScheduled++;
LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", taskType,
minionInstanceTag, pinotTaskConfigs, e);
response.addSchedulingError(e.getMessage());
}
}
if (numErrorTasksScheduled > 0) {
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
// No job got scheduled due to errors
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size()) {
return response;
}
}
// No job got scheduled
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return submittedTaskNames;
return response.setScheduledTaskNames(submittedTaskNames);
}

@Override
Expand Down Expand Up @@ -762,4 +781,36 @@ protected boolean isTaskSchedulable(String taskType, List<String> tables) {
}
return true;
}

public static class TaskSchedulingInfo {
private List<String> _scheduledTaskNames;
private final List<String> _generationErrors = new ArrayList<>();
private final List<String> _schedulingErrors = new ArrayList<>();

@Nullable
public List<String> getScheduledTaskNames() {
return _scheduledTaskNames;
}

public TaskSchedulingInfo setScheduledTaskNames(List<String> scheduledTaskNames) {
_scheduledTaskNames = scheduledTaskNames;
return this;
}

public List<String> getGenerationErrors() {
return _generationErrors;
}

public void addGenerationError(String generationError) {
_generationErrors.add(generationError);
}

public List<String> getSchedulingErrors() {
return _schedulingErrors;
}

public void addSchedulingError(String schedulingError) {
_schedulingErrors.add(schedulingError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ const TaskQueueTable = (props) => {
if (get(res, `${taskType}`, null) === null) {
dispatch({
type: 'error',
message: `Could not schedule task`,
message: `Could not schedule task.\nTask generation errors : ${get(res, 'generationErrors', 'none')}.\nTask scheduling errors : ${get(res, 'schedulingErrors', 'none')}`,
show: true
});
} else if (get(res, `${taskType}`, null) === '') {
dispatch({
type: 'success',
message: `No task to schedule`,
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
show: true
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
throws Exception {
testValidateTaskGeneration(taskManager -> {
// Validate schedule tasks for table when task queue is in stopped state
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null);
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null)
.getScheduledTaskNames();
assertNull(taskIDs);
return null;
});
Expand Down
Loading
Loading