Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs peer discovery #2157

Merged
merged 17 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package co.rsk.net.discovery;

import co.rsk.net.discovery.message.PeerDiscoveryMessage;
import org.apache.commons.lang3.builder.ToStringBuilder;

import java.net.InetSocketAddress;

Expand All @@ -38,4 +39,12 @@ public PeerDiscoveryMessage getMessage() {
public InetSocketAddress getAddress() {
return address;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("message", this.message)
.append("address", this.address)
.toString();
}
}
10 changes: 10 additions & 0 deletions rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package co.rsk.net.discovery;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.ethereum.net.rlpx.Node;

/**
Expand Down Expand Up @@ -45,4 +46,13 @@ public Node getChallenger() {
public String getChallengeId() {
return challengeId;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("challengedNode", this.challengedNode)
.append("challenger", this.challenger)
.append("challengeId", this.challengeId)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import co.rsk.net.discovery.message.PingPeerMessage;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.net.rlpx.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -29,9 +31,14 @@
* Created by mario on 22/02/17.
*/
public class NodeChallengeManager {
private static final Logger logger = LoggerFactory.getLogger(NodeChallengeManager.class);

private Map<String, NodeChallenge> activeChallenges = new ConcurrentHashMap<>();

public NodeChallenge startChallenge(Node challengedNode, Node challenger, PeerExplorer explorer) {
logger.debug("startChallenge - Starting challenge for node: [{}] by challenger: [{}]",
challengedNode.getHexId(), challenger.getHexId());

PingPeerMessage pingMessage = explorer.sendPing(challengedNode.getAddress(), 1, challengedNode);
String messageId = pingMessage.getMessageId();
NodeChallenge challenge = new NodeChallenge(challengedNode, challenger, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.rsk.net.discovery.message.DiscoveryMessageType;
import co.rsk.net.discovery.message.PeerDiscoveryMessage;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.ethereum.net.rlpx.Node;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -73,4 +74,14 @@ public boolean validateMessageResponse(InetSocketAddress responseAddress, PeerDi
public boolean hasExpired() {
return System.currentTimeMillis() > expirationDate;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("messageId", this.messageId)
.append("message", this.message)
.append("address", this.address)
.append("relatedNode", this.relatedNode)
.toString();
}
}
125 changes: 118 additions & 7 deletions rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package co.rsk.net.discovery;

import co.rsk.net.NodeID;
import co.rsk.net.discovery.message.*;
import co.rsk.net.discovery.message.DiscoveryMessageType;
import co.rsk.net.discovery.message.FindNodePeerMessage;
import co.rsk.net.discovery.message.NeighborsPeerMessage;
import co.rsk.net.discovery.message.PingPeerMessage;
import co.rsk.net.discovery.message.PongPeerMessage;
import co.rsk.net.discovery.table.NodeDistanceTable;
import co.rsk.net.discovery.table.OperationResult;
import co.rsk.net.discovery.table.PeerDiscoveryRequestBuilder;
Expand All @@ -37,7 +41,12 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -160,6 +169,12 @@ void setUDPChannel(UDPChannel udpChannel) {
}

synchronized void handleMessage(DiscoveryEvent event) {
logger.debug("handleMessage - Handling message with " +
"type: [{}], " +
"networkId: [{}]",
event.getMessage().getMessageType(),
event.getMessage().getNetworkId());
Dismissed Show dismissed Hide dismissed

if (state != ExecState.RUNNING) {
logger.warn("Cannot handle message as current state is {}", state);
return;
Expand All @@ -170,6 +185,8 @@ synchronized void handleMessage(DiscoveryEvent event) {
//have a networkId in the message yet, then just let them through, for now.
if (event.getMessage().getNetworkId().isPresent() &&
event.getMessage().getNetworkId().getAsInt() != this.networkId) {
logger.warn("handleMessage - Message ignored because remote peer's network id: [{}] is different from local network id: [{}]",
event.getMessage().getNetworkId(), this.networkId);
Dismissed Show dismissed Hide dismissed
return;
}
if (type == DiscoveryMessageType.PING) {
Expand All @@ -191,9 +208,14 @@ synchronized void handleMessage(DiscoveryEvent event) {

private void handlePingMessage(InetSocketAddress address, PingPeerMessage message) {
this.sendPong(address, message);

Node connectedNode = this.establishedConnections.get(message.getNodeId());

logger.debug("handlePingMessage - Handling ping message with " +
"address: [{}/{}], " +
"nodeId: [{}], " +
"connectedNode: [{}]"
, address.getHostName(), address.getPort(), message.getNodeId(), connectedNode);

if (connectedNode == null) {
this.sendPing(address, 1);
} else {
Expand All @@ -204,12 +226,20 @@ private void handlePingMessage(InetSocketAddress address, PingPeerMessage messag
private void handlePong(InetSocketAddress pongAddress, PongPeerMessage message) {
PeerDiscoveryRequest request = this.pendingPingRequests.get(message.getMessageId());

logger.debug("handlePong - Handling pong message with " +
"address: [{}/{}], " +
"messageId: [{}], " +
"request: [{}]"
, pongAddress.getHostName(), pongAddress.getPort(), message.getMessageId(), request);

if (request != null && request.validateMessageResponse(pongAddress, message)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should print a WARN here if request is null or invalid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.pendingPingRequests.remove(message.getMessageId());
NodeChallenge challenge = this.challengeManager.removeChallenge(message.getMessageId());
if (challenge == null) {
this.addConnection(message, request.getAddress().getHostString(), request.getAddress().getPort());
}
} else {
logger.warn("handlePong - Peer discovery request with id [{}] is either null or invalid", message.getMessageId());
}
}

Expand All @@ -219,17 +249,29 @@ private void handleFindNode(FindNodePeerMessage message) {

if (connectedNode != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should print a WARN here if connectedNode is null. So that we may not need the one above - logger.debug("handleFindNode - Handling find node message with ... - as we could extend the one bellow - logger.debug("About to send [{}] neighbors to ip[{}] ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<Node> nodesToSend = this.distanceTable.getClosestNodes(nodeId);
logger.debug("About to send [{}] neighbors to ip[{}] port[{}] nodeId[{}]", nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId());
logger.debug("handleFindNode - About to send [{}] neighbors to address: [{}/{}], nodeId: [{}] with messageId: [{}]",
nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId(), message.getMessageId());
this.sendNeighbors(connectedNode.getAddress(), nodesToSend, message.getMessageId());
updateEntry(connectedNode);
} else {
logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId);
}
}

private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress, NeighborsPeerMessage message) {
Node connectedNode = this.establishedConnections.get(message.getNodeId());
NodeID nodeId = message.getNodeId();
Node connectedNode = this.establishedConnections.get(nodeId);

if (connectedNode != null) {
logger.debug("Neighbors received from [{}]", connectedNode.getHexId());
logger.debug("handleNeighborsMessage - Neighbors received from id: [{}], address: [{}/{}] with " +
"nodeId: [{}]," +
"messageId: [{}], " +
"nodesCount: [{}], " +
"nodes: [{}], " +
"connectedNode: [{}]",
connectedNode.getHexId(), neighborsResponseAddress.getHostName(), neighborsResponseAddress.getPort(), message.getNodeId(),
message.getMessageId(), message.countNodes(), message.getNodes(), connectedNode);

PeerDiscoveryRequest request = this.pendingFindNodeRequests.remove(message.getMessageId());

if (request != null && request.validateMessageResponse(neighborsResponseAddress, message)) {
Expand All @@ -239,6 +281,8 @@ private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress,
this.startConversationWithNewNodes();
}
updateEntry(connectedNode);
} else {
logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId);
}
}

Expand All @@ -254,6 +298,8 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
PingPeerMessage nodeMessage = checkPendingPeerToAddress(nodeAddress);

if (nodeMessage != null) {
logger.warn("sendPing - No ping message has been sent to address: [{}/{}], as there's pending one", nodeAddress.getHostName(), nodeAddress.getPort());

return nodeMessage;
}

Expand All @@ -263,6 +309,13 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
localAddress.getAddress().getHostAddress(),
localAddress.getPort(),
id, this.key, this.networkId);

logger.debug("sendPing - Sending ping message to " +
"address: [{}/{}], " +
"attempt: [{}], " +
"nodeMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), attempt, nodeMessage);

udpChannel.write(new DiscoveryEvent(nodeMessage, nodeAddress));

PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id)
Expand All @@ -275,6 +328,7 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
}

private void updateEntry(Node connectedNode) {
logger.trace("updateEntry - Updating node [{}]", connectedNode.getHexId());
try {
updateEntryLock.lock();
this.distanceTable.updateEntry(connectedNode);
Expand All @@ -296,6 +350,13 @@ private PingPeerMessage checkPendingPeerToAddress(InetSocketAddress address) {
private PongPeerMessage sendPong(InetSocketAddress nodeAddress, PingPeerMessage message) {
InetSocketAddress localAddress = this.localNode.getAddress();
PongPeerMessage pongPeerMessage = PongPeerMessage.create(localAddress.getHostName(), localAddress.getPort(), message.getMessageId(), this.key, this.networkId);

logger.debug("sendPong - Sending pong message to " +
"address: [{}/{}], " +
"messageId: [{}], " +
"pongPeerMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), message.getMessageId(), pongPeerMessage);

udpChannel.write(new DiscoveryEvent(pongPeerMessage, nodeAddress));

return pongPeerMessage;
Expand All @@ -306,10 +367,18 @@ FindNodePeerMessage sendFindNode(Node node) {
InetSocketAddress nodeAddress = node.getAddress();
String id = UUID.randomUUID().toString();
FindNodePeerMessage findNodePeerMessage = FindNodePeerMessage.create(this.key.getNodeId(), id, this.key, this.networkId);

udpChannel.write(new DiscoveryEvent(findNodePeerMessage, nodeAddress));
PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id).relatedNode(node)
.message(findNodePeerMessage).address(nodeAddress).expectedResponse(DiscoveryMessageType.NEIGHBORS)
.expirationPeriod(requestTimeout).build();

logger.debug("sendFindNode - Sending find node message to " +
"address: [{}/{}], " +
"findNodePeerMessage: [{}], " +
"request: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), findNodePeerMessage, request);

pendingFindNodeRequests.put(findNodePeerMessage.getMessageId(), request);

return findNodePeerMessage;
Expand All @@ -318,8 +387,16 @@ FindNodePeerMessage sendFindNode(Node node) {
private NeighborsPeerMessage sendNeighbors(InetSocketAddress nodeAddress, List<Node> nodes, String id) {
List<Node> nodesToSend = getRandomizeLimitedList(nodes, MAX_NODES_PER_MSG, 5);
NeighborsPeerMessage sendNodesMessage = NeighborsPeerMessage.create(nodesToSend, id, this.key, networkId);

logger.debug("sendNeighbors - Sending neighbors message to " +
"address: [{}/{}], " +
"id: [{}], " +
"nodes: [{}], " +
"nodesToSend: [{}], " +
"sendNodesMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), id, nodes, nodesToSend, sendNodesMessage);

udpChannel.write(new DiscoveryEvent(sendNodesMessage, nodeAddress));
logger.debug(" [{}] Neighbors Sent to ip:[{}] port:[{}]", nodesToSend.size(), nodeAddress.getAddress().getHostAddress(), nodeAddress.getPort());

return sendNodesMessage;
}
Expand All @@ -340,6 +417,7 @@ synchronized void clean() {
return;
}

logger.trace("clean - Cleaning obsolete requests");
this.purgeRequests();
}

Expand All @@ -350,17 +428,28 @@ synchronized void update() {
}

List<Node> closestNodes = this.distanceTable.getClosestNodes(this.localNode.getId());

logger.trace("update - closestNodes: [{}]", closestNodes);

this.askForMoreNodes(closestNodes);
this.checkPeersPulse(closestNodes);
}

private void checkPeersPulse(List<Node> closestNodes) {
List<Node> nodesToCheck = this.getRandomizeLimitedList(closestNodes, MAX_NODES_TO_CHECK, 10);

logger.trace("checkPeersPulse - Checking peers pulse for nodes: [{}], nodesToCheck: [{}]"
, closestNodes, nodesToCheck);

nodesToCheck.forEach(node -> sendPing(node.getAddress(), 1, node));
}

private void askForMoreNodes(List<Node> closestNodes) {
List<Node> nodesToAsk = getRandomizeLimitedList(closestNodes, MAX_NODES_TO_ASK, 5);

logger.trace("askForMoreNodes - Asking for more nodes from closestNodes: [{}], nodesToAsk: [{}]"
, closestNodes, nodesToAsk);

nodesToAsk.forEach(this::sendFindNode);
}

Expand All @@ -369,6 +458,9 @@ private List<PeerDiscoveryRequest> removeExpiredRequests(Map<String, PeerDiscove
.filter(PeerDiscoveryRequest::hasExpired).collect(Collectors.toList());
requests.forEach(r -> pendingRequests.remove(r.getMessageId()));

logger.trace("removeExpiredRequests - Removing expired requests from pendingRequests: [{}], requests: [{}]"
, pendingRequests, requests);

return requests;
}

Expand All @@ -377,6 +469,8 @@ private void removeExpiredChallenges(List<PeerDiscoveryRequest> peerDiscoveryReq
}

private void resendExpiredPing(List<PeerDiscoveryRequest> peerDiscoveryRequests) {
logger.trace("resendExpiredPing - Resending expired pings form peerDiscoveryRequests: [{}]", peerDiscoveryRequests);

peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < RETRIES_COUNT)
.forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1, r.getRelatedNode()));
}
Expand All @@ -392,6 +486,12 @@ private void removeConnections(List<PeerDiscoveryRequest> expiredRequests) {
}

private void removeConnection(Node node) {
if (logger.isDebugEnabled()) {
InetSocketAddress address = node.getAddress();
logger.debug("removeConnection - Removing node: [{}], " +
"nodeAddress address: [{}/{}]", node.getHexId(), address.getHostName(), address.getPort());
}

this.establishedConnections.remove(node.getId());
this.distanceTable.removeNode(node);
this.knownHosts.remove(node.getAddressAsString());
Expand All @@ -400,6 +500,14 @@ private void removeConnection(Node node) {
private void addConnection(PongPeerMessage message, String ip, int port) {
Node senderNode = new Node(message.getNodeId().getID(), ip, port);
boolean isLocalNode = StringUtils.equals(senderNode.getHexId(), this.localNode.getHexId());

logger.debug("addConnection - Adding node with " +
"address: [{}/{}], " +
"messageId: [{}], " +
"allowMultipleConnectionsPerHostPort: [{}], " +
"senderNode: [{}], " +
"isLocalNode: [{}], ", ip, port, message.getMessageId(), this.allowMultipleConnectionsPerHostPort, senderNode, isLocalNode);

if (isLocalNode) {
return;
}
Expand All @@ -409,6 +517,9 @@ private void addConnection(PongPeerMessage message, String ip, int port) {
}

OperationResult result = this.distanceTable.addNode(senderNode);

logger.debug("addConnection - result: [{}]", result);

if (result.isSuccess()) {
this.knownHosts.put(senderNode.getAddressAsString(), senderNode.getId());
this.establishedConnections.put(senderNode.getId(), senderNode);
Expand Down
Loading