Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concord-server: refactor WebSocketChannelManager, allow message sources in plugins #1056

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agent/src/assembly/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ ${JDK_SPECIFIC_OPTS} \
-Dlogback.configurationFile=com/walmartlabs/concord/agent/logback.xml \
-Dconcord.conf=${CONCORD_CFG_FILE} \
-cp "${BASE_DIR}/lib/*" \
com.walmartlabs.concord.agent.Main
com.walmartlabs.concord.agent.Main \
"$@"
25 changes: 18 additions & 7 deletions agent/src/main/java/com/walmartlabs/concord/agent/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,31 @@


import com.google.inject.Guice;
import com.google.inject.Injector;
import org.eclipse.sisu.space.BeanScanning;
import org.eclipse.sisu.space.SpaceModule;
import org.eclipse.sisu.space.URLClassSpace;
import org.eclipse.sisu.wire.WireModule;

public class Main {

public static void main(String[] args) {
ClassLoader cl = Main.class.getClassLoader();
public static void main(String[] args) throws Exception {
// auto-wire all modules
var classLoader = Main.class.getClassLoader();
var modules = new WireModule(new SpaceModule(new URLClassSpace(classLoader), BeanScanning.GLOBAL_INDEX));
var injector = Guice.createInjector(modules);

Injector injector = Guice.createInjector(new WireModule(new SpaceModule(new URLClassSpace(cl), BeanScanning.GLOBAL_INDEX)));

Agent a = injector.getInstance(Agent.class);
a.start();
if (args.length == 1) {
// one-shot mode - read ProcessResponse directly from the command line, execute the process and exit
// the current $PWD will be used as ${workDir}
var oneShotRunner = injector.getInstance(OneShotRunner.class);
oneShotRunner.run(args[0]);
} else if (args.length == 0) {
// agent mode - connect to the server's websocket and handle ProcessResponses
var agent = injector.getInstance(Agent.class);
agent.start();
} else {
throw new IllegalArgumentException("Specify the entire ProcessResponse JSON as the first argument to run in " +
"the one-shot mode or run without arguments for the default (agent) mode.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.walmartlabs.concord.agent;

/*-
* *****
* Concord
* -----
* Copyright (C) 2017 - 2024 Walmart Inc.
* -----
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =====
*/

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.walmartlabs.concord.agent.cfg.AgentConfiguration;
import com.walmartlabs.concord.agent.guice.WorkerModule;
import com.walmartlabs.concord.common.IOUtils;
import com.walmartlabs.concord.server.queueclient.message.ProcessResponse;

import javax.inject.Inject;

import static java.util.Objects.requireNonNull;

public class OneShotRunner {

private final AgentConfiguration agentCfg;
private final ObjectMapper objectMapper;
private final Injector injector;

@Inject
public OneShotRunner(AgentConfiguration agentCfg,
ObjectMapper objectMapper,
Injector injector) {

this.agentCfg = requireNonNull(agentCfg);
this.objectMapper = requireNonNull(objectMapper);
this.injector = requireNonNull(injector);
}

public void run(String processResponseJson) throws Exception {
var processResponse = objectMapper.readValue(processResponseJson, ProcessResponse.class);
var workDir = IOUtils.createTempDir(agentCfg.getPayloadDir(), "workDir");
var jobRequest = JobRequest.from(processResponse, workDir);

var workerModule = new WorkerModule(agentCfg.getAgentId(), jobRequest.getInstanceId(), jobRequest.getSessionToken());
var workerFactory = injector.createChildInjector(workerModule).getInstance(WorkerFactory.class);
var worker = workerFactory.create(jobRequest, status -> {
});

worker.setThrowOnFailure(true);

try {
worker.run();
} catch (Exception e) {
System.exit(1);
}

System.exit(0);
}
}
12 changes: 12 additions & 0 deletions agent/src/main/java/com/walmartlabs/concord/agent/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class Worker implements Runnable {
private final JobRequest jobRequest;

private JobInstance jobInstance;
private boolean throwOnFailure; // TODO find a better place

@Inject
public Worker(RepositoryManager repositoryManager,
Expand Down Expand Up @@ -119,6 +120,10 @@ public void cancel() {
jobInstance.cancel();
}

public void setThrowOnFailure(boolean throwOnFailure) {
this.throwOnFailure = throwOnFailure;
}

private void handleError(UUID instanceId, Throwable error) {
StatusEnum status = StatusEnum.FAILED;

Expand All @@ -131,6 +136,13 @@ private void handleError(UUID instanceId, Throwable error) {

onStatusChange(instanceId, status);
log.info("handleError ['{}'] -> done", instanceId);

if (throwOnFailure) {
if (error instanceof RuntimeException re) {
throw re;
}
throw new RuntimeException(error);
}
}

private void onStatusChange(UUID instanceId, StatusEnum status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class DefaultDependencies {
Expand All @@ -45,19 +44,24 @@ public class DefaultDependencies {
public DefaultDependencies() {
String path = System.getenv(CFG_KEY);
if (path != null) {
try (Stream<String> stream = Files.lines(Paths.get(path))) {
this.dependencies = stream.map(DefaultDependencies::parseUri)
.collect(Collectors.toList());
try (Stream<String> lines = Files.lines(Paths.get(path))) {
this.dependencies = lines.filter(s -> !s.isBlank())
.map(DefaultDependencies::parseUri)
.toList();
} catch (IOException e) {
throw new RuntimeException(e);
}

log.info("init -> using external default dependencies configuration: {}", path);
} else {
try (InputStream is = DefaultDependencies.class.getResourceAsStream("default-dependencies")) {
if (is == null) {
throw new RuntimeException("Can't find com/walmartlabs/concord/agent/executors/runner/default-dependencies. " +
"This is most likely a bug or an issue with the local build and/or classpath.");
}
this.dependencies = new BufferedReader(new InputStreamReader(is)).lines()
.map(DefaultDependencies::parseUri)
.collect(Collectors.toList());
.filter(s -> !s.isBlank())
.map(DefaultDependencies::parseUri).toList();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -513,7 +514,12 @@ protected ProcessEntry startOneTime(RunnerJob job, String[] cmd) throws IOExcept
// the job's payload directory, contains all files from the state snapshot including imports
Path src = job.getPayloadDir();

Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE);
try {
Files.move(src, workDir, StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
log.error("startOneTime ['{}'] -> unable to move {} to {} atomically", job.getInstanceId(), src, workDir);
throw e;
}

writeInstanceId(job.getInstanceId(), workDir);

Expand Down
2 changes: 2 additions & 0 deletions agent/src/main/resources/concord-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ concord-agent {
# the value might not longer be valid if the process restarts and
# gets a new Agent.
workDirBase = "/tmp/concord-agent/workDirs"
workDirBase = ${?WORK_DIR_BASE}

# directory to store the process logs
# created automatically if not specified
Expand Down Expand Up @@ -136,6 +137,7 @@ concord-agent {
# Generated on Server first start or defined in server.conf at db.changeLogParameters.defaultAgentToken
# IMPORTANT! After initialization, create a new token via API and delete initial token
apiKey = ""
apiKey = ${?SERVER_API_KEY}

# maximum time interval without a heartbeat before the process fails
maxNoHeartbeatInterval = "5 minutes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private static String getLastPart(URI uri) {
if (idx >= 0 && idx + 1 < p.length()) {
return p.substring(idx + 1);
}
throw new IllegalArgumentException("Invalid dependency URL. Can't get a file name: " + uri);
throw new IllegalArgumentException("Invalid dependency URL. Can't get a file name, uri=" + uri);
}

private static boolean shouldSkipCache(URI u) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.walmartlabs.concord.imports.NoopImportManager;
import com.walmartlabs.concord.runtime.common.ProcessHeartbeat;
import com.walmartlabs.concord.runtime.common.StateManager;
import com.walmartlabs.concord.runtime.common.cfg.ApiConfiguration;
import com.walmartlabs.concord.runtime.common.cfg.RunnerConfiguration;
import com.walmartlabs.concord.runtime.v2.NoopImportsNormalizer;
import com.walmartlabs.concord.runtime.v2.ProjectLoaderV2;
Expand Down Expand Up @@ -85,7 +86,7 @@ public Main(Runner runner,
}

public static void main(String[] args) throws Exception {
RunnerConfiguration runnerCfg = readRunnerConfiguration(args);
RunnerConfiguration runnerCfg = loadRunnerConfiguration(args);

// create the injector with all dependencies and services available before
// the actual process' working directory is ready. It allows us to load
Expand Down Expand Up @@ -116,18 +117,41 @@ public static void main(String[] args) throws Exception {
}
}

private static RunnerConfiguration readRunnerConfiguration(String[] args) throws IOException {
Path src;
private static RunnerConfiguration loadRunnerConfiguration(String[] args) throws IOException {
RunnerConfiguration result = RunnerConfiguration.builder().build();

// load file first, then overlay system env vars

if (args.length > 0) {
src = Paths.get(args[0]);
} else {
throw new IllegalArgumentException("Path to the runner configuration file is required");
Path src = Paths.get(args[0]);
ObjectMapper om = ObjectMapperProvider.getInstance();
try (InputStream in = Files.newInputStream(src)) {
result = om.readValue(in, RunnerConfiguration.class);
}
}

ObjectMapper om = ObjectMapperProvider.getInstance();
try (InputStream in = Files.newInputStream(src)) {
return om.readValue(in, RunnerConfiguration.class);
String agentId = System.getenv("RUNNER_AGENT_ID");
if (agentId != null) {
result = RunnerConfiguration.builder().from(result)
.agentId(agentId)
.build();
}

String apiBaseUrl = System.getenv("RUNNER_API_BASE_URL");
if (apiBaseUrl != null) {
result = RunnerConfiguration.builder().from(result)
.api(ApiConfiguration.builder().from(result.api())
.baseUrl(apiBaseUrl)
.build())
.build();
}

if (result.agentId() == null || result.api() == null || result.api().baseUrl() == null) {
throw new IllegalArgumentException("Specify the path to the runner configuration file or " +
"provide AGENT_ID and API_BASE_URL environment variables.");
}

return result;
}

public void execute() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.walmartlabs.concord.server.boot.BackgroundTasks;
import com.walmartlabs.concord.server.sdk.rest.Resource;
import com.walmartlabs.concord.server.task.TaskScheduler;
import com.walmartlabs.concord.server.websocket.WebSocketChannelManager;
import com.walmartlabs.concord.server.websocket.MessageChannelManager;
import org.jooq.Configuration;

import javax.inject.Inject;
Expand All @@ -42,18 +42,18 @@ public class ServerResource implements Resource {

private final TaskScheduler taskScheduler;
private final BackgroundTasks backgroundTasks;
private final WebSocketChannelManager webSocketChannelManager;
private final MessageChannelManager messageChannelManager;
private final PingDao pingDao;

@Inject
public ServerResource(TaskScheduler taskScheduler,
BackgroundTasks backgroundTasks,
WebSocketChannelManager webSocketChannelManager,
MessageChannelManager messageChannelManager,
PingDao pingDao) {

this.taskScheduler = taskScheduler;
this.backgroundTasks = backgroundTasks;
this.webSocketChannelManager = webSocketChannelManager;
this.messageChannelManager = messageChannelManager;
this.pingDao = pingDao;
}

Expand All @@ -78,7 +78,7 @@ public VersionResponse version() {
public void maintenanceMode() {
backgroundTasks.stop();

webSocketChannelManager.shutdown();
messageChannelManager.shutdown();
taskScheduler.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import com.walmartlabs.concord.server.queueclient.message.MessageType;
import com.walmartlabs.concord.server.queueclient.message.ProcessRequest;
import com.walmartlabs.concord.server.sdk.ProcessKey;
import com.walmartlabs.concord.server.websocket.MessageChannel;
import com.walmartlabs.concord.server.websocket.MessageChannelManager;
import com.walmartlabs.concord.server.websocket.WebSocketChannel;
import com.walmartlabs.concord.server.websocket.WebSocketChannelManager;
import org.jooq.DSLContext;

import javax.inject.Inject;
Expand All @@ -38,23 +39,24 @@
public class AgentManager {

private final AgentCommandsDao commandQueue;
private final WebSocketChannelManager channelManager;
private final MessageChannelManager channelManager;

@Inject
public AgentManager(AgentCommandsDao commandQueue,
WebSocketChannelManager channelManager) {
MessageChannelManager channelManager) {

this.commandQueue = commandQueue;
this.channelManager = channelManager;
}

public Collection<AgentWorkerEntry> getAvailableAgents() {
Map<WebSocketChannel, ProcessRequest> reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST);
Map<MessageChannel, ProcessRequest> reqs = channelManager.getRequests(MessageType.PROCESS_REQUEST);
return reqs.entrySet().stream()
.filter(r -> r.getKey() instanceof WebSocketChannel) // TODO a better way
.map(r -> AgentWorkerEntry.builder()
.channelId(r.getKey().getChannelId())
.agentId(r.getKey().getAgentId())
.userAgent(r.getKey().getUserAgent())
.userAgent(((WebSocketChannel) r.getKey()).getUserAgent())
.capabilities(r.getValue().getCapabilities())
.build())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

import javax.annotation.Nullable;
import java.util.Map;
import java.util.UUID;

@Value.Immutable
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonSerialize(as = ImmutableAgentWorkerEntry.class)
@JsonDeserialize(as = ImmutableAgentWorkerEntry.class)
public interface AgentWorkerEntry {

UUID channelId();
String channelId();

@Nullable // backward-compatibility with old agent versions
String agentId();
Expand Down
Loading
Loading