Skip to content

Commit

Permalink
feat: implemented replica only servers
Browse files Browse the repository at this point in the history
Fixed issue #1208
  • Loading branch information
lvca committed Oct 23, 2023
1 parent 459e14c commit 96621a6
Show file tree
Hide file tree
Showing 20 changed files with 572 additions and 476 deletions.
318 changes: 180 additions & 138 deletions engine/src/main/java/com/arcadedb/GlobalConfiguration.java

Large diffs are not rendered by default.

70 changes: 39 additions & 31 deletions server/src/main/java/com/arcadedb/server/ArcadeDBServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ public enum STATUS {OFFLINE, STARTING, ONLINE, SHUTTING_DOWN}
private final ContextConfiguration configuration;
private final String serverName;
private String hostAddress;
private final boolean testEnabled;
private final boolean replicationLifecycleEventsEnabled;
private FileServerEventLog eventLog;
private final Map<String, ServerPlugin> plugins = new LinkedHashMap<>();
private String serverRootPath;
private HAServer haServer;
private ServerSecurity security;
private HttpServer httpServer;
private final ConcurrentMap<String, ServerDatabase> databases = new ConcurrentHashMap<>();
private final List<TestCallback> testEventListeners = new ArrayList<>();
private final List<ReplicationCallback> testEventListeners = new ArrayList<>();
private volatile STATUS status = STATUS.OFFLINE;
private ServerMetrics serverMetrics = new DefaultServerMetrics();
private ServerMonitor serverMonitor;
Expand All @@ -90,15 +90,15 @@ public ArcadeDBServer() {
serverRootPath = IntegrationUtils.setRootPath(configuration);
loadConfiguration();
this.serverName = configuration.getValueAsString(GlobalConfiguration.SERVER_NAME);
this.testEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST);
this.replicationLifecycleEventsEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST);
init();
}

public ArcadeDBServer(final ContextConfiguration configuration) {
this.configuration = configuration;
serverRootPath = IntegrationUtils.setRootPath(configuration);
this.serverName = configuration.getValueAsString(GlobalConfiguration.SERVER_NAME);
this.testEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST);
this.replicationLifecycleEventsEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST);
init();
}

