Trying to fix an issue in the nodelookupoperation where the k-Closest nodes system doesn't work.

Lots of extra code added, but we're still in devel! so ...
This commit is contained in:
Joshua Kissoon 2014-03-22 10:53:05 +05:30
parent 22fee9116a
commit 0cc24d42f3
14 changed files with 246 additions and 71 deletions

View File

@ -113,6 +113,7 @@ public class KadServer
if (recv != null)
{
/* Setup the receiver to handle message response */
System.out.println(this.localNode + " Putting Receiver for comm: " + comm + " Receiver: " + recv);
receivers.put(comm, recv);
TimerTask task = new TimeoutTask(comm, recv);
timer.schedule(task, Configuration.RESPONSE_TIMEOUT);
@ -199,15 +200,19 @@ public class KadServer
Message msg = messageFactory.createMessage(messCode, din);
din.close();
//System.out.println(this.localNode.getNodeId() + " Message Received: [Comm: " + comm + "] " + msg);
System.out.println(this.localNode.getNodeId() + " Message Received: [Comm: " + comm + "] " + msg);
/* Get a receiver for this message */
Receiver receiver;
if (this.receivers.containsKey(comm))
{
System.out.println("Receiver found: " + this.receivers.get(comm));
/* If there is a reciever in the receivers to handle this */
synchronized (this)
{
System.out.println("Sync Block Receiver found: " + this.receivers.get(comm));
System.out.println(this.localNode + " Receivers: ");
this.printReceivers();
receiver = this.receivers.remove(comm);
TimerTask task = (TimerTask) tasks.remove(comm);
task.cancel();
@ -296,5 +301,13 @@ public class KadServer
}
}
}
public void printReceivers()
{
for(Integer r : this.receivers.keySet())
{
System.out.println("Receiver for comm: " + r + "; Receiver: " + this.receivers.get(r));
}
}
}

View File

