mirror of
https://github.com/ChronosX88/KademliaDHT.git
synced 2024-11-22 10:12:19 +00:00
Finished coding the lookup content functionality
This commit is contained in:
parent
a2d0be6124
commit
58f9157c9d
@ -13,6 +13,7 @@ import kademlia.message.MessageFactory;
|
|||||||
import kademlia.node.Node;
|
import kademlia.node.Node;
|
||||||
import kademlia.node.NodeId;
|
import kademlia.node.NodeId;
|
||||||
import kademlia.operation.ConnectOperation;
|
import kademlia.operation.ConnectOperation;
|
||||||
|
import kademlia.operation.ContentLookupOperation;
|
||||||
import kademlia.operation.Operation;
|
import kademlia.operation.Operation;
|
||||||
import kademlia.operation.RefreshOperation;
|
import kademlia.operation.RefreshOperation;
|
||||||
import kademlia.operation.StoreOperation;
|
import kademlia.operation.StoreOperation;
|
||||||
@ -27,6 +28,7 @@ import kademlia.operation.StoreOperation;
|
|||||||
* @todo Handle IPv6 Addresses
|
* @todo Handle IPv6 Addresses
|
||||||
* @todo Handle compressing data
|
* @todo Handle compressing data
|
||||||
* @todo Allow optional storing of content locally using the put method
|
* @todo Allow optional storing of content locally using the put method
|
||||||
|
* @todo Instead of using a StoreContentMessage to send a store RPC and a ContentMessage to receive a FIND rpc, make them 1 message with different operation type
|
||||||
*/
|
*/
|
||||||
public class Kademlia
|
public class Kademlia
|
||||||
{
|
{
|
||||||
@ -133,20 +135,23 @@ public class Kademlia
|
|||||||
*/
|
*/
|
||||||
public int put(KadContent content) throws IOException
|
public int put(KadContent content) throws IOException
|
||||||
{
|
{
|
||||||
return (int) new StoreOperation(server, localNode, content).execute();
|
new StoreOperation(server, localNode, content).execute();
|
||||||
|
/*@todo Return how many nodes the content was stored on */
|
||||||
|
return 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get some content stored on the DHT
|
* Get some content stored on the DHT
|
||||||
* The content returned is a JSON String in byte format; this string is parsed into a class
|
* The content returned is a JSON String in byte format; this string is parsed into a class
|
||||||
*
|
*
|
||||||
* @param param The parameters used to search for the content
|
* @param param The parameters used to search for the content
|
||||||
|
* @param numResultsReq How many results are required from different nodes
|
||||||
*
|
*
|
||||||
* @return DHTContent The content
|
* @return DHTContent The content
|
||||||
*
|
*
|
||||||
* @throws java.io.IOException
|
* @throws java.io.IOException
|
||||||
*/
|
*/
|
||||||
public List<KadContent> get(GetParameter param) throws NoSuchElementException, IOException
|
public List<KadContent> get(GetParameter param, int numResultsReq) throws NoSuchElementException, IOException
|
||||||
{
|
{
|
||||||
if (this.dht.contains(param))
|
if (this.dht.contains(param))
|
||||||
{
|
{
|
||||||
@ -156,7 +161,9 @@ public class Kademlia
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* Seems like it doesn't exist in our DHT, get it from other Nodes */
|
/* Seems like it doesn't exist in our DHT, get it from other Nodes */
|
||||||
return new DataLookupOperation().execute().getContent();
|
ContentLookupOperation clo = new ContentLookupOperation(server, localNode, param, numResultsReq);
|
||||||
|
clo.execute();
|
||||||
|
return clo.getContentFound();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
66
src/kademlia/message/ContentLookupMessage.java
Normal file
66
src/kademlia/message/ContentLookupMessage.java
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package kademlia.message;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import kademlia.core.GetParameter;
|
||||||
|
import kademlia.node.Node;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Messages used to send to another node requesting content
|
||||||
|
*
|
||||||
|
* @author Joshua Kissoon
|
||||||
|
* @since 20140226
|
||||||
|
*/
|
||||||
|
public class ContentLookupMessage implements Message
|
||||||
|
{
|
||||||
|
|
||||||
|
private static final byte CODE = 0x47;
|
||||||
|
|
||||||
|
private Node origin;
|
||||||
|
private GetParameter params;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param origin The node where this lookup came from
|
||||||
|
* @param params The parameters used to find the content
|
||||||
|
*/
|
||||||
|
public ContentLookupMessage(Node origin, GetParameter params)
|
||||||
|
{
|
||||||
|
this.origin = origin;
|
||||||
|
this.params = params;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContentLookupMessage(DataInputStream in) throws IOException
|
||||||
|
{
|
||||||
|
this.fromStream(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GetParameter getParameters()
|
||||||
|
{
|
||||||
|
return this.params;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Node getOrigin()
|
||||||
|
{
|
||||||
|
return this.origin;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void toStream(DataOutputStream out) throws IOException
|
||||||
|
{
|
||||||
|
this.origin.toStream(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void fromStream(DataInputStream in) throws IOException
|
||||||
|
{
|
||||||
|
this.origin = new Node(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte code()
|
||||||
|
{
|
||||||
|
return CODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
85
src/kademlia/message/ContentMessage.java
Normal file
85
src/kademlia/message/ContentMessage.java
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
package kademlia.message;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import kademlia.dht.KadContent;
|
||||||
|
import kademlia.node.Node;
|
||||||
|
import kademlia.serializer.JsonSerializer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Message used to send content between nodes
|
||||||
|
*
|
||||||
|
* @author Joshua Kissoon
|
||||||
|
* @since 20140226
|
||||||
|
*/
|
||||||
|
public class ContentMessage implements Message
|
||||||
|
{
|
||||||
|
|
||||||
|
public static final byte CODE = 0x56;
|
||||||
|
|
||||||
|
private KadContent content;
|
||||||
|
private Node origin;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param origin Where the message came from
|
||||||
|
* @param content The content to be stored
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public ContentMessage(Node origin, KadContent content)
|
||||||
|
{
|
||||||
|
this.content = content;
|
||||||
|
this.origin = origin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContentMessage(DataInputStream in) throws IOException
|
||||||
|
{
|
||||||
|
this.fromStream(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void toStream(DataOutputStream out) throws IOException
|
||||||
|
{
|
||||||
|
this.origin.toStream(out);
|
||||||
|
|
||||||
|
/* Serialize the KadContent, then send it to the stream */
|
||||||
|
JsonSerializer serializer = new JsonSerializer();
|
||||||
|
serializer.write(content, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void fromStream(DataInputStream in) throws IOException
|
||||||
|
{
|
||||||
|
this.origin = new Node(in);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
this.content = new JsonSerializer().read(in);
|
||||||
|
}
|
||||||
|
catch (ClassNotFoundException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Node getOrigin()
|
||||||
|
{
|
||||||
|
return this.origin;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KadContent getContent()
|
||||||
|
{
|
||||||
|
return this.content;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte code()
|
||||||
|
{
|
||||||
|
return CODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "StoreMessage[origin=" + origin + ",content=" + content + "]";
|
||||||
|
}
|
||||||
|
}
|
@ -1,61 +0,0 @@
|
|||||||
package kademlia.message;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import kademlia.dht.KadContent;
|
|
||||||
import kademlia.node.Node;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A Message used to send a content store request to another DHT
|
|
||||||
*
|
|
||||||
* @author Joshua Kissoon
|
|
||||||
* @since 20140224
|
|
||||||
*/
|
|
||||||
public class ContentStoreMessage implements Message
|
|
||||||
{
|
|
||||||
|
|
||||||
private final Node origin;
|
|
||||||
private final KadContent content;
|
|
||||||
|
|
||||||
public final static byte CODE = 0x23;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param origin Where did this content come from - it'll always be the local node
|
|
||||||
* @param content The Content to send
|
|
||||||
*/
|
|
||||||
public ContentStoreMessage(Node origin, KadContent content)
|
|
||||||
{
|
|
||||||
this.origin = origin;
|
|
||||||
this.content = content;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte code()
|
|
||||||
{
|
|
||||||
return CODE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void fromStream(DataInputStream in)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void toStream(DataOutputStream out)
|
|
||||||
{
|
|
||||||
/* @todo write the origin and the content to the stream */
|
|
||||||
}
|
|
||||||
|
|
||||||
public Node getOrigin()
|
|
||||||
{
|
|
||||||
return this.origin;
|
|
||||||
}
|
|
||||||
|
|
||||||
public KadContent getContent()
|
|
||||||
{
|
|
||||||
return this.content;
|
|
||||||
}
|
|
||||||
}
|
|
@ -42,7 +42,7 @@ public class ConnectOperation implements Operation, Receiver
|
|||||||
* @return null
|
* @return null
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized Object execute()
|
public synchronized void execute()
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -72,15 +72,12 @@ public class ConnectOperation implements Operation, Receiver
|
|||||||
* I think after the above lookup operation, K buckets will be filled
|
* I think after the above lookup operation, K buckets will be filled
|
||||||
* Not sure if this operation is needed here
|
* Not sure if this operation is needed here
|
||||||
*/
|
*/
|
||||||
return null;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (IOException | InterruptedException e)
|
catch (IOException | InterruptedException e)
|
||||||
{
|
{
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,31 +1,306 @@
|
|||||||
package kademlia.operation;
|
package kademlia.operation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import kademlia.core.Configuration;
|
||||||
|
import kademlia.core.GetParameter;
|
||||||
import kademlia.core.KadServer;
|
import kademlia.core.KadServer;
|
||||||
|
import kademlia.dht.KadContent;
|
||||||
|
import kademlia.exceptions.RoutingException;
|
||||||
|
import kademlia.exceptions.UnknownMessageException;
|
||||||
|
import kademlia.message.ContentLookupMessage;
|
||||||
|
import kademlia.message.ContentMessage;
|
||||||
|
import kademlia.message.Message;
|
||||||
|
import kademlia.message.NodeReplyMessage;
|
||||||
import kademlia.node.Node;
|
import kademlia.node.Node;
|
||||||
import kademlia.node.NodeId;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Looks up a specified identifier and returns the value associated with it
|
* Looks up a specified identifier and returns the value associated with it
|
||||||
*
|
*
|
||||||
* @author Joshua Kissoon
|
* @author Joshua Kissoon
|
||||||
* @since 20140226
|
* @since 20140226
|
||||||
|
*
|
||||||
|
* @todo When we've retrieved the required amount of versions of the content, stop the operation
|
||||||
*/
|
*/
|
||||||
public class ContentLookupOperation implements Operation
|
public class ContentLookupOperation implements Operation, Receiver
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/* Constants */
|
||||||
|
private static final Byte UNASKED = new Byte((byte) 0x00);
|
||||||
|
private static final Byte AWAITING = new Byte((byte) 0x01);
|
||||||
|
private static final Byte ASKED = new Byte((byte) 0x02);
|
||||||
|
private static final Byte FAILED = new Byte((byte) 0x03);
|
||||||
|
|
||||||
private final KadServer server;
|
private final KadServer server;
|
||||||
private final Node localNode;
|
private final Node localNode;
|
||||||
private final NodeId key;
|
private final GetParameter params;
|
||||||
|
private final List<KadContent> contentFound;
|
||||||
|
private final int numResultsReq;
|
||||||
|
|
||||||
|
private final ContentLookupMessage lookupMessage;
|
||||||
|
|
||||||
|
private boolean error, isRunning;
|
||||||
|
private final SortedMap<Node, Byte> nodes;
|
||||||
|
|
||||||
|
/* Tracks messages in transit and awaiting reply */
|
||||||
|
private final Map<Integer, Node> messagesTransiting;
|
||||||
|
|
||||||
|
/* Used to sort nodes */
|
||||||
|
private final Comparator comparator;
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
contentFound = new ArrayList<>();
|
||||||
|
messagesTransiting = new HashMap<>();
|
||||||
|
isRunning = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param server
|
* @param server
|
||||||
* @param localNode
|
* @param localNode
|
||||||
* @param key The key for the content which we need to find
|
* @param params The parameters to search for the content which we need to find
|
||||||
|
* @param numResultsReq The number of results for this content from different nodes required
|
||||||
*/
|
*/
|
||||||
public ContentLookupOperation(KadServer server, Node localNode, NodeId key)
|
public ContentLookupOperation(KadServer server, Node localNode, GetParameter params, int numResultsReq)
|
||||||
{
|
{
|
||||||
|
/* Construct our lookup message */
|
||||||
|
this.lookupMessage = new ContentLookupMessage(localNode, params);
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.localNode = localNode;
|
this.localNode = localNode;
|
||||||
this.key = key;
|
this.params = params;
|
||||||
|
this.numResultsReq = numResultsReq;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We initialize a TreeMap to store nodes.
|
||||||
|
* This map will be sorted by which nodes are closest to the lookupId
|
||||||
|
*/
|
||||||
|
this.comparator = new Node.DistanceComparator(params.getKey());
|
||||||
|
this.nodes = new TreeMap(this.comparator);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws java.io.IOException
|
||||||
|
* @throws kademlia.exceptions.RoutingException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void execute() throws IOException, RoutingException
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
error = true;
|
||||||
|
|
||||||
|
/* Set the local node as already asked */
|
||||||
|
nodes.put(this.localNode, ASKED);
|
||||||
|
|
||||||
|
this.addNodes(this.localNode.getRoutingTable().getAllNodes());
|
||||||
|
|
||||||
|
if (!this.askNodesorFinish())
|
||||||
|
{
|
||||||
|
/* If we haven't finished as yet, wait a while */
|
||||||
|
wait(Configuration.OPERATION_TIMEOUT);
|
||||||
|
|
||||||
|
/* If we still haven't received any responses by then, do a routing timeout */
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
throw new RoutingException("Lookup Timeout.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add nodes from this list to the set of nodes to lookup
|
||||||
|
*
|
||||||
|
* @param list The list from which to add nodes
|
||||||
|
*/
|
||||||
|
public void addNodes(List<Node> list)
|
||||||
|
{
|
||||||
|
for (Node o : list)
|
||||||
|
{
|
||||||
|
/* If this node is not in the list, add the node */
|
||||||
|
if (!nodes.containsKey(o))
|
||||||
|
{
|
||||||
|
System.out.println("Adding node " + o.getNodeId());
|
||||||
|
nodes.put(o, UNASKED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(this.localNode.getNodeId() + " Nodes List: ");
|
||||||
|
for (Node o : this.nodes.keySet())
|
||||||
|
{
|
||||||
|
System.out.println(o.getNodeId() + " hash: " + o.hashCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks some of the K closest nodes seen but not yet queried.
|
||||||
|
* Assures that no more than Configuration.CONCURRENCY messages are in transit at a time
|
||||||
|
*
|
||||||
|
* This method should be called every time a reply is received or a timeout occurs.
|
||||||
|
*
|
||||||
|
* If all K closest nodes have been asked and there are no messages in transit,
|
||||||
|
* the algorithm is finished.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if finished OR <code>false</code> otherwise
|
||||||
|
*/
|
||||||
|
private boolean askNodesorFinish() throws IOException
|
||||||
|
{
|
||||||
|
/* If >= CONCURRENCY nodes are in transit, don't do anything */
|
||||||
|
if (Configuration.CONCURRENCY <= this.messagesTransiting.size())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Get unqueried nodes among the K closest seen that have not FAILED */
|
||||||
|
List<Node> unasked = this.closestNodesNotFailed(UNASKED);
|
||||||
|
|
||||||
|
if (unasked.isEmpty() && this.messagesTransiting.isEmpty())
|
||||||
|
{
|
||||||
|
/* We have no unasked nodes nor any messages in transit, we're finished! */
|
||||||
|
error = false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Sort nodes according to criteria */
|
||||||
|
Collections.sort(unasked, this.comparator);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send messages to nodes in the list;
|
||||||
|
* making sure than no more than CONCURRENCY messsages are in transit
|
||||||
|
*/
|
||||||
|
for (int i = 0; (this.messagesTransiting.size() < Configuration.CONCURRENCY) && (i < unasked.size()); i++)
|
||||||
|
{
|
||||||
|
Node n = (Node) unasked.get(i);
|
||||||
|
|
||||||
|
int comm = server.sendMessage(n, lookupMessage, this);
|
||||||
|
|
||||||
|
this.nodes.put(n, AWAITING);
|
||||||
|
this.messagesTransiting.put(new Integer(comm), n);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We're not finished as yet, return false */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find The K closest nodes to the target lookupId given that have not FAILED.
|
||||||
|
* From those K, get those that have the specified status
|
||||||
|
*
|
||||||
|
* @param status The status of the nodes to return
|
||||||
|
*
|
||||||
|
* @return A List of the closest nodes
|
||||||
|
*/
|
||||||
|
private List<Node> closestNodesNotFailed(Byte status)
|
||||||
|
{
|
||||||
|
List<Node> closestNodes = new ArrayList<>(Configuration.K);
|
||||||
|
int remainingSpaces = Configuration.K;
|
||||||
|
|
||||||
|
for (Map.Entry e : this.nodes.entrySet())
|
||||||
|
{
|
||||||
|
if (!FAILED.equals(e.getValue()))
|
||||||
|
{
|
||||||
|
if (status.equals(e.getValue()))
|
||||||
|
{
|
||||||
|
/* We got one with the required status, now add it */
|
||||||
|
closestNodes.add((Node) e.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (--remainingSpaces == 0)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return closestNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void receive(Message incoming, int comm) throws IOException, RoutingException
|
||||||
|
{
|
||||||
|
if (incoming instanceof ContentMessage)
|
||||||
|
{
|
||||||
|
/* The reply received is a content message with the required content, take it in */
|
||||||
|
ContentMessage msg = (ContentMessage) incoming;
|
||||||
|
|
||||||
|
/* Add the origin node to our routing table */
|
||||||
|
this.localNode.getRoutingTable().insert(msg.getOrigin());
|
||||||
|
|
||||||
|
/* Get the Content and check if it satisfies the required parameters */
|
||||||
|
KadContent content = msg.getContent();
|
||||||
|
|
||||||
|
/*@todo Check if the content matches the given criteria */
|
||||||
|
this.contentFound.add(content);
|
||||||
|
|
||||||
|
if (this.contentFound.size() == this.numResultsReq)
|
||||||
|
{
|
||||||
|
/* We've got all the content required, let's stop the loopup operation */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* The reply received is a NodeReplyMessage with nodes closest to the content needed */
|
||||||
|
NodeReplyMessage msg = (NodeReplyMessage) incoming;
|
||||||
|
|
||||||
|
/* Add the origin node to our routing table */
|
||||||
|
Node origin = msg.getOrigin();
|
||||||
|
this.localNode.getRoutingTable().insert(origin);
|
||||||
|
|
||||||
|
/* Set that we've completed ASKing the origin node */
|
||||||
|
this.nodes.put(origin, ASKED);
|
||||||
|
|
||||||
|
/* Remove this msg from messagesTransiting since it's completed now */
|
||||||
|
this.messagesTransiting.remove(new Integer(comm));
|
||||||
|
|
||||||
|
/* Add the received nodes to our nodes list to query */
|
||||||
|
this.addNodes(msg.getNodes());
|
||||||
|
this.askNodesorFinish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A node does not respond or a packet was lost, we set this node as failed
|
||||||
|
*
|
||||||
|
* @param comm
|
||||||
|
*
|
||||||
|
* @throws java.io.IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void timeout(int comm) throws IOException
|
||||||
|
{
|
||||||
|
/* Get the node associated with this communication */
|
||||||
|
Node n = this.messagesTransiting.get(new Integer(comm));
|
||||||
|
|
||||||
|
if (n == null)
|
||||||
|
{
|
||||||
|
throw new UnknownMessageException("Unknown comm: " + comm);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Mark this node as failed */
|
||||||
|
this.nodes.put(n, FAILED);
|
||||||
|
this.localNode.getRoutingTable().remove(n);
|
||||||
|
this.messagesTransiting.remove(new Integer(comm));
|
||||||
|
|
||||||
|
this.askNodesorFinish();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The list of all content found during the lookup operation
|
||||||
|
*/
|
||||||
|
public List<KadContent> getContentFound()
|
||||||
|
{
|
||||||
|
return this.contentFound;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,3 @@
|
|||||||
/**
|
|
||||||
* @author Joshua Kissoon
|
|
||||||
* @created 20140219
|
|
||||||
* @desc Finds the K closest nodes to a specified identifier
|
|
||||||
* The algorithm terminates when it has gotten responses from the K closest nodes it has seen.
|
|
||||||
* Nodes that fail to respond are removed from consideration
|
|
||||||
*/
|
|
||||||
package kademlia.operation;
|
package kademlia.operation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -26,6 +19,14 @@ import kademlia.message.NodeReplyMessage;
|
|||||||
import kademlia.node.Node;
|
import kademlia.node.Node;
|
||||||
import kademlia.node.NodeId;
|
import kademlia.node.NodeId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the K closest nodes to a specified identifier
|
||||||
|
* The algorithm terminates when it has gotten responses from the K closest nodes it has seen.
|
||||||
|
* Nodes that fail to respond are removed from consideration
|
||||||
|
*
|
||||||
|
* @author Joshua Kissoon
|
||||||
|
* @created 20140219
|
||||||
|
*/
|
||||||
public class NodeLookupOperation implements Operation, Receiver
|
public class NodeLookupOperation implements Operation, Receiver
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -77,13 +78,11 @@ public class NodeLookupOperation implements Operation, Receiver
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return A list containing the K closest nodes to the lookupId provided
|
|
||||||
*
|
|
||||||
* @throws java.io.IOException
|
* @throws java.io.IOException
|
||||||
* @throws kademlia.exceptions.RoutingException
|
* @throws kademlia.exceptions.RoutingException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Node> execute() throws IOException, RoutingException
|
public synchronized void execute() throws IOException, RoutingException
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -105,9 +104,6 @@ public class NodeLookupOperation implements Operation, Receiver
|
|||||||
throw new RoutingException("Lookup Timeout.");
|
throw new RoutingException("Lookup Timeout.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* So we have finished, lets return the closest nodes */
|
|
||||||
return this.closestNodes(ASKED);
|
|
||||||
}
|
}
|
||||||
catch (InterruptedException e)
|
catch (InterruptedException e)
|
||||||
{
|
{
|
||||||
@ -115,6 +111,11 @@ public class NodeLookupOperation implements Operation, Receiver
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Node> getClosestNodes()
|
||||||
|
{
|
||||||
|
return this.closestNodes(ASKED);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add nodes from this list to the set of nodes to lookup
|
* Add nodes from this list to the set of nodes to lookup
|
||||||
*
|
*
|
||||||
|
@ -14,11 +14,9 @@ public interface Operation
|
|||||||
/**
|
/**
|
||||||
* Starts an operation and returns when the operation is finished
|
* Starts an operation and returns when the operation is finished
|
||||||
*
|
*
|
||||||
* @return The return value can differ per operation
|
|
||||||
*
|
|
||||||
* @throws kademlia.exceptions.RoutingException
|
* @throws kademlia.exceptions.RoutingException
|
||||||
*
|
*
|
||||||
* @todo Remove the Object return type, those operations that return things should have a method to return the data
|
* @todo Remove the Object return type, those operations that return things should have a method to return the data
|
||||||
*/
|
*/
|
||||||
public Object execute() throws IOException, RoutingException;
|
public void execute() throws IOException, RoutingException;
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ public class RefreshOperation implements Operation
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Object execute() throws IOException
|
public synchronized void execute() throws IOException
|
||||||
{
|
{
|
||||||
/* @todo Do a Node Lookup operation to refresh K-Buckets */
|
/* @todo Do a Node Lookup operation to refresh K-Buckets */
|
||||||
new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId()).execute();
|
new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId()).execute();
|
||||||
@ -38,6 +38,5 @@ public class RefreshOperation implements Operation
|
|||||||
* @todo Delete any content on this node that this node is not one of the K-Closest nodes to
|
* @todo Delete any content on this node that this node is not one of the K-Closest nodes to
|
||||||
* @todo Delete any expired content
|
* @todo Delete any expired content
|
||||||
*/
|
*/
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,10 +34,13 @@ public class StoreOperation implements Operation
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Object execute() throws IOException
|
public synchronized void execute() throws IOException
|
||||||
{
|
{
|
||||||
/* Get the nodes on which we need to store the content */
|
/* Get the nodes on which we need to store the content */
|
||||||
List<Node> nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute();
|
NodeLookupOperation ndlo = new NodeLookupOperation(this.server, this.localNode, this.content.getKey());
|
||||||
|
ndlo.execute();
|
||||||
|
List<Node> nodes = ndlo.getClosestNodes();
|
||||||
|
|
||||||
System.out.println("Nodes to put content on: " + nodes);
|
System.out.println("Nodes to put content on: " + nodes);
|
||||||
|
|
||||||
/* Create the message */
|
/* Create the message */
|
||||||
@ -55,9 +58,5 @@ public class StoreOperation implements Operation
|
|||||||
this.server.sendMessage(n, msg, null);
|
this.server.sendMessage(n, msg, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Return how many nodes the content was stored on */
|
|
||||||
return nodes.size();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user