diff --git a/src/kademlia/core/Kademlia.java b/src/kademlia/core/Kademlia.java index 3081372..3d3a8a0 100644 --- a/src/kademlia/core/Kademlia.java +++ b/src/kademlia/core/Kademlia.java @@ -13,6 +13,7 @@ import kademlia.message.MessageFactory; import kademlia.node.Node; import kademlia.node.NodeId; import kademlia.operation.ConnectOperation; +import kademlia.operation.ContentLookupOperation; import kademlia.operation.Operation; import kademlia.operation.RefreshOperation; import kademlia.operation.StoreOperation; @@ -27,6 +28,7 @@ import kademlia.operation.StoreOperation; * @todo Handle IPv6 Addresses * @todo Handle compressing data * @todo Allow optional storing of content locally using the put method + * @todo Instead of using a StoreContentMessage to send a store RPC and a ContentMessage to receive a FIND rpc, make them 1 message with different operation type */ public class Kademlia { @@ -133,20 +135,23 @@ public class Kademlia */ public int put(KadContent content) throws IOException { - return (int) new StoreOperation(server, localNode, content).execute(); + new StoreOperation(server, localNode, content).execute(); + /*@todo Return how many nodes the content was stored on */ + return 10; } /** * Get some content stored on the DHT * The content returned is a JSON String in byte format; this string is parsed into a class * - * @param param The parameters used to search for the content + * @param param The parameters used to search for the content + * @param numResultsReq How many results are required from different nodes * * @return DHTContent The content * * @throws java.io.IOException */ - public List get(GetParameter param) throws NoSuchElementException, IOException + public List get(GetParameter param, int numResultsReq) throws NoSuchElementException, IOException { if (this.dht.contains(param)) { @@ -156,7 +161,9 @@ public class Kademlia else { /* Seems like it doesn't exist in our DHT, get it from other Nodes */ - return new DataLookupOperation().execute().getContent(); + ContentLookupOperation clo = new ContentLookupOperation(server, localNode, param, numResultsReq); + clo.execute(); + return clo.getContentFound(); } } diff --git a/src/kademlia/message/ContentLookupMessage.java b/src/kademlia/message/ContentLookupMessage.java new file mode 100644 index 0000000..d99ce71 --- /dev/null +++ b/src/kademlia/message/ContentLookupMessage.java @@ -0,0 +1,66 @@ +package kademlia.message; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import kademlia.core.GetParameter; +import kademlia.node.Node; + +/** + * Messages used to send to another node requesting content + * + * @author Joshua Kissoon + * @since 20140226 + */ +public class ContentLookupMessage implements Message +{ + + private static final byte CODE = 0x47; + + private Node origin; + private GetParameter params; + + /** + * @param origin The node where this lookup came from + * @param params The parameters used to find the content + */ + public ContentLookupMessage(Node origin, GetParameter params) + { + this.origin = origin; + this.params = params; + } + + public ContentLookupMessage(DataInputStream in) throws IOException + { + this.fromStream(in); + } + + public GetParameter getParameters() + { + return this.params; + } + + public Node getOrigin() + { + return this.origin; + } + + @Override + public void toStream(DataOutputStream out) throws IOException + { + this.origin.toStream(out); + } + + @Override + public final void fromStream(DataInputStream in) throws IOException + { + this.origin = new Node(in); + } + + @Override + public byte code() + { + return CODE; + } + +} diff --git a/src/kademlia/message/ContentMessage.java b/src/kademlia/message/ContentMessage.java new file mode 100644 index 0000000..78894e5 --- /dev/null +++ b/src/kademlia/message/ContentMessage.java @@ -0,0 +1,85 @@ +package kademlia.message; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import kademlia.dht.KadContent; +import kademlia.node.Node; +import kademlia.serializer.JsonSerializer; + +/** + * A Message used to send content between nodes + * + * @author Joshua Kissoon + * @since 20140226 + */ +public class ContentMessage implements Message +{ + + public static final byte CODE = 0x56; + + private KadContent content; + private Node origin; + + /** + * @param origin Where the message came from + * @param content The content to be stored + * + */ + public ContentMessage(Node origin, KadContent content) + { + this.content = content; + this.origin = origin; + } + + public ContentMessage(DataInputStream in) throws IOException + { + this.fromStream(in); + } + + @Override + public void toStream(DataOutputStream out) throws IOException + { + this.origin.toStream(out); + + /* Serialize the KadContent, then send it to the stream */ + JsonSerializer serializer = new JsonSerializer(); + serializer.write(content, out); + } + + @Override + public final void fromStream(DataInputStream in) throws IOException + { + this.origin = new Node(in); + try + { + this.content = new JsonSerializer().read(in); + } + catch (ClassNotFoundException e) + { + e.printStackTrace(); + } + } + + public Node getOrigin() + { + return this.origin; + } + + public KadContent getContent() + { + return this.content; + } + + @Override + public byte code() + { + return CODE; + } + + @Override + public String toString() + { + return "StoreMessage[origin=" + origin + ",content=" + content + "]"; + } +} diff --git a/src/kademlia/message/ContentStoreMessage.java b/src/kademlia/message/ContentStoreMessage.java deleted file mode 100644 index a2bd183..0000000 --- a/src/kademlia/message/ContentStoreMessage.java +++ /dev/null @@ -1,61 +0,0 @@ -package kademlia.message; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import kademlia.dht.KadContent; -import kademlia.node.Node; - -/** - * A Message used to send a content store request to another DHT - * - * @author Joshua Kissoon - * @since 20140224 - */ -public class ContentStoreMessage implements Message -{ - - private final Node origin; - private final KadContent content; - - public final static byte CODE = 0x23; - - /** - * @param origin Where did this content come from - it'll always be the local node - * @param content The Content to send - */ - public ContentStoreMessage(Node origin, KadContent content) - { - this.origin = origin; - this.content = content; - } - - @Override - public byte code() - { - return CODE; - } - - @Override - public void fromStream(DataInputStream in) - { - - } - - @Override - public void toStream(DataOutputStream out) - { - /* @todo write the origin and the content to the stream */ - } - - public Node getOrigin() - { - return this.origin; - } - - public KadContent getContent() - { - return this.content; - } -} diff --git a/src/kademlia/operation/ConnectOperation.java b/src/kademlia/operation/ConnectOperation.java index 0c0b1c4..736390e 100644 --- a/src/kademlia/operation/ConnectOperation.java +++ b/src/kademlia/operation/ConnectOperation.java @@ -42,7 +42,7 @@ public class ConnectOperation implements Operation, Receiver * @return null */ @Override - public synchronized Object execute() + public synchronized void execute() { try { @@ -72,15 +72,12 @@ public class ConnectOperation implements Operation, Receiver * I think after the above lookup operation, K buckets will be filled * Not sure if this operation is needed here */ - return null; } catch (IOException | InterruptedException e) { e.printStackTrace(); } - - return null; } /** diff --git a/src/kademlia/operation/ContentLookupOperation.java b/src/kademlia/operation/ContentLookupOperation.java index b55dcf9..a4328c3 100644 --- a/src/kademlia/operation/ContentLookupOperation.java +++ b/src/kademlia/operation/ContentLookupOperation.java @@ -1,31 +1,306 @@ package kademlia.operation; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import kademlia.core.Configuration; +import kademlia.core.GetParameter; import kademlia.core.KadServer; +import kademlia.dht.KadContent; +import kademlia.exceptions.RoutingException; +import kademlia.exceptions.UnknownMessageException; +import kademlia.message.ContentLookupMessage; +import kademlia.message.ContentMessage; +import kademlia.message.Message; +import kademlia.message.NodeReplyMessage; import kademlia.node.Node; -import kademlia.node.NodeId; /** * Looks up a specified identifier and returns the value associated with it * * @author Joshua Kissoon * @since 20140226 + * + * @todo When we've retrieved the required amount of versions of the content, stop the operation */ -public class ContentLookupOperation implements Operation +public class ContentLookupOperation implements Operation, Receiver { + /* Constants */ + private static final Byte UNASKED = new Byte((byte) 0x00); + private static final Byte AWAITING = new Byte((byte) 0x01); + private static final Byte ASKED = new Byte((byte) 0x02); + private static final Byte FAILED = new Byte((byte) 0x03); + private final KadServer server; private final Node localNode; - private final NodeId key; + private final GetParameter params; + private final List contentFound; + private final int numResultsReq; + + private final ContentLookupMessage lookupMessage; + + private boolean error, isRunning; + private final SortedMap nodes; + + /* Tracks messages in transit and awaiting reply */ + private final Map messagesTransiting; + + /* Used to sort nodes */ + private final Comparator comparator; + + + { + contentFound = new ArrayList<>(); + messagesTransiting = new HashMap<>(); + isRunning = true; + } /** * @param server * @param localNode - * @param key The key for the content which we need to find + * @param params The parameters to search for the content which we need to find + * @param numResultsReq The number of results for this content from different nodes required */ - public ContentLookupOperation(KadServer server, Node localNode, NodeId key) + public ContentLookupOperation(KadServer server, Node localNode, GetParameter params, int numResultsReq) { + /* Construct our lookup message */ + this.lookupMessage = new ContentLookupMessage(localNode, params); + this.server = server; this.localNode = localNode; - this.key = key; + this.params = params; + this.numResultsReq = numResultsReq; + + /** + * We initialize a TreeMap to store nodes. + * This map will be sorted by which nodes are closest to the lookupId + */ + this.comparator = new Node.DistanceComparator(params.getKey()); + this.nodes = new TreeMap(this.comparator); + } + + /** + * @throws java.io.IOException + * @throws kademlia.exceptions.RoutingException + */ + @Override + public synchronized void execute() throws IOException, RoutingException + { + try + { + error = true; + + /* Set the local node as already asked */ + nodes.put(this.localNode, ASKED); + + this.addNodes(this.localNode.getRoutingTable().getAllNodes()); + + if (!this.askNodesorFinish()) + { + /* If we haven't finished as yet, wait a while */ + wait(Configuration.OPERATION_TIMEOUT); + + /* If we still haven't received any responses by then, do a routing timeout */ + if (error) + { + throw new RoutingException("Lookup Timeout."); + } + } + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + /** + * Add nodes from this list to the set of nodes to lookup + * + * @param list The list from which to add nodes + */ + public void addNodes(List list) + { + for (Node o : list) + { + /* If this node is not in the list, add the node */ + if (!nodes.containsKey(o)) + { + System.out.println("Adding node " + o.getNodeId()); + nodes.put(o, UNASKED); + } + } + + System.out.println(this.localNode.getNodeId() + " Nodes List: "); + for (Node o : this.nodes.keySet()) + { + System.out.println(o.getNodeId() + " hash: " + o.hashCode()); + } + } + + /** + * Asks some of the K closest nodes seen but not yet queried. + * Assures that no more than Configuration.CONCURRENCY messages are in transit at a time + * + * This method should be called every time a reply is received or a timeout occurs. + * + * If all K closest nodes have been asked and there are no messages in transit, + * the algorithm is finished. + * + * @return true if finished OR false otherwise + */ + private boolean askNodesorFinish() throws IOException + { + /* If >= CONCURRENCY nodes are in transit, don't do anything */ + if (Configuration.CONCURRENCY <= this.messagesTransiting.size()) + { + return false; + } + + /* Get unqueried nodes among the K closest seen that have not FAILED */ + List unasked = this.closestNodesNotFailed(UNASKED); + + if (unasked.isEmpty() && this.messagesTransiting.isEmpty()) + { + /* We have no unasked nodes nor any messages in transit, we're finished! */ + error = false; + return true; + } + + /* Sort nodes according to criteria */ + Collections.sort(unasked, this.comparator); + + /** + * Send messages to nodes in the list; + * making sure than no more than CONCURRENCY messsages are in transit + */ + for (int i = 0; (this.messagesTransiting.size() < Configuration.CONCURRENCY) && (i < unasked.size()); i++) + { + Node n = (Node) unasked.get(i); + + int comm = server.sendMessage(n, lookupMessage, this); + + this.nodes.put(n, AWAITING); + this.messagesTransiting.put(new Integer(comm), n); + } + + /* We're not finished as yet, return false */ + return false; + } + + /** + * Find The K closest nodes to the target lookupId given that have not FAILED. + * From those K, get those that have the specified status + * + * @param status The status of the nodes to return + * + * @return A List of the closest nodes + */ + private List closestNodesNotFailed(Byte status) + { + List closestNodes = new ArrayList<>(Configuration.K); + int remainingSpaces = Configuration.K; + + for (Map.Entry e : this.nodes.entrySet()) + { + if (!FAILED.equals(e.getValue())) + { + if (status.equals(e.getValue())) + { + /* We got one with the required status, now add it */ + closestNodes.add((Node) e.getKey()); + } + + if (--remainingSpaces == 0) + { + break; + } + } + } + + return closestNodes; + } + + @Override + public synchronized void receive(Message incoming, int comm) throws IOException, RoutingException + { + if (incoming instanceof ContentMessage) + { + /* The reply received is a content message with the required content, take it in */ + ContentMessage msg = (ContentMessage) incoming; + + /* Add the origin node to our routing table */ + this.localNode.getRoutingTable().insert(msg.getOrigin()); + + /* Get the Content and check if it satisfies the required parameters */ + KadContent content = msg.getContent(); + + /*@todo Check if the content matches the given criteria */ + this.contentFound.add(content); + + if (this.contentFound.size() == this.numResultsReq) + { + /* We've got all the content required, let's stop the loopup operation */ + } + } + else + { + /* The reply received is a NodeReplyMessage with nodes closest to the content needed */ + NodeReplyMessage msg = (NodeReplyMessage) incoming; + + /* Add the origin node to our routing table */ + Node origin = msg.getOrigin(); + this.localNode.getRoutingTable().insert(origin); + + /* Set that we've completed ASKing the origin node */ + this.nodes.put(origin, ASKED); + + /* Remove this msg from messagesTransiting since it's completed now */ + this.messagesTransiting.remove(new Integer(comm)); + + /* Add the received nodes to our nodes list to query */ + this.addNodes(msg.getNodes()); + this.askNodesorFinish(); + } + } + + /** + * A node does not respond or a packet was lost, we set this node as failed + * + * @param comm + * + * @throws java.io.IOException + */ + @Override + public synchronized void timeout(int comm) throws IOException + { + /* Get the node associated with this communication */ + Node n = this.messagesTransiting.get(new Integer(comm)); + + if (n == null) + { + throw new UnknownMessageException("Unknown comm: " + comm); + } + + /* Mark this node as failed */ + this.nodes.put(n, FAILED); + this.localNode.getRoutingTable().remove(n); + this.messagesTransiting.remove(new Integer(comm)); + + this.askNodesorFinish(); + } + + /** + * @return The list of all content found during the lookup operation + */ + public List getContentFound() + { + return this.contentFound; } } diff --git a/src/kademlia/operation/NodeLookupOperation.java b/src/kademlia/operation/NodeLookupOperation.java index 26bd59c..ea7b310 100644 --- a/src/kademlia/operation/NodeLookupOperation.java +++ b/src/kademlia/operation/NodeLookupOperation.java @@ -1,10 +1,3 @@ -/** - * @author Joshua Kissoon - * @created 20140219 - * @desc Finds the K closest nodes to a specified identifier - * The algorithm terminates when it has gotten responses from the K closest nodes it has seen. - * Nodes that fail to respond are removed from consideration - */ package kademlia.operation; import java.io.IOException; @@ -26,6 +19,14 @@ import kademlia.message.NodeReplyMessage; import kademlia.node.Node; import kademlia.node.NodeId; +/** + * Finds the K closest nodes to a specified identifier + * The algorithm terminates when it has gotten responses from the K closest nodes it has seen. + * Nodes that fail to respond are removed from consideration + * + * @author Joshua Kissoon + * @created 20140219 + */ public class NodeLookupOperation implements Operation, Receiver { @@ -77,13 +78,11 @@ public class NodeLookupOperation implements Operation, Receiver } /** - * @return A list containing the K closest nodes to the lookupId provided - * * @throws java.io.IOException * @throws kademlia.exceptions.RoutingException */ @Override - public synchronized List execute() throws IOException, RoutingException + public synchronized void execute() throws IOException, RoutingException { try { @@ -105,9 +104,6 @@ public class NodeLookupOperation implements Operation, Receiver throw new RoutingException("Lookup Timeout."); } } - - /* So we have finished, lets return the closest nodes */ - return this.closestNodes(ASKED); } catch (InterruptedException e) { @@ -115,6 +111,11 @@ public class NodeLookupOperation implements Operation, Receiver } } + public List getClosestNodes() + { + return this.closestNodes(ASKED); + } + /** * Add nodes from this list to the set of nodes to lookup * @@ -215,7 +216,7 @@ public class NodeLookupOperation implements Operation, Receiver } } } - + return closestNodes; } diff --git a/src/kademlia/operation/Operation.java b/src/kademlia/operation/Operation.java index d3fd135..aec1602 100644 --- a/src/kademlia/operation/Operation.java +++ b/src/kademlia/operation/Operation.java @@ -14,11 +14,9 @@ public interface Operation /** * Starts an operation and returns when the operation is finished * - * @return The return value can differ per operation - * * @throws kademlia.exceptions.RoutingException - * + * * @todo Remove the Object return type, those operations that return things should have a method to return the data */ - public Object execute() throws IOException, RoutingException; + public void execute() throws IOException, RoutingException; } diff --git a/src/kademlia/operation/RefreshOperation.java b/src/kademlia/operation/RefreshOperation.java index 0f00cad..3f0d911 100644 --- a/src/kademlia/operation/RefreshOperation.java +++ b/src/kademlia/operation/RefreshOperation.java @@ -24,7 +24,7 @@ public class RefreshOperation implements Operation } @Override - public synchronized Object execute() throws IOException + public synchronized void execute() throws IOException { /* @todo Do a Node Lookup operation to refresh K-Buckets */ new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId()).execute(); @@ -38,6 +38,5 @@ public class RefreshOperation implements Operation * @todo Delete any content on this node that this node is not one of the K-Closest nodes to * @todo Delete any expired content */ - return null; } } diff --git a/src/kademlia/operation/StoreOperation.java b/src/kademlia/operation/StoreOperation.java index 07ce27c..21896be 100644 --- a/src/kademlia/operation/StoreOperation.java +++ b/src/kademlia/operation/StoreOperation.java @@ -34,10 +34,13 @@ public class StoreOperation implements Operation } @Override - public synchronized Object execute() throws IOException + public synchronized void execute() throws IOException { /* Get the nodes on which we need to store the content */ - List nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute(); + NodeLookupOperation ndlo = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()); + ndlo.execute(); + List nodes = ndlo.getClosestNodes(); + System.out.println("Nodes to put content on: " + nodes); /* Create the message */ @@ -55,9 +58,5 @@ public class StoreOperation implements Operation this.server.sendMessage(n, msg, null); } } - - - /* Return how many nodes the content was stored on */ - return nodes.size(); } }