@ -75,7 +75,7 @@ public class Kademlia
*
* @throws IOException If an error occurred while reading id or local map
* from disk <i>or</i> a network error occurred while
* attempting to connect to the network
attempting to bootstrap to the network
* */
public Kademlia(String ownerId, Node localNode, int udpPort, DHT dht) throws IOException
{
@ -184,7 +184,7 @@ public class Kademlia
* @throws IOException If a network error occurred
* @throws IllegalStateException If this object is closed
* */
public final void connect(Node n) throws IOException, RoutingException
public synchronized final void bootstrap(Node n) throws IOException, RoutingException
{
Operation op = new ConnectOperation(this.server, this.localNode, n);
op.execute();
@ -201,7 +201,7 @@ public class Kademlia
* @throws java.io.IOException
*
*/
public int put(KadContent content) throws IOException
public synchronized int put(KadContent content) throws IOException
{
StoreOperation sop = new StoreOperation(server, localNode, content);
sop.execute();

View File

@ -38,6 +38,7 @@ public class ConnectReceiver implements Receiver
this.localNode.getRoutingTable().insert(mess.getOrigin());
/* Respond to the connect request */
System.out.println(this.localNode + " Connect message received, sending an AcknowledgementMessage to " + mess.getOrigin());
AcknowledgeMessage msg = new AcknowledgeMessage(this.localNode);
/* Reply to the connect message with an Acknowledgement */

View File

@ -39,22 +39,21 @@ public class NodeLookupReceiver implements Receiver
Node origin = msg.getOrigin();
System.out.println(this.localNode.getNodeId() + ": Received NodeLookupMessage, sending NodeReplyMessage to: " + origin.getNodeId());
/* Update the local space by inserting the origin node. */
this.localNode.getRoutingTable().insert(origin);
/* Find nodes closest to the LookupId */
List<Node> nodes = this.localNode.getRoutingTable().findClosest(msg.getLookupId(), Configuration.K);
System.out.println("\nClosest Nodes: ");
for (Node n : nodes)
{
System.out.println(n.getNodeId());
}
System.out.println();
// System.out.println("\n" + this.localNode + " Closest Nodes: ");
// for (Node n : nodes)
// {
// System.out.println(n.getNodeId());
// }
// System.out.println();
/* Respond to the NodeLookupMessage */
System.out.println(this.localNode + " Sending NodeReplyMessage to " + origin);
Message reply = new NodeReplyMessage(this.localNode, nodes);
/* Let the Server send the reply */

View File

@ -23,6 +23,7 @@ public class Node implements Streamable
private NodeId nodeId;
private InetAddress inetAddress;
private int port;
private String strRep;
private transient RoutingTable routingTable;
@ -36,6 +37,7 @@ public class Node implements Streamable
this.nodeId = nid;
this.inetAddress = ip;
this.port = port;
this.strRep = this.nodeId.toString();
}
/**
@ -48,6 +50,7 @@ public class Node implements Streamable
public Node(DataInputStream in) throws IOException
{
this.fromStream(in);
this.strRep = this.nodeId.toString();
}
/**
@ -134,6 +137,11 @@ public class Node implements Streamable
{
if (o instanceof Node)
{
Node n = (Node) o;
if (o == this)
{
return true;
}
return this.getNodeId().equals(((Node) o).getNodeId());
}
return false;
@ -157,16 +165,16 @@ public class Node implements Streamable
public static class DistanceComparator implements Comparator
{
private final NodeId nodeId;
private final NodeId nid;
/**
* The NodeId relative to which the distance should be measured.
*
* @param nodeId
* @param nid
* */
public DistanceComparator(NodeId nodeId)
public DistanceComparator(NodeId nid)
{
this.nodeId = nodeId;
this.nid = nid;
}
/**
@ -179,27 +187,26 @@ public class Node implements Streamable
{
Node n1 = (Node) o1;
Node n2 = (Node) o2;
// System.out.println("\nDistance Comparator: " + nodeId);
// System.out.println("Distance Comparator: " + n1.getNodeId());
// System.out.println("Distance Comparator: " + n2.getNodeId());
/* Check if they are equal and return 0 */
if (n1.getNodeId().equals(n2.getNodeId()))
{
return 0;
}
// if (n1.equals(n2))
// {
// //System.out.println("Distance Comparator: Return 0");
// return 0;
// }
//System.out.println("\n **************** Compare Starting **************** ");
//System.out.println("Comparing to: " + this.nodeId);
int distance1 = nodeId.getDistance(n1.getNodeId());
int distance1 = nid.getDistance(n1.getNodeId());
//System.out.println("Node " + n1.getNodeId() + " distance: " + index1);
int distance2 = nodeId.getDistance(n2.getNodeId());
int distance2 = nid.getDistance(n2.getNodeId());
//System.out.println("Node " + n2.getNodeId() + " distance: " + index2);
int retval;
if (distance1 < distance2)
{
/* If the first node is closer to the given node, return 1 */
retval = 1;
}
else
if ((distance1 == distance2) && n1.equals(n2))
{
/**
* If the first node is farther to the given node, return 1
@ -207,7 +214,18 @@ public class Node implements Streamable
* @note -1 will also be returned if both nodes are the same distance away
* This really don't make a difference though, since they need to be sorted.
*/
retval = -1;
//System.out.println("Distance Comparator: Return -1");
retval = 0;
}
else if (distance1 < distance2)
{
/* If the first node is closer to the given node, return 1 */
//System.out.println("Distance Comparator: Return 1");
retval = 1;
}
else
{
return -1;
}
//System.out.println("Returned: " + retval);

View File

@ -38,9 +38,6 @@ public class ConnectOperation implements Operation, Receiver
this.bootstrapNode = bootstrap;
}
/**
* @return null
*/
@Override
public synchronized void execute()
{
@ -64,6 +61,7 @@ public class ConnectOperation implements Operation, Receiver
}
/* Perform lookup for our own ID to get nodes close to us */
System.out.println("Looking up for nodes with our own ID");
Operation lookup = new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId());
lookup.execute();

View File

