Skip to content

Commit

Permalink
fix: supported IPV6 in HA configuration (#1860)
Browse files Browse the repository at this point in the history
Fixed issue #1859
  • Loading branch information
lvca authored Dec 10, 2024
1 parent e8314a3 commit f4d38e5
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 58 deletions.
7 changes: 3 additions & 4 deletions gremlin/src/main/java/com/arcadedb/gremlin/ArcadeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.arcadedb.gremlin.service.ArcadeServiceRegistry;
import com.arcadedb.gremlin.service.VectorNeighborsFactory;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.remote.RemoteDatabase;
import com.arcadedb.schema.DocumentType;
Expand Down Expand Up @@ -187,10 +188,8 @@ public GraphTraversalSource traversal() {
remoteAddresses.addAll(remoteDatabase.getReplicaAddresses());

final String[] hosts = new String[remoteAddresses.size()];
for (int i = 0; i < remoteAddresses.size(); i++) {
final String host = remoteAddresses.get(0);
hosts[i] = host.substring(0, host.indexOf(":"));
}
for (int i = 0; i < remoteAddresses.size(); i++)
hosts[i] = HostUtil.parseHostAddress(remoteAddresses.get(0), "" + GREMLIN_SERVER_PORT)[0];

final GraphBinaryMessageSerializerV1 serializer = new GraphBinaryMessageSerializerV1(
new TypeSerializerRegistry.Builder().addRegistry(new ArcadeIoRegistry()));
Expand Down
45 changes: 45 additions & 0 deletions network/src/main/java/com/arcadedb/network/HostUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.arcadedb.network;/*
* Copyright © 2021-present Arcade Data Ltd ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @author Luca Garulli ([email protected])
*/
public class HostUtil {
public static final String CLIENT_DEFAULT_PORT = "2480";
public static final String HA_DEFAULT_PORT = "2424";

public static String[] parseHostAddress(String host, final String defaultPort) {
if (host == null)
throw new IllegalArgumentException("Host null");

host = host.trim();

if (host.isEmpty())
throw new IllegalArgumentException("Host is empty");

final String[] parts = host.split(":");
if (parts.length == 1 || parts.length == 8)
// ( IPV4 OR IPV6 ) NO PORT
return new String[] { host, defaultPort };
else if (parts.length == 2 || parts.length == 9) {
// ( IPV4 OR IPV6 ) + PORT
final int pos = host.lastIndexOf(":");
return new String[] { host.substring(0, pos), host.substring(pos + 1) };
}

throw new IllegalArgumentException("Invalid host " + host);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.exception.TransactionException;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.network.binary.QuorumNotReachedException;
import com.arcadedb.network.binary.ServerIsNotTheLeaderException;
import com.arcadedb.serializer.json.JSONObject;
Expand Down Expand Up @@ -285,7 +286,9 @@ void requestClusterConfiguration() {
final JSONObject ha = response.getJSONObject("ha");

final String cfgLeaderServer = (String) ha.get("leaderAddress");
final String[] leaderServerParts = cfgLeaderServer.split(":");

final String[] leaderServerParts = HostUtil.parseHostAddress(cfgLeaderServer, HostUtil.HA_DEFAULT_PORT);

leaderServer = new Pair<>(leaderServerParts[0], Integer.parseInt(leaderServerParts[1]));

final String cfgReplicaServers = (String) ha.get("replicaAddresses");
Expand All @@ -297,10 +300,7 @@ void requestClusterConfiguration() {
final String[] serverEntries = cfgReplicaServers.split(",");
for (final String serverEntry : serverEntries) {
try {
final String[] serverParts = serverEntry.split(":");
if (serverParts.length != 2)
LogManager.instance().log(this, Level.WARNING, "No port specified on remote server URL '%s'", null, serverEntry);

final String[] serverParts = HostUtil.parseHostAddress(serverEntry, HostUtil.CLIENT_DEFAULT_PORT);
final String sHost = serverParts[0];
final int sPort = Integer.parseInt(serverParts[1]);

Expand Down Expand Up @@ -417,12 +417,12 @@ protected Exception manageException(final HttpURLConnection connection, final St
} else if (exception.equals(RecordNotFoundException.class.getName())) {
final int begin = detail.indexOf("#");
final int end = detail.indexOf(" ", begin);
return new RecordNotFoundException(detail, new RID( detail.substring(begin, end)));
return new RecordNotFoundException(detail, new RID(detail.substring(begin, end)));
} else if (exception.equals(QuorumNotReachedException.class.getName())) {
return new QuorumNotReachedException(detail);
} else if (exception.equals(DuplicatedKeyException.class.getName()) && exceptionArgs != null) {
final String[] exceptionArgsParts = exceptionArgs.split("\\|");
return new DuplicatedKeyException(exceptionArgsParts[0], exceptionArgsParts[1], new RID( exceptionArgsParts[2]));
return new DuplicatedKeyException(exceptionArgsParts[0], exceptionArgsParts[1], new RID(exceptionArgsParts[2]));
} else if (exception.equals(ConcurrentModificationException.class.getName())) {
return new ConcurrentModificationException(detail);
} else if (exception.equals(TransactionException.class.getName())) {
Expand Down
61 changes: 61 additions & 0 deletions network/src/test/java/com/arcadedb/HostUtilTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2021-present Arcade Data Ltd ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-FileCopyrightText: 2021-present Arcade Data Ltd ([email protected])
* SPDX-License-Identifier: Apache-2.0
*/
package com.arcadedb;

import com.arcadedb.network.HostUtil;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Luca Garulli ([email protected])
*/
public class HostUtilTest {
@Test
public void testIPv4() {
final String[] parts = HostUtil.parseHostAddress("10.33.5.22", HostUtil.CLIENT_DEFAULT_PORT);
assertThat(parts.length).isEqualTo(2);
assertThat(parts[0]).isEqualTo("10.33.5.22");
assertThat(parts[1]).isEqualTo(HostUtil.CLIENT_DEFAULT_PORT);
}

@Test
public void testIPv4WithPort() {
final String[] parts = HostUtil.parseHostAddress("10.33.5.22:33", HostUtil.CLIENT_DEFAULT_PORT);
assertThat(parts.length).isEqualTo(2);
assertThat(parts[0]).isEqualTo("10.33.5.22");
assertThat(parts[1]).isEqualTo("33");
}

@Test
public void testIPv6() {
final String[] parts = HostUtil.parseHostAddress("fe80:0:0:0:250:56ff:fe9a:6990", HostUtil.CLIENT_DEFAULT_PORT);
assertThat(parts.length).isEqualTo(2);
assertThat(parts[0]).isEqualTo("fe80:0:0:0:250:56ff:fe9a:6990");
assertThat(parts[1]).isEqualTo(HostUtil.CLIENT_DEFAULT_PORT);
}

@Test
public void testIPv6WithPort() {
final String[] parts = HostUtil.parseHostAddress("fe80:0:0:0:250:56ff:fe9a:6990:22", HostUtil.CLIENT_DEFAULT_PORT);
assertThat(parts.length).isEqualTo(2);
assertThat(parts[0]).isEqualTo("fe80:0:0:0:250:56ff:fe9a:6990");
assertThat(parts[1]).isEqualTo("22");
}
}
52 changes: 16 additions & 36 deletions server/src/main/java/com/arcadedb/server/ha/HAServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.arcadedb.log.LogManager;
import com.arcadedb.network.binary.ChannelBinaryClient;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.network.HostUtil;
import com.arcadedb.network.binary.QuorumNotReachedException;
import com.arcadedb.network.binary.ServerIsNotTheLeaderException;
import com.arcadedb.query.sql.executor.InternalResultSet;
Expand All @@ -50,29 +51,15 @@
import com.arcadedb.utility.RecordTableFormatter;
import com.arcadedb.utility.TableFormatter;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;

public class HAServer implements ServerPlugin {
public static final String DEFAULT_PORT = "2424";
public static final String DEFAULT_PORT = HostUtil.HA_DEFAULT_PORT;
private final HAMessageFactory messageFactory;
private final ArcadeDBServer server;
private final ContextConfiguration configuration;
Expand Down Expand Up @@ -238,13 +225,10 @@ protected boolean isCurrentServer(final String serverEntry) {
if (serverAddress.equals(serverEntry))
return true;

final String[] localServerParts = serverAddress.split(":");
final String[] localServerParts = HostUtil.parseHostAddress(serverAddress, DEFAULT_PORT);

try {
String[] serverParts = serverEntry.split(":");
if (serverParts.length < 2)
serverParts = new String[] { serverParts[0], DEFAULT_PORT };

final String[] serverParts = HostUtil.parseHostAddress(serverEntry, DEFAULT_PORT);
if (localServerParts[0].equals(serverParts[0]) && localServerParts[1].equals(serverParts[1]))
return true;

Expand Down Expand Up @@ -328,7 +312,7 @@ private void sendNewLeadershipToOtherNodes() {
continue;

try {
final String[] parts = serverAddress.split(":");
final String[] parts = HostUtil.parseHostAddress(serverAddress, DEFAULT_PORT);

LogManager.instance().log(this, Level.INFO, "- Sending new Leader to server '%s'...", serverAddress);

Expand Down Expand Up @@ -967,10 +951,7 @@ public void resendMessagesToReplica(final long fromMessageNumber, final String r
}

public boolean connectToLeader(final String serverEntry, final Callable<Void, Exception> errorCallback) {
String[] serverParts = serverEntry.split(":");
if (serverParts.length == 1)
serverParts = new String[] { serverParts[0], DEFAULT_PORT };

final String[] serverParts = HostUtil.parseHostAddress(serverEntry, DEFAULT_PORT);
try {
connectToLeader(serverParts[0], Integer.parseInt(serverParts[1]));

Expand All @@ -982,7 +963,7 @@ public boolean connectToLeader(final String serverEntry, final Callable<Void, Ex
LogManager.instance().log(this, Level.INFO, "Remote server %s:%d is not the Leader, connecting to %s", serverParts[0],
Integer.parseInt(serverParts[1]), leaderAddress);

final String[] leader = leaderAddress.split(":");
final String[] leader = HostUtil.parseHostAddress(leaderAddress, DEFAULT_PORT);

connectToLeader(leader[0], Integer.parseInt(leader[1]));

Expand Down Expand Up @@ -1134,9 +1115,7 @@ private void startElection() {

try {

String[] parts = serverAddressCopy.split(":");
if (parts.length == 1)
parts = new String[] { parts[0], DEFAULT_PORT };
final String[] parts = HostUtil.parseHostAddress(serverAddressCopy, DEFAULT_PORT);

final ChannelBinaryClient channel = createNetworkConnection(parts[0], Integer.parseInt(parts[1]),
ReplicationProtocol.COMMAND_VOTE_FOR_ME);
Expand Down Expand Up @@ -1178,7 +1157,8 @@ private void startElection() {

channel.close();
} catch (final Exception e) {
LogManager.instance().log(this, Level.INFO, "Error contacting server %s for election", e, serverAddressCopy);
LogManager.instance()
.log(this, Level.INFO, "Error contacting server %s for election: %s", serverAddressCopy, e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.arcadedb.log.LogManager;
import com.arcadedb.network.binary.ChannelBinaryClient;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.network.HostUtil;
import com.arcadedb.network.binary.NetworkProtocolException;
import com.arcadedb.network.binary.ServerIsNotTheLeaderException;
import com.arcadedb.schema.LocalSchema;
Expand Down Expand Up @@ -212,8 +213,7 @@ private void reconnect(final Exception e) {
// SKIP LOCAL SERVER
continue;

final String[] parts = serverAddress.split(":");

final String[] parts = HostUtil.parseHostAddress(serverAddress, HostUtil.HA_DEFAULT_PORT);
host = parts[0];
port = Integer.parseInt(parts[1]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.exception.TransactionException;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.remote.RemoteDatabase;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void run() {
}, 15_000, 10_000);

final String server1Address = getServer(0).getHttpServer().getListeningAddress();
final String[] server1AddressParts = server1Address.split(":");
final String[] server1AddressParts = HostUtil.parseHostAddress(server1Address, HostUtil.CLIENT_DEFAULT_PORT);

final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]),
getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.server.ArcadeDBServer;
import com.arcadedb.server.ReplicationCallback;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void onEvent(final TYPE type, final Object object, final ArcadeDBServer s
} else if (type == TYPE.NETWORK_CONNECTION && split) {
final String connectTo = (String) object;

final String[] parts = connectTo.split(":");
final String[] parts = HostUtil.parseHostAddress(connectTo, HostUtil.HA_DEFAULT_PORT);
final int port = Integer.parseInt(parts[1]);

if (server.getServerName().equals("ArcadeDB_3") || server.getServerName().equals("ArcadeDB_4")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.exception.TransactionException;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.remote.RemoteDatabase;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void testReplication() {
checkDatabases();

final String server1Address = getServer(0).getHttpServer().getListeningAddress();
final String[] server1AddressParts = server1Address.split(":");
final String[] server1AddressParts = HostUtil.parseHostAddress(server1Address, HostUtil.CLIENT_DEFAULT_PORT);

final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]),
getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.remote.RemoteDatabase;
Expand Down Expand Up @@ -58,8 +59,8 @@ public void testReplication() {
checkDatabases();

final String server1Address = getServer(0).getHttpServer().getListeningAddress();
final String[] server1AddressParts = server1Address.split(":");

final String[] server1AddressParts = HostUtil.parseHostAddress(server1Address, HostUtil.CLIENT_DEFAULT_PORT);
final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]),
getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
import com.arcadedb.remote.RemoteDatabase;
Expand Down Expand Up @@ -55,9 +56,10 @@ public void testReplication() {
checkDatabases();

final String server2Address = getServer(1).getHttpServer().getListeningAddress();
final String[] server1AddressParts = server2Address.split(":");

final RemoteDatabase db = new RemoteDatabase(server1AddressParts[0], Integer.parseInt(server1AddressParts[1]),
final String[] server2AddressParts = HostUtil.parseHostAddress(server2Address, HostUtil.CLIENT_DEFAULT_PORT);

final RemoteDatabase db = new RemoteDatabase(server2AddressParts[0], Integer.parseInt(server2AddressParts[1]),
getDatabaseName(), "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS);

LogManager.instance()
Expand Down
Loading

0 comments on commit f4d38e5

Please sign in to comment.