Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include the task scheduling errors in schedule response #14754

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -642,21 +641,35 @@ public Map<String, String> scheduleTasks(
@ApiParam(value = "Minion Instance tag to schedule the task explicitly on") @QueryParam("minionInstanceTag")
@Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) : DEFAULT_DATABASE;
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
if (taskType != null) {
// Schedule task for the given task type
List<String> taskNames = tableName != null ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null : StringUtils.join(taskNames, ','));
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
} else {
// Schedule tasks for all task types
Map<String, List<String>> allTaskNames = tableName != null ? _pinotTaskManager.scheduleAllTasksForTable(
DatabaseUtils.translateTableName(tableName, headers), minionInstanceTag)
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
Map<String, String> result = allTaskNames.entrySet().stream().filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
return result.isEmpty() ? null : result;
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
}
response.put("generationErrors", String.join(",", generationErrors));
response.put("schedulingErrors", String.join(",", schedulingErrors));
return response;
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,70 +486,79 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
/**
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForDatabase(@Nullable String database,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false, minionInstanceTag);
}

/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
public synchronized Map<String, List<String>> scheduleAllTasksForTable(String tableNameWithType,
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), minionInstanceTag);
}

/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database,
public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
}

/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns a list of tasks scheduled, or {@code null} if no task is scheduled.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
public synchronized List<String> scheduleTaskForTable(String taskType, String tableNameWithType,
public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
return scheduleTask(taskType, List.of(tableNameWithType), minionInstanceTag);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled.
*/
protected synchronized Map<String, List<String>> scheduleTasks(List<String> tableNamesWithType, boolean isLeader,
@Nullable String minionInstanceTag) {
protected synchronized Map<String, TaskSchedulingInfo> scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

// Scan all table configs to get the tables with tasks enabled
Expand All @@ -565,7 +574,7 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
}

// Generate each type of tasks
Map<String, List<String>> tasksScheduled = new HashMap<>();
Map<String, TaskSchedulingInfo> tasksScheduled = new HashMap<>();
for (Map.Entry<String, List<TableConfig>> entry : enabledTableConfigMap.entrySet()) {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
Expand All @@ -577,16 +586,18 @@ protected synchronized Map<String, List<String>> scheduleTasks(List<String> tabl
addTaskTypeMetricsUpdaterIfNeeded(taskType);
tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader, minionInstanceTag));
} else {
LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", taskType, enabledTables);
tasksScheduled.put(taskType, null);
String message = "Task type: " + taskType + " is not registered, cannot enable it for tables: " + enabledTables;
LOGGER.warn(message);
TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo();
taskSchedulingInfo.addSchedulingError(message);
tasksScheduled.put(taskType, taskSchedulingInfo);
}
}

return tasksScheduled;
}

@Nullable
protected synchronized List<String> scheduleTask(String taskType, List<String> tables,
protected synchronized TaskSchedulingInfo scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);
Expand All @@ -608,17 +619,23 @@ protected synchronized List<String> scheduleTask(String taskType, List<String> t

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
* Returns
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Nullable
protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader, @Nullable String minionInstanceTagForTask) {
TaskSchedulingInfo response = new TaskSchedulingInfo();
String taskType = taskGenerator.getTaskType();
List<String> enabledTables =
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
LOGGER.info("Trying to schedule task type: {}, for tables: {}, isLeader: {}", taskType, enabledTables, isLeader);
if (!isTaskSchedulable(taskType, enabledTables)) {
return null;
response.addSchedulingError("Unable to start scheduling for task type " + taskType
+ " as task queue may be stopped. Please check the task queue status.");
return response;
}
Map<String, List<PinotTaskConfig>> minionInstanceTagToTaskConfigs = new HashMap<>();
for (TableConfig tableConfig : enabledTableConfigs) {
Expand All @@ -645,6 +662,8 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
try (PrintWriter pw = new PrintWriter(errors)) {
e.printStackTrace(pw);
}
response.addGenerationError("Failed to generate tasks for task type " + taskType + " for table " + tableName
+ "\n Reason : " + errors);
long failureRunTimestamp = System.currentTimeMillis();
_taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
Expand Down Expand Up @@ -684,17 +703,17 @@ protected List<String> scheduleTask(PinotTaskGenerator taskGenerator, List<Table
numErrorTasksScheduled++;
LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", taskType,
minionInstanceTag, pinotTaskConfigs, e);
response.addSchedulingError(e.getMessage());
}
}
if (numErrorTasksScheduled > 0) {
LOGGER.warn("Failed to schedule {} tasks for task type type {}", numErrorTasksScheduled, taskType);
// No job got scheduled due to errors
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size()) {
return response;
}
}
// No job got scheduled
if (numErrorTasksScheduled == minionInstanceTagToTaskConfigs.size() || submittedTaskNames.isEmpty()) {
return null;
}
// atleast one job got scheduled
return submittedTaskNames;
return response.setScheduledTaskNames(submittedTaskNames);
}

