Started working on the NodeLookupOperation

Created some new Messages
Updated a few minor things
This commit is contained in:
Joshua Kissoon 2014-02-19 13:00:29 +05:30
parent 6ff6e8dc21
commit 58df7be300
9 changed files with 407 additions and 10 deletions

View File

@ -82,7 +82,7 @@ public class KadServer
*
* @throws IOException
*/
public void sendMessage(Node to, Message msg, Receiver recv) throws IOException
public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException
{
if (!isRunning)
{
@ -100,9 +100,10 @@ public class KadServer
/* Send the message */
sendMessage(to, msg, comm);
return comm;
}
public void reply(Node to, Message msg, int comm) throws IOException
public synchronized void reply(Node to, Message msg, int comm) throws IOException
{
if (!isRunning)
{

View File

@ -0,0 +1,74 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc A message used to connect nodes
*/
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import kademlia.node.Node;
import kademlia.node.NodeId;
public class NodeLookupMessage implements Message
{
private Node origin;
private NodeId lookupId;
public static final byte CODE = 0x13;
/**
* A new NodeLookupMessage to find nodes
*
* @param origin The Node from which the message is coming from
* @param lookup The key for which to lookup nodes for
*/
public NodeLookupMessage(Node origin, NodeId lookup)
{
this.origin = origin;
this.lookupId = lookup;
}
public NodeLookupMessage(DataInput in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInput in) throws IOException
{
this.origin = new Node(in);
this.lookupId = new NodeId(in);
}
@Override
public void toStream(DataOutput out) throws IOException
{
this.origin.toStream(out);
this.lookupId.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
public NodeId getLookupId()
{
return this.lookupId;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "NodeLookupMessage[origin=" + origin + ",lookup=" + lookupId + "]";
}
}

View File

@ -8,9 +8,11 @@ package kademlia.node;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Comparator;
import kademlia.message.Streamable;
import kademlia.routing.RoutingTable;
@ -115,4 +117,52 @@ public class Node implements Streamable
{
return this.routingTable;
}
/**
* A DistanceComparator is used to compare Node objects based on their closeness
* */
public static class DistanceComparator implements Comparator
{
private final NodeId nodeId;
/**
* The NodeId relative to which the distance should be measured.
*
* @param nodeId
* */
public DistanceComparator(NodeId nodeId)
{
this.nodeId = nodeId;
}
/**
* Compare two objects which must both be of type <code>Node</code>
* and determine which is closest to the identifier specified in the
* constructor.
* */
@Override
public int compare(Object o1, Object o2)
{
Node n1 = (Node) o1;
Node n2 = (Node) o2;
int index1 = nodeId.xor(n1.getNodeId()).getFirstSetBitIndex();
int index2 = nodeId.xor(n2.getNodeId()).getFirstSetBitIndex();
/* If the first node is closer to the given node, return 1 */
if (index1 < index2)
{
return 1;
}
else if (index1 > index2)
{
return -1;
}
else
{
return 0;
}
}
}
}

View File

@ -119,7 +119,7 @@ public class NodeId implements Streamable
*
* @return int The number of leading 0's
*/
public int prefixLength()
public int getFirstSetBitIndex()
{
int prefixLength = 0;
System.out.println("Bytes: ");

View File

@ -65,6 +65,8 @@ public class ConnectOperation implements Operation, Receiver
}
/* @todo Perform lookup for our own ID to get nodes close to us */
Operation lookup = new No
/* @todo Refresh buckets to get a good routing table */
return null;

View File

@ -0,0 +1,253 @@
/**
* @author Joshua Kissoon
* @created 20140219
* @desc Finds the K closest nodes to a specified identifier
* The algorithm terminates when it has gotten responses from the K closest nodes it has seen.
* Nodes that fail to respond are removed from consideration
*/
package kademlia.operation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import kademlia.core.Configuration;
import kademlia.core.KadServer;
import kademlia.exceptions.RoutingException;
import kademlia.message.Message;
import kademlia.message.NodeLookupMessage;
import kademlia.node.Node;
import kademlia.node.NodeId;
public class NodeLookupOperation implements Operation, Receiver
{
/* Constants */
private static final Byte UNASKED = new Byte((byte) 0x00);
private static final Byte AWAITING = new Byte((byte) 0x01);
private static final Byte ASKED = new Byte((byte) 0x02);
private static final Byte FAILED = new Byte((byte) 0x03);
private final KadServer server;
private final Node localNode;
private final NodeId lookupId;
private boolean error;
private final Message lookupMessage; // Message sent to each peer
private final SortedMap<Node, Byte> nodes;
/* Tracks messages in transit and awaiting reply */
private final HashMap<Integer, Node> messagesTransiting;
/* Used to sort nodes */
private final Comparator comparator;
{
messagesTransiting = new HashMap<>();
}
/**
* @param server KadServer used for communication
* @param localNode The local node making the communication
* @param lookupId The ID for which to find nodes close to
*/
public NodeLookupOperation(KadServer server, Node localNode, NodeId lookupId)
{
this.server = server;
this.localNode = localNode;
this.lookupId = lookupId;
this.lookupMessage = new NodeLookupMessage(localNode, lookupId);
/**
* We initialize a TreeMap to store nodes.
* This map will be sorted by which nodes are closest to the lookupId
*/
this.comparator = new Node.DistanceComparator(lookupId);
this.nodes = new TreeMap(this.comparator);
}
/**
* @return A list containing the K closest nodes to the lookupId provided
*
* @throws java.io.IOException
* @throws kademlia.exceptions.RoutingException
*/
@Override
public synchronized ArrayList<Node> execute() throws IOException, RoutingException
{
try
{
error = true;
/* Set the local node as already asked */
nodes.put(this.localNode, ASKED);
this.addNodes(this.localNode.getRoutingTable().getAllNodes());
if (!this.askNodesorFinish())
{
/* If we haven't finished as yet, wait a while */
wait(Configuration.OPERATION_TIMEOUT);
/* If we still haven't received any responses by then, do a routing timeout */
if (error)
{
throw new RoutingException("Lookup Timeout.");
}
}
/* So we have finished, lets return the closest nodes */
return this.closestNodes(ASKED);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
/**
* Add nodes from this list to the set of nodes to lookup
*
* @param list The list from which to add nodes
*/
public void addNodes(List<Node> list)
{
for (Node o : list)
{
/* If this node is not in the list, add the node */
if (!nodes.containsKey(o))
{
nodes.put(o, UNASKED);
}
}
}
/**
* Asks some of the K closest nodes seen but not yet queried.
* Assures that no more than Configuration.CONCURRENCY messages are in transit at a time
*
* This method should be called every time a reply is received or a timeout occurs.
*
* If all K closest nodes have been asked and there are no messages in transit,
* the algorithm is finished.
*
* @return <code>true</code> if finished OR <code>false</code> otherwise
*/
private boolean askNodesorFinish() throws IOException
{
/* If >= CONCURRENCY nodes are in transit, don't do anything */
if (Configuration.CONCURRENCY <= this.messagesTransiting.size())
{
return false;
}
/* Get unqueried nodes among the K closest seen */
ArrayList<Node> unasked = this.closestNodesNotFailed(UNASKED);
if (unasked.isEmpty() && this.messagesTransiting.isEmpty())
{
/* We have no unasked nodes nor any messages in transit, we're finished! */
error = false;
return true;
}
/* Sort nodes according to criteria */
Collections.sort(unasked, this.comparator);
/**
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
*/
for (int i = 0; (this.messagesTransiting.size() < Configuration.CONCURRENCY) && (i < unasked.size()); i++)
{
Node n = (Node) unasked.get(i);
int comm = server.sendMessage(n, lookupMessage, this);
this.nodes.put(n, AWAITING);
this.messagesTransiting.put(new Integer(comm), n);
}
/* We're not finished as yet, return false */
return false;
}
/**
* @param status The status of the nodes to return
*
* @return The K closest nodes to the target lookupId given that have the specified status
*/
private ArrayList<Node> closestNodes(Byte status)
{
ArrayList<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
for (Map.Entry e : this.nodes.entrySet())
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add((Node) e.getKey());
if (--remainingSpaces == 0)
{
break;
}
}
}
return closestNodes;
}
/**
* Find The K closest nodes to the target lookupId given that have not FAILED.
* From those K, get those that have the specified status
*
* @param status The status of the nodes to return
*
* @return A List of the closest nodes
*/
private ArrayList<Node> closestNodesNotFailed(Byte status)
{
ArrayList<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
for (Map.Entry e : this.nodes.entrySet())
{
if (!FAILED.equals(e.getValue()))
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add((Node) e.getKey());
}
if (--remainingSpaces == 0)
{
break;
}
}
}
return closestNodes;
}
@Override
public synchronized void receive(Message incoming, int comm)
{
// NodeRepl
}
@Override
public synchronized void timeout(int comm)
{
}
}

View File

@ -5,6 +5,9 @@
*/
package kademlia.operation;
import java.io.IOException;
import kademlia.exceptions.RoutingException;
public interface Operation
{
@ -12,6 +15,8 @@ public interface Operation
* Starts an operation and returns when the operation is finished
*
* @return The return value can differ per operation
*
* @throws kademlia.exceptions.RoutingException
*/
public Object execute();
public Object execute() throws IOException, RoutingException;
}

View File

@ -59,7 +59,7 @@ public class KadBucket implements Bucket
this.nodes.remove(n);
}
public ArrayList<Node> getContacts()
public ArrayList<Node> getNodes()
{
return this.nodes;
}