@ -2,7 +2,6 @@ package kademlia.operation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -31,10 +30,10 @@ public class NodeLookupOperation implements Operation, Receiver
{
/* Constants */
private static final Byte UNASKED = (byte) 0x00;
private static final Byte AWAITING = (byte) 0x01;
private static final Byte ASKED = (byte) 0x02;
private static final Byte FAILED = (byte) 0x03;
private static final String UNASKED = "UnAsked";
private static final String AWAITING = "Awaiting";
private static final String ASKED = "Asked";
private static final String FAILED = "Failed";
private final KadServer server;
private final Node localNode;
@ -43,7 +42,7 @@ public class NodeLookupOperation implements Operation, Receiver
private boolean error;
private final Message lookupMessage; // Message sent to each peer
private final SortedMap<Node, Byte> nodes;
private final Map<Node, String> nodes;
/* Tracks messages in transit and awaiting reply */
private final Map<Integer, Node> messagesTransiting;
@ -74,7 +73,8 @@ public class NodeLookupOperation implements Operation, Receiver
* This map will be sorted by which nodes are closest to the lookupId
*/
this.comparator = new Node.DistanceComparator(lookupId);
this.nodes = new TreeMap(this.comparator);
//this.nodes = new TreeMap(this.comparator);
this.nodes = new HashMap<>();
}
/**
@ -126,9 +126,24 @@ public class NodeLookupOperation implements Operation, Receiver
for (Node o : list)
{
/* If this node is not in the list, add the node */
if (!nodes.containsKey(o))
System.out.println("Current Nodes & Their Status: ");
for (Node e : this.nodes.keySet())
{
//System.out.println("Adding node " + o.getNodeId());
System.out.println(e + ": " + this.nodes.get(e));
}
System.out.println("Nodes Received to add: ");
for (Node e : list)
{
System.out.println(e);
}
/**
* @note We add the extra check since for some reason, treemap returns that it doesn't contain the localNode even though it does
*/
if (!nodes.containsKey(o) /*&& !o.equals(this.localNode)*/)
{
System.out.println(localNode + ": Adding node " + o.getNodeId() + " Equal to local: " + o.equals(this.localNode));
nodes.put(o, UNASKED);
}
}
@ -161,10 +176,12 @@ public class NodeLookupOperation implements Operation, Receiver
/* Get unqueried nodes among the K closest seen that have not FAILED */
List<Node> unasked = this.closestNodesNotFailed(UNASKED);
// for (Node nn : unasked)
// {
// System.out.println(nn.getNodeId());
// }
System.out.println("Getting closest unasked Nodes not failed. Nodes ");
for (Node nn : unasked)
{
System.out.println(nn.getNodeId());
}
System.out.println("Unasked printing finished");
if (unasked.isEmpty() && this.messagesTransiting.isEmpty())
{
@ -174,8 +191,7 @@ public class NodeLookupOperation implements Operation, Receiver
}
/* Sort nodes according to criteria */
Collections.sort(unasked, this.comparator);
//Collections.sort(unasked, this.comparator);
/**
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
@ -186,7 +202,15 @@ public class NodeLookupOperation implements Operation, Receiver
int comm = server.sendMessage(n, lookupMessage, this);
System.out.println(this.localNode + "\n\n\n ************** Sent lookup message to: " + n + "; Comm: " + comm);
this.nodes.put(n, AWAITING);
System.out.println("Awaiting: " + AWAITING);
System.out.println("Node: " + n + " status: " + this.nodes.get(n));
System.out.println("\n\nPrinting entries: ");
for (Map.Entry e : this.nodes.entrySet())
{
System.out.println("Node: " + e.getKey() + "; Value: " + e.getValue());
}
this.messagesTransiting.put(comm, n);
}
@ -199,7 +223,7 @@ public class NodeLookupOperation implements Operation, Receiver
*
* @return The K closest nodes to the target lookupId given that have the specified status
*/
private List<Node> closestNodes(Byte status)
private List<Node> closestNodes(String status)
{
List<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
@ -228,19 +252,22 @@ public class NodeLookupOperation implements Operation, Receiver
*
* @return A List of the closest nodes
*/
private List<Node> closestNodesNotFailed(Byte status)
private List<Node> closestNodesNotFailed(String status)
{
System.out.println(this.localNode + " - closestNodesNotFailed called, Status looking for: " + status);
List<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
for (Map.Entry e : this.nodes.entrySet())
for (Map.Entry<Node, String> e : this.nodes.entrySet())
{
if (!FAILED.equals(e.getValue()))
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add((Node) e.getKey());
System.out.println("Adding " + e.getKey() + "; status: " + e.getValue());
closestNodes.add(e.getKey());
}
if (--remainingSpaces == 0)
@ -272,6 +299,7 @@ public class NodeLookupOperation implements Operation, Receiver
this.localNode.getRoutingTable().insert(origin);
/* Set that we've completed ASKing the origin node */
System.out.println("Setting " + origin + " as " + ASKED);
this.nodes.put(origin, ASKED);
/* Remove this msg from messagesTransiting since it's completed now */

View File

@ -61,6 +61,8 @@ public class StoreOperation implements Operation
this.server.sendMessage(n, msg, null);
}
}
System.out.println("\n\n\n\nSTORE CONTENT FINISHED");
}
/**

View File

@ -210,7 +210,7 @@ public class RoutingTable
sb.append("\n");
}
}
sb.append("\nPrinting Routing Table Ended ******************** ");
sb.append("Printing Routing Table Ended ******************** ");
return sb.toString();
}

View File

@ -25,7 +25,7 @@ public class ContentSendingTest
System.out.println("Created Node Kad 1: " + kad1.getNode().getNodeId());
Kademlia kad2 = new Kademlia("Crystal", new NodeId("ASERTKJDHGVHERJHGFLK"), 7572);
System.out.println("Created Node Kad 2: " + kad2.getNode().getNodeId());
kad2.connect(kad1.getNode());
kad2.bootstrap(kad1.getNode());
/**
* Lets create the content and share it

View File

@ -29,7 +29,7 @@ public class NodeConnectionTest
/* Connecting 2 to 1 */
System.out.println("Connecting Kad 1 and Kad 2");
kad1.connect(kad2.getNode());
kad1.bootstrap(kad2.getNode());
// System.out.println("Kad 1: ");
// System.out.println(kad1.getNode().getRoutingTable());
@ -41,7 +41,7 @@ public class NodeConnectionTest
System.out.println("\n\n\n\n\n\nCreated Node Kad 3: " + kad3.getNode().getNodeId());
System.out.println("Connecting Kad 3 and Kad 2");
kad3.connect(kad2.getNode());
kad3.bootstrap(kad2.getNode());
// NodeId diff32 = kad3.getNode().getNodeId().xor(kad2.getNode().getNodeId());
// NodeId diff31 = kad1.getNode().getNodeId().xor(kad3.getNode().getNodeId());
@ -51,7 +51,7 @@ public class NodeConnectionTest
System.out.println("\n\n\n\n\n\nCreated Node Kad 4: " + kad4.getNode().getNodeId());
System.out.println("Connecting Kad 4 and Kad 2");
kad4.connect(kad2.getNode());
kad4.bootstrap(kad2.getNode());
System.out.println("\n\nKad 1: " + kad1.getNode().getNodeId() + " Routing Table: ");
System.out.println(kad1.getNode().getRoutingTable());