Expand All @@ -123,13 +123,13 @@ public synchronized void start() {
eventLog.start();

try {
lifecycleEvent(TestCallback.TYPE.SERVER_STARTING, null);
lifecycleEvent(ReplicationCallback.TYPE.SERVER_STARTING, null);
} catch (final Exception e) {
throw new ServerException("Error on starting the server '" + serverName + "'");
}

LogManager.instance()
.log(this, Level.INFO, "Starting ArcadeDB Server in %s mode with plugins %s ...", GlobalConfiguration.SERVER_MODE.getValueAsString(), getPluginNames());
LogManager.instance().log(this, Level.INFO, "Starting ArcadeDB Server in %s mode with plugins %s ...",
GlobalConfiguration.SERVER_MODE.getValueAsString(), getPluginNames());

// START METRICS & CONNECTED JMX REPORTER
if (configuration.getValueAsBoolean(GlobalConfiguration.SERVER_METRICS)) {
Expand Down Expand Up @@ -168,8 +168,8 @@ public synchronized void start() {

final String mode = GlobalConfiguration.SERVER_MODE.getValueAsString();

final String msg = String.format("ArcadeDB Server started in '%s' mode (CPUs=%d MAXRAM=%s)", mode, Runtime.getRuntime().availableProcessors(),
FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()));
final String msg = String.format("ArcadeDB Server started in '%s' mode (CPUs=%d MAXRAM=%s)", mode,
Runtime.getRuntime().availableProcessors(), FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()));
LogManager.instance().log(this, Level.INFO, msg);

getEventLog().reportEvent(ServerEventLog.EVENT_TYPE.INFO, "Server", null, msg);
Expand All @@ -178,7 +178,7 @@ public synchronized void start() {
LogManager.instance().log(this, Level.INFO, "Studio web tool available at http://%s:%d ", hostAddress, httpServer.getPort());

try {
lifecycleEvent(TestCallback.TYPE.SERVER_UP, null);
lifecycleEvent(ReplicationCallback.TYPE.SERVER_UP, null);
} catch (final Exception e) {
stop();
throw new ServerException("Error on starting the server '" + serverName + "'");
Expand All @@ -196,9 +196,10 @@ private void welcomeBanner() {
final String vmVendorVersion = System.getProperty("java.vendor.version");
final String vmVersion = System.getProperty("java.version");
LogManager.instance().log(this, Level.INFO,
"Running on " + osName + " " + osVersion + " - " + (vmName != null ? vmName : "Java") + " " + vmVersion + " " + (vmVendorVersion != null ?
"(" + vmVendorVersion + ")" :
""));
"Running on " + osName + " " + osVersion + " - " + (vmName != null ? vmName : "Java") + " " + vmVersion + " " + (
vmVendorVersion != null ?
"(" + vmVendorVersion + ")" :
""));
}

private Set<String> getPluginNames() {
Expand Down Expand Up @@ -253,7 +254,7 @@ public synchronized void stop() {
serverMonitor.stop();

try {
lifecycleEvent(TestCallback.TYPE.SERVER_SHUTTING_DOWN, null);
lifecycleEvent(ReplicationCallback.TYPE.SERVER_SHUTTING_DOWN, null);
} catch (final Exception e) {
throw new ServerException("Error on stopping the server '" + serverName + "'");
}
Expand All @@ -262,7 +263,8 @@ public synchronized void stop() {

for (final Map.Entry<String, ServerPlugin> pEntry : plugins.entrySet()) {
LogManager.instance().log(this, Level.INFO, "- Stop %s plugin", pEntry.getKey());
CodeUtils.executeIgnoringExceptions(() -> pEntry.getValue().stopService(), "Error on halting '" + pEntry.getKey() + "' plugin", false);
CodeUtils.executeIgnoringExceptions(() -> pEntry.getValue().stopService(),
"Error on halting '" + pEntry.getKey() + "' plugin", false);
}

if (haServer != null)
Expand All @@ -287,7 +289,7 @@ public synchronized void stop() {
LogManager.instance().log(this, Level.INFO, "ArcadeDB Server is down");

try {
lifecycleEvent(TestCallback.TYPE.SERVER_DOWN, null);
lifecycleEvent(ReplicationCallback.TYPE.SERVER_DOWN, null);
} catch (final Exception e) {
throw new ServerException("Error on stopping the server '" + serverName + "'");
}
Expand Down Expand Up @@ -340,7 +342,8 @@ public ServerDatabase createDatabase(final String databaseName, final ComponentF
throw new IllegalArgumentException("Database '" + databaseName + "' already exists");

final DatabaseFactory factory = new DatabaseFactory(
configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName).setAutoTransaction(true);
configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator
+ databaseName).setAutoTransaction(true);

if (factory.exists())
throw new IllegalArgumentException("Database '" + databaseName + "' already exists");
Expand Down Expand Up @@ -387,13 +390,13 @@ public ServerSecurity getSecurity() {
return security;
}

public void registerTestEventListener(final TestCallback callback) {
public void registerTestEventListener(final ReplicationCallback callback) {
testEventListeners.add(callback);
}

public void lifecycleEvent(final TestCallback.TYPE type, final Object object) throws Exception {
if (testEnabled)
for (final TestCallback c : testEventListeners)
public void lifecycleEvent(final ReplicationCallback.TYPE type, final Object object) throws Exception {
if (replicationLifecycleEventsEnabled)
for (final ReplicationCallback c : testEventListeners)
c.onEvent(type, object, this);
}

Expand Down Expand Up @@ -422,13 +425,15 @@ public ServerDatabase getDatabase(final String databaseName, final boolean creat
if (!allowLoad)
throw new DatabaseOperationException("Database '" + databaseName + "' is not available");

final String path = configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName;
final String path =
configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName;

final DatabaseFactory factory = new DatabaseFactory(path).setAutoTransaction(true);

factory.setSecurity(getSecurity());

ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, ComponentFile.MODE.class);
ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE,
ComponentFile.MODE.class);
if (defaultDbMode == null)
defaultDbMode = READ_WRITE;

Expand Down Expand Up @@ -485,7 +490,8 @@ private void loadDatabases() {
private void loadDefaultDatabases() {
final String defaultDatabases = configuration.getValueAsString(GlobalConfiguration.SERVER_DEFAULT_DATABASES);
if (defaultDatabases != null && !defaultDatabases.isEmpty()) {
ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, ComponentFile.MODE.class);
ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE,
ComponentFile.MODE.class);
if (defaultDbMode == null)
defaultDbMode = READ_WRITE;

Expand Down Expand Up @@ -527,7 +533,8 @@ private void loadDefaultDatabases() {
((DatabaseInternal) database).getEmbedded().drop();
databases.remove(dbName);
}
final String dbPath = configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + dbName;
final String dbPath =
configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + dbName;
// new Restore(commandParams, dbPath).restoreDatabase();

try {
Expand Down Expand Up @@ -578,7 +585,8 @@ private void parseCredentials(final String dbName, final String credentials) {
if (credentialParts.length < 2) {
if (!security.existsUser(credential)) {
LogManager.instance()
.log(this, Level.WARNING, "Cannot create user '%s' to access database '%s' because the user does not exist", null, credential, dbName);
.log(this, Level.WARNING, "Cannot create user '%s' to access database '%s' because the user does not exist", null,
credential, dbName);
}
//FIXME: else if user exists, should we give him access to the dbName?
} else {
Expand All @@ -598,9 +606,9 @@ private void parseCredentials(final String dbName, final String credentials) {
security.saveUsers();

} catch (final ServerSecurityException e) {
LogManager.instance()
.log(this, Level.WARNING, "Cannot create database '%s' because the user '%s' already exists with a different password", null, dbName,
userName);
LogManager.instance().log(this, Level.WARNING,
"Cannot create database '%s' because the user '%s' already exists with a different password", null, dbName,
userName);
}
} else {
// UPDATE DB LIST
Expand Down Expand Up @@ -663,8 +671,8 @@ private String assignHostAddress() {

if (configuration.getValueAsBoolean(GlobalConfiguration.HA_K8S)) {
if (hostNameEnvVariable == null) {
LogManager.instance()
.log(this, Level.SEVERE, "Error: HOSTNAME environment variable not found but needed when running inside Kubernetes. The server will be halted");
LogManager.instance().log(this, Level.SEVERE,
"Error: HOSTNAME environment variable not found but needed when running inside Kubernetes. The server will be halted");
stop();
System.exit(1);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package com.arcadedb.server;

public interface TestCallback {
public interface ReplicationCallback {
enum TYPE {
SERVER_STARTING,
SERVER_UP,
Expand Down
Loading

0 comments on commit 96621a6

Please sign in to comment.