diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/DiscoveryEvent.java b/rskj-core/src/main/java/co/rsk/net/discovery/DiscoveryEvent.java index d306a09b9c2..a083c4e54fe 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/DiscoveryEvent.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/DiscoveryEvent.java @@ -19,6 +19,7 @@ package co.rsk.net.discovery; import co.rsk.net.discovery.message.PeerDiscoveryMessage; +import org.apache.commons.lang3.builder.ToStringBuilder; import java.net.InetSocketAddress; @@ -38,4 +39,12 @@ public PeerDiscoveryMessage getMessage() { public InetSocketAddress getAddress() { return address; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("message", this.message) + .append("address", this.address) + .toString(); + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java index 759a3c4c20c..6e0928f4774 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java @@ -18,6 +18,7 @@ package co.rsk.net.discovery; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.ethereum.net.rlpx.Node; /** @@ -45,4 +46,13 @@ public Node getChallenger() { public String getChallengeId() { return challengeId; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("challengedNode", this.challengedNode) + .append("challenger", this.challenger) + .append("challengeId", this.challengeId) + .toString(); + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java index ef2878b5b14..4400dfe4061 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/NodeChallengeManager.java @@ -21,6 +21,8 @@ import co.rsk.net.discovery.message.PingPeerMessage; import com.google.common.annotations.VisibleForTesting; import org.ethereum.net.rlpx.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,9 +31,14 @@ * Created by mario on 22/02/17. */ public class NodeChallengeManager { + private static final Logger logger = LoggerFactory.getLogger(NodeChallengeManager.class); + private Map activeChallenges = new ConcurrentHashMap<>(); public NodeChallenge startChallenge(Node challengedNode, Node challenger, PeerExplorer explorer) { + logger.debug("startChallenge - Starting challenge for node: [{}] by challenger: [{}]", + challengedNode.getHexId(), challenger.getHexId()); + PingPeerMessage pingMessage = explorer.sendPing(challengedNode.getAddress(), 1, challengedNode); String messageId = pingMessage.getMessageId(); NodeChallenge challenge = new NodeChallenge(challengedNode, challenger, messageId); diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerDiscoveryRequest.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerDiscoveryRequest.java index e1d01208776..9bd7ae77fa4 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerDiscoveryRequest.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerDiscoveryRequest.java @@ -20,6 +20,7 @@ import co.rsk.net.discovery.message.DiscoveryMessageType; import co.rsk.net.discovery.message.PeerDiscoveryMessage; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.ethereum.net.rlpx.Node; import java.net.InetSocketAddress; @@ -73,4 +74,14 @@ public boolean validateMessageResponse(InetSocketAddress responseAddress, PeerDi public boolean hasExpired() { return System.currentTimeMillis() > expirationDate; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("messageId", this.messageId) + .append("message", this.message) + .append("address", this.address) + .append("relatedNode", this.relatedNode) + .toString(); + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java index cc2d86bda19..ce8ed79817c 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java @@ -19,7 +19,11 @@ package co.rsk.net.discovery; import co.rsk.net.NodeID; -import co.rsk.net.discovery.message.*; +import co.rsk.net.discovery.message.DiscoveryMessageType; +import co.rsk.net.discovery.message.FindNodePeerMessage; +import co.rsk.net.discovery.message.NeighborsPeerMessage; +import co.rsk.net.discovery.message.PingPeerMessage; +import co.rsk.net.discovery.message.PongPeerMessage; import co.rsk.net.discovery.table.NodeDistanceTable; import co.rsk.net.discovery.table.OperationResult; import co.rsk.net.discovery.table.PeerDiscoveryRequestBuilder; @@ -37,7 +41,12 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.security.SecureRandom; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -160,6 +169,12 @@ void setUDPChannel(UDPChannel udpChannel) { } synchronized void handleMessage(DiscoveryEvent event) { + logger.debug("handleMessage - Handling message with " + + "type: [{}], " + + "networkId: [{}]", + event.getMessage().getMessageType(), + event.getMessage().getNetworkId()); + if (state != ExecState.RUNNING) { logger.warn("Cannot handle message as current state is {}", state); return; @@ -170,6 +185,8 @@ synchronized void handleMessage(DiscoveryEvent event) { //have a networkId in the message yet, then just let them through, for now. if (event.getMessage().getNetworkId().isPresent() && event.getMessage().getNetworkId().getAsInt() != this.networkId) { + logger.warn("handleMessage - Message ignored because remote peer's network id: [{}] is different from local network id: [{}]", + event.getMessage().getNetworkId(), this.networkId); return; } if (type == DiscoveryMessageType.PING) { @@ -191,9 +208,14 @@ synchronized void handleMessage(DiscoveryEvent event) { private void handlePingMessage(InetSocketAddress address, PingPeerMessage message) { this.sendPong(address, message); - Node connectedNode = this.establishedConnections.get(message.getNodeId()); + logger.debug("handlePingMessage - Handling ping message with " + + "address: [{}/{}], " + + "nodeId: [{}], " + + "connectedNode: [{}]" + , address.getHostName(), address.getPort(), message.getNodeId(), connectedNode); + if (connectedNode == null) { this.sendPing(address, 1); } else { @@ -204,12 +226,20 @@ private void handlePingMessage(InetSocketAddress address, PingPeerMessage messag private void handlePong(InetSocketAddress pongAddress, PongPeerMessage message) { PeerDiscoveryRequest request = this.pendingPingRequests.get(message.getMessageId()); + logger.debug("handlePong - Handling pong message with " + + "address: [{}/{}], " + + "messageId: [{}], " + + "request: [{}]" + , pongAddress.getHostName(), pongAddress.getPort(), message.getMessageId(), request); + if (request != null && request.validateMessageResponse(pongAddress, message)) { this.pendingPingRequests.remove(message.getMessageId()); NodeChallenge challenge = this.challengeManager.removeChallenge(message.getMessageId()); if (challenge == null) { this.addConnection(message, request.getAddress().getHostString(), request.getAddress().getPort()); } + } else { + logger.warn("handlePong - Peer discovery request with id [{}] is either null or invalid", message.getMessageId()); } } @@ -219,17 +249,29 @@ private void handleFindNode(FindNodePeerMessage message) { if (connectedNode != null) { List nodesToSend = this.distanceTable.getClosestNodes(nodeId); - logger.debug("About to send [{}] neighbors to ip[{}] port[{}] nodeId[{}]", nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId()); + logger.debug("handleFindNode - About to send [{}] neighbors to address: [{}/{}], nodeId: [{}] with messageId: [{}]", + nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId(), message.getMessageId()); this.sendNeighbors(connectedNode.getAddress(), nodesToSend, message.getMessageId()); updateEntry(connectedNode); + } else { + logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId); } } private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress, NeighborsPeerMessage message) { - Node connectedNode = this.establishedConnections.get(message.getNodeId()); + NodeID nodeId = message.getNodeId(); + Node connectedNode = this.establishedConnections.get(nodeId); if (connectedNode != null) { - logger.debug("Neighbors received from [{}]", connectedNode.getHexId()); + logger.debug("handleNeighborsMessage - Neighbors received from id: [{}], address: [{}/{}] with " + + "nodeId: [{}]," + + "messageId: [{}], " + + "nodesCount: [{}], " + + "nodes: [{}], " + + "connectedNode: [{}]", + connectedNode.getHexId(), neighborsResponseAddress.getHostName(), neighborsResponseAddress.getPort(), message.getNodeId(), + message.getMessageId(), message.countNodes(), message.getNodes(), connectedNode); + PeerDiscoveryRequest request = this.pendingFindNodeRequests.remove(message.getMessageId()); if (request != null && request.validateMessageResponse(neighborsResponseAddress, message)) { @@ -239,6 +281,8 @@ private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress, this.startConversationWithNewNodes(); } updateEntry(connectedNode); + } else { + logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId); } } @@ -254,6 +298,8 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt PingPeerMessage nodeMessage = checkPendingPeerToAddress(nodeAddress); if (nodeMessage != null) { + logger.warn("sendPing - No ping message has been sent to address: [{}/{}], as there's pending one", nodeAddress.getHostName(), nodeAddress.getPort()); + return nodeMessage; } @@ -263,6 +309,13 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt localAddress.getAddress().getHostAddress(), localAddress.getPort(), id, this.key, this.networkId); + + logger.debug("sendPing - Sending ping message to " + + "address: [{}/{}], " + + "attempt: [{}], " + + "nodeMessage: [{}]" + , nodeAddress.getHostName(), nodeAddress.getPort(), attempt, nodeMessage); + udpChannel.write(new DiscoveryEvent(nodeMessage, nodeAddress)); PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id) @@ -275,6 +328,7 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt } private void updateEntry(Node connectedNode) { + logger.trace("updateEntry - Updating node [{}]", connectedNode.getHexId()); try { updateEntryLock.lock(); this.distanceTable.updateEntry(connectedNode); @@ -296,6 +350,13 @@ private PingPeerMessage checkPendingPeerToAddress(InetSocketAddress address) { private PongPeerMessage sendPong(InetSocketAddress nodeAddress, PingPeerMessage message) { InetSocketAddress localAddress = this.localNode.getAddress(); PongPeerMessage pongPeerMessage = PongPeerMessage.create(localAddress.getHostName(), localAddress.getPort(), message.getMessageId(), this.key, this.networkId); + + logger.debug("sendPong - Sending pong message to " + + "address: [{}/{}], " + + "messageId: [{}], " + + "pongPeerMessage: [{}]" + , nodeAddress.getHostName(), nodeAddress.getPort(), message.getMessageId(), pongPeerMessage); + udpChannel.write(new DiscoveryEvent(pongPeerMessage, nodeAddress)); return pongPeerMessage; @@ -306,10 +367,18 @@ FindNodePeerMessage sendFindNode(Node node) { InetSocketAddress nodeAddress = node.getAddress(); String id = UUID.randomUUID().toString(); FindNodePeerMessage findNodePeerMessage = FindNodePeerMessage.create(this.key.getNodeId(), id, this.key, this.networkId); + udpChannel.write(new DiscoveryEvent(findNodePeerMessage, nodeAddress)); PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id).relatedNode(node) .message(findNodePeerMessage).address(nodeAddress).expectedResponse(DiscoveryMessageType.NEIGHBORS) .expirationPeriod(requestTimeout).build(); + + logger.debug("sendFindNode - Sending find node message to " + + "address: [{}/{}], " + + "findNodePeerMessage: [{}], " + + "request: [{}]" + , nodeAddress.getHostName(), nodeAddress.getPort(), findNodePeerMessage, request); + pendingFindNodeRequests.put(findNodePeerMessage.getMessageId(), request); return findNodePeerMessage; @@ -318,8 +387,16 @@ FindNodePeerMessage sendFindNode(Node node) { private NeighborsPeerMessage sendNeighbors(InetSocketAddress nodeAddress, List nodes, String id) { List nodesToSend = getRandomizeLimitedList(nodes, MAX_NODES_PER_MSG, 5); NeighborsPeerMessage sendNodesMessage = NeighborsPeerMessage.create(nodesToSend, id, this.key, networkId); + + logger.debug("sendNeighbors - Sending neighbors message to " + + "address: [{}/{}], " + + "id: [{}], " + + "nodes: [{}], " + + "nodesToSend: [{}], " + + "sendNodesMessage: [{}]" + , nodeAddress.getHostName(), nodeAddress.getPort(), id, nodes, nodesToSend, sendNodesMessage); + udpChannel.write(new DiscoveryEvent(sendNodesMessage, nodeAddress)); - logger.debug(" [{}] Neighbors Sent to ip:[{}] port:[{}]", nodesToSend.size(), nodeAddress.getAddress().getHostAddress(), nodeAddress.getPort()); return sendNodesMessage; } @@ -340,6 +417,7 @@ synchronized void clean() { return; } + logger.trace("clean - Cleaning obsolete requests"); this.purgeRequests(); } @@ -350,17 +428,28 @@ synchronized void update() { } List closestNodes = this.distanceTable.getClosestNodes(this.localNode.getId()); + + logger.trace("update - closestNodes: [{}]", closestNodes); + this.askForMoreNodes(closestNodes); this.checkPeersPulse(closestNodes); } private void checkPeersPulse(List closestNodes) { List nodesToCheck = this.getRandomizeLimitedList(closestNodes, MAX_NODES_TO_CHECK, 10); + + logger.trace("checkPeersPulse - Checking peers pulse for nodes: [{}], nodesToCheck: [{}]" + , closestNodes, nodesToCheck); + nodesToCheck.forEach(node -> sendPing(node.getAddress(), 1, node)); } private void askForMoreNodes(List closestNodes) { List nodesToAsk = getRandomizeLimitedList(closestNodes, MAX_NODES_TO_ASK, 5); + + logger.trace("askForMoreNodes - Asking for more nodes from closestNodes: [{}], nodesToAsk: [{}]" + , closestNodes, nodesToAsk); + nodesToAsk.forEach(this::sendFindNode); } @@ -369,6 +458,9 @@ private List removeExpiredRequests(Map pendingRequests.remove(r.getMessageId())); + logger.trace("removeExpiredRequests - Removing expired requests from pendingRequests: [{}], requests: [{}]" + , pendingRequests, requests); + return requests; } @@ -377,6 +469,8 @@ private void removeExpiredChallenges(List peerDiscoveryReq } private void resendExpiredPing(List peerDiscoveryRequests) { + logger.trace("resendExpiredPing - Resending expired pings form peerDiscoveryRequests: [{}]", peerDiscoveryRequests); + peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < RETRIES_COUNT) .forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1, r.getRelatedNode())); } @@ -392,6 +486,12 @@ private void removeConnections(List expiredRequests) { } private void removeConnection(Node node) { + if (logger.isDebugEnabled()) { + InetSocketAddress address = node.getAddress(); + logger.debug("removeConnection - Removing node: [{}], " + + "nodeAddress address: [{}/{}]", node.getHexId(), address.getHostName(), address.getPort()); + } + this.establishedConnections.remove(node.getId()); this.distanceTable.removeNode(node); this.knownHosts.remove(node.getAddressAsString()); @@ -400,6 +500,14 @@ private void removeConnection(Node node) { private void addConnection(PongPeerMessage message, String ip, int port) { Node senderNode = new Node(message.getNodeId().getID(), ip, port); boolean isLocalNode = StringUtils.equals(senderNode.getHexId(), this.localNode.getHexId()); + + logger.debug("addConnection - Adding node with " + + "address: [{}/{}], " + + "messageId: [{}], " + + "allowMultipleConnectionsPerHostPort: [{}], " + + "senderNode: [{}], " + + "isLocalNode: [{}], ", ip, port, message.getMessageId(), this.allowMultipleConnectionsPerHostPort, senderNode, isLocalNode); + if (isLocalNode) { return; } @@ -409,6 +517,9 @@ private void addConnection(PongPeerMessage message, String ip, int port) { } OperationResult result = this.distanceTable.addNode(senderNode); + + logger.debug("addConnection - result: [{}]", result); + if (result.isSuccess()) { this.knownHosts.put(senderNode.getAddressAsString(), senderNode.getId()); this.establishedConnections.put(senderNode.getId(), senderNode); diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/table/BucketEntry.java b/rskj-core/src/main/java/co/rsk/net/discovery/table/BucketEntry.java index c84e4ac5d58..89615c8e340 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/table/BucketEntry.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/table/BucketEntry.java @@ -18,6 +18,7 @@ package co.rsk.net.discovery.table; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.ethereum.net.rlpx.Node; /** @@ -43,4 +44,12 @@ public long lastSeen() { public void updateTime() { this.lastSeenTime = System.currentTimeMillis(); } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("node", node) + .append("lastSeenTime", lastSeenTime) + .toString(); + } } diff --git a/rskj-core/src/main/java/co/rsk/net/discovery/table/OperationResult.java b/rskj-core/src/main/java/co/rsk/net/discovery/table/OperationResult.java index 6b273de8a33..9f147d8d910 100644 --- a/rskj-core/src/main/java/co/rsk/net/discovery/table/OperationResult.java +++ b/rskj-core/src/main/java/co/rsk/net/discovery/table/OperationResult.java @@ -18,6 +18,8 @@ package co.rsk.net.discovery.table; +import org.apache.commons.lang3.builder.ToStringBuilder; + /** * Created by mario on 21/02/17. */ @@ -38,4 +40,12 @@ public boolean isSuccess() { public BucketEntry getAffectedEntry() { return affectedEntry; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("success", success) + .append("affectedEntry", affectedEntry) + .toString(); + } }