View File

@ -39,7 +39,7 @@ public class RoutingTable
public void insert(Node n)
{
/* Find the prefix length of how far this node is away from the contact node */
int prefixLength = this.node.getNodeId().xor(n.getNodeId()).prefixLength();
int prefixLength = this.node.getNodeId().xor(n.getNodeId()).getFirstSetBitIndex();
/* Put this contact to the bucket that stores contacts prefixLength distance away */
this.buckets[prefixLength].insert(n);
@ -58,10 +58,10 @@ public class RoutingTable
ArrayList<Node> closest = new ArrayList<>(num);
/* Get the bucket number to search for closest from */
int bucketNumber = this.node.getNodeId().xor(target).prefixLength();
int bucketNumber = this.node.getNodeId().xor(target).getFirstSetBitIndex();
/* Add the contacts from this bucket to the return contacts */
for (Node c : this.buckets[bucketNumber].getContacts())
for (Node c : this.buckets[bucketNumber].getNodes())
{
if (closest.size() < num)
{
@ -84,7 +84,7 @@ public class RoutingTable
/* Check the bucket on the left side */
if (bucketNumber - i > 0)
{
for (Node c : this.buckets[bucketNumber - i].getContacts())
for (Node c : this.buckets[bucketNumber - i].getNodes())
{
if (closest.size() < num)
{
@ -100,7 +100,7 @@ public class RoutingTable
/* Check the bucket on the right side */
if (bucketNumber + i < NodeId.ID_LENGTH)
{
for (Node c : this.buckets[bucketNumber + i].getContacts())
for (Node c : this.buckets[bucketNumber + i].getNodes())
{
if (closest.size() < num)
{
@ -123,6 +123,18 @@ public class RoutingTable
return closest;
}
public ArrayList<Node> getAllNodes()
{
ArrayList<Node> nodes = new ArrayList<>();
for (KadBucket b : this.buckets)
{
nodes.addAll(b.getNodes());
}
return nodes;
}
@Override
public String toString()
{