Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] AdminConsumptionTask fix around timed out or cancelled AdminExecutionTasks #1444

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private AdminConsumptionTask getAdminConsumptionTaskForCluster(String clusterNam
config.getAdminConsumptionCycleTimeoutMs(),
config.getAdminConsumptionMaxWorkerThreadPoolSize(),
pubSubTopicRepository,
pubSubMessageDeserializer,
config.getRegionName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManager;
Expand All @@ -38,12 +37,14 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -154,13 +155,11 @@ public String toString() {
private volatile long failingOffset = UNASSIGNED_VALUE;
private boolean topicExists;
/**
* A {@link Map} of stores to admin operations belonging to each store. The corresponding kafka offset of each admin
* operation is also attached as the first element of the {@link Pair}.
* A {@link Map} of stores to admin operations belonging to each store. The corresponding kafka offset and other
* metadata of each admin operation are included in the {@link AdminOperationWrapper}.
*/
private final Map<String, Queue<AdminOperationWrapper>> storeAdminOperationsMapWithOffset;

private final ConcurrentHashMap<String, AdminExecutionTask> storeToScheduledTask;

/**
* Map of store names that have encountered some sort of exception during consumption to {@link AdminErrorInfo}
* that has the details about the exception and the offset of the problematic admin message.
Expand All @@ -169,7 +168,7 @@ public String toString() {
private final Queue<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> undelegatedRecords;

private final ExecutionIdAccessor executionIdAccessor;
private final ExecutorService executorService;
private ExecutorService executorService;

private TopicManager sourceKafkaClusterTopicManager;

Expand Down Expand Up @@ -239,10 +238,6 @@ public ExecutorService getExecutorService() {
*/
private long lastUpdateTimeForConsumptionOffsetLag = 0;

private final PubSubTopicRepository pubSubTopicRepository;

private final PubSubMessageDeserializer pubSubMessageDeserializer;

/**
* The local region name of the controller.
*/
Expand All @@ -263,7 +258,6 @@ public AdminConsumptionTask(
long processingCycleTimeoutInMs,
int maxWorkerThreadPoolSize,
PubSubTopicRepository pubSubTopicRepository,
PubSubMessageDeserializer pubSubMessageDeserializer,
String regionName) {
this.clusterName = clusterName;
this.topic = AdminTopicUtils.getTopicNameFromClusterName(clusterName);
Expand All @@ -286,7 +280,6 @@ public AdminConsumptionTask(

this.storeAdminOperationsMapWithOffset = new ConcurrentHashMap<>();
this.problematicStores = new ConcurrentHashMap<>();
this.storeToScheduledTask = new ConcurrentHashMap<>();
// since we use an unbounded queue the core pool size is really the max pool size
this.executorService = new ThreadPoolExecutor(
maxWorkerThreadPoolSize,
Expand All @@ -297,8 +290,6 @@ public AdminConsumptionTask(
new DaemonThreadFactory(String.format("Venice-Admin-Execution-Task-%s", clusterName)));
this.undelegatedRecords = new LinkedList<>();
this.stats.setAdminConsumptionFailedOffset(failingOffset);
this.pubSubTopicRepository = pubSubTopicRepository;
this.pubSubMessageDeserializer = pubSubMessageDeserializer;
this.pubSubTopic = pubSubTopicRepository.getTopic(topic);
this.regionName = regionName;

Expand All @@ -311,6 +302,11 @@ public AdminConsumptionTask(
}
}

// For testing purpose only
void setAdminExecutionTaskExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public synchronized void close() throws IOException {
isRunning.getAndSet(false);
Expand Down Expand Up @@ -376,28 +372,29 @@ public void run() {
}

while (!undelegatedRecords.isEmpty()) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = undelegatedRecords.peek();
if (record == null) {
break;
}
try {
long executionId = delegateMessage(undelegatedRecords.peek());
long executionId = delegateMessage(record);
if (executionId == lastDelegatedExecutionId) {
updateLastOffset(undelegatedRecords.peek().getOffset());
updateLastOffset(record.getOffset());
}
undelegatedRecords.remove();
} catch (DataValidationException dve) {
// Very unlikely but DataValidationException could be thrown here.
LOGGER.error(
"Admin consumption task is blocked due to DataValidationException with offset {}",
undelegatedRecords.peek().getOffset(),
record.getOffset(),
dve);
failingOffset = undelegatedRecords.peek().getOffset();
failingOffset = record.getOffset();
stats.recordFailedAdminConsumption();
stats.recordAdminTopicDIVErrorReportCount();
break;
} catch (Exception e) {
LOGGER.error(
"Admin consumption task is blocked due to Exception with offset {}",
undelegatedRecords.peek().getOffset(),
e);
failingOffset = undelegatedRecords.peek().getOffset();
LOGGER.error("Admin consumption task is blocked due to Exception with offset {}", record.getOffset(), e);
failingOffset = record.getOffset();
stats.recordFailedAdminConsumption();
break;
}
Expand Down Expand Up @@ -461,7 +458,6 @@ private void unSubscribe() {
storeAdminOperationsMapWithOffset.clear();
problematicStores.clear();
undelegatedRecords.clear();
storeToScheduledTask.clear();
failingOffset = UNASSIGNED_VALUE;
offsetToSkip = UNASSIGNED_VALUE;
offsetToSkipDIV = UNASSIGNED_VALUE;
Expand All @@ -482,6 +478,8 @@ private void unSubscribe() {
}

/**
* Package private for testing purpose
*
* Delegate work from the {@code storeAdminOperationsMapWithOffset} to the worker threads. Wait for the worker threads
* to complete or when timeout {@code processingCycleTimeoutInMs} is reached. Collect the result of each thread.
* The result can either be success: all given {@link AdminOperation}s were processed successfully or made progress
Expand All @@ -492,39 +490,51 @@ private void unSubscribe() {
private void executeMessagesAndCollectResults() throws InterruptedException {
lastSucceededExecutionIdMap =
new ConcurrentHashMap<>(executionIdAccessor.getLastSucceededExecutionIdMap(clusterName));
/** This set is used to track which store has a task scheduled, so that we schedule at most one per store. */
Set<String> storesWithScheduledTask = new HashSet<>();
/** List of tasks to be executed by the worker threads. */
List<Callable<Void>> tasks = new ArrayList<>();
/**
* Note that tasks and stores are parallel lists (the elements at each index correspond to one another), and both
* lists are also parallel to the results list, declared later in this function.
*/
List<String> stores = new ArrayList<>();
// Create a task for each store that has admin messages pending to be processed.
boolean skipOffsetCommandHasBeenProcessed = false;
for (Map.Entry<String, Queue<AdminOperationWrapper>> entry: storeAdminOperationsMapWithOffset.entrySet()) {
if (!entry.getValue().isEmpty()) {
long adminMessageOffset = entry.getValue().peek().getOffset();
if (checkOffsetToSkip(adminMessageOffset, false)) {
entry.getValue().remove();
String storeName = entry.getKey();
Queue<AdminOperationWrapper> storeQueue = entry.getValue();
if (!storeQueue.isEmpty()) {
AdminOperationWrapper nextOp = storeQueue.peek();
if (nextOp == null) {
continue;
}
long adminMessageOffset = nextOp.getOffset();
if (checkOffsetToSkip(nextOp.getOffset(), false)) {
storeQueue.remove();
skipOffsetCommandHasBeenProcessed = true;
}
AdminExecutionTask newTask = new AdminExecutionTask(
LOGGER,
clusterName,
entry.getKey(),
storeName,
lastSucceededExecutionIdMap,
lastPersistedExecutionId,
entry.getValue(),
storeQueue,
admin,
executionIdAccessor,
isParentController,
stats,
regionName,
storeToScheduledTask);
regionName);
// Check if there is previously created scheduled task still occupying one thread from the pool.
if (storeToScheduledTask.putIfAbsent(entry.getKey(), newTask) == null) {
if (storesWithScheduledTask.add(storeName)) {
// Log the store name and the offset of the task being added into the task list
LOGGER.info(
"Adding admin message from store {} with offset {} to the task list",
entry.getKey(),
storeName,
adminMessageOffset);
tasks.add(newTask);
stores.add(entry.getKey());
stores.add(storeName);
}
}
}
Expand All @@ -550,44 +560,46 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
try {
result.get();
problematicStores.remove(storeName);
if (internalQueuesEmptied && storeAdminOperationsMapWithOffset.containsKey(storeName)
&& !storeAdminOperationsMapWithOffset.get(storeName).isEmpty()) {
internalQueuesEmptied = false;
if (internalQueuesEmptied) {
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
if (storeQueue != null && !storeQueue.isEmpty()) {
internalQueuesEmptied = false;
}
}
} catch (ExecutionException | CancellationException e) {
internalQueuesEmptied = false;
AdminErrorInfo errorInfo = new AdminErrorInfo();
int perStorePendingMessagesCount = storeAdminOperationsMapWithOffset.get(storeName).size();
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
int perStorePendingMessagesCount = storeQueue == null ? 0 : storeQueue.size();
pendingAdminMessagesCount += perStorePendingMessagesCount;
storesWithPendingAdminMessagesCount += perStorePendingMessagesCount > 0 ? 1 : 0;
if (e instanceof CancellationException) {
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, -1L);
long lastSucceededId = lastSucceededExecutionIdMap.getOrDefault(storeName, UNASSIGNED_VALUE);
long newLastSucceededId = newLastSucceededExecutionIdMap.getOrDefault(storeName, UNASSIGNED_VALUE);

if (lastSucceededId == -1) {
if (lastSucceededId == UNASSIGNED_VALUE) {
LOGGER.error("Could not find last successful execution ID for store {}", storeName);
}

if (lastSucceededId == newLastSucceededId && perStorePendingMessagesCount > 0) {
// only mark the store problematic if no progress is made and there are still message(s) in the queue.
errorInfo.exception = new VeniceException(
"Could not finish processing admin message for store " + storeName + " in time");
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
errorInfo.offset = getNextOperationOffsetIfAvailable(storeName);
problematicStores.put(storeName, errorInfo);
LOGGER.warn(errorInfo.exception.getMessage());
}
} else {
errorInfo.exception = e;
errorInfo.offset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
errorInfo.offset = getNextOperationOffsetIfAvailable(storeName);
problematicStores.put(storeName, errorInfo);
}
} catch (Throwable e) {
long errorMsgOffset = -1;
try {
errorMsgOffset = storeAdminOperationsMapWithOffset.get(storeName).peek().getOffset();
} catch (Exception ex) {
LOGGER.error("Could not get the offset of the problematic admin message for store {}", storeName, ex);
long errorMsgOffset = getNextOperationOffsetIfAvailable(storeName);
if (errorMsgOffset == UNASSIGNED_VALUE) {
LOGGER.error("Could not get the offset of the problematic admin message for store {}", storeName);
}

LOGGER.error(
"Unexpected exception thrown while processing admin message for store {} at offset {}",
storeName,
Expand Down Expand Up @@ -632,6 +644,15 @@ private void executeMessagesAndCollectResults() throws InterruptedException {
}
}

/**
* @return the offset of the next enqueued operation for the given store name, or {@link #UNASSIGNED_VALUE} if unavailable.
*/
private long getNextOperationOffsetIfAvailable(String storeName) {
Queue<AdminOperationWrapper> storeQueue = storeAdminOperationsMapWithOffset.get(storeName);
AdminOperationWrapper nextOperation = storeQueue == null ? null : storeQueue.peek();
return nextOperation == null ? UNASSIGNED_VALUE : nextOperation.getOffset();
}

private void internalClose() {
unSubscribe();
executorService.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ public class AdminExecutionTask implements Callable<Void> {
private final ConcurrentHashMap<String, Long> lastSucceededExecutionIdMap;
private final long lastPersistedExecutionId;

private final Map<String, AdminExecutionTask> storeToScheduledTask;

AdminExecutionTask(
Logger LOGGER,
String clusterName,
Expand All @@ -97,8 +95,7 @@ public class AdminExecutionTask implements Callable<Void> {
ExecutionIdAccessor executionIdAccessor,
boolean isParentController,
AdminConsumptionStats stats,
String regionName,
Map<String, AdminExecutionTask> storeToScheduledTask) {
String regionName) {
this.LOGGER = LOGGER;
this.clusterName = clusterName;
this.storeName = storeName;
Expand All @@ -110,7 +107,6 @@ public class AdminExecutionTask implements Callable<Void> {
this.isParentController = isParentController;
this.stats = stats;
this.regionName = regionName;
this.storeToScheduledTask = storeToScheduledTask;
}

@Override
Expand Down Expand Up @@ -159,12 +155,15 @@ public Void call() {
LOGGER.error("Error {}", logMessage, e);
}
throw e;
} finally {
storeToScheduledTask.remove(storeName);
}
return null;
}

// Package private for testing only
String getStoreName() {
return this.storeName;
}

private void processMessage(AdminOperation adminOperation) {
long lastSucceededExecutionId = lastSucceededExecutionIdMap.getOrDefault(storeName, lastPersistedExecutionId);
if (adminOperation.executionId <= lastSucceededExecutionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@


public class AdminOperationWrapper {
private AdminOperation adminOperation;
private long offset;
private long producerTimestamp;
private long localBrokerTimestamp;
private long delegateTimestamp;
private final AdminOperation adminOperation;
private final long offset;
private final long producerTimestamp;
private final long localBrokerTimestamp;
private final long delegateTimestamp;

private Long startProcessingTimestamp = null;

Expand Down
Loading
Loading