@Override
Expand Down Expand Up @@ -762,4 +781,36 @@ protected boolean isTaskSchedulable(String taskType, List<String> tables) {
}
return true;
}

public static class TaskSchedulingInfo {
private List<String> _scheduledTaskNames;
private final List<String> _generationErrors = new ArrayList<>();
private final List<String> _schedulingErrors = new ArrayList<>();

@Nullable
public List<String> getScheduledTaskNames() {
return _scheduledTaskNames;
}

public TaskSchedulingInfo setScheduledTaskNames(List<String> scheduledTaskNames) {
_scheduledTaskNames = scheduledTaskNames;
return this;
}

public List<String> getGenerationErrors() {
return _generationErrors;
}

public void addGenerationError(String generationError) {
_generationErrors.add(generationError);
}

public List<String> getSchedulingErrors() {
return _schedulingErrors;
}

public void addSchedulingError(String schedulingError) {
_schedulingErrors.add(schedulingError);
}
}
}
21 changes: 20 additions & 1 deletion pinot-controller/src/main/resources/app/pages/TaskQueueTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import PinotMethodUtils from '../utils/PinotMethodUtils';
import useScheduleAdhocModal from '../components/useScheduleAdhocModal';
import useMinionMetadata from '../components/useMinionMetaData';
import useTaskListing from '../components/useTaskListing';
import { Typography } from '@material-ui/core';

const jsonoptions = {
lineNumbers: true,
Expand Down Expand Up @@ -110,7 +111,25 @@ const TaskQueueTable = (props) => {
if (get(res, `${taskType}`, null) === null) {
dispatch({
type: 'error',
message: `Could not schedule task`,
message: (
<Box>
<Typography>
Could not schedule task
</Typography>
<Typography>
Task generation errors : {get(res, 'generationErrors', 'none')}
</Typography>
<Typography>
Task scheduling errors : {get(res, 'schedulingErrors', 'none')}
</Typography>
</Box>
),
show: true
});
} else if (get(res, `${taskType}`, null) === '') {
dispatch({
type: 'success',
message: `No task to schedule`,
show: true
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
throws Exception {
testValidateTaskGeneration(taskManager -> {
// Validate schedule tasks for table when task queue is in stopped state
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null);
List<String> taskIDs = taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", null)
.getScheduledTaskNames();
assertNull(taskIDs);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import java.util.Map;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class MinionTaskTestUtils {
private MinionTaskTestUtils() {
}

public static void assertNoTaskSchedule(String offlineTableName, String taskType, PinotTaskManager taskManager) {
PinotTaskManager.TaskSchedulingInfo info =
taskManager.scheduleAllTasksForTable(offlineTableName, null).get(taskType);
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
assertNoTaskSchedule(info);
}

public static void assertNoTaskSchedule(String taskType, PinotTaskManager taskManager) {
PinotTaskManager.TaskSchedulingInfo info = taskManager.scheduleTaskForAllTables(taskType, null);
assertNoTaskSchedule(info);
}

public static void assertNoTaskSchedule(PinotTaskManager taskManager) {
Map<String, PinotTaskManager.TaskSchedulingInfo> infoMap = taskManager.scheduleAllTasksForAllTables(null);
infoMap.forEach((key, value) -> assertNoTaskSchedule(value));
}

public static void assertNoTaskSchedule(PinotTaskManager.TaskSchedulingInfo info) {
assertNotNull(info.getScheduledTaskNames());
assertTrue(info.getScheduledTaskNames().isEmpty());
assertNotNull(info.getGenerationErrors());
assertTrue(info.getGenerationErrors().isEmpty());
assertNotNull(info.getSchedulingErrors());
assertTrue(info.getSchedulingErrors().isEmpty());
Comment on lines +51 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure iiuc, but errors can be non-empty too if no task is scheduled due to scheduling / generation errors right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are using this util methods right now to assert that tasks are not being scheduled due to no work or tasks already in progress. In that flow we need to make sure that there are no errors as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm would suggest to move the last 4 assertions to a separate method maybe -- assertNoError kind of. But can be done in a follow-up PR as well.

}
}
Loading
Loading