View File

@ -23,7 +23,7 @@ public class RefreshOperationTest
/* Setting up 2 Kad networks */
Kademlia kad1 = new Kademlia("JoshuaK", new NodeId("ASF45678947584567467"), 7574);
Kademlia kad2 = new Kademlia("Crystal", new NodeId("ASERTKJDHGVHERJHGFLK"), 7572);
kad2.connect(kad1.getNode());
kad2.bootstrap(kad1.getNode());
/* Lets create the content and share it */
DHTContentImpl c = new DHTContentImpl(kad2.getOwnerId(), "Some Data");

View File

@ -1,5 +1,7 @@
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Configuration;
import kademlia.core.Kademlia;
import kademlia.node.NodeId;
@ -17,34 +19,75 @@ public class SaveStateTest
try
{
/* Setting up 2 Kad networks */
Kademlia kad1 = new Kademlia("JoshuaK", new NodeId("ASF45678947584567467"), 7529);
Kademlia kad2 = new Kademlia("Crystal", new NodeId("ASERTKJDHGVHERJHGFLK"), 7532);
Kademlia kad1 = new Kademlia("JoshuaK", new NodeId("ASF45678947584567467"), 12049);
Kademlia kad2 = new Kademlia("Crystal", new NodeId("AfERTKdvHGVHERJHGFdh"), 4585);
Kademlia kad3 = new Kademlia("Shameer", new NodeId("ASERTKyrHGVHERfHGFsy"), 8104);
Kademlia kad4 = new Kademlia("Lokesh", new NodeId("AS3RTKJsdjVHERJHGF94"), 8335);
Kademlia kad5 = new Kademlia("Chandu", new NodeId("ASERT47kfeVHERJHGF15"), 13345);
/* Connecting 2 to 1 */
System.out.println("Connecting Kad 1 and Kad 2");
kad1.connect(kad2.getNode());
System.out.println("Connecting Nodes 1 & 2");
kad2.bootstrap(kad1.getNode());
System.out.println(kad1);
System.out.println(kad2);
kad3.bootstrap(kad2.getNode());
System.out.println(kad1);
System.out.println(kad2);
System.out.println(kad3);
kad4.bootstrap(kad2.getNode());
System.out.println(kad1);
System.out.println(kad2);
System.out.println(kad3);
System.out.println(kad4);
kad5.bootstrap(kad4.getNode());
System.out.println(kad1);
System.out.println(kad2);
System.out.println(kad3);
System.out.println(kad4);
System.out.println(kad5);
synchronized (this)
{
System.out.println("\n\n\n\nSTORING CONTENT 1\n\n\n\n");
DHTContentImpl c = new DHTContentImpl(kad2.getOwnerId(), "Some Data");
System.out.println(c);
kad2.put(c);
kad2.put(c);
}
synchronized (this)
{
System.out.println("\n\n\n\nSTORING CONTENT 2\n\n\n\n");
DHTContentImpl c2 = new DHTContentImpl(kad2.getOwnerId(), "Some other Data");
System.out.println(c2);
kad4.put(c2);
}
System.out.println(kad1);
System.out.println(kad2);
System.out.println(kad3);
System.out.println(kad4);
System.out.println(kad5);
/* Shutting down kad1 and restarting it */
System.out.println("\n\n\nShutting down Kad instance");
System.out.println(kad2);
kad1.shutdown();
System.out.println("\n\n\nReloading down Kad instance from file");
Kademlia kad3 = Kademlia.loadFromFile("JoshuaK");
System.out.println(kad3);
}
System.out.println("\n\n\nReloading Kad instance from file");
Kademlia kadR2 = Kademlia.loadFromFile("JoshuaK");
System.out.println(kadR2);
}
// catch(RoutingException e)
// {
// System.err.println("Routing Exception");
// }
catch (Exception e)
{
e.printStackTrace();;
e.printStackTrace();
}
}

