diff --git a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java index 32beb3809e..c62970a0d9 100644 --- a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java +++ b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java @@ -35,27 +35,28 @@ */ public enum GlobalConfiguration { // ENVIRONMENT - DUMP_CONFIG_AT_STARTUP("arcadedb.dumpConfigAtStartup", SCOPE.JVM, "Dumps the configuration at startup", Boolean.class, false, value -> { - //dumpConfiguration(System.out); + DUMP_CONFIG_AT_STARTUP("arcadedb.dumpConfigAtStartup", SCOPE.JVM, "Dumps the configuration at startup", Boolean.class, false, + value -> { + //dumpConfiguration(System.out); - try { - final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - dumpConfiguration(new PrintStream(buffer)); - if (LogManager.instance() != null) - LogManager.instance().log(buffer, Level.WARNING, new String(buffer.toByteArray())); - else - System.out.println(new String(buffer.toByteArray())); - - buffer.close(); - } catch (IOException e) { - System.out.println("Error on printing initial configuration to log (error=" + e + ")"); - } + try { + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + dumpConfiguration(new PrintStream(buffer)); + if (LogManager.instance() != null) + LogManager.instance().log(buffer, Level.WARNING, new String(buffer.toByteArray())); + else + System.out.println(new String(buffer.toByteArray())); - return value; - }), + buffer.close(); + } catch (IOException e) { + System.out.println("Error on printing initial configuration to log (error=" + e + ")"); + } + + return value; + }), - DUMP_METRICS_EVERY("arcadedb.dumpMetricsEvery", SCOPE.JVM, "Dumps the metrics at startup, shutdown and every configurable amount of time (in seconds)", - Long.class, 0, new Callable<>() { + DUMP_METRICS_EVERY("arcadedb.dumpMetricsEvery", SCOPE.JVM, + "Dumps the metrics at startup, shutdown and every configurable amount of time (in seconds)", Long.class, 0, new Callable<>() { @Override public Object call(final Object value) { final long time = (long) value * 1000; @@ -73,69 +74,75 @@ public void run() { } }), - PROFILE("arcadedb.profile", SCOPE.JVM, "Specify the preferred profile among: default, high-performance, low-ram, low-cpu", String.class, "default", + PROFILE("arcadedb.profile", SCOPE.JVM, "Specify the preferred profile among: default, high-performance, low-ram, low-cpu", + String.class, "default", new Callable<>() { + @Override + public Object call(final Object value) { + final String v = value.toString(); + if (v.equalsIgnoreCase("default")) { + // NOT MUCH TO DO HERE, THIS IS THE DEFAULT OPTION + } else if (v.equalsIgnoreCase("high-performance")) { + ASYNC_OPERATIONS_QUEUE_IMPL.setValue("fast"); + + final int cores = Runtime.getRuntime().availableProcessors(); + if (cores > 1) + // USE ONLY HALF OF THE CORES MINUS ONE + ASYNC_WORKER_THREADS.setValue((cores / 2) - 1); + else + ASYNC_WORKER_THREADS.setValue(1); + + } else if (v.equalsIgnoreCase("low-ram")) { + MAX_PAGE_RAM.setValue(16); // 16 MB OF RAM FOR PAGE CACHE + INDEX_COMPACTION_RAM_MB.setValue(16); + INITIAL_PAGE_CACHE_SIZE.setValue(256); + FREE_PAGE_RAM.setValue(100); + ASYNC_OPERATIONS_QUEUE_SIZE.setValue(8); + ASYNC_TX_BATCH_SIZE.setValue(8); + PAGE_FLUSH_QUEUE.setValue(8); + SQL_STATEMENT_CACHE.setValue(16); + HA_REPLICATION_QUEUE_SIZE.setValue(8); + ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard"); + + } else if (v.equalsIgnoreCase("low-cpu")) { + ASYNC_WORKER_THREADS.setValue(1); + ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard"); + } else + throw new IllegalArgumentException("Profile '" + v + "' not available"); + + return value; + } + }), + + TEST("arcadedb.test", SCOPE.JVM, + "Tells if it is running in test mode. This enables the calling of callbacks for testing purpose ", Boolean.class, false), + + MAX_PAGE_RAM("arcadedb.maxPageRAM", SCOPE.DATABASE, "Maximum amount of pages (in MB) to keep in RAM", Long.class, 4 * 1024, new Callable<>() { @Override public Object call(final Object value) { - final String v = value.toString(); - if (v.equalsIgnoreCase("default")) { - // NOT MUCH TO DO HERE, THIS IS THE DEFAULT OPTION - } else if (v.equalsIgnoreCase("high-performance")) { - ASYNC_OPERATIONS_QUEUE_IMPL.setValue("fast"); - - final int cores = Runtime.getRuntime().availableProcessors(); - if (cores > 1) - // USE ONLY HALF OF THE CORES MINUS ONE - ASYNC_WORKER_THREADS.setValue((cores / 2) - 1); + final long maxRAM = ((long) value) * 1024 * 1024; // VALUE IN MB + + if (maxRAM > Runtime.getRuntime().maxMemory() * 80 / 100) { + final long newValue = Runtime.getRuntime().maxMemory() / 2; + if (LogManager.instance() != null) + LogManager.instance() + .log(this, Level.WARNING, "Setting '%s=%s' is > than 80%% of maximum heap (%s). Decreasing it to %s", + MAX_PAGE_RAM.key, FileUtils.getSizeAsString(maxRAM), + FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()), FileUtils.getSizeAsString(newValue)); else - ASYNC_WORKER_THREADS.setValue(1); - - } else if (v.equalsIgnoreCase("low-ram")) { - MAX_PAGE_RAM.setValue(16); // 16 MB OF RAM FOR PAGE CACHE - INDEX_COMPACTION_RAM_MB.setValue(16); - INITIAL_PAGE_CACHE_SIZE.setValue(256); - FREE_PAGE_RAM.setValue(100); - ASYNC_OPERATIONS_QUEUE_SIZE.setValue(8); - ASYNC_TX_BATCH_SIZE.setValue(8); - PAGE_FLUSH_QUEUE.setValue(8); - SQL_STATEMENT_CACHE.setValue(16); - HA_REPLICATION_QUEUE_SIZE.setValue(8); - ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard"); - - } else if (v.equalsIgnoreCase("low-cpu")) { - ASYNC_WORKER_THREADS.setValue(1); - ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard"); - } else - throw new IllegalArgumentException("Profile '" + v + "' not available"); + System.out.println( + String.format("Setting '%s=%s' is > than 80%% of maximum heap (%s). Decreasing it to %s", MAX_PAGE_RAM.key, + FileUtils.getSizeAsString(maxRAM), FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()), + FileUtils.getSizeAsString(newValue))); + return newValue; + } return value; } - }), + }, value -> Runtime.getRuntime().maxMemory() / 4 / 1024 / 1024), - TEST("arcadedb.test", SCOPE.JVM, "Tells if it is running in test mode. This enables the calling of callbacks for testing purpose ", Boolean.class, false), - - MAX_PAGE_RAM("arcadedb.maxPageRAM", SCOPE.DATABASE, "Maximum amount of pages (in MB) to keep in RAM", Long.class, 4 * 1024, new Callable<>() { - @Override - public Object call(final Object value) { - final long maxRAM = ((long) value) * 1024 * 1024; // VALUE IN MB - - if (maxRAM > Runtime.getRuntime().maxMemory() * 80 / 100) { - final long newValue = Runtime.getRuntime().maxMemory() / 2; - if (LogManager.instance() != null) - LogManager.instance().log(this, Level.WARNING, "Setting '%s=%s' is > than 80%% of maximum heap (%s). Decreasing it to %s", MAX_PAGE_RAM.key, - FileUtils.getSizeAsString(maxRAM), FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()), FileUtils.getSizeAsString(newValue)); - else - System.out.println( - String.format("Setting '%s=%s' is > than 80%% of maximum heap (%s). Decreasing it to %s", MAX_PAGE_RAM.key, FileUtils.getSizeAsString(maxRAM), - FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory()), FileUtils.getSizeAsString(newValue))); - - return newValue; - } - return value; - } - }, value -> Runtime.getRuntime().maxMemory() / 4 / 1024 / 1024), - - INITIAL_PAGE_CACHE_SIZE("arcadedb.initialPageCacheSize", SCOPE.DATABASE, "Initial number of entries for page cache", Integer.class, 65535), + INITIAL_PAGE_CACHE_SIZE("arcadedb.initialPageCacheSize", SCOPE.DATABASE, "Initial number of entries for page cache", + Integer.class, 65535), DATE_IMPLEMENTATION("arcadedb.dateImplementation", SCOPE.DATABASE, "Default date implementation to use on deserialization. By default java.util.Date is used, but the following are supported: java.util.Calendar, java.time.LocalDate", @@ -150,7 +157,8 @@ public Object call(final Object value) { return value; }), - DATE_FORMAT("arcadedb.dateFormat", SCOPE.DATABASE, "Default date format using Java SimpleDateFormat syntax", String.class, "yyyy-MM-dd"), + DATE_FORMAT("arcadedb.dateFormat", SCOPE.DATABASE, "Default date format using Java SimpleDateFormat syntax", String.class, + "yyyy-MM-dd"), DATE_TIME_IMPLEMENTATION("arcadedb.dateTimeImplementation", SCOPE.DATABASE, "Default datetime implementation to use on deserialization. By default java.util.Date is used, but the following are supported: java.util.Calendar, java.time.LocalDateTime, java.time.ZonedDateTime", @@ -165,38 +173,46 @@ public Object call(final Object value) { return value; }), - DATE_TIME_FORMAT("arcadedb.dateTimeFormat", SCOPE.DATABASE, "Default date time format using Java SimpleDateFormat syntax", String.class, - "yyyy-MM-dd HH:mm:ss"), + DATE_TIME_FORMAT("arcadedb.dateTimeFormat", SCOPE.DATABASE, "Default date time format using Java SimpleDateFormat syntax", + String.class, "yyyy-MM-dd HH:mm:ss"), TX_WAL("arcadedb.txWAL", SCOPE.DATABASE, "Uses the WAL", Boolean.class, true), TX_WAL_FLUSH("arcadedb.txWalFlush", SCOPE.DATABASE, - "Flushes the WAL on disk at commit time. It can be 0 = no flush, 1 = flush without metadata and 2 = full flush (fsync)", Integer.class, 0), + "Flushes the WAL on disk at commit time. It can be 0 = no flush, 1 = flush without metadata and 2 = full flush (fsync)", + Integer.class, 0), - FREE_PAGE_RAM("arcadedb.freePageRAM", SCOPE.DATABASE, "Percentage (0-100) of memory to free when Page RAM is full", Integer.class, 50), + FREE_PAGE_RAM("arcadedb.freePageRAM", SCOPE.DATABASE, "Percentage (0-100) of memory to free when Page RAM is full", Integer.class, + 50), - TYPE_DEFAULT_BUCKETS("arcadedb.typeDefaultBuckets", SCOPE.DATABASE, "Default number of buckets to create per type", Integer.class, 8), + TYPE_DEFAULT_BUCKETS("arcadedb.typeDefaultBuckets", SCOPE.DATABASE, "Default number of buckets to create per type", Integer.class, + 8), - BUCKET_DEFAULT_PAGE_SIZE("arcadedb.bucketDefaultPageSize", SCOPE.DATABASE, "Default page size in bytes for buckets. Default is 64KB", Integer.class, 65_536), + BUCKET_DEFAULT_PAGE_SIZE("arcadedb.bucketDefaultPageSize", SCOPE.DATABASE, + "Default page size in bytes for buckets. Default is 64KB", Integer.class, 65_536), - ASYNC_WORKER_THREADS("arcadedb.asyncWorkerThreads", SCOPE.DATABASE, "Number of asynchronous worker threads. 0 (default) = available cores minus 1", - Integer.class, Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() - 1 : 1), + ASYNC_WORKER_THREADS("arcadedb.asyncWorkerThreads", SCOPE.DATABASE, + "Number of asynchronous worker threads. 0 (default) = available cores minus 1", Integer.class, + Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() - 1 : 1), ASYNC_OPERATIONS_QUEUE_IMPL("arcadedb.asyncOperationsQueueImpl", SCOPE.DATABASE, "Queue implementation to use between 'standard' and 'fast'. 'standard' consumes less CPU than the 'fast' implementation, but it could be slower with high loads", String.class, "standard", Set.of((Object[]) new String[] { "standard", "fast" })), ASYNC_OPERATIONS_QUEUE_SIZE("arcadedb.asyncOperationsQueueSize", SCOPE.DATABASE, - "Size of the total asynchronous operation queues (it is divided by the number of parallel threads in the pool)", Integer.class, 1024), + "Size of the total asynchronous operation queues (it is divided by the number of parallel threads in the pool)", + Integer.class, 1024), - ASYNC_TX_BATCH_SIZE("arcadedb.asyncTxBatchSize", SCOPE.DATABASE, "Maximum number of operations to commit in batch by async thread", Integer.class, 1024 * 10), + ASYNC_TX_BATCH_SIZE("arcadedb.asyncTxBatchSize", SCOPE.DATABASE, + "Maximum number of operations to commit in batch by async thread", Integer.class, 1024 * 10), - ASYNC_BACK_PRESSURE("arcadedb.asyncBackPressure", SCOPE.DATABASE, "When the asynchronous queue is full at a certain percentage, back pressure is applied", - Integer.class, 0), + ASYNC_BACK_PRESSURE("arcadedb.asyncBackPressure", SCOPE.DATABASE, + "When the asynchronous queue is full at a certain percentage, back pressure is applied", Integer.class, 0), PAGE_FLUSH_QUEUE("arcadedb.pageFlushQueue", SCOPE.DATABASE, "Size of the asynchronous page flush queue", Integer.class, 512), - COMMIT_LOCK_TIMEOUT("arcadedb.commitLockTimeout", SCOPE.DATABASE, "Timeout in ms to lock resources during commit", Long.class, 5000), + COMMIT_LOCK_TIMEOUT("arcadedb.commitLockTimeout", SCOPE.DATABASE, "Timeout in ms to lock resources during commit", Long.class, + 5000), TX_RETRIES("arcadedb.txRetries", SCOPE.DATABASE, "Number of retries in case of MVCC exception", Integer.class, 3), @@ -205,7 +221,8 @@ public Object call(final Object value) { Integer.class, 100), // SQL - SQL_STATEMENT_CACHE("arcadedb.sqlStatementCache", SCOPE.DATABASE, "Maximum number of parsed statements to keep in cache", Integer.class, 300), + SQL_STATEMENT_CACHE("arcadedb.sqlStatementCache", SCOPE.DATABASE, "Maximum number of parsed statements to keep in cache", + Integer.class, 300), // COMMAND COMMAND_TIMEOUT("arcadedb.command.timeout", SCOPE.DATABASE, "Default timeout for commands (in ms)", Long.class, 0), @@ -217,24 +234,27 @@ public Object call(final Object value) { /** * Not in use anymore after removing Gremlin Executor */ - @Deprecated GREMLIN_COMMAND_TIMEOUT("arcadedb.gremlin.timeout", SCOPE.DATABASE, "Default timeout for gremlin commands (in ms)", Long.class, 30_000), + @Deprecated GREMLIN_COMMAND_TIMEOUT("arcadedb.gremlin.timeout", SCOPE.DATABASE, "Default timeout for gremlin commands (in ms)", + Long.class, 30_000), // USER CODE - POLYGLOT_COMMAND_TIMEOUT("arcadedb.polyglotCommand.timeout", SCOPE.DATABASE, "Default timeout for polyglot commands (in ms)", Long.class, 10_000), + POLYGLOT_COMMAND_TIMEOUT("arcadedb.polyglotCommand.timeout", SCOPE.DATABASE, "Default timeout for polyglot commands (in ms)", + Long.class, 10_000), QUERY_MAX_HEAP_ELEMENTS_ALLOWED_PER_OP("arcadedb.queryMaxHeapElementsAllowedPerOp", SCOPE.DATABASE, "Maximum number of elements (records) allowed in a single query for memory-intensive operations (eg. ORDER BY in heap). " + "If exceeded, the query fails with an OCommandExecutionException. Negative number means no limit." - + "This setting is intended as a safety measure against excessive resource consumption from a single query (eg. prevent OutOfMemory)", Long.class, - 500_000), + + "This setting is intended as a safety measure against excessive resource consumption from a single query (eg. prevent OutOfMemory)", + Long.class, 500_000), // CYPHER CYPHER_STATEMENT_CACHE("arcadedb.cypher.statementCache", SCOPE.DATABASE, - "Max number of entries in the cypher statement cache. Use 0 to disable. Caching statements speeds up execution of the same cypher queries", Integer.class, - 1000), + "Max number of entries in the cypher statement cache. Use 0 to disable. Caching statements speeds up execution of the same cypher queries", + Integer.class, 1000), // INDEXES - INDEX_COMPACTION_RAM_MB("arcadedb.indexCompactionRAM", SCOPE.DATABASE, "Maximum amount of RAM to use for index compaction, in MB", Long.class, 300), + INDEX_COMPACTION_RAM_MB("arcadedb.indexCompactionRAM", SCOPE.DATABASE, "Maximum amount of RAM to use for index compaction, in MB", + Long.class, 300), INDEX_COMPACTION_MIN_PAGES_SCHEDULE("arcadedb.indexCompactionMinPagesSchedule", SCOPE.DATABASE, "Minimum number of mutable pages for an index to be schedule for automatic compaction. 0 = disabled", Integer.class, 10), @@ -246,25 +266,29 @@ public Object call(final Object value) { NETWORK_SSL_KEYSTORE("arcadedb.ssl.keyStore", SCOPE.SERVER, "Path where the SSL certificates are stored", String.class, null), - NETWORK_SSL_KEYSTORE_PASSWORD("arcadedb.ssl.keyStorePassword", SCOPE.SERVER, "Password to open the SSL key store", String.class, null), + NETWORK_SSL_KEYSTORE_PASSWORD("arcadedb.ssl.keyStorePassword", SCOPE.SERVER, "Password to open the SSL key store", String.class, + null), NETWORK_SSL_TRUSTSTORE("arcadedb.ssl.trustStore", SCOPE.SERVER, "Path to the SSL trust store", String.class, null), - NETWORK_SSL_TRUSTSTORE_PASSWORD("arcadedb.ssl.trustStorePassword", SCOPE.SERVER, "Password to open the SSL trust store", String.class, null), + NETWORK_SSL_TRUSTSTORE_PASSWORD("arcadedb.ssl.trustStorePassword", SCOPE.SERVER, "Password to open the SSL trust store", + String.class, null), // SERVER SERVER_NAME("arcadedb.server.name", SCOPE.SERVER, "Server name", String.class, Constants.PRODUCT + "_0"), SERVER_ROOT_PASSWORD("arcadedb.server.rootPassword", SCOPE.SERVER, - "Password for root user to use at first startup of the server. Set this to avoid asking the password to the user", String.class, null), + "Password for root user to use at first startup of the server. Set this to avoid asking the password to the user", + String.class, null), - SERVER_MODE("arcadedb.server.mode", SCOPE.SERVER, "Server mode between 'development', 'test' and 'production'", String.class, "development", - Set.of((Object[]) new String[] { "development", "test", "production" })), + SERVER_MODE("arcadedb.server.mode", SCOPE.SERVER, "Server mode between 'development', 'test' and 'production'", String.class, + "development", Set.of((Object[]) new String[] { "development", "test", "production" })), SERVER_METRICS("arcadedb.serverMetrics", SCOPE.SERVER, "True to enable metrics", Boolean.class, true), SERVER_ROOT_PATH("arcadedb.server.rootPath", SCOPE.SERVER, - "Root path in the file system where the server is looking for files. By default is the current directory", String.class, null), + "Root path in the file system where the server is looking for files. By default is the current directory", String.class, + null), SERVER_DATABASE_DIRECTORY("arcadedb.server.databaseDirectory", SCOPE.JVM, "Directory containing the database", String.class, "${arcadedb.server.rootPath}/databases"), @@ -272,8 +296,8 @@ public Object call(final Object value) { SERVER_BACKUP_DIRECTORY("arcadedb.server.backupDirectory", SCOPE.JVM, "Directory containing the backups", String.class, "${arcadedb.server.rootPath}/backups"), - SERVER_DATABASE_LOADATSTARTUP("arcadedb.server.databaseLoadAtStartup", SCOPE.SERVER, "Open all the available databases at server startup", Boolean.class, - true), + SERVER_DATABASE_LOADATSTARTUP("arcadedb.server.databaseLoadAtStartup", SCOPE.SERVER, + "Open all the available databases at server startup", Boolean.class, true), SERVER_DEFAULT_DATABASES("arcadedb.server.defaultDatabases", SCOPE.SERVER, "The default databases created when the server starts. The format is `([(:[:])[,]*])[{import|restore:}][;]*'. Pay attention on using `;`" @@ -285,11 +309,12 @@ public Object call(final Object value) { + "Databases which are newly created will always be opened READ_WRITE.", String.class, "READ_WRITE", Set.of((Object[]) new String[] { "read_only", "read_write" })), - SERVER_PLUGINS("arcadedb.server.plugins", SCOPE.SERVER, "List of server plugins to install. The format to load a plugin is: `:`", - String.class, ""), + SERVER_PLUGINS("arcadedb.server.plugins", SCOPE.SERVER, + "List of server plugins to install. The format to load a plugin is: `:`", String.class, ""), // SERVER HTTP - SERVER_HTTP_INCOMING_HOST("arcadedb.server.httpIncomingHost", SCOPE.SERVER, "TCP/IP host name used for incoming HTTP connections", String.class, "0.0.0.0"), + SERVER_HTTP_INCOMING_HOST("arcadedb.server.httpIncomingHost", SCOPE.SERVER, "TCP/IP host name used for incoming HTTP connections", + String.class, "0.0.0.0"), SERVER_HTTP_INCOMING_PORT("arcadedb.server.httpIncomingPort", SCOPE.SERVER, "TCP/IP port number used for incoming HTTP connections. Specify a single port or a range ``. Default is 2480-2489 to accept a range of ports in case they are occupied.", @@ -300,77 +325,92 @@ public Object call(final Object value) { String.class, "2490-2499"), SERVER_HTTP_TX_EXPIRE_TIMEOUT("arcadedb.server.httpTxExpireTimeout", SCOPE.SERVER, - "Timeout in seconds for a HTTP transaction to expire. This timeout is computed from the latest command against the transaction", Long.class, 30), + "Timeout in seconds for a HTTP transaction to expire. This timeout is computed from the latest command against the transaction", + Long.class, 30), // SERVER WS - SERVER_WS_EVENT_BUS_QUEUE_SIZE("arcadedb.server.eventBusQueueSize", SCOPE.SERVER, "Size of the queue used as a buffer for unserviced database change events.", - Integer.class, 1000), + SERVER_WS_EVENT_BUS_QUEUE_SIZE("arcadedb.server.eventBusQueueSize", SCOPE.SERVER, + "Size of the queue used as a buffer for unserviced database change events.", Integer.class, 1000), // SERVER SECURITY - SERVER_SECURITY_ALGORITHM("arcadedb.server.securityAlgorithm", SCOPE.SERVER, "Default encryption algorithm used for passwords hashing", String.class, - "PBKDF2WithHmacSHA256"), + SERVER_SECURITY_ALGORITHM("arcadedb.server.securityAlgorithm", SCOPE.SERVER, + "Default encryption algorithm used for passwords hashing", String.class, "PBKDF2WithHmacSHA256"), SERVER_SECURITY_SALT_CACHE_SIZE("arcadedb.server.securitySaltCacheSize", SCOPE.SERVER, "Cache size of hashed salt passwords. The cache works as LRU. Use 0 to disable the cache", Integer.class, 64), SERVER_SECURITY_SALT_ITERATIONS("arcadedb.server.saltIterations", SCOPE.SERVER, - "Number of iterations to generate the salt or user password. Changing this setting does not affect stored passwords", Integer.class, 65536), + "Number of iterations to generate the salt or user password. Changing this setting does not affect stored passwords", + Integer.class, 65536), // HA HA_ENABLED("arcadedb.ha.enabled", SCOPE.SERVER, "True if HA is enabled for the current server", Boolean.class, false), - HA_CLUSTER_NAME("arcadedb.ha.clusterName", SCOPE.SERVER, "Cluster name. By default is 'arcadedb'. Useful in case of multiple clusters in the same network", - String.class, Constants.PRODUCT.toLowerCase()), + HA_SERVER_ROLE("arcadedb.ha.serverRole", SCOPE.SERVER, + "Server role between ANY (default) OR REPLICA to configure replica only servers", String.class, "any", + Set.of(new String[] { "any", "replica" })), + + HA_CLUSTER_NAME("arcadedb.ha.clusterName", SCOPE.SERVER, + "Cluster name. By default is 'arcadedb'. Useful in case of multiple clusters in the same network", String.class, + Constants.PRODUCT.toLowerCase()), HA_SERVER_LIST("arcadedb.ha.serverList", SCOPE.SERVER, - "Servers in the cluster as a list of items separated by comma. Example: localhost:2424,192.168.0.1:2424", String.class, ""), + "Servers in the cluster as a list of items separated by comma. Example: localhost:2424,192.168.0.1:2424", + String.class, ""), - HA_QUORUM("arcadedb.ha.quorum", SCOPE.SERVER, "Default quorum between 'none', 1, 2, 3, 'majority' and 'all' servers. Default is majority", String.class, - "majority", Set.of((Object[]) new String[] { "none", "1", "2", "3", "majority", "all" })), + HA_QUORUM("arcadedb.ha.quorum", SCOPE.SERVER, + "Default quorum between 'none', 1, 2, 3, 'majority' and 'all' servers. Default is majority", String.class, "majority", + Set.of(new String[] { "none", "1", "2", "3", "majority", "all" })), HA_QUORUM_TIMEOUT("arcadedb.ha.quorumTimeout", SCOPE.SERVER, "Timeout waiting for the quorum", Long.class, 10000), - HA_REPLICATION_QUEUE_SIZE("arcadedb.ha.replicationQueueSize", SCOPE.SERVER, "Queue size for replicating messages between servers", Integer.class, 512), + HA_REPLICATION_QUEUE_SIZE("arcadedb.ha.replicationQueueSize", SCOPE.SERVER, "Queue size for replicating messages between servers", + Integer.class, 512), // TODO: USE THIS FOR CREATING NEW FILES - HA_REPLICATION_FILE_MAXSIZE("arcadedb.ha.replicationFileMaxSize", SCOPE.SERVER, "Maximum file size for replicating messages between servers. Default is 1GB", - Long.class, 1024 * 1024 * 1024), + HA_REPLICATION_FILE_MAXSIZE("arcadedb.ha.replicationFileMaxSize", SCOPE.SERVER, + "Maximum file size for replicating messages between servers. Default is 1GB", Long.class, 1024 * 1024 * 1024), HA_REPLICATION_CHUNK_MAXSIZE("arcadedb.ha.replicationChunkMaxSize", SCOPE.SERVER, "Maximum channel chunk size for replicating messages between servers. Default is 16777216", Integer.class, 16384 * 1024), HA_REPLICATION_INCOMING_HOST("arcadedb.ha.replicationIncomingHost", SCOPE.SERVER, - "TCP/IP host name used for incoming replication connections. By default is 0.0.0.0 (listens to all the configured network interfaces)", String.class, - "0.0.0.0"), + "TCP/IP host name used for incoming replication connections. By default is 0.0.0.0 (listens to all the configured network interfaces)", + String.class, "0.0.0.0"), - HA_REPLICATION_INCOMING_PORTS("arcadedb.ha.replicationIncomingPorts", SCOPE.SERVER, "TCP/IP port number used for incoming replication connections", - String.class, "2424-2433"), + HA_REPLICATION_INCOMING_PORTS("arcadedb.ha.replicationIncomingPorts", SCOPE.SERVER, + "TCP/IP port number used for incoming replication connections", String.class, "2424-2433"), + // KUBERNETES HA_K8S("arcadedb.ha.k8s", SCOPE.SERVER, "The server is running inside Kubernetes", Boolean.class, false), HA_K8S_DNS_SUFFIX("arcadedb.ha.k8sSuffix", SCOPE.SERVER, - "When running inside Kubernetes use this suffix to reach the other servers. Example: arcadedb.default.svc.cluster.local", String.class, ""), + "When running inside Kubernetes use this suffix to reach the other servers. Example: arcadedb.default.svc.cluster.local", + String.class, ""), // POSTGRES - POSTGRES_PORT("arcadedb.postgres.port", SCOPE.SERVER, "TCP/IP port number used for incoming connections for Postgres plugin. Default is 5432", Integer.class, - 5432), + POSTGRES_PORT("arcadedb.postgres.port", SCOPE.SERVER, + "TCP/IP port number used for incoming connections for Postgres plugin. Default is 5432", Integer.class, 5432), - POSTGRES_HOST("arcadedb.postgres.host", SCOPE.SERVER, "TCP/IP host name used for incoming connections for Postgres plugin. Default is '0.0.0.0'", - String.class, "0.0.0.0"), + POSTGRES_HOST("arcadedb.postgres.host", SCOPE.SERVER, + "TCP/IP host name used for incoming connections for Postgres plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"), - POSTGRES_DEBUG("arcadedb.postgres.debug", SCOPE.SERVER, "Enables the printing of Postgres protocol to the console. Default is false", Boolean.class, false), + POSTGRES_DEBUG("arcadedb.postgres.debug", SCOPE.SERVER, + "Enables the printing of Postgres protocol to the console. Default is false", Boolean.class, false), // REDIS - REDIS_PORT("arcadedb.redis.port", SCOPE.SERVER, "TCP/IP port number used for incoming connections for Redis plugin. Default is 6379", Integer.class, 6379), + REDIS_PORT("arcadedb.redis.port", SCOPE.SERVER, + "TCP/IP port number used for incoming connections for Redis plugin. Default is 6379", Integer.class, 6379), - REDIS_HOST("arcadedb.redis.host", SCOPE.SERVER, "TCP/IP host name used for incoming connections for Redis plugin. Default is '0.0.0.0'", String.class, - "0.0.0.0"), + REDIS_HOST("arcadedb.redis.host", SCOPE.SERVER, + "TCP/IP host name used for incoming connections for Redis plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"), // MONGO - MONGO_PORT("arcadedb.mongo.port", SCOPE.SERVER, "TCP/IP port number used for incoming connections for Mongo plugin. Default is 27017", Integer.class, 27017), + MONGO_PORT("arcadedb.mongo.port", SCOPE.SERVER, + "TCP/IP port number used for incoming connections for Mongo plugin. Default is 27017", Integer.class, 27017), - MONGO_HOST("arcadedb.mongo.host", SCOPE.SERVER, "TCP/IP host name used for incoming connections for Mongo plugin. Default is '0.0.0.0'", String.class, - "0.0.0.0"), + MONGO_HOST("arcadedb.mongo.host", SCOPE.SERVER, + "TCP/IP host name used for incoming connections for Mongo plugin. Default is '0.0.0.0'", String.class, "0.0.0.0"), ; /** @@ -399,7 +439,8 @@ public enum SCOPE {JVM, SERVER, DATABASE} readConfiguration(); } - GlobalConfiguration(final String iKey, final SCOPE scope, final String iDescription, final Class iType, final Object iDefValue) { + GlobalConfiguration(final String iKey, final SCOPE scope, final String iDescription, final Class iType, + final Object iDefValue) { this(iKey, scope, iDescription, iType, iDefValue, null, null, null); } @@ -643,7 +684,8 @@ else if (type.isEnum()) { if (allowed != null && value != null) if (!allowed.contains(value.toString().toLowerCase())) - throw new IllegalArgumentException("Global setting '" + key + "=" + value + "' is not valid. Allowed values are " + allowed); + throw new IllegalArgumentException( + "Global setting '" + key + "=" + value + "' is not valid. Allowed values are " + allowed); } catch (final Exception e) { // RESTORE THE PREVIOUS VALUE diff --git a/server/src/main/java/com/arcadedb/server/ArcadeDBServer.java b/server/src/main/java/com/arcadedb/server/ArcadeDBServer.java index 292f840770..effff0ffb6 100644 --- a/server/src/main/java/com/arcadedb/server/ArcadeDBServer.java +++ b/server/src/main/java/com/arcadedb/server/ArcadeDBServer.java @@ -65,7 +65,7 @@ public enum STATUS {OFFLINE, STARTING, ONLINE, SHUTTING_DOWN} private final ContextConfiguration configuration; private final String serverName; private String hostAddress; - private final boolean testEnabled; + private final boolean replicationLifecycleEventsEnabled; private FileServerEventLog eventLog; private final Map plugins = new LinkedHashMap<>(); private String serverRootPath; @@ -73,7 +73,7 @@ public enum STATUS {OFFLINE, STARTING, ONLINE, SHUTTING_DOWN} private ServerSecurity security; private HttpServer httpServer; private final ConcurrentMap databases = new ConcurrentHashMap<>(); - private final List testEventListeners = new ArrayList<>(); + private final List testEventListeners = new ArrayList<>(); private volatile STATUS status = STATUS.OFFLINE; private ServerMetrics serverMetrics = new DefaultServerMetrics(); private ServerMonitor serverMonitor; @@ -90,7 +90,7 @@ public ArcadeDBServer() { serverRootPath = IntegrationUtils.setRootPath(configuration); loadConfiguration(); this.serverName = configuration.getValueAsString(GlobalConfiguration.SERVER_NAME); - this.testEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST); + this.replicationLifecycleEventsEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST); init(); } @@ -98,7 +98,7 @@ public ArcadeDBServer(final ContextConfiguration configuration) { this.configuration = configuration; serverRootPath = IntegrationUtils.setRootPath(configuration); this.serverName = configuration.getValueAsString(GlobalConfiguration.SERVER_NAME); - this.testEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST); + this.replicationLifecycleEventsEnabled = configuration.getValueAsBoolean(GlobalConfiguration.TEST); init(); } @@ -123,13 +123,13 @@ public synchronized void start() { eventLog.start(); try { - lifecycleEvent(TestCallback.TYPE.SERVER_STARTING, null); + lifecycleEvent(ReplicationCallback.TYPE.SERVER_STARTING, null); } catch (final Exception e) { throw new ServerException("Error on starting the server '" + serverName + "'"); } - LogManager.instance() - .log(this, Level.INFO, "Starting ArcadeDB Server in %s mode with plugins %s ...", GlobalConfiguration.SERVER_MODE.getValueAsString(), getPluginNames()); + LogManager.instance().log(this, Level.INFO, "Starting ArcadeDB Server in %s mode with plugins %s ...", + GlobalConfiguration.SERVER_MODE.getValueAsString(), getPluginNames()); // START METRICS & CONNECTED JMX REPORTER if (configuration.getValueAsBoolean(GlobalConfiguration.SERVER_METRICS)) { @@ -168,8 +168,8 @@ public synchronized void start() { final String mode = GlobalConfiguration.SERVER_MODE.getValueAsString(); - final String msg = String.format("ArcadeDB Server started in '%s' mode (CPUs=%d MAXRAM=%s)", mode, Runtime.getRuntime().availableProcessors(), - FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory())); + final String msg = String.format("ArcadeDB Server started in '%s' mode (CPUs=%d MAXRAM=%s)", mode, + Runtime.getRuntime().availableProcessors(), FileUtils.getSizeAsString(Runtime.getRuntime().maxMemory())); LogManager.instance().log(this, Level.INFO, msg); getEventLog().reportEvent(ServerEventLog.EVENT_TYPE.INFO, "Server", null, msg); @@ -178,7 +178,7 @@ public synchronized void start() { LogManager.instance().log(this, Level.INFO, "Studio web tool available at http://%s:%d ", hostAddress, httpServer.getPort()); try { - lifecycleEvent(TestCallback.TYPE.SERVER_UP, null); + lifecycleEvent(ReplicationCallback.TYPE.SERVER_UP, null); } catch (final Exception e) { stop(); throw new ServerException("Error on starting the server '" + serverName + "'"); @@ -196,9 +196,10 @@ private void welcomeBanner() { final String vmVendorVersion = System.getProperty("java.vendor.version"); final String vmVersion = System.getProperty("java.version"); LogManager.instance().log(this, Level.INFO, - "Running on " + osName + " " + osVersion + " - " + (vmName != null ? vmName : "Java") + " " + vmVersion + " " + (vmVendorVersion != null ? - "(" + vmVendorVersion + ")" : - "")); + "Running on " + osName + " " + osVersion + " - " + (vmName != null ? vmName : "Java") + " " + vmVersion + " " + ( + vmVendorVersion != null ? + "(" + vmVendorVersion + ")" : + "")); } private Set getPluginNames() { @@ -253,7 +254,7 @@ public synchronized void stop() { serverMonitor.stop(); try { - lifecycleEvent(TestCallback.TYPE.SERVER_SHUTTING_DOWN, null); + lifecycleEvent(ReplicationCallback.TYPE.SERVER_SHUTTING_DOWN, null); } catch (final Exception e) { throw new ServerException("Error on stopping the server '" + serverName + "'"); } @@ -262,7 +263,8 @@ public synchronized void stop() { for (final Map.Entry pEntry : plugins.entrySet()) { LogManager.instance().log(this, Level.INFO, "- Stop %s plugin", pEntry.getKey()); - CodeUtils.executeIgnoringExceptions(() -> pEntry.getValue().stopService(), "Error on halting '" + pEntry.getKey() + "' plugin", false); + CodeUtils.executeIgnoringExceptions(() -> pEntry.getValue().stopService(), + "Error on halting '" + pEntry.getKey() + "' plugin", false); } if (haServer != null) @@ -287,7 +289,7 @@ public synchronized void stop() { LogManager.instance().log(this, Level.INFO, "ArcadeDB Server is down"); try { - lifecycleEvent(TestCallback.TYPE.SERVER_DOWN, null); + lifecycleEvent(ReplicationCallback.TYPE.SERVER_DOWN, null); } catch (final Exception e) { throw new ServerException("Error on stopping the server '" + serverName + "'"); } @@ -340,7 +342,8 @@ public ServerDatabase createDatabase(final String databaseName, final ComponentF throw new IllegalArgumentException("Database '" + databaseName + "' already exists"); final DatabaseFactory factory = new DatabaseFactory( - configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName).setAutoTransaction(true); + configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + + databaseName).setAutoTransaction(true); if (factory.exists()) throw new IllegalArgumentException("Database '" + databaseName + "' already exists"); @@ -387,13 +390,13 @@ public ServerSecurity getSecurity() { return security; } - public void registerTestEventListener(final TestCallback callback) { + public void registerTestEventListener(final ReplicationCallback callback) { testEventListeners.add(callback); } - public void lifecycleEvent(final TestCallback.TYPE type, final Object object) throws Exception { - if (testEnabled) - for (final TestCallback c : testEventListeners) + public void lifecycleEvent(final ReplicationCallback.TYPE type, final Object object) throws Exception { + if (replicationLifecycleEventsEnabled) + for (final ReplicationCallback c : testEventListeners) c.onEvent(type, object, this); } @@ -422,13 +425,15 @@ public ServerDatabase getDatabase(final String databaseName, final boolean creat if (!allowLoad) throw new DatabaseOperationException("Database '" + databaseName + "' is not available"); - final String path = configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName; + final String path = + configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + databaseName; final DatabaseFactory factory = new DatabaseFactory(path).setAutoTransaction(true); factory.setSecurity(getSecurity()); - ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, ComponentFile.MODE.class); + ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, + ComponentFile.MODE.class); if (defaultDbMode == null) defaultDbMode = READ_WRITE; @@ -485,7 +490,8 @@ private void loadDatabases() { private void loadDefaultDatabases() { final String defaultDatabases = configuration.getValueAsString(GlobalConfiguration.SERVER_DEFAULT_DATABASES); if (defaultDatabases != null && !defaultDatabases.isEmpty()) { - ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, ComponentFile.MODE.class); + ComponentFile.MODE defaultDbMode = configuration.getValueAsEnum(GlobalConfiguration.SERVER_DEFAULT_DATABASE_MODE, + ComponentFile.MODE.class); if (defaultDbMode == null) defaultDbMode = READ_WRITE; @@ -527,7 +533,8 @@ private void loadDefaultDatabases() { ((DatabaseInternal) database).getEmbedded().drop(); databases.remove(dbName); } - final String dbPath = configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + dbName; + final String dbPath = + configuration.getValueAsString(GlobalConfiguration.SERVER_DATABASE_DIRECTORY) + File.separator + dbName; // new Restore(commandParams, dbPath).restoreDatabase(); try { @@ -578,7 +585,8 @@ private void parseCredentials(final String dbName, final String credentials) { if (credentialParts.length < 2) { if (!security.existsUser(credential)) { LogManager.instance() - .log(this, Level.WARNING, "Cannot create user '%s' to access database '%s' because the user does not exist", null, credential, dbName); + .log(this, Level.WARNING, "Cannot create user '%s' to access database '%s' because the user does not exist", null, + credential, dbName); } //FIXME: else if user exists, should we give him access to the dbName? } else { @@ -598,9 +606,9 @@ private void parseCredentials(final String dbName, final String credentials) { security.saveUsers(); } catch (final ServerSecurityException e) { - LogManager.instance() - .log(this, Level.WARNING, "Cannot create database '%s' because the user '%s' already exists with a different password", null, dbName, - userName); + LogManager.instance().log(this, Level.WARNING, + "Cannot create database '%s' because the user '%s' already exists with a different password", null, dbName, + userName); } } else { // UPDATE DB LIST @@ -663,8 +671,8 @@ private String assignHostAddress() { if (configuration.getValueAsBoolean(GlobalConfiguration.HA_K8S)) { if (hostNameEnvVariable == null) { - LogManager.instance() - .log(this, Level.SEVERE, "Error: HOSTNAME environment variable not found but needed when running inside Kubernetes. The server will be halted"); + LogManager.instance().log(this, Level.SEVERE, + "Error: HOSTNAME environment variable not found but needed when running inside Kubernetes. The server will be halted"); stop(); System.exit(1); return null; diff --git a/server/src/main/java/com/arcadedb/server/TestCallback.java b/server/src/main/java/com/arcadedb/server/ReplicationCallback.java similarity index 96% rename from server/src/main/java/com/arcadedb/server/TestCallback.java rename to server/src/main/java/com/arcadedb/server/ReplicationCallback.java index 902a6766c0..9be3684e9f 100644 --- a/server/src/main/java/com/arcadedb/server/TestCallback.java +++ b/server/src/main/java/com/arcadedb/server/ReplicationCallback.java @@ -18,7 +18,7 @@ */ package com.arcadedb.server; -public interface TestCallback { +public interface ReplicationCallback { enum TYPE { SERVER_STARTING, SERVER_UP, diff --git a/server/src/main/java/com/arcadedb/server/ha/HAServer.java b/server/src/main/java/com/arcadedb/server/ha/HAServer.java index 98458f8e67..fe8dc11f4d 100644 --- a/server/src/main/java/com/arcadedb/server/ha/HAServer.java +++ b/server/src/main/java/com/arcadedb/server/ha/HAServer.java @@ -37,7 +37,7 @@ import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ServerException; import com.arcadedb.server.ServerPlugin; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import com.arcadedb.server.ha.message.ErrorResponse; import com.arcadedb.server.ha.message.HACommand; import com.arcadedb.server.ha.message.HAMessageFactory; @@ -71,8 +71,10 @@ public class HAServer implements ServerPlugin { protected ReplicationLogFile replicationLogFile; private final AtomicReference leaderConnection = new AtomicReference<>(); private LeaderNetworkListener listener; - private final Map messagesWaitingForQuorum = new ConcurrentHashMap<>(1024); - private final Map forwardMessagesWaitingForResponse = new ConcurrentHashMap<>(1024); + private final Map messagesWaitingForQuorum = new ConcurrentHashMap<>( + 1024); + private final Map forwardMessagesWaitingForResponse = new ConcurrentHashMap<>( + 1024); private long lastConfigurationOutputHash = 0; private final Object sendingLock = new Object(); private String serverAddress; @@ -81,6 +83,7 @@ public class HAServer implements ServerPlugin { protected Pair lastElectionVote; private volatile ELECTION_STATUS electionStatus = ELECTION_STATUS.DONE; private boolean started; + private final SERVER_ROLE serverRole; public enum QUORUM { NONE, ONE, TWO, THREE, MAJORITY, ALL @@ -90,6 +93,10 @@ public enum ELECTION_STATUS { DONE, VOTING_FOR_ME, VOTING_FOR_OTHERS, LEADER_WAITING_FOR_QUORUM } + public enum SERVER_ROLE { + ANY, REPLICA + } + private static class QuorumMessage { public final long sentOn = System.currentTimeMillis(); public final CountDownLatch semaphore; @@ -132,6 +139,7 @@ public HAServer(final ArcadeDBServer server, final ContextConfiguration configur this.bucketName = configuration.getValueAsString(GlobalConfiguration.HA_CLUSTER_NAME); this.startedOn = System.currentTimeMillis(); this.replicationPath = server.getRootPath() + "/replication"; + this.serverRole = SERVER_ROLE.valueOf(configuration.getValueAsString(GlobalConfiguration.HA_SERVER_ROLE).toUpperCase()); } @Override @@ -151,10 +159,12 @@ public void startService() { final ReplicationMessage lastMessage = replicationLogFile.getLastMessage(); if (lastMessage != null) { lastDistributedOperationNumber.set(lastMessage.messageNumber); - LogManager.instance().log(this, Level.FINE, "Found an existent replication log. Starting messages from %d", lastMessage.messageNumber); + LogManager.instance() + .log(this, Level.FINE, "Found an existent replication log. Starting messages from %d", lastMessage.messageNumber); } } catch (final IOException e) { - LogManager.instance().log(this, Level.SEVERE, "Error on creating replication file '%s' for remote server '%s'", fileName, server.getServerName()); + LogManager.instance().log(this, Level.SEVERE, "Error on creating replication file '%s' for remote server '%s'", fileName, + server.getServerName()); stopService(); throw new ReplicationLogException("Error on creating replication file '" + fileName + "'", e); } @@ -171,7 +181,9 @@ public void startService() { configuredServers = serverEntries.length; - LogManager.instance().log(this, Level.FINE, "Connecting to servers %s (cluster=%s configuredServers=%d)", cfgServerList, bucketName, configuredServers); + LogManager.instance() + .log(this, Level.FINE, "Connecting to servers %s (cluster=%s configuredServers=%d)", cfgServerList, bucketName, + configuredServers); checkAllOrNoneAreLocalhosts(serverEntries); @@ -187,11 +199,13 @@ public void startService() { if (leaderConnection.get() == null) { final int majorityOfVotes = (configuredServers / 2) + 1; - LogManager.instance().log(this, Level.INFO, "Unable to find any Leader, start election (cluster=%s configuredServers=%d majorityOfVotes=%d)", bucketName, - configuredServers, majorityOfVotes); + LogManager.instance() + .log(this, Level.INFO, "Unable to find any Leader, start election (cluster=%s configuredServers=%d majorityOfVotes=%d)", + bucketName, configuredServers, majorityOfVotes); - // START ELECTION IN BACKGROUND - new Thread(this::startElection).start(); + if (serverRole != SERVER_ROLE.REPLICA) + // START ELECTION IN BACKGROUND + new Thread(this::startElection).start(); } } @@ -294,7 +308,8 @@ public void startElection() { if (parts.length == 1) parts = new String[] { parts[0], DEFAULT_PORT }; - final ChannelBinaryClient channel = createNetworkConnection(parts[0], Integer.parseInt(parts[1]), ReplicationProtocol.COMMAND_VOTE_FOR_ME); + final ChannelBinaryClient channel = createNetworkConnection(parts[0], Integer.parseInt(parts[1]), + ReplicationProtocol.COMMAND_VOTE_FOR_ME); channel.writeLong(electionTurn); channel.writeLong(lastReplicationMessage); channel.flush(); @@ -305,8 +320,8 @@ public void startElection() { // RECEIVED VOTE ++totalVotes; LogManager.instance() - .log(this, Level.INFO, "Received the vote from server %s (turn=%d totalVotes=%d majority=%d)", serverAddressCopy, electionTurn, totalVotes, - majorityOfVotes); + .log(this, Level.INFO, "Received the vote from server %s (turn=%d totalVotes=%d majority=%d)", serverAddressCopy, + electionTurn, totalVotes, majorityOfVotes); } else { final String otherLeaderName = channel.readString(); @@ -319,15 +334,15 @@ public void startElection() { if (vote == 1) { // NO VOTE, IT ALREADY VOTED FOR SOMEBODY ELSE LogManager.instance() - .log(this, Level.INFO, "Did not receive the vote from server %s (turn=%d totalVotes=%d majority=%d itsLeader=%s)", serverAddressCopy, - electionTurn, totalVotes, majorityOfVotes, otherLeaderName); + .log(this, Level.INFO, "Did not receive the vote from server %s (turn=%d totalVotes=%d majority=%d itsLeader=%s)", + serverAddressCopy, electionTurn, totalVotes, majorityOfVotes, otherLeaderName); } else if (vote == 2) { // NO VOTE, THE OTHER NODE HAS A HIGHER LSN, IT WILL START THE ELECTION electionAborted = true; - LogManager.instance() - .log(this, Level.INFO, "Aborting election because server %s has a higher LSN (turn=%d lastReplicationMessage=%d totalVotes=%d majority=%d)", - serverAddressCopy, electionTurn, lastReplicationMessage, totalVotes, majorityOfVotes); + LogManager.instance().log(this, Level.INFO, + "Aborting election because server %s has a higher LSN (turn=%d lastReplicationMessage=%d totalVotes=%d majority=%d)", + serverAddressCopy, electionTurn, lastReplicationMessage, totalVotes, majorityOfVotes); } } @@ -342,8 +357,8 @@ public void startElection() { if (!electionAborted && totalVotes >= majorityOfVotes) { LogManager.instance() - .log(this, Level.INFO, "Current server elected as new $ANSI{green Leader} (turn=%d totalVotes=%d majority=%d)", electionTurn, totalVotes, - majorityOfVotes); + .log(this, Level.INFO, "Current server elected as new $ANSI{green Leader} (turn=%d totalVotes=%d majority=%d)", + electionTurn, totalVotes, majorityOfVotes); sendNewLeadershipToOtherNodes(); break; } @@ -351,12 +366,13 @@ public void startElection() { if (!otherLeaders.isEmpty()) { // TRY TO CONNECT TO THE EXISTENT LEADER LogManager.instance() - .log(this, Level.INFO, "Other leaders found %s (turn=%d totalVotes=%d majority=%d)", otherLeaders, electionTurn, totalVotes, majorityOfVotes); + .log(this, Level.INFO, "Other leaders found %s (turn=%d totalVotes=%d majority=%d)", otherLeaders, electionTurn, + totalVotes, majorityOfVotes); for (final Map.Entry entry : otherLeaders.entrySet()) { if (entry.getValue() >= majorityOfVotes) { LogManager.instance() - .log(this, Level.INFO, "Trying to connect to the existing leader '%s' (turn=%d totalVotes=%d majority=%d)", entry.getKey(), electionTurn, - entry.getValue(), majorityOfVotes); + .log(this, Level.INFO, "Trying to connect to the existing leader '%s' (turn=%d totalVotes=%d majority=%d)", + entry.getKey(), electionTurn, entry.getValue(), majorityOfVotes); if (!isCurrentServer(entry.getKey()) && connectToLeader(entry.getKey(), null)) break; } @@ -372,8 +388,8 @@ public void startElection() { timeout *= 3; LogManager.instance() - .log(this, Level.INFO, "Not able to be elected as Leader, waiting %dms and retry (turn=%d totalVotes=%d majority=%d)", timeout, electionTurn, - totalVotes, majorityOfVotes); + .log(this, Level.INFO, "Not able to be elected as Leader, waiting %dms and retry (turn=%d totalVotes=%d majority=%d)", + timeout, electionTurn, totalVotes, majorityOfVotes); Thread.sleep(timeout); } catch (final InterruptedException e) { @@ -394,7 +410,8 @@ private boolean checkForExistentLeaderConnection(final long electionTurn) { if (lc != null) { // I AM A REPLICA, NO LEADER ELECTION IS NEEDED LogManager.instance() - .log(this, Level.INFO, "Abort election process, a Leader (%s) has been already found (turn=%d)", lc.getRemoteServerName(), electionTurn); + .log(this, Level.INFO, "Abort election process, a Leader (%s) has been already found (turn=%d)", lc.getRemoteServerName(), + electionTurn); return true; } return false; @@ -405,7 +422,8 @@ private void sendNewLeadershipToOtherNodes() { setElectionStatus(ELECTION_STATUS.LEADER_WAITING_FOR_QUORUM); - LogManager.instance().log(this, Level.INFO, "Contacting all the servers for the new leadership (turn=%d)...", lastElectionVote.getFirst()); + LogManager.instance() + .log(this, Level.INFO, "Contacting all the servers for the new leadership (turn=%d)...", lastElectionVote.getFirst()); for (final String serverAddress : serverAddressList) { if (isCurrentServer(serverAddress)) @@ -417,7 +435,8 @@ private void sendNewLeadershipToOtherNodes() { LogManager.instance().log(this, Level.INFO, "- Sending new Leader to server '%s'...", serverAddress); - final ChannelBinaryClient channel = createNetworkConnection(parts[0], Integer.parseInt(parts[1]), ReplicationProtocol.COMMAND_ELECTION_COMPLETED); + final ChannelBinaryClient channel = createNetworkConnection(parts[0], Integer.parseInt(parts[1]), + ReplicationProtocol.COMMAND_ELECTION_COMPLETED); channel.writeLong(lastElectionVote.getFirst()); channel.flush(); @@ -456,7 +475,8 @@ public void setReplicaStatus(final String remoteServerName, final boolean online c.setStatus(online ? Leader2ReplicaNetworkExecutor.STATUS.ONLINE : Leader2ReplicaNetworkExecutor.STATUS.OFFLINE); try { - server.lifecycleEvent(online ? TestCallback.TYPE.REPLICA_ONLINE : TestCallback.TYPE.REPLICA_OFFLINE, remoteServerName); + server.lifecycleEvent(online ? ReplicationCallback.TYPE.REPLICA_ONLINE : ReplicationCallback.TYPE.REPLICA_OFFLINE, + remoteServerName); } catch (final Exception e) { // IGNORE IT } @@ -615,7 +635,8 @@ else if (forwardedMessage.error.exceptionClass.equals(TransactionException.class else if (forwardedMessage.error.exceptionClass.equals(QuorumNotReachedException.class.getName())) throw new QuorumNotReachedException(forwardedMessage.error.exceptionMessage); - LogManager.instance().log(this, Level.WARNING, "Unexpected error received from forwarding a transaction to the Leader"); + LogManager.instance() + .log(this, Level.WARNING, "Unexpected error received from forwarding a transaction to the Leader"); throw new ReplicationException("Unexpected error received from forwarding a transaction to the Leader"); } @@ -625,7 +646,8 @@ else if (forwardedMessage.error.exceptionClass.equals(QuorumNotReachedException. } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - throw new ReplicationException("No response received from the Leader for request " + opNumber + " because the thread was interrupted"); + throw new ReplicationException( + "No response received from the Leader for request " + opNumber + " because the thread was interrupted"); } } else forwardedMessage.result = new InternalResultSet(new ResultInternal(Map.of("operation", "forwarded to the leader"))); @@ -660,7 +682,8 @@ public void sendCommandToReplicasNoLog(final HACommand command) { replicaConnection.enqueueMessage(buffer.slice(0)); } catch (final ReplicationException e) { // REMOVE THE REPLICA - LogManager.instance().log(this, Level.SEVERE, "Replica '%s' does not respond, setting it as OFFLINE", replicaConnection.getRemoteServerName()); + LogManager.instance().log(this, Level.SEVERE, "Replica '%s' does not respond, setting it as OFFLINE", + replicaConnection.getRemoteServerName()); setReplicaStatus(replicaConnection.getRemoteServerName(), false); } } @@ -702,7 +725,8 @@ public List sendCommandToReplicasWithQuorum(final HACommand command, fin // SEND THE REQUEST TO ALL THE REPLICAS final List replicas = new ArrayList<>(replicaConnections.values()); - LogManager.instance().log(this, Level.FINE, "Sending request %d '%s' to %s (quorum=%d)", opNumber, command, replicas, quorum); + LogManager.instance() + .log(this, Level.FINE, "Sending request %d '%s' to %s (quorum=%d)", opNumber, command, replicas, quorum); for (final Leader2ReplicaNetworkExecutor replicaConnection : replicas) { try { @@ -715,8 +739,8 @@ public List sendCommandToReplicasWithQuorum(final HACommand command, fin } } catch (final ReplicationException e) { - LogManager.instance() - .log(this, Level.SEVERE, "Error on replicating message %d to replica '%s' (error=%s)", opNumber, replicaConnection.getRemoteServerName(), e); + LogManager.instance().log(this, Level.SEVERE, "Error on replicating message %d to replica '%s' (error=%s)", opNumber, + replicaConnection.getRemoteServerName(), e); // REMOVE THE REPLICA AND EXCLUDE IT FROM THE QUORUM if (quorumMessage != null) @@ -727,8 +751,10 @@ public List sendCommandToReplicasWithQuorum(final HACommand command, fin if (sent < quorum - 1) { checkCurrentNodeIsTheLeader(); - LogManager.instance().log(this, Level.WARNING, "Quorum " + quorum + " not reached because only " + (sent + 1) + " server(s) are online"); - throw new QuorumNotReachedException("Quorum " + quorum + " not reached because only " + (sent + 1) + " server(s) are online"); + LogManager.instance() + .log(this, Level.WARNING, "Quorum " + quorum + " not reached because only " + (sent + 1) + " server(s) are online"); + throw new QuorumNotReachedException( + "Quorum " + quorum + " not reached because only " + (sent + 1) + " server(s) are online"); } if (quorumMessage != null) { @@ -743,13 +769,16 @@ public List sendCommandToReplicasWithQuorum(final HACommand command, fin checkCurrentNodeIsTheLeader(); - LogManager.instance().log(this, Level.WARNING, "Timeout waiting for quorum (%d) to be reached for request %d", quorum, opNumber); - throw new QuorumNotReachedException("Timeout waiting for quorum (" + quorum + ") to be reached for request " + opNumber); + LogManager.instance() + .log(this, Level.WARNING, "Timeout waiting for quorum (%d) to be reached for request %d", quorum, opNumber); + throw new QuorumNotReachedException( + "Timeout waiting for quorum (" + quorum + ") to be reached for request " + opNumber); } } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - throw new QuorumNotReachedException("Quorum not reached for request " + opNumber + " because the thread was interrupted"); + throw new QuorumNotReachedException( + "Quorum not reached for request " + opNumber + " because the thread was interrupted"); } } @@ -798,7 +827,8 @@ public void removeServer(final String remoteServerName) { final Leader2ReplicaNetworkExecutor c = replicaConnections.remove(remoteServerName); if (c != null) { //final RemovedServerInfo removedServer = new RemovedServerInfo(remoteServerName, c.getJoinedOn()); - LogManager.instance().log(this, Level.SEVERE, "Replica '%s' seems not active, removing it from the cluster", remoteServerName); + LogManager.instance() + .log(this, Level.SEVERE, "Replica '%s' seems not active, removing it from the cluster", remoteServerName); c.close(); } @@ -932,7 +962,8 @@ public void resendMessagesToReplica(final long fromMessageNumber, final String r final Leader2ReplicaNetworkExecutor replica = replicaConnections.get(replicaName); if (replica == null) - throw new ReplicationException("Server '" + getServerName() + "' cannot sync replica '" + replicaName + "' because it is offline"); + throw new ReplicationException( + "Server '" + getServerName() + "' cannot sync replica '" + replicaName + "' because it is offline"); final long fromPositionInLog = replicationLogFile.findMessagePosition(fromMessageNumber); @@ -947,7 +978,8 @@ public void resendMessagesToReplica(final long fromMessageNumber, final String r // STARTING FROM THE SECOND SERVER, COPY THE BUFFER try { - LogManager.instance().log(this, Level.FINE, "Resending message (%s) to replica '%s'...", entry.getFirst(), replica.getRemoteServerName()); + LogManager.instance() + .log(this, Level.FINE, "Resending message (%s) to replica '%s'...", entry.getFirst(), replica.getRemoteServerName()); if (min == -1) min = entry.getFirst().messageNumber; @@ -961,8 +993,8 @@ public void resendMessagesToReplica(final long fromMessageNumber, final String r } catch (final Exception e) { // REMOVE THE REPLICA - LogManager.instance() - .log(this, Level.SEVERE, "Replica '%s' does not respond, setting it as OFFLINE (error=%s)", replica.getRemoteServerName(), e.toString()); + LogManager.instance().log(this, Level.SEVERE, "Replica '%s' does not respond, setting it as OFFLINE (error=%s)", + replica.getRemoteServerName(), e.toString()); setReplicaStatus(replica.getRemoteServerName(), false); throw new ReplicationException("Cannot resend messages to replica '" + replicaName + "'", e); } @@ -970,7 +1002,8 @@ public void resendMessagesToReplica(final long fromMessageNumber, final String r } LogManager.instance() - .log(this, Level.INFO, "Recovering completed. Sent %d message(s) to replica '%s' (%d-%d)", totalSentMessages.get(), replicaName, min, max); + .log(this, Level.INFO, "Recovering completed. Sent %d message(s) to replica '%s' (%d-%d)", totalSentMessages.get(), + replicaName, min, max); } public boolean connectToLeader(final String serverEntry, final Callable errorCallback) { @@ -986,8 +1019,8 @@ public boolean connectToLeader(final String serverEntry, final Callable 0 && localHostServers < serverEntries.length) - throw new ServerException("Found a localhost (127.0.0.1) in the server list among non-localhost servers. Please fix the server list configuration."); + throw new ServerException( + "Found a localhost (127.0.0.1) in the server list among non-localhost servers. Please fix the server list configuration."); } } diff --git a/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java b/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java index c389bd9bbc..52f8f9ab8a 100755 --- a/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java +++ b/server/src/main/java/com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.java @@ -32,7 +32,7 @@ import com.arcadedb.network.binary.ServerIsNotTheLeaderException; import com.arcadedb.schema.EmbeddedSchema; import com.arcadedb.server.ServerException; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import com.arcadedb.server.ha.message.DatabaseStructureRequest; import com.arcadedb.server.ha.message.DatabaseStructureResponse; import com.arcadedb.server.ha.message.FileContentRequest; @@ -55,7 +55,6 @@ public class Replica2LeaderNetworkExecutor extends Thread { private int port; private String leaderServerName = "?"; private String leaderServerHTTPAddress; - private final boolean testOn; private ChannelBinaryClient channel; private volatile boolean shutdown = false; private final Object channelOutputLock = new Object(); @@ -63,8 +62,6 @@ public class Replica2LeaderNetworkExecutor extends Thread { public Replica2LeaderNetworkExecutor(final HAServer ha, final String host, final int port) { this.server = ha; - this.testOn = ha.getServer().getConfiguration().getValueAsBoolean(GlobalConfiguration.TEST); - this.host = host; this.port = port; connect(); @@ -89,7 +86,8 @@ public void run() { final Pair request = server.getMessageFactory().deserializeCommand(buffer, requestBytes); if (request == null) { - LogManager.instance().log(this, Level.SEVERE, "Error on receiving message NULL, reconnecting (threadId=%d)", Thread.currentThread().getId()); + LogManager.instance().log(this, Level.SEVERE, "Error on receiving message NULL, reconnecting (threadId=%d)", + Thread.currentThread().getId()); reconnect(null); continue; } @@ -99,9 +97,11 @@ public void run() { reqId = message.messageNumber; if (reqId > -1) - LogManager.instance().log(this, Level.FINE, "Received request %d from the Leader (threadId=%d)", reqId, Thread.currentThread().getId()); + LogManager.instance() + .log(this, Level.FINE, "Received request %d from the Leader (threadId=%d)", reqId, Thread.currentThread().getId()); else - LogManager.instance().log(this, Level.FINE, "Received response %d from the Leader (threadId=%d)", reqId, Thread.currentThread().getId()); + LogManager.instance() + .log(this, Level.FINE, "Received response %d from the Leader (threadId=%d)", reqId, Thread.currentThread().getId()); // NUMBERS <0 ARE FORWARD FROM REPLICA TO LEADER WITHOUT A VALID SEQUENCE if (reqId > -1) { @@ -109,7 +109,8 @@ public void run() { if (reqId <= lastMessage) { //TODO: CHECK IF THE MESSAGE IS IDENTICAL? - LogManager.instance().log(this, Level.FINE, "Message %d already applied on local server (last=%d). Skip this", reqId, lastMessage); + LogManager.instance() + .log(this, Level.FINE, "Message %d already applied on local server (last=%d). Skip this", reqId, lastMessage); continue; } @@ -136,8 +137,7 @@ public void run() { } } - if (testOn) - server.getServer().lifecycleEvent(TestCallback.TYPE.REPLICA_MSG_RECEIVED, request); + server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED, request); if (response != null) sendCommandToLeader(buffer, response, reqId); @@ -147,13 +147,15 @@ public void run() { // IGNORE IT } catch (final Exception e) { LogManager.instance() - .log(this, Level.INFO, "Exception during execution of request %d (shutdown=%s name=%s error=%s)", reqId, shutdown, getName(), e.toString()); + .log(this, Level.INFO, "Exception during execution of request %d (shutdown=%s name=%s error=%s)", reqId, shutdown, + getName(), e.toString()); reconnect(e); } } LogManager.instance() - .log(this, Level.INFO, "Replica message thread closed (shutdown=%s name=%s threadId=%d)", shutdown, getName(), Thread.currentThread().getId()); + .log(this, Level.INFO, "Replica message thread closed (shutdown=%s name=%s threadId=%d)", shutdown, getName(), + Thread.currentThread().getId()); } public String getRemoteServerName() { @@ -173,22 +175,24 @@ private void reconnect(final Exception e) { if (server.getLeader() != this) { // LEADER ALREADY CONNECTED (RE-ELECTED?) - LogManager.instance().log(this, Level.SEVERE, "Removing connection to the previous Leader ('%s'). New Leader is: %s", getRemoteServerName(), - server.getLeader().getRemoteServerName()); + LogManager.instance() + .log(this, Level.SEVERE, "Removing connection to the previous Leader ('%s'). New Leader is: %s", getRemoteServerName(), + server.getLeader().getRemoteServerName()); close(); return; } - LogManager.instance() - .log(this, Level.SEVERE, "Error on communication between current replica and the Leader ('%s'), reconnecting... (error=%s)", getRemoteServerName(), - e); + LogManager.instance().log(this, Level.SEVERE, + "Error on communication between current replica and the Leader ('%s'), reconnecting... (error=%s)", getRemoteServerName(), + e); if (!shutdown) { try { connect(); startup(); } catch (final Exception e1) { - LogManager.instance().log(this, Level.SEVERE, "Error on re-connecting to the Leader ('%s') (error=%s)", getRemoteServerName(), e1); + LogManager.instance() + .log(this, Level.SEVERE, "Error on re-connecting to the Leader ('%s') (error=%s)", getRemoteServerName(), e1); HashSet serverAddressListCopy = new HashSet<>(Arrays.asList(server.getServerAddressList().split(","))); @@ -208,7 +212,8 @@ private void reconnect(final Exception e) { startup(); return; } catch (final Exception e2) { - LogManager.instance().log(this, Level.SEVERE, "Error on re-connecting to the server '%s' (error=%s)", getRemoteAddress(), e2); + LogManager.instance() + .log(this, Level.SEVERE, "Error on re-connecting to the server '%s' (error=%s)", getRemoteAddress(), e2); } } @@ -231,7 +236,8 @@ private void reconnect(final Exception e) { public void sendCommandToLeader(final Binary buffer, final HACommand response, final long messageNumber) throws IOException { if (messageNumber > -1) - LogManager.instance().log(this, Level.FINE, "Sending message (response to %d) to the Leader '%s'...", messageNumber, response); + LogManager.instance() + .log(this, Level.FINE, "Sending message (response to %d) to the Leader '%s'...", messageNumber, response); else LogManager.instance().log(this, Level.FINE, "Sending message (request %d) to the Leader '%s'...", messageNumber, response); @@ -240,7 +246,8 @@ public void sendCommandToLeader(final Binary buffer, final HACommand response, f synchronized (channelOutputLock) { final ChannelBinaryClient c = channel; if (c == null) - throw new ReplicationException("Error on sending command back to the leader server '" + leaderServerName + "' (cause=socket closed)"); + throw new ReplicationException( + "Error on sending command back to the leader server '" + leaderServerName + "' (cause=socket closed)"); c.writeVarLengthBytes(buffer.getContent(), buffer.size()); c.flush(); @@ -310,31 +317,36 @@ public void connect() { case ReplicationProtocol.ERROR_CONNECT_NOLEADER: final String leaderServerName = channel.readString(); final String leaderAddress = channel.readString(); - LogManager.instance() - .log(this, Level.INFO, "Cannot accept incoming connections: remote server is not a Leader, connecting to the current Leader '%s' (%s)", - leaderServerName, leaderAddress); + LogManager.instance().log(this, Level.INFO, + "Cannot accept incoming connections: remote server is not a Leader, connecting to the current Leader '%s' (%s)", + leaderServerName, leaderAddress); closeChannel(); throw new ServerIsNotTheLeaderException( - "Remote server is not a Leader, connecting to the current Leader '" + leaderServerName + "' (" + leaderAddress + ")", leaderAddress); + "Remote server is not a Leader, connecting to the current Leader '" + leaderServerName + "' (" + leaderAddress + + ")", leaderAddress); case ReplicationProtocol.ERROR_CONNECT_ELECTION_PENDING: - LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: an election for the Leader server is in progress"); + LogManager.instance() + .log(this, Level.INFO, "Cannot accept incoming connections: an election for the Leader server is in progress"); closeChannel(); throw new ReplicationException("An election for the Leader server is pending"); case ReplicationProtocol.ERROR_CONNECT_UNSUPPORTEDPROTOCOL: LogManager.instance() - .log(this, Level.INFO, "Cannot accept incoming connections: remote server does not support protocol %d", ReplicationProtocol.PROTOCOL_VERSION); + .log(this, Level.INFO, "Cannot accept incoming connections: remote server does not support protocol %d", + ReplicationProtocol.PROTOCOL_VERSION); break; case ReplicationProtocol.ERROR_CONNECT_WRONGCLUSTERNAME: LogManager.instance() - .log(this, Level.INFO, "Cannot accept incoming connections: remote server joined a different cluster than '%s'", server.getClusterName()); + .log(this, Level.INFO, "Cannot accept incoming connections: remote server joined a different cluster than '%s'", + server.getClusterName()); break; case ReplicationProtocol.ERROR_CONNECT_SAME_SERVERNAME: - LogManager.instance() - .log(this, Level.INFO, "Cannot accept incoming connections: remote server has the same name as the local server '%s'", server.getServerName()); + LogManager.instance().log(this, Level.INFO, + "Cannot accept incoming connections: remote server has the same name as the local server '%s'", + server.getServerName()); break; default: @@ -364,11 +376,14 @@ public void connect() { } public void startup() { - LogManager.instance().log(this, Level.INFO, "Server connected to the Leader server %s:%d, members=[%s]", host, port, server.getServerAddressList()); + LogManager.instance().log(this, Level.INFO, "Server connected to the Leader server %s:%d, members=[%s]", host, port, + server.getServerAddressList()); setName(Constants.PRODUCT + "-ha-replica2leader/" + server.getServerName() + "/" + getRemoteServerName()); - LogManager.instance().log(this, Level.INFO, "Server started as Replica in HA mode (cluster=%s leader=%s:%d)", server.getClusterName(), host, port); + LogManager.instance() + .log(this, Level.INFO, "Server started as Replica in HA mode (cluster=%s leader=%s:%d)", server.getClusterName(), host, + port); installDatabases(); } @@ -389,8 +404,7 @@ private void installDatabases() { if (response instanceof ReplicaConnectFullResyncResponse) { LogManager.instance().log(this, Level.INFO, "Asking for a full resync..."); - if (testOn) - server.getServer().lifecycleEvent(TestCallback.TYPE.REPLICA_FULL_RESYNC, null); + server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_FULL_RESYNC, null); final ReplicaConnectFullResyncResponse fullSync = (ReplicaConnectFullResyncResponse) response; @@ -404,8 +418,7 @@ private void installDatabases() { } else { LogManager.instance().log(this, Level.INFO, "Receiving hot resync (from=%d)...", lastLogNumber); - if (testOn) - server.getServer().lifecycleEvent(TestCallback.TYPE.REPLICA_HOT_RESYNC, null); + server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_HOT_RESYNC, null); } sendCommandToLeader(buffer, new ReplicaReadyRequest(), -1); @@ -424,8 +437,8 @@ public void requestInstallDatabase(final Binary buffer, final String db) throws installDatabase(buffer, db, dbStructure, database); } - private void installDatabase(final Binary buffer, final String db, final DatabaseStructureResponse dbStructure, final DatabaseInternal database) - throws IOException { + private void installDatabase(final Binary buffer, final String db, final DatabaseStructureResponse dbStructure, + final DatabaseInternal database) throws IOException { // WRITE THE SCHEMA try (final FileWriter schemaFile = new FileWriter(database.getDatabasePath() + File.separator + EmbeddedSchema.SCHEMA_FILE_NAME, @@ -441,8 +454,8 @@ private void installDatabase(final Binary buffer, final String db, final Databas try { databaseSize += installFile(buffer, db, f.getKey(), f.getValue(), 0, -1); } catch (Exception e) { - LogManager.instance() - .log(this, Level.SEVERE, "Error on installing file '%s' (%s %d/%d files)", e, f.getKey(), FileUtils.getSizeAsString(databaseSize), i, list.size()); + LogManager.instance().log(this, Level.SEVERE, "Error on installing file '%s' (%s %d/%d files)", e, f.getKey(), + FileUtils.getSizeAsString(databaseSize), i, list.size()); database.getEmbedded().drop(); throw new ReplicationException("Error on installing database '" + db + "'", e); } @@ -453,12 +466,12 @@ private void installDatabase(final Binary buffer, final String db, final Databas DatabaseContext.INSTANCE.init(database); database.getSchema().getEmbedded().load(ComponentFile.MODE.READ_WRITE, true); - LogManager.instance() - .log(this, Level.INFO, "Database '%s' installed from the cluster (%s - %d files)", null, db, FileUtils.getSizeAsString(databaseSize), list.size()); + LogManager.instance().log(this, Level.INFO, "Database '%s' installed from the cluster (%s - %d files)", null, db, + FileUtils.getSizeAsString(databaseSize), list.size()); } - private long installFile(final Binary buffer, final String db, final int fileId, final String fileName, final int pageFromInclusive, - final int pageToInclusive) throws IOException { + private long installFile(final Binary buffer, final String db, final int fileId, final String fileName, + final int pageFromInclusive, final int pageToInclusive) throws IOException { int from = pageFromInclusive; @@ -485,7 +498,8 @@ private long installFile(final Binary buffer, final String db, final int fileId, from += fileChunk.getPages(); } - LogManager.instance().log(this, Level.FINE, "File '%s' installed (pagesWritten=%d size=%s)", fileName, pagesWritten, FileUtils.getSizeAsString(fileSize)); + LogManager.instance().log(this, Level.FINE, "File '%s' installed (pagesWritten=%d size=%s)", fileName, pagesWritten, + FileUtils.getSizeAsString(fileSize)); return fileSize; } diff --git a/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java b/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java index 1327ee8c0e..3a48620d1e 100644 --- a/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java +++ b/server/src/test/java/com/arcadedb/server/BaseGraphServerTest.java @@ -32,6 +32,7 @@ import com.arcadedb.schema.Schema; import com.arcadedb.schema.VertexType; import com.arcadedb.serializer.json.JSONObject; +import com.arcadedb.server.ha.HAServer; import com.arcadedb.utility.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -42,6 +43,7 @@ import java.nio.charset.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.logging.*; /** @@ -54,9 +56,10 @@ public abstract class BaseGraphServerTest extends StaticBaseServerTest { protected static final String EDGE2_TYPE_NAME = "E2"; private static final int PARALLEL_LEVEL = 4; - protected static RID root; - private ArcadeDBServer[] servers; - private Database[] databases; + protected static RID root; + private ArcadeDBServer[] servers; + private Database[] databases; + protected volatile boolean serversSynchronized = false; protected interface Callback { void call(int serverIndex) throws Exception; @@ -166,10 +169,12 @@ public void endTest() { for (int i = servers.length - 1; i > -1; --i) { if (servers[i] != null && !servers[i].isStarted()) { testLog(" Restarting server %d to force re-alignment", i); - final int oldPort = servers[i].getHttpServer().getPort(); - servers[i].getConfiguration().setValue(GlobalConfiguration.SERVER_HTTP_INCOMING_PORT, oldPort); - servers[i].start(); - anyServerRestarted = true; + if (servers[i].getHttpServer() != null) { + final int oldPort = servers[i].getHttpServer().getPort(); + servers[i].getConfiguration().setValue(GlobalConfiguration.SERVER_HTTP_INCOMING_PORT, oldPort); + servers[i].start(); + anyServerRestarted = true; + } } } } @@ -184,11 +189,11 @@ public void endTest() { } } } finally { - try { LogManager.instance().log(this, Level.FINE, "END OF THE TEST: Check DBS are identical..."); checkDatabasesAreIdentical(); } finally { + GlobalConfiguration.resetAll(); LogManager.instance().log(this, Level.FINE, "TEST: Stopping servers..."); stopServers(); @@ -254,6 +259,7 @@ protected void startServers() { config.setValue(GlobalConfiguration.HA_REPLICATION_INCOMING_HOST, "localhost"); config.setValue(GlobalConfiguration.SERVER_HTTP_INCOMING_HOST, "localhost"); config.setValue(GlobalConfiguration.HA_ENABLED, getServerCount() > 1); + config.setValue(GlobalConfiguration.HA_SERVER_ROLE, getServerRole(i)); //config.setValue(GlobalConfiguration.NETWORK_SOCKET_TIMEOUT, 2000); onServerConfiguration(config); @@ -269,12 +275,16 @@ protected void startServers() { waitAllReplicasAreConnected(); } + protected HAServer.SERVER_ROLE getServerRole(final int serverIndex) { + return serverIndex == 1 ? HAServer.SERVER_ROLE.REPLICA : HAServer.SERVER_ROLE.ANY; + } + protected void waitAllReplicasAreConnected() { final int serverCount = getServerCount(); int lastTotalConnectedReplica = 0; final long beginTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - beginTime < 5_000) { + while (System.currentTimeMillis() - beginTime < 10_000) { for (int i = 0; i < serverCount; ++i) { if (servers[i].getHA() != null && servers[i].getHA().isLeader()) { lastTotalConnectedReplica = servers[i].getHA().getOnlineReplicas(); @@ -290,7 +300,11 @@ protected void waitAllReplicasAreConnected() { } } } - LogManager.instance().log(this, Level.SEVERE, "Timeout on waiting for all servers to get online %d < %d", 1 + lastTotalConnectedReplica, serverCount); + LogManager.instance() + .log(this, Level.SEVERE, "Timeout on waiting for all servers to get online %d < %d", 1 + lastTotalConnectedReplica, + serverCount); + + serversSynchronized = true; } protected void stopServers() { @@ -303,8 +317,8 @@ protected void stopServers() { } } - protected void formatPayload(final HttpURLConnection connection, final String language, final String payloadCommand, final String serializer, - final Map params) throws Exception { + protected void formatPayload(final HttpURLConnection connection, final String language, final String payloadCommand, + final String serializer, final Map params) throws Exception { if (payloadCommand != null) { final JSONObject jsonRequest = new JSONObject(); jsonRequest.put("language", language); @@ -467,13 +481,16 @@ protected void checkDatabasesAreIdentical() { if (db1 == null || db2 == null) continue; - LogManager.instance().log(this, Level.FINE, "TEST: Comparing databases '%s' and '%s' are identical...", db1.getDatabasePath(), db2.getDatabasePath()); + LogManager.instance().log(this, Level.FINE, "TEST: Comparing databases '%s' and '%s' are identical...", db1.getDatabasePath(), + db2.getDatabasePath()); try { new DatabaseComparator().compare(db1, db2); - LogManager.instance().log(this, Level.FINE, "TEST: OK databases '%s' and '%s' are identical", db1.getDatabasePath(), db2.getDatabasePath()); + LogManager.instance() + .log(this, Level.FINE, "TEST: OK databases '%s' and '%s' are identical", db1.getDatabasePath(), db2.getDatabasePath()); } catch (final RuntimeException e) { LogManager.instance() - .log(this, Level.FINE, "ERROR on comparing databases '%s' and '%s': %s", db1.getDatabasePath(), db2.getDatabasePath(), e.getMessage()); + .log(this, Level.FINE, "ERROR on comparing databases '%s' and '%s': %s", db1.getDatabasePath(), db2.getDatabasePath(), + e.getMessage()); throw e; } } @@ -481,13 +498,16 @@ protected void checkDatabasesAreIdentical() { protected void testEachServer(final Callback callback) throws Exception { for (int i = 0; i < getServerCount(); i++) { - LogManager.instance().log(this, Level.FINE, "***********************************************************************************"); + LogManager.instance() + .log(this, Level.FINE, "***********************************************************************************"); LogManager.instance().log(this, Level.FINE, "EXECUTING TEST ON SERVER %d/%d...", i, getServerCount()); - LogManager.instance().log(this, Level.FINE, "***********************************************************************************"); + LogManager.instance() + .log(this, Level.FINE, "***********************************************************************************"); try { callback.call(i); } catch (Exception e) { - LogManager.instance().log(this, Level.SEVERE, "Error on executing test %s on server %d/%d", e, getClass().getName(), i + 1, getServerCount()); + LogManager.instance().log(this, Level.SEVERE, "Error on executing test %s on server %d/%d", e, getClass().getName(), i + 1, + getServerCount()); throw e; } } @@ -499,42 +519,15 @@ private void checkForActiveDatabases() { db.close(); if (!activeDatabases.isEmpty()) - LogManager.instance().log(this, Level.SEVERE, "Found active databases: " + activeDatabases + ". Forced close before starting a new test"); + LogManager.instance() + .log(this, Level.SEVERE, "Found active databases: " + activeDatabases + ". Forced close before starting a new test"); //Assertions.assertTrue(activeDatabases.isEmpty(), "Found active databases: " + activeDatabases); } - protected String createRecord(final int serverIndex, final String payload) throws IOException { - final HttpURLConnection connection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph").openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestMethod("POST"); - connection.setRequestProperty("Authorization", - "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); - connection.setDoOutput(true); - - connection.connect(); - - final PrintWriter pw = new PrintWriter(new OutputStreamWriter(connection.getOutputStream())); - pw.write(payload); - pw.close(); - - try { - final String response = readResponse(connection); - - Assertions.assertEquals(200, connection.getResponseCode()); - Assertions.assertEquals("OK", connection.getResponseMessage()); - LogManager.instance().log(this, Level.FINE, "TEST: Response: %s", response); - Assertions.assertTrue(response.contains("#")); - - return response; - - } finally { - connection.disconnect(); - } - } - protected String command(final int serverIndex, final String command) throws Exception { - final HttpURLConnection initialConnection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/command/graph").openConnection(); + final HttpURLConnection initialConnection = (HttpURLConnection) new URL( + "http://127.0.0.1:248" + serverIndex + "/api/v1/command/graph").openConnection(); try { initialConnection.setRequestMethod("POST"); @@ -559,7 +552,8 @@ protected String command(final int serverIndex, final String command) throws Exc } protected JSONObject executeCommand(final int serverIndex, final String language, final String payloadCommand) throws Exception { - final HttpURLConnection connection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/command/graph").openConnection(); + final HttpURLConnection connection = (HttpURLConnection) new URL( + "http://127.0.0.1:248" + serverIndex + "/api/v1/command/graph").openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Authorization", diff --git a/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java b/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java index c81d61a989..a97168b685 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HARandomCrashIT.java @@ -45,6 +45,11 @@ public void setTestConfiguration() { GlobalConfiguration.HA_QUORUM.setValue("Majority"); } + @Override + protected HAServer.SERVER_ROLE getServerRole(int serverIndex) { + return HAServer.SERVER_ROLE.ANY; + } + @Test public void testReplication() { checkDatabases(); @@ -71,8 +76,9 @@ public void run() { try { final long count = db.countType(VERTEX1_TYPE_NAME, true); if (count > (getTxs() * getVerticesPerTx()) * 9 / 10) { - LogManager.instance().log(this, Level.FINE, "TEST: Skip stop of server because it's close to the end of the test (%d/%d)", null, count, - getTxs() * getVerticesPerTx()); + LogManager.instance() + .log(this, Level.FINE, "TEST: Skip stop of server because it's close to the end of the test (%d/%d)", null, + count, getTxs() * getVerticesPerTx()); return; } } catch (final Exception e) { @@ -123,10 +129,11 @@ public void run() { final String server1Address = getServer(0).getHttpServer().getListeningAddress(); final String[] server1AddressParts = server1Address.split(":"); - final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", - BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), + getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - LogManager.instance().log(this, Level.FINE, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); + LogManager.instance() + .log(this, Level.FINE, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; @@ -138,7 +145,8 @@ public void run() { for (int i = 0; i < getVerticesPerTx(); ++i) { - final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, "distributed-test"); + final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, + "distributed-test"); Assertions.assertTrue(resultSet.hasNext()); final Result result = resultSet.next(); @@ -160,7 +168,8 @@ public void run() { break; } catch (final TransactionException | NeedRetryException | RemoteException e) { - LogManager.instance().log(this, Level.FINE, "TEST: - RECEIVED ERROR: %s (RETRY %d/%d)", null, e.toString(), retry, getMaxRetry()); + LogManager.instance() + .log(this, Level.FINE, "TEST: - RECEIVED ERROR: %s (RETRY %d/%d)", null, e.toString(), retry, getMaxRetry()); if (retry >= getMaxRetry() - 1) throw e; counter = lastGoodCounter; diff --git a/server/src/test/java/com/arcadedb/server/ha/HASplitBrainIT.java b/server/src/test/java/com/arcadedb/server/ha/HASplitBrainIT.java index c351099bcd..148ad5dde6 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HASplitBrainIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HASplitBrainIT.java @@ -21,7 +21,7 @@ import com.arcadedb.GlobalConfiguration; import com.arcadedb.log.LogManager; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -59,7 +59,7 @@ protected void onAfterTest() { @Override protected void onBeforeStarting(final ArcadeDBServer server) { - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) throws IOException { if (type == TYPE.NETWORK_CONNECTION && split) { @@ -96,7 +96,7 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s if (server.getServerName().equals("ArcadeDB_4")) server.registerTestEventListener((type, object, server1) -> { if (!split) { - if (type == TestCallback.TYPE.REPLICA_MSG_RECEIVED) { + if (type == ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED) { messages.incrementAndGet(); if (messages.get() > 10) { split = true; diff --git a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java index 0e81815d38..a54bbece1c 100644 --- a/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/HTTP2ServersIT.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.*; import java.net.*; import java.util.*; import java.util.logging.*; @@ -39,7 +40,8 @@ protected int getServerCount() { @Test public void testServerInfo() throws Exception { testEachServer((serverIndex) -> { - final HttpURLConnection connection = (HttpURLConnection) new URL("http://127.0.0.1:248" + serverIndex + "/api/v1/server?mode=cluster").openConnection(); + final HttpURLConnection connection = (HttpURLConnection) new URL( + "http://127.0.0.1:248" + serverIndex + "/api/v1/server?mode=cluster").openConnection(); connection.setRequestMethod("GET"); connection.setRequestProperty("Authorization", @@ -61,7 +63,8 @@ public void propagationOfSchema() throws Exception { testEachServer((serverIndex) -> { // CREATE THE SCHEMA ON BOTH SERVER, ONE TYPE PER SERVER final String response = command(serverIndex, "create vertex type VertexType" + serverIndex); - Assertions.assertTrue(response.contains("VertexType" + serverIndex), "Type " + (("VertexType" + serverIndex) + " not found on server " + serverIndex)); + Assertions.assertTrue(response.contains("VertexType" + serverIndex), + "Type " + (("VertexType" + serverIndex) + " not found on server " + serverIndex)); }); Thread.sleep(300); @@ -94,35 +97,6 @@ public void checkQuery() throws Exception { }); } - @Test - public void checkRecordLoading() throws Exception { - testEachServer((serverIndex) -> { - final HttpURLConnection connection = (HttpURLConnection) new URL( - "http://127.0.0.1:248" + serverIndex + "/api/v1/document/graph/" + BaseGraphServerTest.root.getIdentity().toString().substring(1)).openConnection(); - - connection.setRequestMethod("GET"); - connection.setRequestProperty("Authorization", - "Basic " + Base64.getEncoder().encodeToString(("root:" + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS).getBytes())); - connection.connect(); - - try { - final String response = readResponse(connection); - LogManager.instance().log(this, Level.FINE, "TEST: Response: %s", null, response); - Assertions.assertEquals(200, connection.getResponseCode()); - Assertions.assertEquals("OK", connection.getResponseMessage()); - Assertions.assertTrue(response.contains("V1")); - - } finally { - connection.disconnect(); - } - }); - } - - @Test - public void checkRecordCreate() throws Exception { - testEachServer((serverIndex) -> createRecord(serverIndex, "{\"@type\":\"Person\",\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}")); - } - @Test public void checkDeleteGraphElements() throws Exception { @@ -131,7 +105,9 @@ public void checkDeleteGraphElements() throws Exception { testEachServer((serverIndex) -> { LogManager.instance().log(this, Level.FINE, "TESTS SERVER " + serverIndex); - final String v1 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}")).getString("result"); + final String v1 = new JSONObject( + command(serverIndex, "create vertex V1 content {\"name\":\"Jay\",\"surname\":\"Miner\",\"age\":69}")).getJSONArray( + "result").getJSONObject(0).getString("@rid"); if (!getServer(serverIndex).getHA().isLeader()) Thread.sleep(300); @@ -146,7 +122,9 @@ public void checkDeleteGraphElements() throws Exception { } }); - final String v2 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Elon\",\"surname\":\"Musk\",\"age\":50}")).getString("result"); + final String v2 = new JSONObject( + command(serverIndex, "create vertex V1 content {\"name\":\"Elon\",\"surname\":\"Musk\",\"age\":50}")).getJSONArray( + "result").getJSONObject(0).getString("@rid"); if (!getServer(serverIndex).getHA().isLeader()) Thread.sleep(300); @@ -161,8 +139,8 @@ public void checkDeleteGraphElements() throws Exception { } }); - final String e1 = new JSONObject(command(serverIndex, "create edge E1 from " + v1 + " to " + v2)).getJSONArray("result").getJSONObject(0) - .getString("@rid"); + final String e1 = new JSONObject(command(serverIndex, "create edge E1 from " + v1 + " to " + v2)).getJSONArray("result") + .getJSONObject(0).getString("@rid"); if (!getServer(serverIndex).getHA().isLeader()) Thread.sleep(300); @@ -177,8 +155,9 @@ public void checkDeleteGraphElements() throws Exception { } }); - final String v3 = new JSONObject(createRecord(serverIndex, "{\"@type\":\"V1\",\"name\":\"Nikola\",\"surname\":\"Tesla\",\"age\":150}")).getString( - "result"); + final String v3 = new JSONObject( + command(serverIndex, "create vertex V1 content {\"name\":\"Nikola\",\"surname\":\"Tesla\",\"age\":150}")).getJSONArray( + "result").getJSONObject(0).getString("@rid"); if (!getServer(serverIndex).getHA().isLeader()) Thread.sleep(300); @@ -193,8 +172,8 @@ public void checkDeleteGraphElements() throws Exception { } }); - final String e2 = new JSONObject(command(serverIndex, "create edge E2 from " + v2 + " to " + v3)).getJSONArray("result").getJSONObject(0) - .getString("@rid"); + final String e2 = new JSONObject(command(serverIndex, "create edge E2 from " + v2 + " to " + v3)).getJSONArray("result") + .getJSONObject(0).getString("@rid"); if (!getServer(serverIndex).getHA().isLeader()) Thread.sleep(300); @@ -216,13 +195,17 @@ public void checkDeleteGraphElements() throws Exception { testEachServer((checkServer) -> { try { - Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + v1)).getJSONArray("result").isEmpty(), - "executed on server " + serverIndex + " checking on server " + serverIndex); - Assertions.assertTrue(new JSONObject(command(checkServer, "select from " + e1)).getJSONArray("result").isEmpty(), - "executed on server " + serverIndex + " checking on server " + serverIndex); - } catch (final Exception e) { - LogManager.instance().log(this, Level.SEVERE, "Error on checking for right deletion on server " + checkServer); - throw e; + new JSONObject(command(checkServer, "select from " + v1)).getJSONArray("result"); + Assertions.fail("executed on server " + serverIndex + " checking on server " + serverIndex); + } catch (FileNotFoundException e) { + // EXPECTED + } + + try { + new JSONObject(command(checkServer, "select from " + e1)).getJSONArray("result"); + Assertions.fail("executed on server " + serverIndex + " checking on server " + serverIndex); + } catch (FileNotFoundException e) { + // EXPECTED } }); }); @@ -231,7 +214,8 @@ public void checkDeleteGraphElements() throws Exception { @Test public void testHAConfiguration() { for (ArcadeDBServer server : getServers()) { - final RemoteDatabase database = new RemoteDatabase("127.0.0.1", 2480, getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + final RemoteDatabase database = new RemoteDatabase("127.0.0.1", 2480, getDatabaseName(), "root", + BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); Assertions.assertNotNull(database.getLeaderAddress()); Assertions.assertFalse(database.getReplicaAddresses().isEmpty()); } diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java index 05a75eb1a2..3b65a2fda7 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationChangeSchemaIT.java @@ -184,14 +184,11 @@ private void checkSchemaFilesAreTheSameOnAllServers() { first = entry.getValue(); else Assertions.assertEquals(first, entry.getValue(), - "Server " + entry.getKey() + " has different schema saved:\nFIRST SERVER:\n" + first + "\n" + entry.getKey() + " SERVER:\n" + entry.getValue()); + "Server " + entry.getKey() + " has different schema saved:\nFIRST SERVER:\n" + first + "\n" + entry.getKey() + + " SERVER:\n" + entry.getValue()); } } - protected int getServerCount() { - return 3; - } - @Override protected int getTxs() { return 10; diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerIT.java index d6bab1a26a..a89fc76869 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerIT.java @@ -68,7 +68,8 @@ public void testReplication(final int serverId) { Assertions.assertEquals(1, db.countType(VERTEX1_TYPE_NAME, true), "TEST: Check for vertex count for server" + 0); - LogManager.instance().log(this, Level.FINE, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); + LogManager.instance() + .log(this, Level.FINE, "TEST: Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); final long total = getTxs() * getVerticesPerTx(); long counter = 0; @@ -89,7 +90,8 @@ public void testReplication(final int serverId) { break; } catch (final TransactionException | NeedRetryException e) { - LogManager.instance().log(this, Level.FINE, "TEST: - RECEIVED ERROR: %s (RETRY %d/%d)", null, e.toString(), retry, getMaxRetry()); + LogManager.instance() + .log(this, Level.FINE, "TEST: - RECEIVED ERROR: %s (RETRY %d/%d)", null, e.toString(), retry, getMaxRetry()); if (retry >= getMaxRetry() - 1) throw e; counter = lastGoodCounter; @@ -110,7 +112,8 @@ public void testReplication(final int serverId) { testLog("Done"); - Assertions.assertEquals(1 + (long) getTxs() * getVerticesPerTx(), db.countType(VERTEX1_TYPE_NAME, true), "Check for vertex count for server" + 0); + Assertions.assertEquals(1 + (long) getTxs() * getVerticesPerTx(), db.countType(VERTEX1_TYPE_NAME, true), + "Check for vertex count for server" + 0); try { Thread.sleep(1000); @@ -155,8 +158,8 @@ protected boolean isPrintingConfigurationAtEveryStep() { return false; } - protected void checkEntriesOnServer(final int s) { - final Database db = getServerDatabase(s, getDatabaseName()); + protected void checkEntriesOnServer(final int serverIndex) { + final Database db = getServerDatabase(serverIndex, getDatabaseName()); // RESET ANY PREVIOUS TRANSACTION IN TL. IN CASE OF STOP/CRASH THE TL COULD HAVE AN OLD INSTANCE THAT POINT TO AN OLD SERVER DatabaseContext.INSTANCE.init((DatabaseInternal) db); @@ -165,7 +168,8 @@ protected void checkEntriesOnServer(final int s) { try { final long recordInDb = db.countType(VERTEX1_TYPE_NAME, true); Assertions.assertTrue(recordInDb <= 1 + getTxs() * getVerticesPerTx(), - "TEST: Check for vertex count for server" + s + " found " + recordInDb + " not less than " + (1 + getTxs() * getVerticesPerTx())); + "TEST: Check for vertex count for server" + serverIndex + " found " + recordInDb + " not less than " + (1 + + getTxs() * getVerticesPerTx())); final TypeIndex index = db.getSchema().getType(VERTEX1_TYPE_NAME).getPolymorphicIndexByProperties("id"); long total = 0; @@ -175,7 +179,8 @@ protected void checkEntriesOnServer(final int s) { ++total; } - LogManager.instance().log(this, Level.FINE, "TEST: Entries in the index (%d) >= records in database (%d)", null, total, recordInDb); + LogManager.instance() + .log(this, Level.FINE, "TEST: Entries in the index (%d) >= records in database (%d)", null, total, recordInDb); final Map> ridsFoundInIndex = new HashMap<>(); long total2 = 0; @@ -201,19 +206,21 @@ record = rid.getRecord(true); } if (record == null) { - LogManager.instance().log(this, Level.FINE, "TEST: - Cannot find record %s in database even if it's present in the index (null)", null, rid); + LogManager.instance() + .log(this, Level.FINE, "TEST: - Cannot find record %s in database even if it's present in the index (null)", null, + rid); missingsCount++; } - } - Assertions.assertEquals(recordInDb, ridsFoundInIndex.size(), "TEST: Found " + missingsCount + " missing records"); + Assertions.assertEquals(recordInDb, ridsFoundInIndex.size(), + "TEST: Found " + missingsCount + " missing records on server " + serverIndex); Assertions.assertEquals(0, missingsCount); Assertions.assertEquals(total, total2); } catch (final Exception e) { e.printStackTrace(); - Assertions.fail("TEST: Error on checking on server" + s); + Assertions.fail("TEST: Error on checking on server" + serverIndex); } }); } diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java index 76700903b7..347bb7aaf3 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderChanges3TimesIT.java @@ -29,7 +29,7 @@ import com.arcadedb.remote.RemoteDatabase; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.BaseGraphServerTest; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import com.arcadedb.server.ha.message.TxRequest; import com.arcadedb.utility.Pair; import org.junit.jupiter.api.Assertions; @@ -52,6 +52,11 @@ public void setTestConfiguration() { GlobalConfiguration.HA_QUORUM.setValue("Majority"); } + @Override + protected HAServer.SERVER_ROLE getServerRole(int serverIndex) { + return HAServer.SERVER_ROLE.ANY; + } + @Test public void testReplication() { checkDatabases(); @@ -59,19 +64,22 @@ public void testReplication() { final String server1Address = getServer(0).getHttpServer().getListeningAddress(); final String[] server1AddressParts = server1Address.split(":"); - final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", - BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), + getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - LogManager.instance().log(this, Level.FINE, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); + LogManager.instance() + .log(this, Level.FINE, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; final int maxRetry = 10; + int timeouts = 0; for (int tx = 0; tx < getTxs(); ++tx) { for (int retry = 0; retry < 3; ++retry) { try { for (int i = 0; i < getVerticesPerTx(); ++i) { - final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, "distributed-test"); + final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, + "distributed-test"); Assertions.assertTrue(resultSet.hasNext()); final Result result = resultSet.next(); @@ -85,8 +93,13 @@ public void testReplication() { } } catch (final DuplicatedKeyException | NeedRetryException | TimeoutException | TransactionException e) { + if (e instanceof TimeoutException) { + if (++timeouts > 3) + throw e; + } // IGNORE IT - LogManager.instance().log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry); + LogManager.instance() + .log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry); try { Thread.sleep(500); } catch (final InterruptedException e1) { @@ -131,7 +144,7 @@ public void testReplication() { @Override protected void onBeforeStarting(final ArcadeDBServer server) { - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { if (type == TYPE.REPLICA_MSG_RECEIVED) { @@ -143,14 +156,16 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s messagesInTotal.incrementAndGet(); messagesPerRestart.incrementAndGet(); - if (getServer(leaderName).isStarted() && messagesPerRestart.get() > getTxs() / (getServerCount() * 2) && restarts.get() < getServerCount()) { - LogManager.instance().log(this, Level.FINE, "TEST: Found online replicas %d", null, getServer(leaderName).getHA().getOnlineReplicas()); + if (getServer(leaderName).isStarted() && messagesPerRestart.get() > getTxs() / (getServerCount() * 2) + && restarts.get() < getServerCount()) { + LogManager.instance() + .log(this, Level.FINE, "TEST: Found online replicas %d", null, getServer(leaderName).getHA().getOnlineReplicas()); if (getServer(leaderName).getHA().getOnlineReplicas() < getServerCount() - 1) { // NOT ALL THE SERVERS ARE UP, AVOID A QUORUM ERROR - LogManager.instance() - .log(this, Level.FINE, "TEST: Skip restart of the Leader %s because no all replicas are online yet (messages=%d txs=%d) ...", null, - leaderName, messagesInTotal.get(), getTxs()); + LogManager.instance().log(this, Level.FINE, + "TEST: Skip restart of the Leader %s because no all replicas are online yet (messages=%d txs=%d) ...", null, + leaderName, messagesInTotal.get(), getTxs()); return; } @@ -158,7 +173,8 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s // ANOTHER REPLICA JUST DID IT return; - testLog("Stopping the Leader %s (messages=%d txs=%d restarts=%d) ...", leaderName, messagesInTotal.get(), getTxs(), restarts.get()); + testLog("Stopping the Leader %s (messages=%d txs=%d restarts=%d) ...", leaderName, messagesInTotal.get(), getTxs(), + restarts.get()); getServer(leaderName).stop(); restarts.incrementAndGet(); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java index e410e77350..b4f86ddf2c 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownIT.java @@ -26,7 +26,7 @@ import com.arcadedb.remote.RemoteException; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.BaseGraphServerTest; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -46,6 +46,11 @@ public void setTestConfiguration() { GlobalConfiguration.HA_QUORUM.setValue("Majority"); } + @Override + protected HAServer.SERVER_ROLE getServerRole(int serverIndex) { + return HAServer.SERVER_ROLE.ANY; + } + @Test public void testReplication() { checkDatabases(); @@ -53,10 +58,11 @@ public void testReplication() { final String server1Address = getServer(0).getHttpServer().getListeningAddress(); final String[] server1AddressParts = server1Address.split(":"); - final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), getDatabaseName(), "root", - BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]), + getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - LogManager.instance().log(this, Level.FINE, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); + LogManager.instance() + .log(this, Level.FINE, "Executing %s transactions with %d vertices each...", null, getTxs(), getVerticesPerTx()); long counter = 0; @@ -66,7 +72,8 @@ public void testReplication() { for (int i = 0; i < getVerticesPerTx(); ++i) { for (int retry = 0; retry < maxRetry; ++retry) { try { - final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, "distributed-test"); + final ResultSet resultSet = db.command("SQL", "CREATE VERTEX " + VERTEX1_TYPE_NAME + " SET id = ?, name = ?", ++counter, + "distributed-test"); Assertions.assertTrue(resultSet.hasNext()); final Result result = resultSet.next(); @@ -80,7 +87,8 @@ public void testReplication() { break; } catch (final RemoteException e) { // IGNORE IT - LogManager.instance().log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry); + LogManager.instance() + .log(this, Level.SEVERE, "Error on creating vertex %d, retrying (retry=%d/%d)...", e, counter, retry, maxRetry); try { Thread.sleep(500); } catch (final InterruptedException e1) { @@ -117,7 +125,7 @@ public void testReplication() { protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_2")) server.registerTestEventListener((type, object, server1) -> { - if (type == TestCallback.TYPE.REPLICA_MSG_RECEIVED) { + if (type == ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED) { if (messages.incrementAndGet() > 10 && getServer(0).isStarted()) { testLog("TEST: Stopping the Leader..."); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java index 09c48f4054..419d86e3ca 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerLeaderDownNoTransactionsToForwardIT.java @@ -26,7 +26,7 @@ import com.arcadedb.remote.RemoteException; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.BaseGraphServerTest; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,6 +43,11 @@ public void setTestConfiguration() { GlobalConfiguration.HA_QUORUM.setValue("Majority"); } + @Override + protected HAServer.SERVER_ROLE getServerRole(int serverIndex) { + return HAServer.SERVER_ROLE.ANY; + } + @Test public void testReplication() { checkDatabases(); @@ -114,7 +119,7 @@ public void testReplication() { protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_2")) server.registerTestEventListener((type, object, server1) -> { - if (type == TestCallback.TYPE.REPLICA_MSG_RECEIVED) { + if (type == ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED) { if (messages.incrementAndGet() > 10 && getServer(0).isStarted()) { testLog("TEST: Stopping the Leader..."); diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority1ServerOutIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority1ServerOutIT.java index c767427913..8d2bdfc31f 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority1ServerOutIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority1ServerOutIT.java @@ -18,10 +18,9 @@ */ package com.arcadedb.server.ha; -import com.arcadedb.GlobalConfiguration; import com.arcadedb.log.LogManager; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import java.util.concurrent.atomic.*; import java.util.logging.*; @@ -29,16 +28,15 @@ public class ReplicationServerQuorumMajority1ServerOutIT extends ReplicationServerIT { private final AtomicInteger messages = new AtomicInteger(); - public ReplicationServerQuorumMajority1ServerOutIT() { - GlobalConfiguration.HA_QUORUM.setValue("Majority"); - } - @Override protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_2")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { + if (!serversSynchronized) + return; + if (type == TYPE.REPLICA_MSG_RECEIVED) { if (messages.incrementAndGet() > 100) { LogManager.instance().log(this, Level.FINE, "TEST: Stopping Replica 2..."); @@ -50,15 +48,11 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s } protected int[] getServerToCheck() { - final int[] result = new int[getServerCount() - 1]; - for (int i = 0; i < result.length; ++i) - result[i] = i; - return result; + return new int[] { 0, 1 }; } @Override protected int getTxs() { return 300; } - } diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority2ServersOutIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority2ServersOutIT.java index 01f1327f82..019c38c2de 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority2ServersOutIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerQuorumMajority2ServersOutIT.java @@ -23,7 +23,7 @@ import com.arcadedb.log.LogManager; import com.arcadedb.network.binary.QuorumNotReachedException; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -40,7 +40,7 @@ public ReplicationServerQuorumMajority2ServersOutIT() { @Override protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_1")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { if (type == TYPE.REPLICA_MSG_RECEIVED) { @@ -53,7 +53,7 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s }); if (server.getServerName().equals("ArcadeDB_2")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { if (type == TYPE.REPLICA_MSG_RECEIVED) { @@ -80,15 +80,15 @@ protected int[] getServerToCheck() { return new int[] {}; } - protected void checkEntriesOnServer(final int s) { - final Database db = getServerDatabase(s, getDatabaseName()); + protected void checkEntriesOnServer(final int serverIndex) { + final Database db = getServerDatabase(serverIndex, getDatabaseName()); db.begin(); try { - Assertions.assertTrue(1 + getTxs() * getVerticesPerTx() > db.countType(VERTEX1_TYPE_NAME, true), "Check for vertex count for server" + s); + Assertions.assertTrue(1 + getTxs() * getVerticesPerTx() > db.countType(VERTEX1_TYPE_NAME, true), "Check for vertex count for server" + serverIndex); } catch (final Exception e) { e.printStackTrace(); - Assertions.fail("Error on checking on server" + s); + Assertions.fail("Error on checking on server" + serverIndex); } } diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaHotResyncIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaHotResyncIT.java index c4bad71d65..8c23127a1c 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaHotResyncIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaHotResyncIT.java @@ -21,7 +21,7 @@ import com.arcadedb.GlobalConfiguration; import com.arcadedb.log.LogManager; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.TestCallback; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.Assertions; import java.util.concurrent.atomic.*; @@ -49,7 +49,7 @@ protected void onAfterTest() { @Override protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_2")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { if (slowDown) { @@ -76,7 +76,7 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s }); if (server.getServerName().equals("ArcadeDB_0")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { // SLOW DOWN A SERVER diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaRestartForceDbInstallIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaRestartForceDbInstallIT.java index 3acf5c3e7b..23e1158854 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaRestartForceDbInstallIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerReplicaRestartForceDbInstallIT.java @@ -21,9 +21,9 @@ import com.arcadedb.GlobalConfiguration; import com.arcadedb.log.LogManager; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.TestCallback; -import org.junit.jupiter.api.AfterEach; +import com.arcadedb.server.ReplicationCallback; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.io.*; import java.util.concurrent.atomic.*; @@ -41,13 +41,6 @@ public ReplicationServerReplicaRestartForceDbInstallIT() { GlobalConfiguration.HA_REPLICATION_QUEUE_SIZE.setValue(10); } - @AfterEach - @Override - public void endTest() { - super.endTest(); - GlobalConfiguration.HA_REPLICATION_QUEUE_SIZE.setValue(512); - } - @Override protected void onAfterTest() { Assertions.assertFalse(hotResync); @@ -57,15 +50,18 @@ protected void onAfterTest() { @Override protected void onBeforeStarting(final ArcadeDBServer server) { if (server.getServerName().equals("ArcadeDB_2")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { + if (!serversSynchronized) + return; + if (slowDown) { // SLOW DOWN A SERVER AFTER 5TH MESSAGE if (totalMessages.incrementAndGet() > 5) { try { LogManager.instance().log(this, Level.FINE, "TEST: Slowing down response from replica server 2..."); - Thread.sleep(10000); + Thread.sleep(10_000); } catch (final InterruptedException e) { // IGNORE IT LogManager.instance().log(this, Level.SEVERE, "TEST: ArcadeDB_2 HA event listener thread interrupted"); @@ -85,13 +81,16 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s }); if (server.getServerName().equals("ArcadeDB_0")) - server.registerTestEventListener(new TestCallback() { + server.registerTestEventListener(new ReplicationCallback() { @Override public void onEvent(final TYPE type, final Object object, final ArcadeDBServer server) { + if (!serversSynchronized) + return; + // AS SOON AS SERVER 2 IS OFFLINE, A CLEAN OF REPLICATION LOG AND RESTART IS EXECUTED if ("ArcadeDB_2".equals(object) && type == TYPE.REPLICA_OFFLINE && firstTimeServerShutdown) { - LogManager.instance() - .log(this, Level.FINE, "TEST: Stopping Replica 2, removing latency, delete the replication log file and restart the server..."); + LogManager.instance().log(this, Level.SEVERE, + "TEST: Stopping Replica 2, removing latency, delete the replication log file and restart the server..."); slowDown = false; firstTimeServerShutdown = false; diff --git a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerWriteAgainstReplicaIT.java b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerWriteAgainstReplicaIT.java index d06eeb43f2..315a9ed2a2 100644 --- a/server/src/test/java/com/arcadedb/server/ha/ReplicationServerWriteAgainstReplicaIT.java +++ b/server/src/test/java/com/arcadedb/server/ha/ReplicationServerWriteAgainstReplicaIT.java @@ -18,29 +18,11 @@ */ package com.arcadedb.server.ha; -import com.arcadedb.GlobalConfiguration; import org.junit.jupiter.api.Test; public class ReplicationServerWriteAgainstReplicaIT extends ReplicationServerIT { - public void setTestConfiguration() { - super.setTestConfiguration(); - GlobalConfiguration.HA_QUORUM.setValue("Majority"); - } - - @Override - protected int getServerCount() { - return 3; - } - @Test public void testReplication() { - // WAIT THE LEADERSHIP HAS BEEN DETERMINED - try { - Thread.sleep(3000); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - testReplication(1); } diff --git a/server/src/test/resources/arcadedb-log.properties b/server/src/test/resources/arcadedb-log.properties index 80a014db8d..0c3436d83d 100644 --- a/server/src/test/resources/arcadedb-log.properties +++ b/server/src/test/resources/arcadedb-log.properties @@ -23,14 +23,12 @@ # The following creates two handlers handlers=java.util.logging.ConsoleHandler # Set the default logging level for the root logger -.level=WARNING +.level=INFO io.undertow.level=WARNING -org.xnio.level=WARNING -org.jboss.level=WARNING -com.arcadedb.level=WARNING -com.arcadedb.server.ha.level=WARNING +com.arcadedb.level=INFO +com.arcadedb.server.ha.level=INFO # Set the default logging level for new ConsoleHandler instances -java.util.logging.ConsoleHandler.level=WARNING +java.util.logging.ConsoleHandler.level=INFO # Set the default formatter for new ConsoleHandler instances java.util.logging.ConsoleHandler.formatter=com.arcadedb.utility.AnsiLogFormatter