Skip to content

Commit

Permalink
Merge branch '7.9.x' into master by armittal-0077
Browse files Browse the repository at this point in the history
  • Loading branch information
semaphore-agent-production[bot] committed Jan 8, 2025
2 parents 19858cd + 1a1c970 commit f6a2b1f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CommandRunner implements Closeable {
private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000;
private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(MAX_STATEMENT_RETRY_MS);
private static final int SHUTDOWN_TIMEOUT_MS = 3 * MAX_STATEMENT_RETRY_MS;
private static final int COMMAND_TOPIC_THRESHOLD_LIMIT = 10000;

private final InteractiveStatementExecutor statementExecutor;
private final CommandQueue commandStore;
Expand Down Expand Up @@ -271,6 +272,10 @@ public void processPriorCommands(final PersistentQueryCleanupImpl queryCleanup)
final List<QueuedCommand> compatibleCommands = checkForIncompatibleCommands(restoreCommands);

LOG.info("Restoring previous state from {} commands.", compatibleCommands.size());
if (compatibleCommands.size() > COMMAND_TOPIC_THRESHOLD_LIMIT) {
LOG.warn("Command topic size exceeded. [commands={}, threshold={}]",
compatibleCommands.size(), COMMAND_TOPIC_THRESHOLD_LIMIT);
}

final Optional<QueuedCommand> terminateCmd =
findTerminateCommand(compatibleCommands, commandDeserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.server.execution;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -75,10 +76,13 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public final class ListSourceExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private static final Logger LOG = LoggerFactory.getLogger(ListSourceExecutor.class);

private ListSourceExecutor() {
}
Expand Down Expand Up @@ -299,6 +303,8 @@ private static SourceDescriptionWithWarnings describeSource(
);
sourceConstraints = getSourceConstraints(name, ksqlExecutionContext.getMetaStore());
} catch (final KafkaException | KafkaResponseGetFailedException e) {
LOG.warn("Failed to Describe. [Error={}, Topic={}, Source={}]",
Throwables.getRootCause(e), dataSource.getKafkaTopicName(), name.text());
warnings.add(new KsqlWarning("Error from Kafka: " + e.getMessage()));
}

Expand Down

0 comments on commit f6a2b1f

Please sign in to comment.