diff --git a/agent/src/assembly/start.sh b/agent/src/assembly/start.sh index 3180ed19ba..52ad9a4b4c 100755 --- a/agent/src/assembly/start.sh +++ b/agent/src/assembly/start.sh @@ -43,4 +43,5 @@ ${JDK_SPECIFIC_OPTS} \ -Dlogback.configurationFile=com/walmartlabs/concord/agent/logback.xml \ -Dconcord.conf=${CONCORD_CFG_FILE} \ -cp "${BASE_DIR}/lib/*" \ -com.walmartlabs.concord.agent.Main +com.walmartlabs.concord.agent.Main \ +"$@" diff --git a/agent/src/main/java/com/walmartlabs/concord/agent/Main.java b/agent/src/main/java/com/walmartlabs/concord/agent/Main.java index e59761ae8f..74f3fbd5c9 100644 --- a/agent/src/main/java/com/walmartlabs/concord/agent/Main.java +++ b/agent/src/main/java/com/walmartlabs/concord/agent/Main.java @@ -22,7 +22,6 @@ import com.google.inject.Guice; -import com.google.inject.Injector; import org.eclipse.sisu.space.BeanScanning; import org.eclipse.sisu.space.SpaceModule; import org.eclipse.sisu.space.URLClassSpace; @@ -30,12 +29,24 @@ public class Main { - public static void main(String[] args) { - ClassLoader cl = Main.class.getClassLoader(); + public static void main(String[] args) throws Exception { + // auto-wire all modules + var classLoader = Main.class.getClassLoader(); + var modules = new WireModule(new SpaceModule(new URLClassSpace(classLoader), BeanScanning.GLOBAL_INDEX)); + var injector = Guice.createInjector(modules); - Injector injector = Guice.createInjector(new WireModule(new SpaceModule(new URLClassSpace(cl), BeanScanning.GLOBAL_INDEX))); - - Agent a = injector.getInstance(Agent.class); - a.start(); + if (args.length == 1) { + // one-shot mode - read ProcessResponse directly from the command line, execute the process and exit + // the current $PWD will be used as ${workDir} + var oneShotRunner = injector.getInstance(OneShotRunner.class); + oneShotRunner.run(args[0]); + } else if (args.length == 0) { + // agent mode - connect to the server's websocket and handle ProcessResponses + var agent = injector.getInstance(Agent.class); + agent.start(); + } else { + throw new IllegalArgumentException("Specify the entire ProcessResponse JSON as the first argument to run in " + + "the one-shot mode or run without arguments for the default (agent) mode."); + } } } diff --git a/agent/src/main/java/com/walmartlabs/concord/agent/OneShotRunner.java b/agent/src/main/java/com/walmartlabs/concord/agent/OneShotRunner.java new file mode 100644 index 0000000000..d407764375 --- /dev/null +++ b/agent/src/main/java/com/walmartlabs/concord/agent/OneShotRunner.java @@ -0,0 +1,70 @@ +package com.walmartlabs.concord.agent; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Injector; +import com.walmartlabs.concord.agent.cfg.AgentConfiguration; +import com.walmartlabs.concord.agent.guice.WorkerModule; +import com.walmartlabs.concord.common.IOUtils; +import com.walmartlabs.concord.server.queueclient.message.ProcessResponse; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class OneShotRunner { + + private final AgentConfiguration agentCfg; + private final ObjectMapper objectMapper; + private final Injector injector; + + @Inject + public OneShotRunner(AgentConfiguration agentCfg, + ObjectMapper objectMapper, + Injector injector) { + + this.agentCfg = requireNonNull(agentCfg); + this.objectMapper = requireNonNull(objectMapper); + this.injector = requireNonNull(injector); + } + + public void run(String processResponseJson) throws Exception { + var processResponse = objectMapper.readValue(processResponseJson, ProcessResponse.class); + var workDir = IOUtils.createTempDir(agentCfg.getPayloadDir(), "workDir"); + var jobRequest = JobRequest.from(processResponse, workDir); + + var workerModule = new WorkerModule(agentCfg.getAgentId(), jobRequest.getInstanceId(), jobRequest.getSessionToken()); + var workerFactory = injector.createChildInjector(workerModule).getInstance(WorkerFactory.class); + var worker = workerFactory.create(jobRequest, status -> { + }); + + worker.setThrowOnFailure(true); + + try { + worker.run(); + } catch (Exception e) { + System.exit(1); + } + + System.exit(0); + } +} diff --git a/agent/src/main/java/com/walmartlabs/concord/agent/Worker.java b/agent/src/main/java/com/walmartlabs/concord/agent/Worker.java index c5218430fc..f0aea61328 100644 --- a/agent/src/main/java/com/walmartlabs/concord/agent/Worker.java +++ b/agent/src/main/java/com/walmartlabs/concord/agent/Worker.java @@ -50,6 +50,7 @@ public class Worker implements Runnable { private final JobRequest jobRequest; private JobInstance jobInstance; + private boolean throwOnFailure; // TODO find a better place @Inject public Worker(RepositoryManager repositoryManager, @@ -119,6 +120,10 @@ public void cancel() { jobInstance.cancel(); } + public void setThrowOnFailure(boolean throwOnFailure) { + this.throwOnFailure = throwOnFailure; + } + private void handleError(UUID instanceId, Throwable error) { StatusEnum status = StatusEnum.FAILED; @@ -131,6 +136,13 @@ private void handleError(UUID instanceId, Throwable error) { onStatusChange(instanceId, status); log.info("handleError ['{}'] -> done", instanceId); + + if (throwOnFailure) { + if (error instanceof RuntimeException re) { + throw re; + } + throw new RuntimeException(error); + } } private void onStatusChange(UUID instanceId, StatusEnum status) { diff --git a/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/DefaultDependencies.java b/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/DefaultDependencies.java index 4c5d88f5e2..a713cb3da2 100644 --- a/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/DefaultDependencies.java +++ b/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/DefaultDependencies.java @@ -32,7 +32,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; public class DefaultDependencies { @@ -45,9 +44,10 @@ public class DefaultDependencies { public DefaultDependencies() { String path = System.getenv(CFG_KEY); if (path != null) { - try (Stream stream = Files.lines(Paths.get(path))) { - this.dependencies = stream.map(DefaultDependencies::parseUri) - .collect(Collectors.toList()); + try (Stream lines = Files.lines(Paths.get(path))) { + this.dependencies = lines.filter(s -> !s.isBlank()) + .map(DefaultDependencies::parseUri) + .toList(); } catch (IOException e) { throw new RuntimeException(e); } @@ -55,9 +55,13 @@ public DefaultDependencies() { log.info("init -> using external default dependencies configuration: {}", path); } else { try (InputStream is = DefaultDependencies.class.getResourceAsStream("default-dependencies")) { + if (is == null) { + throw new RuntimeException("Can't find com/walmartlabs/concord/agent/executors/runner/default-dependencies. " + + "This is most likely a bug or an issue with the local build and/or classpath."); + } this.dependencies = new BufferedReader(new InputStreamReader(is)).lines() - .map(DefaultDependencies::parseUri) - .collect(Collectors.toList()); + .filter(s -> !s.isBlank()) + .map(DefaultDependencies::parseUri).toList(); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/RunnerJobExecutor.java b/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/RunnerJobExecutor.java index 439584d869..ccfd7c42b4 100644 --- a/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/RunnerJobExecutor.java +++ b/agent/src/main/java/com/walmartlabs/concord/agent/executors/runner/RunnerJobExecutor.java @@ -57,6 +57,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -513,7 +514,12 @@ protected ProcessEntry startOneTime(RunnerJob job, String[] cmd) throws IOExcept // the job's payload directory, contains all files from the state snapshot including imports Path src = job.getPayloadDir(); - Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE); + try { + Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE); + } catch (AtomicMoveNotSupportedException e) { + log.error("startOneTime ['{}'] -> unable to move {} to {} atomically", job.getInstanceId(), src, workDir); + throw e; + } writeInstanceId(job.getInstanceId(), workDir); diff --git a/agent/src/main/resources/concord-agent.conf b/agent/src/main/resources/concord-agent.conf index 218d57348f..69fb5bebdd 100644 --- a/agent/src/main/resources/concord-agent.conf +++ b/agent/src/main/resources/concord-agent.conf @@ -52,6 +52,7 @@ concord-agent { # the value might not longer be valid if the process restarts and # gets a new Agent. workDirBase = "/tmp/concord-agent/workDirs" + workDirBase = ${?WORK_DIR_BASE} # directory to store the process logs # created automatically if not specified @@ -136,6 +137,7 @@ concord-agent { # Generated on Server first start or defined in server.conf at db.changeLogParameters.defaultAgentToken # IMPORTANT! After initialization, create a new token via API and delete initial token apiKey = "" + apiKey = ${?SERVER_API_KEY} # maximum time interval without a heartbeat before the process fails maxNoHeartbeatInterval = "5 minutes" diff --git a/dependency-manager/src/main/java/com/walmartlabs/concord/dependencymanager/DependencyManager.java b/dependency-manager/src/main/java/com/walmartlabs/concord/dependencymanager/DependencyManager.java index a8f3382d52..fd38f9ffa4 100644 --- a/dependency-manager/src/main/java/com/walmartlabs/concord/dependencymanager/DependencyManager.java +++ b/dependency-manager/src/main/java/com/walmartlabs/concord/dependencymanager/DependencyManager.java @@ -373,7 +373,7 @@ private static String getLastPart(URI uri) { if (idx >= 0 && idx + 1 < p.length()) { return p.substring(idx + 1); } - throw new IllegalArgumentException("Invalid dependency URL. Can't get a file name: " + uri); + throw new IllegalArgumentException("Invalid dependency URL. Can't get a file name, uri=" + uri); } private static boolean shouldSkipCache(URI u) { diff --git a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/Main.java b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/Main.java index 7cb6c041dd..755c27fa26 100644 --- a/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/Main.java +++ b/runtime/v2/runner/src/main/java/com/walmartlabs/concord/runtime/v2/runner/Main.java @@ -28,6 +28,7 @@ import com.walmartlabs.concord.imports.NoopImportManager; import com.walmartlabs.concord.runtime.common.ProcessHeartbeat; import com.walmartlabs.concord.runtime.common.StateManager; +import com.walmartlabs.concord.runtime.common.cfg.ApiConfiguration; import com.walmartlabs.concord.runtime.common.cfg.RunnerConfiguration; import com.walmartlabs.concord.runtime.v2.NoopImportsNormalizer; import com.walmartlabs.concord.runtime.v2.ProjectLoaderV2; @@ -85,7 +86,7 @@ public Main(Runner runner, } public static void main(String[] args) throws Exception { - RunnerConfiguration runnerCfg = readRunnerConfiguration(args); + RunnerConfiguration runnerCfg = loadRunnerConfiguration(args); // create the injector with all dependencies and services available before // the actual process' working directory is ready. It allows us to load @@ -116,18 +117,41 @@ public static void main(String[] args) throws Exception { } } - private static RunnerConfiguration readRunnerConfiguration(String[] args) throws IOException { - Path src; + private static RunnerConfiguration loadRunnerConfiguration(String[] args) throws IOException { + RunnerConfiguration result = RunnerConfiguration.builder().build(); + + // load file first, then overlay system env vars + if (args.length > 0) { - src = Paths.get(args[0]); - } else { - throw new IllegalArgumentException("Path to the runner configuration file is required"); + Path src = Paths.get(args[0]); + ObjectMapper om = ObjectMapperProvider.getInstance(); + try (InputStream in = Files.newInputStream(src)) { + result = om.readValue(in, RunnerConfiguration.class); + } } - ObjectMapper om = ObjectMapperProvider.getInstance(); - try (InputStream in = Files.newInputStream(src)) { - return om.readValue(in, RunnerConfiguration.class); + String agentId = System.getenv("RUNNER_AGENT_ID"); + if (agentId != null) { + result = RunnerConfiguration.builder().from(result) + .agentId(agentId) + .build(); } + + String apiBaseUrl = System.getenv("RUNNER_API_BASE_URL"); + if (apiBaseUrl != null) { + result = RunnerConfiguration.builder().from(result) + .api(ApiConfiguration.builder().from(result.api()) + .baseUrl(apiBaseUrl) + .build()) + .build(); + } + + if (result.agentId() == null || result.api() == null || result.api().baseUrl() == null) { + throw new IllegalArgumentException("Specify the path to the runner configuration file or " + + "provide AGENT_ID and API_BASE_URL environment variables."); + } + + return result; } public void execute() throws Exception { diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/ServerResource.java b/server/impl/src/main/java/com/walmartlabs/concord/server/ServerResource.java index c4df164369..8e637875dd 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/ServerResource.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/ServerResource.java @@ -25,7 +25,7 @@ import com.walmartlabs.concord.server.boot.BackgroundTasks; import com.walmartlabs.concord.server.sdk.rest.Resource; import com.walmartlabs.concord.server.task.TaskScheduler; -import com.walmartlabs.concord.server.websocket.WebSocketChannelManager; +import com.walmartlabs.concord.server.websocket.MessageChannelManager; import org.jooq.Configuration; import javax.inject.Inject; @@ -42,18 +42,18 @@ public class ServerResource implements Resource { private final TaskScheduler taskScheduler; private final BackgroundTasks backgroundTasks; - private final WebSocketChannelManager webSocketChannelManager; + private final MessageChannelManager messageChannelManager; private final PingDao pingDao; @Inject public ServerResource(TaskScheduler taskScheduler, BackgroundTasks backgroundTasks, - WebSocketChannelManager webSocketChannelManager, + MessageChannelManager messageChannelManager, PingDao pingDao) { this.taskScheduler = taskScheduler; this.backgroundTasks = backgroundTasks; - this.webSocketChannelManager = webSocketChannelManager; + this.messageChannelManager = messageChannelManager; this.pingDao = pingDao; } @@ -78,7 +78,7 @@ public VersionResponse version() { public void maintenanceMode() { backgroundTasks.stop(); - webSocketChannelManager.shutdown(); + messageChannelManager.shutdown(); taskScheduler.stop(); } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentManager.java b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentManager.java index bb3967b66a..b650c8afe5 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentManager.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentManager.java @@ -23,8 +23,9 @@ import com.walmartlabs.concord.server.queueclient.message.MessageType; import com.walmartlabs.concord.server.queueclient.message.ProcessRequest; import com.walmartlabs.concord.server.sdk.ProcessKey; +import com.walmartlabs.concord.server.websocket.MessageChannel; +import com.walmartlabs.concord.server.websocket.MessageChannelManager; import com.walmartlabs.concord.server.websocket.WebSocketChannel; -import com.walmartlabs.concord.server.websocket.WebSocketChannelManager; import org.jooq.DSLContext; import javax.inject.Inject; @@ -38,23 +39,24 @@ public class AgentManager { private final AgentCommandsDao commandQueue; - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; @Inject public AgentManager(AgentCommandsDao commandQueue, - WebSocketChannelManager channelManager) { + MessageChannelManager channelManager) { this.commandQueue = commandQueue; this.channelManager = channelManager; } public Collection getAvailableAgents() { - Map reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST); + Map reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST); return reqs.entrySet().stream() + .filter(r -> r.getKey() instanceof WebSocketChannel) // TODO a better way .map(r -> AgentWorkerEntry.builder() .channelId(r.getKey().getChannelId()) .agentId(r.getKey().getAgentId()) - .userAgent(r.getKey().getUserAgent()) + .userAgent(((WebSocketChannel) r.getKey()).getUserAgent()) .capabilities(r.getValue().getCapabilities()) .build()) .collect(Collectors.toList()); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentWorkerEntry.java b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentWorkerEntry.java index 4427bb6c92..ddc5740c66 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentWorkerEntry.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/AgentWorkerEntry.java @@ -27,7 +27,6 @@ import javax.annotation.Nullable; import java.util.Map; -import java.util.UUID; @Value.Immutable @JsonInclude(JsonInclude.Include.NON_EMPTY) @@ -35,7 +34,7 @@ @JsonDeserialize(as = ImmutableAgentWorkerEntry.class) public interface AgentWorkerEntry { - UUID channelId(); + String channelId(); @Nullable // backward-compatibility with old agent versions String agentId(); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/dispatcher/Dispatcher.java b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/dispatcher/Dispatcher.java index 53ddcf9f5f..7873682005 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/agent/dispatcher/Dispatcher.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/agent/dispatcher/Dispatcher.java @@ -35,8 +35,8 @@ import com.walmartlabs.concord.server.queueclient.message.CommandResponse; import com.walmartlabs.concord.server.queueclient.message.MessageType; import com.walmartlabs.concord.server.sdk.metrics.WithTimer; -import com.walmartlabs.concord.server.websocket.WebSocketChannel; -import com.walmartlabs.concord.server.websocket.WebSocketChannelManager; +import com.walmartlabs.concord.server.websocket.MessageChannel; +import com.walmartlabs.concord.server.websocket.MessageChannelManager; import org.jooq.Configuration; import org.jooq.DSLContext; import org.slf4j.Logger; @@ -61,12 +61,12 @@ public class Dispatcher extends PeriodicTask { private static final int BATCH_SIZE = 10; private final DispatcherDao dao; - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; @Inject public Dispatcher(AgentConfiguration cfg, DispatcherDao dao, - WebSocketChannelManager channelManager) { + MessageChannelManager channelManager) { super(cfg.getCommandPollDelay().toMillis(), ERROR_DELAY); this.dao = dao; @@ -75,7 +75,7 @@ public Dispatcher(AgentConfiguration cfg, @Override protected boolean performTask() { - Map requests = this.channelManager.getRequests(MessageType.COMMAND_REQUEST); + Map requests = this.channelManager.getRequests(MessageType.COMMAND_REQUEST); if (requests.isEmpty()) { return false; } @@ -148,7 +148,6 @@ private AgentCommand findCandidate(CommandRequest req, List candid } private void sendResponse(Match match, AgentCommand response) { - WebSocketChannel channel = match.request.channel; long correlationId = match.request.request.getCorrelationId(); CommandType type = CommandType.valueOf((String) response.getData().remove(Commands.TYPE_KEY)); @@ -157,7 +156,8 @@ private void sendResponse(Match match, AgentCommand response) { payload.put("type", type.toString()); payload.putAll(response.getData()); - boolean success = channelManager.sendResponse(channel.getChannelId(), CommandResponse.cancel(correlationId, payload)); + MessageChannel channel = match.request.channel; + boolean success = channelManager.sendMessage(channel.getChannelId(), CommandResponse.cancel(correlationId, payload)); if (success) { log.info("sendResponse ['{}'] -> done", correlationId); } else { @@ -223,25 +223,10 @@ private AgentCommand convert(AgentCommandsRecord r) { } } - private static final class Match { + private record Match(Request request, AgentCommand command) { - private final Request request; - private final AgentCommand command; - - private Match(Request request, AgentCommand command) { - this.request = request; - this.command = command; - } } - private static final class Request { - - private final WebSocketChannel channel; - private final CommandRequest request; - - private Request(WebSocketChannel channel, CommandRequest request) { - this.channel = channel; - this.request = request; - } + private record Request(MessageChannel channel, CommandRequest request) { } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueDao.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueDao.java index 6e3bf9a8bc..4506c45592 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueDao.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueDao.java @@ -47,6 +47,7 @@ import org.jooq.impl.DSL; import org.jooq.util.postgres.PostgresDSL; +import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import java.time.Duration; @@ -140,7 +141,7 @@ public void insert(DSLContext tx, ProcessKey processKey, ProcessStatus status, P } } - public void updateAgentId(DSLContext tx, ProcessKey processKey, String agentId, ProcessStatus status) { + public void updateAgentId(DSLContext tx, ProcessKey processKey, @Nullable String agentId, ProcessStatus status) { UUID instanceId = processKey.getInstanceId(); int i = tx.update(PROCESS_QUEUE) diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueManager.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueManager.java index c9b3e1e67a..b12d26790d 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueManager.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/ProcessQueueManager.java @@ -35,6 +35,7 @@ import com.walmartlabs.concord.server.sdk.ProcessStatus; import org.jooq.DSLContext; +import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; import java.time.Duration; @@ -193,7 +194,7 @@ public void updateAgentId(ProcessKey processKey, String agentId, ProcessStatus s /** * Updates the process' agent ID and status. */ - public void updateAgentId(DSLContext tx, ProcessKey processKey, String agentId, ProcessStatus status) { + public void updateAgentId(DSLContext tx, ProcessKey processKey, @Nullable String agentId, ProcessStatus status) { queueDao.updateAgentId(tx, processKey, agentId, status); notifyStatusChange(tx, processKey, status); } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/Dispatcher.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/Dispatcher.java index 4d3f7e0a09..0597dde576 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/Dispatcher.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/Dispatcher.java @@ -44,8 +44,9 @@ import com.walmartlabs.concord.server.sdk.ProcessKey; import com.walmartlabs.concord.server.sdk.ProcessStatus; import com.walmartlabs.concord.server.sdk.metrics.WithTimer; +import com.walmartlabs.concord.server.websocket.MessageChannel; +import com.walmartlabs.concord.server.websocket.MessageChannelManager; import com.walmartlabs.concord.server.websocket.WebSocketChannel; -import com.walmartlabs.concord.server.websocket.WebSocketChannelManager; import org.jooq.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,7 @@ public class Dispatcher extends PeriodicTask { private final Locks locks; private final DispatcherDao dao; - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; private final ProcessLogManager logManager; private final ProcessQueueManager queueManager; private final Set filters; @@ -93,7 +94,7 @@ public class Dispatcher extends PeriodicTask { @Inject public Dispatcher(Locks locks, DispatcherDao dao, - WebSocketChannelManager channelManager, + MessageChannelManager channelManager, ProcessLogManager logManager, ProcessQueueManager queueManager, Set filters, @@ -125,7 +126,7 @@ protected boolean performTask() { // TODO the WebSocketChannelManager business can be replaced with an async jax-rs endpoint and an "inbox" queue // grab the requests w/o responses - Map requests = this.channelManager.getRequests(MessageType.PROCESS_REQUEST); + Map requests = this.channelManager.getRequests(MessageType.PROCESS_REQUEST); if (requests.isEmpty()) { return false; } @@ -206,7 +207,8 @@ private List match(DSLContext tx, List requests) { ProcessQueueEntry candidate = m.response; // mark the process as STARTING - queueManager.updateAgentId(tx, candidate.key(), m.request.channel.getAgentId(), ProcessStatus.STARTING); + String agentId = m.request.channel.getAgentId(); + queueManager.updateAgentId(tx, candidate.key(), agentId, ProcessStatus.STARTING); } return matches; @@ -250,7 +252,6 @@ private boolean pass(DSLContext tx, ProcessQueueEntry e, List } private void sendResponse(Match match) { - WebSocketChannel channel = match.request.channel; long correlationId = match.request.request.getCorrelationId(); ProcessQueueEntry item = match.response; @@ -267,6 +268,7 @@ private void sendResponse(Match match) { ProcessResponse resp = new ProcessResponse(correlationId, sessionTokenCreator.create(item.key()), item.key().getInstanceId(), + item.key().getCreatedAt(), secret != null ? secret.orgName : null, item.repoUrl(), item.repoPath(), @@ -275,17 +277,23 @@ private void sendResponse(Match match) { secret != null ? secret.secretName : null, imports); - if (!channelManager.sendResponse(channel.getChannelId(), resp)) { + MessageChannel channel = match.request.channel; + if (!channelManager.sendMessage(channel.getChannelId(), resp)) { log.warn("sendResponse ['{}'] -> failed", correlationId); } - logManager.info(item.key(), "Acquired by: " + channel.getUserAgent()); + // TODO a way to avoid instanceof here + String userAgent = channel instanceof WebSocketChannel ? ((WebSocketChannel) channel).getUserAgent() : null; + if (userAgent != null) { + logManager.info(item.key(), "Acquired by: " + userAgent); + } } catch (Exception e) { log.error("sendResponse ['{}'] -> failed (instanceId: {})", correlationId, item.key().getInstanceId()); } } @Named + @SuppressWarnings("resource") public static class DispatcherDao extends AbstractDao { private final ConcordObjectMapper objectMapper; @@ -316,20 +324,20 @@ public List next(DSLContext tx, int offset, int limit) { SelectJoinStep> s = tx.select( - q.INSTANCE_ID, - q.CREATED_AT, - q.PROJECT_ID, - orgIdField, - q.INITIATOR_ID, - q.PARENT_INSTANCE_ID, - q.REPO_PATH, - q.REPO_URL, - q.COMMIT_ID, - q.REPO_ID, - q.IMPORTS, - q.REQUIREMENTS, - q.EXCLUSIVE, - q.COMMIT_BRANCH) + q.INSTANCE_ID, + q.CREATED_AT, + q.PROJECT_ID, + orgIdField, + q.INITIATOR_ID, + q.PARENT_INSTANCE_ID, + q.REPO_PATH, + q.REPO_URL, + q.COMMIT_ID, + q.REPO_ID, + q.IMPORTS, + q.REQUIREMENTS, + q.EXCLUSIVE, + q.COMMIT_BRANCH) .from(q); s.where(q.CURRENT_STATUS.eq(ProcessStatus.ENQUEUED.toString()) @@ -369,36 +377,12 @@ public SecretReference getSecretReference(UUID repoId) { } } - private static final class Request { - - private final WebSocketChannel channel; - private final ProcessRequest request; - - private Request(WebSocketChannel channel, ProcessRequest request) { - this.channel = channel; - this.request = request; - } + private record Request(MessageChannel channel, ProcessRequest request) { } - private static final class Match { - - private final Request request; - private final ProcessQueueEntry response; - - private Match(Request request, ProcessQueueEntry response) { - this.request = request; - this.response = response; - } + private record Match(Request request, ProcessQueueEntry response) { } - private static final class SecretReference { - - private final String orgName; - private final String secretName; - - private SecretReference(String orgName, String secretName) { - this.orgName = orgName; - this.secretName = secretName; - } + public record SecretReference(String orgName, String secretName) { } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/ProcessRequestChannel.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/ProcessRequestChannel.java new file mode 100644 index 0000000000..bbae8aa8bd --- /dev/null +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/queue/dispatcher/ProcessRequestChannel.java @@ -0,0 +1,34 @@ +package com.walmartlabs.concord.server.process.queue.dispatcher; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.walmartlabs.concord.server.queueclient.message.ProcessRequest; + +import java.util.List; + +/** + * An extension point for {@link Dispatcher}. + * Allows server plugins to provide their own sources of {@link com.walmartlabs.concord.server.queueclient.message.ProcessRequest}. + */ +public interface ProcessRequestChannel { + + List getRequests(); +} diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/ConcordWebSocketServlet.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/ConcordWebSocketServlet.java index af2048ac81..aac334f165 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/ConcordWebSocketServlet.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/ConcordWebSocketServlet.java @@ -24,7 +24,6 @@ import org.eclipse.jetty.ee8.websocket.server.JettyWebSocketServlet; import org.eclipse.jetty.ee8.websocket.server.JettyWebSocketServletFactory; - import javax.inject.Inject; import javax.servlet.annotation.WebServlet; @@ -33,11 +32,11 @@ public class ConcordWebSocketServlet extends JettyWebSocketServlet { private static final long serialVersionUID = 1L; - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; private final ApiKeyDao apiKeyDao; @Inject - public ConcordWebSocketServlet(WebSocketChannelManager channelManager, ApiKeyDao apiKeyDao) { + public ConcordWebSocketServlet(MessageChannelManager channelManager, ApiKeyDao apiKeyDao) { this.channelManager = channelManager; this.apiKeyDao = apiKeyDao; } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannel.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannel.java new file mode 100644 index 0000000000..99c6278dd3 --- /dev/null +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannel.java @@ -0,0 +1,53 @@ +package com.walmartlabs.concord.server.websocket; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + + +import com.walmartlabs.concord.server.queueclient.message.Message; +import com.walmartlabs.concord.server.queueclient.message.MessageType; + +import java.io.Closeable; +import java.util.Optional; + +public interface MessageChannel extends Closeable { + + /** + * A unique identifier of a channel. + */ + String getChannelId(); + + /** + * An identifier of the agent represented by this MessageChannel. + */ + String getAgentId(); + + /** + * @return {@code true} if the message was sent successfully. + * Returns {@code false} if the message cannot be sent at the moment. + */ + boolean offerMessage(Message msg) throws Exception; + + /** + * @return an empty value if no messages of the specified type can be returned + * at the moment. + */ + Optional getMessage(MessageType messageType) throws Exception; +} diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannelManager.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannelManager.java similarity index 50% rename from server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannelManager.java rename to server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannelManager.java index b277a239c7..d8f6c357d2 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannelManager.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/MessageChannelManager.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,16 +30,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.UUID; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @Named @Singleton -public class WebSocketChannelManager { +public class MessageChannelManager { - private static final Logger log = LoggerFactory.getLogger(WebSocketChannelManager.class); + private static final Logger log = LoggerFactory.getLogger(MessageChannelManager.class); - private final Map channels = new ConcurrentHashMap<>(); + private final Map channels = new ConcurrentHashMap<>(); private volatile boolean isShutdown; @@ -50,76 +50,81 @@ public boolean isShutdown() { public void shutdown() { isShutdown = true; - channels.forEach((uuid, webSocketChannel) -> webSocketChannel.close()); + channels.forEach((uuid, channel) -> { + try { + channel.close(); + } catch (Exception e) { + log.warn("shutdown -> failed on channel {}: {}", channel.getClass(), e.getMessage()); + } + }); log.info("shutdown -> done"); } - public void close(UUID channelId) { - WebSocketChannel channel = channels.remove(channelId); + public void close(String channelId) { + MessageChannel channel = channels.remove(channelId); if (channel == null) { log.warn("close ['{}'] -> channel not found", channelId); return; } - channel.close(); - - log.info("close ['{}'] -> done", channelId); - } - - public void onRequest(UUID channelId, Message message) { - WebSocketChannel channel = channels.get(channelId); - if (channel == null) { - log.warn("request ['{}', '{}'] -> channel not found", channelId, message); - return; + try { + channel.close(); + } catch (Exception e) { + throw new RuntimeException(e); } - channel.onRequest(message); + log.info("close ['{}'] -> done", channelId); } /** * Sends the response and removes the associated request from the queue. */ - public boolean sendResponse(UUID channelId, Message response) { - WebSocketChannel channel = channels.get(channelId); + public boolean sendMessage(String channelId, Message response) { + MessageChannel channel = channels.get(channelId); if (channel == null) { - log.warn("request ['{}', '{}'] -> channel not found", channelId, response); + log.warn("sendResponse ['{}', '{}'] -> channel not found", channelId, response); return false; } - return channel.sendResponse(response); - } - - public boolean pong(UUID channelId) { - WebSocketChannel channel = channels.get(channelId); - if (channel == null) { - log.warn("pong ['{}'] -> channel not found", channelId); + try { + return channel.offerMessage(response); + } catch (Exception e) { + log.warn("sendResponse ['{}', '{}'] -> failed: {}", channelId, response, e.getMessage()); return false; } - - return channel.pong(); } @SuppressWarnings("unchecked") - public Map getRequests(MessageType requestType) { - Map result = new HashMap<>(); + public Map getRequests(MessageType requestType) { + Map result = new HashMap<>(); channels.forEach((channelId, channel) -> { - Message m = channel.getRequest(requestType); - if (m != null) { - result.put(channel, (E)m); + try { + channel.getMessage(requestType).ifPresent(msg -> { + result.put(channel, (E) msg); + }); + } catch (Exception e) { + log.warn("getRequests ['{}'] -> failed on channel {}: {}", requestType, channel.getClass(), e.getMessage()); } }); return result; } - public void add(UUID channelId, WebSocketChannel channel) { - channels.put(channelId, channel); + public void add(MessageChannel channel) { + channels.put(channel.getChannelId(), channel); } public int connectedClientsCount() { return channels.size(); } - public Map getChannels() { + public Map getChannels() { return Collections.unmodifiableMap(channels); } + + @SuppressWarnings("unchecked") + public Optional getChannel(String channelId, Class klass) { + return Optional.ofNullable(channels.get(channelId)) + .filter(c -> klass.isAssignableFrom(c.getClass())) + .map(c -> (T) c); + } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannel.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannel.java index c444dce250..504c382eb3 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannel.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketChannel.java @@ -29,28 +29,29 @@ import java.nio.ByteBuffer; import java.util.Map; -import java.util.UUID; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -public class WebSocketChannel { +public class WebSocketChannel implements MessageChannel { private static final Logger log = LoggerFactory.getLogger(WebSocketChannel.class); - private final UUID channelId; + private final String channelId; private final String agentId; private final String userAgent; private final Session session; private final Map requests = new ConcurrentHashMap<>(); - public WebSocketChannel(UUID channelId, String agentId, Session session, String userAgent) { + public WebSocketChannel(String channelId, String agentId, Session session, String userAgent) { this.channelId = channelId; this.agentId = agentId; this.session = session; this.userAgent = userAgent; } - public UUID getChannelId() { + @Override + public String getChannelId() { return channelId; } @@ -66,18 +67,11 @@ public Session getSession() { return session; } - public void onRequest(Message request) { - Message old = requests.put(request.getCorrelationId(), request); - if (old != null) { - log.error("request ['{}', '{}'] -> duplicate request. closing channel", channelId, request); - close(); - } - } - /** * Sends the response and removes the associated request from the queue. */ - public boolean sendResponse(Message response) { + @Override + public boolean offerMessage(Message response) { if (!session.isOpen()) { log.warn("response ['{}', '{}'] -> session is closed", channelId, response); return false; @@ -98,22 +92,14 @@ public boolean sendResponse(Message response) { } } - public boolean pong() { - try { - session.getRemote().sendPong(ByteBuffer.wrap("pong".getBytes())); - return true; - } catch (Exception e) { - log.error("pong ['{}'] -> error", channelId, e); - return false; - } - } - - public Message getRequest(MessageType requestType) { + @Override + public Optional getMessage(MessageType requestType) { return requests.values().stream() .filter(m -> m.getMessageType() == requestType) - .findAny().orElse(null); + .findAny(); } + @Override public void close() { if (!session.isOpen()) { return; @@ -126,4 +112,20 @@ public void close() { log.warn("close ['{}'] -> error", channelId, e); } } + + void onRequest(Message request) { + Message old = requests.put(request.getCorrelationId(), request); + if (old != null) { + log.error("request ['{}', '{}'] -> duplicate request. closing channel", channelId, request); + close(); + } + } + + void pong() { + try { + session.getRemote().sendPong(ByteBuffer.wrap("pong".getBytes())); + } catch (Exception e) { + log.warn("pong ['{}'] -> error", channelId, e); + } + } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketCreator.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketCreator.java index 75e1ec1599..46df0acfe7 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketCreator.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketCreator.java @@ -39,10 +39,10 @@ public class WebSocketCreator implements JettyWebSocketCreator { private static final Logger log = LoggerFactory.getLogger(WebSocketCreator.class); - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; private final ApiKeyDao apiKeyDao; - public WebSocketCreator(WebSocketChannelManager channelManager, ApiKeyDao apiKeyDao) { + public WebSocketCreator(MessageChannelManager channelManager, ApiKeyDao apiKeyDao) { this.channelManager = channelManager; this.apiKeyDao = apiKeyDao; } @@ -71,7 +71,7 @@ public Object createWebSocket(JettyServerUpgradeRequest req, JettyServerUpgradeR return null; } - UUID channelId = UUID.randomUUID(); + String channelId = "ws-" + UUID.randomUUID(); String agentId = req.getHeader(QueueClient.AGENT_ID); String userAgent = req.getHeader(QueueClient.AGENT_UA); return new WebSocketListener(channelManager, channelId, agentId, userAgent); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketListener.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketListener.java index 76d0b3ade3..5c2e7c8360 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketListener.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketListener.java @@ -27,28 +27,31 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.UUID; + +import static java.util.Objects.requireNonNull; public class WebSocketListener implements org.eclipse.jetty.ee8.websocket.api.WebSocketListener, WebSocketPingPongListener { private static final Logger log = LoggerFactory.getLogger(WebSocketListener.class); - private final WebSocketChannelManager channelManager; + private final MessageChannelManager channelManager; - private final UUID channelId; + private final String channelId; private final String agentId; private final String userAgent; - public WebSocketListener(WebSocketChannelManager channelManager, UUID channelId, String agentId, String userAgent) { - this.channelManager = channelManager; - this.channelId = channelId; - this.agentId = agentId; + public WebSocketListener(MessageChannelManager channelManager, String channelId, String agentId, String userAgent) { + this.channelManager = requireNonNull(channelManager); + this.channelId = requireNonNull(channelId); + this.agentId = requireNonNull(agentId); this.userAgent = sanitize(userAgent); } @Override public void onWebSocketPing(ByteBuffer payload) { - channelManager.pong(channelId); + channelManager.getChannel(channelId, WebSocketChannel.class) + .ifPresentOrElse(WebSocketChannel::pong, + () -> log.warn("onWebSocketPing ['{}'] -> channel not found", channelId)); } @Override @@ -64,8 +67,9 @@ public void onWebSocketBinary(byte[] payload, int offset, int len) { @Override public void onWebSocketText(String message) { - channelManager.onRequest(channelId, MessageSerializer.deserialize(message)); - log.debug("onWebSocketText ['{}', '{}'] -> ok", channelId, message); + channelManager.getChannel(channelId, WebSocketChannel.class) + .ifPresentOrElse(c -> c.onRequest(MessageSerializer.deserialize(message)), + () -> log.warn("onWebSocketText ['{}', '{}'] -> channel not found", channelId, message)); } @Override @@ -76,7 +80,8 @@ public void onWebSocketClose(int statusCode, String reason) { @Override public void onWebSocketConnect(Session session) { - channelManager.add(channelId, new WebSocketChannel(channelId, agentId, session, userAgent)); + var channel = new WebSocketChannel(channelId, agentId, session, userAgent); + channelManager.add(channel); log.debug("onWebSocketConnect ['{}'] -> '{}'", channelId, userAgent); } @@ -85,13 +90,13 @@ public void onWebSocketError(Throwable cause) { log.warn("onWebSocketError ['{}', '{}'] -> error: {}", channelId, userAgent, cause.getMessage()); } - private static String sanitize(String log) { - if (log == null || log.isEmpty()) { - return log; + private static String sanitize(String s) { + if (s == null || s.isEmpty()) { + return s; } - if (log.length() > 128) { - log = log.substring(0, 128) + "...cut"; + if (s.length() > 128) { + s = s.substring(0, 128) + "...cut"; } - return log.replace("\n", "\\n"); + return s.replace("\n", "\\n"); } } diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketMetricsModule.java b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketMetricsModule.java index b6d916439a..9bc8638859 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketMetricsModule.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/websocket/WebSocketMetricsModule.java @@ -34,14 +34,15 @@ public class WebSocketMetricsModule extends AbstractModule { @Override protected void configure() { - Provider channelManagerProvider = getProvider(WebSocketChannelManager.class); + Provider channelManagerProvider = getProvider(MessageChannelManager.class); + @SuppressWarnings("rawtypes") Multibinder gauges = Multibinder.newSetBinder(binder(), GaugeProvider.class); gauges.addBinding().toInstance(createGauge(channelManagerProvider)); gauges.addBinding().toInstance(create(channelManagerProvider)); } - private static GaugeProvider createGauge(Provider channelManagerProvider) { + private static GaugeProvider createGauge(Provider channelManagerProvider) { return new GaugeProvider() { @Override public String name() { @@ -55,8 +56,8 @@ public Gauge gauge() { }; } - private static GaugeProvider create(Provider channelManagerProvider) { - return new GaugeProvider() { + private static GaugeProvider create(Provider channelManagerProvider) { + return new GaugeProvider<>() { @Override public String name() { return "agent-workers-available"; diff --git a/server/queue-client/pom.xml b/server/queue-client/pom.xml index 57500f29e5..fca642afd2 100644 --- a/server/queue-client/pom.xml +++ b/server/queue-client/pom.xml @@ -74,6 +74,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jdk8 + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + com.google.code.findbugs jsr305 diff --git a/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/MessageSerializer.java b/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/MessageSerializer.java index 89bdf4d091..451b1bc10d 100644 --- a/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/MessageSerializer.java +++ b/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/MessageSerializer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.guava.GuavaModule; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.walmartlabs.concord.server.queueclient.message.Message; import com.walmartlabs.concord.server.queueclient.message.MessageType; @@ -58,6 +59,7 @@ private static ObjectMapper createObjectMapper() { ObjectMapper om = new ObjectMapper(); om.registerModule(new GuavaModule()); om.registerModule(new Jdk8Module()); + om.registerModule(new JavaTimeModule()); return om; } diff --git a/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/message/ProcessResponse.java b/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/message/ProcessResponse.java index fed4e59db5..9f0d9514c4 100644 --- a/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/message/ProcessResponse.java +++ b/server/queue-client/src/main/java/com/walmartlabs/concord/server/queueclient/message/ProcessResponse.java @@ -21,15 +21,19 @@ */ import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonProperty; import com.walmartlabs.concord.imports.Imports; +import java.time.OffsetDateTime; import java.util.UUID; public class ProcessResponse extends Message { private final String sessionToken; private final UUID processId; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSX") + private final OffsetDateTime processCreatedAt; private final String orgName; // TODO rename to secretOrgName private final String repoUrl; private final String repoPath; @@ -43,6 +47,7 @@ public ProcessResponse( @JsonProperty("correlationId") long correlationId, @JsonProperty("sessionToken") String sessionToken, @JsonProperty("processId") UUID processId, + @JsonProperty("processCreatedAt") OffsetDateTime processCreatedAt, @JsonProperty("orgName") String orgName, @JsonProperty("repoUrl") String repoUrl, @JsonProperty("repoPath") String repoPath, @@ -56,6 +61,7 @@ public ProcessResponse( setCorrelationId(correlationId); this.sessionToken = sessionToken; this.processId = processId; + this.processCreatedAt = processCreatedAt; this.orgName = orgName; this.repoUrl = repoUrl; this.repoPath = repoPath; @@ -73,6 +79,10 @@ public UUID getProcessId() { return processId; } + public OffsetDateTime getProcessCreatedAt() { + return processCreatedAt; + } + public String getOrgName() { return orgName; } @@ -106,6 +116,7 @@ public String toString() { return "ProcessResponse{" + "sessionToken='" + sessionToken + '\'' + ", processId=" + processId + + ", processCreatedAt=" + processCreatedAt + '\'' + ", orgName='" + orgName + '\'' + ", repoUrl='" + repoUrl + '\'' + ", repoPath='" + repoPath + '\'' + diff --git a/server/queue-client/src/test/java/com/walmartlabs/concord/server/queueclient/MessageSerializerTest.java b/server/queue-client/src/test/java/com/walmartlabs/concord/server/queueclient/MessageSerializerTest.java index 2de04123ea..15058cb42b 100644 --- a/server/queue-client/src/test/java/com/walmartlabs/concord/server/queueclient/MessageSerializerTest.java +++ b/server/queue-client/src/test/java/com/walmartlabs/concord/server/queueclient/MessageSerializerTest.java @@ -26,6 +26,9 @@ import com.walmartlabs.concord.server.queueclient.message.*; import org.junit.jupiter.api.Test; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.UUID; @@ -104,16 +107,18 @@ public void testProcessResponse() { Imports imports = Imports.of(Collections.singletonList(item)); - ProcessResponse r = new ProcessResponse(123, "sesion-token", UUID.randomUUID(), "org-name", "repo-url", "repo-path", "commit-id", "repo-branch", "secret-name", imports); + OffsetDateTime createdAt = OffsetDateTime.now(ZoneId.of("UTC")).minusMinutes(10).truncatedTo(ChronoUnit.MILLIS); // mimic the DB time + ProcessResponse r = new ProcessResponse(123, "sesion-token", UUID.randomUUID(), createdAt, "org-name", "repo-url", "repo-path", "commit-id", "repo-branch", "secret-name", imports); // --- String rSerialized = MessageSerializer.serialize(r); assertNotNull(rSerialized); ProcessResponse rDeserialized = MessageSerializer.deserialize(rSerialized); - assertEquals(r.getMessageType(), MessageType.PROCESS_RESPONSE); + assertEquals(MessageType.PROCESS_RESPONSE, r.getMessageType()); assertEquals(r.getSessionToken(), rDeserialized.getSessionToken()); assertEquals(r.getProcessId(), rDeserialized.getProcessId()); + assertEquals(createdAt, rDeserialized.getProcessCreatedAt()); assertEquals(r.getCorrelationId(), rDeserialized.getCorrelationId()); assertEquals("repo-branch", rDeserialized.getRepoBranch()); }