From 1de348fc729c3cf980abd34b9e9ee42840601ab6 Mon Sep 17 00:00:00 2001 From: Joshua Kissoon Date: Wed, 19 Feb 2014 15:54:15 +0530 Subject: [PATCH] Working on NodeLookups --- .../exceptions/UnknownMessageException.java | 20 ++++ src/kademlia/message/AcknowledgeMessage.java | 2 +- src/kademlia/message/ConnectMessage.java | 2 +- src/kademlia/message/MessageFactory.java | 11 ++- src/kademlia/message/NodeLookupMessage.java | 2 +- src/kademlia/message/NodeReplyMessage.java | 91 +++++++++++++++++++ src/kademlia/message/SimpleMessage.java | 2 +- src/kademlia/node/Node.java | 1 - src/kademlia/node/NodeId.java | 1 - src/kademlia/operation/ConnectOperation.java | 5 +- .../operation/NodeLookupOperation.java | 52 ++++++++++- src/kademlia/routing/KadBucket.java | 22 +++++ src/kademlia/routing/RoutingTable.java | 25 ++++- 13 files changed, 219 insertions(+), 17 deletions(-) create mode 100644 src/kademlia/exceptions/UnknownMessageException.java create mode 100644 src/kademlia/message/NodeReplyMessage.java diff --git a/src/kademlia/exceptions/UnknownMessageException.java b/src/kademlia/exceptions/UnknownMessageException.java new file mode 100644 index 0000000..8a70670 --- /dev/null +++ b/src/kademlia/exceptions/UnknownMessageException.java @@ -0,0 +1,20 @@ +/** + * @author Joshua Kissoon + * @created 20140219 + * @desc An exception used to indicate an unknown message type or communication identifier + */ +package kademlia.exceptions; + +public class UnknownMessageException extends RuntimeException +{ + + public UnknownMessageException() + { + super(); + } + + public UnknownMessageException(String message) + { + super(message); + } +} diff --git a/src/kademlia/message/AcknowledgeMessage.java b/src/kademlia/message/AcknowledgeMessage.java index c6ef459..7b7b01b 100644 --- a/src/kademlia/message/AcknowledgeMessage.java +++ b/src/kademlia/message/AcknowledgeMessage.java @@ -14,7 +14,7 @@ public class AcknowledgeMessage implements Message { private Node origin; - public static final byte CODE = 0x10; + public static final byte CODE = 0x01; public AcknowledgeMessage(Node origin) { diff --git a/src/kademlia/message/ConnectMessage.java b/src/kademlia/message/ConnectMessage.java index 7f885ae..37ce51e 100644 --- a/src/kademlia/message/ConnectMessage.java +++ b/src/kademlia/message/ConnectMessage.java @@ -14,7 +14,7 @@ public class ConnectMessage implements Message { private Node origin; - public static final byte CODE = 0x11; + public static final byte CODE = 0x02; public ConnectMessage(Node origin) { diff --git a/src/kademlia/message/MessageFactory.java b/src/kademlia/message/MessageFactory.java index 8542717..454718b 100644 --- a/src/kademlia/message/MessageFactory.java +++ b/src/kademlia/message/MessageFactory.java @@ -25,13 +25,20 @@ public class MessageFactory { switch (code) { - default: case SimpleMessage.CODE: return new SimpleMessage(in); case ConnectMessage.CODE: return new ConnectMessage(in); case AcknowledgeMessage.CODE: return new AcknowledgeMessage(in); + case NodeReplyMessage.CODE: + return new NodeReplyMessage(in); + case NodeLookupMessage.CODE: + return new NodeLookupMessage(in); + default: + System.out.println("No Message handler found for message. Code: " + code); + return new SimpleMessage(in); + } } @@ -44,6 +51,8 @@ public class MessageFactory return new SimpleReceiver(); case ConnectMessage.CODE: return new ConnectReceiver(server, this.localNode); + case NodeLookupMessage.CODE + return new NodeLookupReceiver(); } } } diff --git a/src/kademlia/message/NodeLookupMessage.java b/src/kademlia/message/NodeLookupMessage.java index dc53c50..afb115a 100644 --- a/src/kademlia/message/NodeLookupMessage.java +++ b/src/kademlia/message/NodeLookupMessage.java @@ -17,7 +17,7 @@ public class NodeLookupMessage implements Message private Node origin; private NodeId lookupId; - public static final byte CODE = 0x13; + public static final byte CODE = 0x03; /** * A new NodeLookupMessage to find nodes diff --git a/src/kademlia/message/NodeReplyMessage.java b/src/kademlia/message/NodeReplyMessage.java new file mode 100644 index 0000000..7c9b042 --- /dev/null +++ b/src/kademlia/message/NodeReplyMessage.java @@ -0,0 +1,91 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc A message used to connect nodes + */ +package kademlia.message; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import kademlia.node.Node; + +public class NodeReplyMessage implements Message +{ + + private Node origin; + public static final byte CODE = 0x04; + private ArrayList nodes; + + public NodeReplyMessage(Node origin, ArrayList nodes) + { + this.origin = origin; + this.nodes = nodes; + } + + public NodeReplyMessage(DataInput in) throws IOException + { + this.fromStream(in); + } + + @Override + public final void fromStream(DataInput in) throws IOException + { + /* Read in the origin */ + this.origin = new Node(in); + + /* Get the number of incoming nodes */ + int len = in.readInt(); + this.nodes = new ArrayList<>(len); + + /* Read in all nodes */ + for (int i = 0; i < len; i++) + { + this.nodes.add(new Node(in)); + } + } + + @Override + public void toStream(DataOutput out) throws IOException + { + /* Add the origin node to the stream */ + origin.toStream(out); + + /* Add all other nodes to the stream */ + int len = this.nodes.size(); + if (len > 255) + { + throw new IndexOutOfBoundsException("Too many nodes in list to send in NodeReplyMessage. Size: " + len); + } + + /* Writing the nodes to the stream */ + out.writeInt(len); + for (Node n : this.nodes) + { + n.toStream(out); + } + } + + public Node getOrigin() + { + return this.origin; + } + + @Override + public byte code() + { + return CODE; + } + + public ArrayList getNodes() + { + return this.nodes; + } + + @Override + public String toString() + { + return "ConnectMessage[origin NodeId=" + origin.getNodeId() + "]"; + } +} diff --git a/src/kademlia/message/SimpleMessage.java b/src/kademlia/message/SimpleMessage.java index afdc972..d51de42 100644 --- a/src/kademlia/message/SimpleMessage.java +++ b/src/kademlia/message/SimpleMessage.java @@ -13,7 +13,7 @@ public class SimpleMessage implements Message { /* Message constants */ - public static final byte CODE = 0x01; + public static final byte CODE = 0x05; private String content; diff --git a/src/kademlia/node/Node.java b/src/kademlia/node/Node.java index 71fa9ff..2370dcf 100644 --- a/src/kademlia/node/Node.java +++ b/src/kademlia/node/Node.java @@ -8,7 +8,6 @@ package kademlia.node; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; diff --git a/src/kademlia/node/NodeId.java b/src/kademlia/node/NodeId.java index 6b7e286..17cfd46 100644 --- a/src/kademlia/node/NodeId.java +++ b/src/kademlia/node/NodeId.java @@ -122,7 +122,6 @@ public class NodeId implements Streamable public int getFirstSetBitIndex() { int prefixLength = 0; - System.out.println("Bytes: "); for (byte b : this.keyBytes) { diff --git a/src/kademlia/operation/ConnectOperation.java b/src/kademlia/operation/ConnectOperation.java index f7662c3..3c1393f 100644 --- a/src/kademlia/operation/ConnectOperation.java +++ b/src/kademlia/operation/ConnectOperation.java @@ -64,8 +64,9 @@ public class ConnectOperation implements Operation, Receiver throw new RoutingException("Bootstrap node did not respond: " + bootstrapNode); } - /* @todo Perform lookup for our own ID to get nodes close to us */ - Operation lookup = new No + /* Perform lookup for our own ID to get nodes close to us */ + Operation lookup = new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId()); + lookup.execute(); /* @todo Refresh buckets to get a good routing table */ return null; diff --git a/src/kademlia/operation/NodeLookupOperation.java b/src/kademlia/operation/NodeLookupOperation.java index 7aca96c..565315e 100644 --- a/src/kademlia/operation/NodeLookupOperation.java +++ b/src/kademlia/operation/NodeLookupOperation.java @@ -19,8 +19,10 @@ import java.util.TreeMap; import kademlia.core.Configuration; import kademlia.core.KadServer; import kademlia.exceptions.RoutingException; +import kademlia.exceptions.UnknownMessageException; import kademlia.message.Message; import kademlia.message.NodeLookupMessage; +import kademlia.message.NodeReplyMessage; import kademlia.node.Node; import kademlia.node.NodeId; @@ -149,7 +151,7 @@ public class NodeLookupOperation implements Operation, Receiver return false; } - /* Get unqueried nodes among the K closest seen */ + /* Get unqueried nodes among the K closest seen that have not FAILED */ ArrayList unasked = this.closestNodesNotFailed(UNASKED); if (unasked.isEmpty() && this.messagesTransiting.isEmpty()) @@ -239,15 +241,57 @@ public class NodeLookupOperation implements Operation, Receiver return closestNodes; } + /** + * Receive and handle the incoming NodeReplyMessage + * + * @param comm + * + * @throws java.io.IOException + */ @Override - public synchronized void receive(Message incoming, int comm) + public synchronized void receive(Message incoming, int comm) throws IOException { - // NodeRepl + /* We receive a NodeReplyMessage with a set of nodes, read this message */ + NodeReplyMessage msg = (NodeReplyMessage) incoming; + + /* Add the origin node to our routing table */ + Node origin = msg.getOrigin(); + this.localNode.getRoutingTable().insert(origin); + + /* Set that we've completed ASKing the origin node */ + this.nodes.put(origin, ASKED); + + /* Remove this msg from messagesTransiting since it's completed now */ + this.messagesTransiting.remove(new Integer(comm)); + + /* Add the received nodes to our nodes list to query */ + this.addNodes(msg.getNodes()); + this.askNodesorFinish(); } + /** + * A node does not respond or a packet was lost, we set this node as failed + * + * @param comm + * + * @throws java.io.IOException + */ @Override - public synchronized void timeout(int comm) + public synchronized void timeout(int comm) throws IOException { + /* Get the node associated with this communication */ + Node n = this.messagesTransiting.get(new Integer(comm)); + if (n == null) + { + throw new UnknownMessageException("Unknown comm: " + comm); + } + + /* Mark this node as failed */ + this.nodes.put(n, FAILED); + this.localNode.getRoutingTable().remove(n); + this.messagesTransiting.remove(new Integer(comm)); + + this.askNodesorFinish(); } } diff --git a/src/kademlia/routing/KadBucket.java b/src/kademlia/routing/KadBucket.java index b69ec9b..ddea413 100644 --- a/src/kademlia/routing/KadBucket.java +++ b/src/kademlia/routing/KadBucket.java @@ -43,6 +43,28 @@ public class KadBucket implements Bucket } } + /** + * Checks if this bucket contain a node + * + * @param n The node to check for + * + * @return boolean + */ + public boolean containNode(Node n) + { + return this.nodes.contains(n); + } + + /** + * Remove a node from this bucket + * + * @param n The node to remove + */ + public void removeNode(Node n) + { + this.nodes.remove(n); + } + public int numNodes() { return this.nodes.size(); diff --git a/src/kademlia/routing/RoutingTable.java b/src/kademlia/routing/RoutingTable.java index 75354e5..0c05991 100644 --- a/src/kademlia/routing/RoutingTable.java +++ b/src/kademlia/routing/RoutingTable.java @@ -32,17 +32,34 @@ public class RoutingTable } /** - * Adds a new contact to the routing table + * Adds a new node to the routing table * * @param n The contact to add */ public void insert(Node n) { - /* Find the prefix length of how far this node is away from the contact node */ - int prefixLength = this.node.getNodeId().xor(n.getNodeId()).getFirstSetBitIndex(); + /* Find the first set bit: how far this node is away from the contact node */ + int bucketId = this.node.getNodeId().xor(n.getNodeId()).getFirstSetBitIndex(); /* Put this contact to the bucket that stores contacts prefixLength distance away */ - this.buckets[prefixLength].insert(n); + this.buckets[bucketId].insert(n); + } + + /** + * Remove a node from the routing table + * + * @param n The node to remove + */ + public void remove(Node n) + { + /* Find the first set bit: how far this node is away from the contact node */ + int bucketId = this.node.getNodeId().xor(n.getNodeId()).getFirstSetBitIndex(); + + /* If the bucket has the contact, remove it */ + if (this.buckets[bucketId].containNode(n)) + { + this.buckets[bucketId].removeNode(n); + } } /**