View File

@ -0,0 +1,73 @@
package kademlia.tests;
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import kademlia.core.Kademlia;
import kademlia.node.Node;
import kademlia.node.NodeId;
/**
* Had some problems with treemap, so trying out testing treemaps
*
* @author Joshua Kissoon
* @since 20140311
*/
public class TreeMapTest
{
/* Constants */
private static final Byte UNASKED = (byte) 0x00;
private static final Byte AWAITING = (byte) 0x01;
private static final Byte ASKED = (byte) 0x02;
private static final Byte FAILED = (byte) 0x03;
private final SortedMap<Node, Byte> nodes;
/* Used to sort nodes */
private final Comparator comparator;
public TreeMapTest() throws IOException
{
/* Setting up 2 Kad networks */
Kademlia kad1 = new Kademlia("JoshuaK", new NodeId("ASF45678947584567467"), 8888);
Kademlia kad2 = new Kademlia("Crystal", new NodeId("AfERTKdvHGVHERJHGFdh"), 8889);
Kademlia kad3 = new Kademlia("Shameer", new NodeId("ASERTKyrHGVHERfHGFsy"), 8890);
Kademlia kad4 = new Kademlia("Lokesh", new NodeId("AS3RTKJsdjVHERJHGF94"), 8891);
Kademlia kad5 = new Kademlia("Chandu", new NodeId("ASERT47kfeVHERJHGF15"), 8892);
this.comparator = new Node.DistanceComparator(kad1.getNode().getNodeId());
this.nodes = new TreeMap(this.comparator);
/* Add all nodes as unasked */
this.nodes.put(kad1.getNode(), ASKED);
this.nodes.put(kad2.getNode(), UNASKED);
this.nodes.put(kad3.getNode(), UNASKED);
this.nodes.put(kad4.getNode(), UNASKED);
this.nodes.put(kad5.getNode(), UNASKED);
this.printTree();
}
private void printTree()
{
for (Map.Entry<Node, Byte> e : this.nodes.entrySet())
{
System.out.println("Node: " + e.getKey() + "; Value: " + e.getValue());
}
}
public static void main(String[] args)
{
try
{
new TreeMapTest();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}