Created a KadConfiguration interface that allows passing in a configuration file as needed.

This allows other applications to create their own configuration files
This commit is contained in:
Joshua Kissoon 2014-03-29 14:37:01 +05:30
parent 6462722227
commit a259579f4a
25 changed files with 397 additions and 194 deletions

View File

@ -1,4 +1,4 @@
package kademlia.core;
package kademlia;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -13,6 +13,10 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import kademlia.core.DefaultConfiguration;
import kademlia.core.GetParameter;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.dht.KadContent;
import kademlia.exceptions.RoutingException;
@ -57,6 +61,7 @@ public class Kademlia
private final transient DHT dht;
private final transient Timer timer;
private final int udpPort;
private KadConfiguration config;
/* Factories */
private final transient MessageFactory messageFactory;
@ -71,19 +76,21 @@ public class Kademlia
* @param localNode The Local Node for this Kad instance
* @param udpPort The UDP port to use for routing messages
* @param dht The DHT for this instance
* @param config
*
* @throws IOException If an error occurred while reading id or local map
* from disk <i>or</i> a network error occurred while
* attempting to bootstrap to the network
* */
public Kademlia(String ownerId, Node localNode, int udpPort, DHT dht) throws IOException
public Kademlia(String ownerId, Node localNode, int udpPort, DHT dht, KadConfiguration config) throws IOException
{
this.ownerId = ownerId;
this.udpPort = udpPort;
this.localNode = localNode;
this.dht = dht;
this.messageFactory = new MessageFactory(localNode, this.dht);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode);
this.config = config;
this.messageFactory = new MessageFactory(localNode, this.dht, this.config);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode, this.config);
this.timer = new Timer(true);
/* Schedule Recurring RestoreOperation */
@ -105,17 +112,22 @@ public class Kademlia
}
},
// Delay // Interval
Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL
this.config.restoreInterval(), this.config.restoreInterval()
);
}
public Kademlia(String ownerId, NodeId defaultId, int udpPort, KadConfiguration config) throws IOException
{
this(ownerId, new Node(defaultId, InetAddress.getLocalHost(), udpPort), udpPort, new DHT(ownerId, config), config);
}
public Kademlia(String ownerId, NodeId defaultId, int udpPort) throws IOException
{
this(ownerId, new Node(defaultId, InetAddress.getLocalHost(), udpPort), udpPort, new DHT(ownerId));
this(ownerId, new Node(defaultId, InetAddress.getLocalHost(), udpPort), udpPort, new DHT(ownerId, new DefaultConfiguration()), new DefaultConfiguration());
}
/**
* Load Stored state
* Load Stored state using default configuration
*
* @param ownerId The ID of the owner for the stored state
*
@ -127,35 +139,53 @@ public class Kademlia
* @todo Boot up this Kademlia instance from a saved file state
*/
public static Kademlia loadFromFile(String ownerId) throws FileNotFoundException, IOException, ClassNotFoundException
{
return Kademlia.loadFromFile(ownerId, new DefaultConfiguration());
}
/**
* Load Stored state
*
* @param ownerId The ID of the owner for the stored state
* @param iconfig Configuration information to work with
*
* @return A Kademlia instance loaded from a stored state in a file
*
* @throws java.io.FileNotFoundException
* @throws java.lang.ClassNotFoundException
*
* @todo Boot up this Kademlia instance from a saved file state
*/
public static Kademlia loadFromFile(String ownerId, KadConfiguration iconfig) throws FileNotFoundException, IOException, ClassNotFoundException
{
DataInputStream din;
/**
* @section Read Basic Kad data
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "kad.kns"));
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "kad.kns"));
Kademlia ikad = new JsonSerializer<Kademlia>().read(din);
/**
* @section Read the routing table
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "routingtable.kns"));
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "routingtable.kns"));
RoutingTable irtbl = new JsonRoutingTableSerializer().read(din);
/**
* @section Read the node state
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "node.kns"));
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "node.kns"));
Node inode = new JsonSerializer<Node>().read(din);
inode.setRoutingTable(irtbl);
/**
* @section Read the DHT
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "dht.kns"));
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "dht.kns"));
DHT idht = new JsonDHTSerializer().read(din);
return new Kademlia(ownerId, inode, ikad.getPort(), idht);
return new Kademlia(ownerId, inode, ikad.getPort(), idht, ikad.getCurrentConfiguration());
}
/**
@ -182,6 +212,14 @@ public class Kademlia
return this.dht;
}
/**
* @return The current KadConfiguration object being used
*/
public KadConfiguration getCurrentConfiguration()
{
return this.config;
}
/**
* Connect to an existing peer-to-peer network.
*
@ -193,7 +231,7 @@ public class Kademlia
* */
public synchronized final void bootstrap(Node n) throws IOException, RoutingException
{
Operation op = new ConnectOperation(this.server, this.localNode, n);
Operation op = new ConnectOperation(this.server, this.localNode, n, this.config);
op.execute();
}
@ -210,7 +248,7 @@ public class Kademlia
*/
public synchronized int put(KadContent content) throws IOException
{
StoreOperation sop = new StoreOperation(this.server, this.localNode, content, this.dht);
StoreOperation sop = new StoreOperation(this.server, this.localNode, content, this.dht, this.config);
sop.execute();
/* Return how many nodes the content was stored on */
@ -252,7 +290,7 @@ public class Kademlia
else
{
/* Seems like it doesn't exist in our DHT, get it from other Nodes */
ContentLookupOperation clo = new ContentLookupOperation(server, localNode, param, numResultsReq);
ContentLookupOperation clo = new ContentLookupOperation(server, localNode, param, numResultsReq, this.config);
clo.execute();
contentFound = clo.getContentFound();
}
@ -267,7 +305,7 @@ public class Kademlia
*/
public void refresh() throws IOException
{
new KadRefreshOperation(this.server, this.localNode, this.dht).execute();
new KadRefreshOperation(this.server, this.localNode, this.dht, this.config).execute();
}
/**
@ -318,13 +356,13 @@ public class Kademlia
/**
* @section Store Basic Kad data
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "kad.kns"));
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "kad.kns"));
new JsonSerializer<Kademlia>().write(this, dout);
/**
* @section Save the node state
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "node.kns"));
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "node.kns"));
new JsonSerializer<Node>().write(this.localNode, dout);
/**
@ -332,13 +370,13 @@ public class Kademlia
* We need to save the routing table separate from the node since the routing table will contain the node and the node will contain the routing table
* This will cause a serialization recursion, and in turn a Stack Overflow
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "routingtable.kns"));
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "routingtable.kns"));
new JsonRoutingTableSerializer().write(this.localNode.getRoutingTable(), dout);
/**
* @section Save the DHT
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "dht.kns"));
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "dht.kns"));
new JsonDHTSerializer().write(this.dht, dout);
}
@ -348,10 +386,10 @@ public class Kademlia
*
* @return String The name of the folder to store node states
*/
private static String getStateStorageFolderName(String ownerId)
private static String getStateStorageFolderName(String ownerId, KadConfiguration iconfig)
{
/* Setup the nodes storage folder if it doesn't exist */
String path = Configuration.getNodeDataFolder(ownerId) + File.separator + "nodeState";
String path = iconfig.getNodeDataFolder(ownerId) + File.separator + "nodeState";
File nodeStateFolder = new File(path);
if (!nodeStateFolder.isDirectory())
{

View File

@ -1,24 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package kademlia;
/**
*
* @author Joshua
*/
public class KademliaBasic
{
/**
* @param args the command line arguments
*/
public static void main(String[] args)
{
// TODO code application logic here
}
}

View File

@ -1,86 +0,0 @@
package kademlia.core;
import java.io.File;
/**
* A set of Kademlia configuration parameters. Default values are
* supplied and can be changed by the application as necessary.
*
*/
public class Configuration
{
/**
* Interval in milliseconds between execution of RestoreOperations.
* */
public static long RESTORE_INTERVAL = 60 * 1000; // Default at 1 hour
/**
* If no reply received from a node in this period (in milliseconds)
* consider the node unresponsive.
* */
public static long RESPONSE_TIMEOUT = 1500;
/**
* Maximum number of milliseconds for performing an operation.
* */
public static long OPERATION_TIMEOUT = 3000;
/**
* Maximum number of concurrent messages in transit.
* */
public static int CONCURRENCY = 10;
/**
* Log base exponent.
* */
public static int B = 2;
/**
* Bucket size.
* */
public static int K = 2;
/**
* Size of replacement cache.
* */
public static int RCSIZE = 3;
/**
* Number of times a node can be marked as stale before it is actually removed.
* */
public static int STALE = 1;
/**
* Local Storage location - Relative to the user's home folder (Cross-Platform)
*/
public static String LOCAL_FOLDER = "kademlia";
/**
* Creates the folder in which this node data is to be stored
*
* @param ownerId
*
* @return The folder path
*/
public static String getNodeDataFolder(String ownerId)
{
/* Setup the main storage folder if it doesn't exist */
String path = System.getProperty("user.home") + File.separator + Configuration.LOCAL_FOLDER;
File folder = new File(path);
if (!folder.isDirectory())
{
folder.mkdir();
}
/* Setup subfolder for this owner if it doesn't exist */
File ownerFolder = new File(folder + File.separator + ownerId);
if (!ownerFolder.isDirectory())
{
ownerFolder.mkdir();
}
/* Return the path */
return ownerFolder.toString();
}
}

View File

@ -0,0 +1,85 @@
package kademlia.core;
import java.io.File;
/**
* A set of Kademlia configuration parameters. Default values are
* supplied and can be changed by the application as necessary.
*
*/
public class DefaultConfiguration implements KadConfiguration
{
private final static long RESTORE_INTERVAL = 60 * 1000; // Default at 1 hour
private final static long RESPONSE_TIMEOUT = 1500;
private final static long OPERATION_TIMEOUT = 3000;
private final static int CONCURRENCY = 10;
private final static int K = 2;
private final static int RCSIZE = 3;
private final static int STALE = 1;
private final static String LOCAL_FOLDER = "kademlia";
@Override
public long restoreInterval()
{
return RESTORE_INTERVAL;
}
@Override
public long responseTimeout()
{
return RESPONSE_TIMEOUT;
}
@Override
public long operationTimeout()
{
return OPERATION_TIMEOUT;
}
@Override
public int maxConcurrentMessagesTransiting()
{
return CONCURRENCY;
}
@Override
public int k()
{
return K;
}
@Override
public int replacementCacheSize()
{
return RCSIZE;
}
@Override
public int stale()
{
return STALE;
}
@Override
public String getNodeDataFolder(String ownerId)
{
/* Setup the main storage folder if it doesn't exist */
String path = System.getProperty("user.home") + File.separator + DefaultConfiguration.LOCAL_FOLDER;
File folder = new File(path);
if (!folder.isDirectory())
{
folder.mkdir();
}
/* Setup subfolder for this owner if it doesn't exist */
File ownerFolder = new File(folder + File.separator + ownerId);
if (!ownerFolder.isDirectory())
{
ownerFolder.mkdir();
}
/* Return the path */
return ownerFolder.toString();
}
}

View File

@ -0,0 +1,58 @@
package kademlia.core;
/**
* Interface that defines a KadConfiguration object
*
* @author Joshua Kissoon
* @since 20140329
*/
public interface KadConfiguration
{
/**
* @return Interval in milliseconds between execution of RestoreOperations.
*/
public long restoreInterval();
/**
* If no reply received from a node in this period (in milliseconds)
* consider the node unresponsive.
*
* @return The time it takes to consider a node unresponsive
*/
public long responseTimeout();
/**
* @return Maximum number of milliseconds for performing an operation.
*/
public long operationTimeout();
/**
* @return Maximum number of concurrent messages in transit.
*/
public int maxConcurrentMessagesTransiting();
/**
* @return K-Value used throughout Kademlia
*/
public int k();
/**
* @return Size of replacement cache.
*/
public int replacementCacheSize();
/**
* @return # of times a node can be marked as stale before it is actually removed.
*/
public int stale();
/**
* Creates the folder in which this node data is to be stored
*
* @param ownerId
*
* @return The folder path
*/
public String getNodeDataFolder(String ownerId);
}

View File

@ -30,6 +30,9 @@ public class KadServer
/* Maximum size of a Datagram Packet */
private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB
/* Basic Kad Objects */
private final KadConfiguration config;
/* Server Objects */
private final int udpPort;
private final DatagramSocket socket;
@ -57,12 +60,15 @@ public class KadServer
* @param udpPort The port to listen on
* @param mFactory Factory used to create messages
* @param localNode Local node on which this server runs on
* @param config
*
* @throws java.net.SocketException
*/
public KadServer(int udpPort, MessageFactory mFactory, Node localNode) throws SocketException
public KadServer(int udpPort, MessageFactory mFactory, Node localNode, KadConfiguration config) throws SocketException
{
this.udpPort = udpPort;
this.config = config;
this.socket = new DatagramSocket(udpPort);
this.localNode = localNode;
@ -116,7 +122,7 @@ public class KadServer
//System.out.println(this.localNode + " Putting Receiver for comm: " + comm + " Receiver: " + recv);
receivers.put(comm, recv);
TimerTask task = new TimeoutTask(comm, recv);
timer.schedule(task, Configuration.RESPONSE_TIMEOUT);
timer.schedule(task, this.config.responseTimeout());
tasks.put(comm, task);
}

View File

@ -9,8 +9,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import kademlia.core.Configuration;
import kademlia.core.DefaultConfiguration;
import kademlia.core.GetParameter;
import kademlia.core.KadConfiguration;
import kademlia.exceptions.ContentExistException;
import kademlia.exceptions.ContentNotFoundException;
import kademlia.node.NodeId;
@ -27,6 +28,7 @@ public class DHT
private transient StorageEntryManager entriesManager;
private transient final JsonSerializer<KadContent> contentSerializer;
private final KadConfiguration config;
private final String ownerId;
@ -35,9 +37,10 @@ public class DHT
contentSerializer = new JsonSerializer<>();
}
public DHT(String ownerId)
public DHT(String ownerId, KadConfiguration config)
{
this.ownerId = ownerId;
this.config = config;
this.initialize();
}
@ -205,7 +208,7 @@ public class DHT
* The name of the file containing the content is the hash of this content
*/
String folderName = key.hexRepresentation().substring(0, 10);
File contentStorageFolder = new File(Configuration.getNodeDataFolder(ownerId) + File.separator + folderName);
File contentStorageFolder = new File(this.config.getNodeDataFolder(ownerId) + File.separator + folderName);
/* Create the content folder if it doesn't exist */
if (!contentStorageFolder.isDirectory())

View File

@ -1,6 +1,7 @@
package kademlia.message;
import java.io.IOException;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.node.Node;
@ -19,12 +20,14 @@ public class ContentLookupReceiver implements Receiver
private final KadServer server;
private final Node localNode;
private final DHT dht;
private final KadConfiguration config;
public ContentLookupReceiver(KadServer server, Node localNode, DHT dht)
public ContentLookupReceiver(KadServer server, Node localNode, DHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
@Override
@ -47,7 +50,7 @@ public class ContentLookupReceiver implements Receiver
* We create a NodeLookupReceiver and let this receiver handle this operation
*/
NodeLookupMessage lkpMsg = new NodeLookupMessage(msg.getOrigin(), msg.getParameters().getKey());
new NodeLookupReceiver(server, localNode).receive(lkpMsg, comm);
new NodeLookupReceiver(server, localNode, this.config).receive(lkpMsg, comm);
}
}

View File

@ -2,6 +2,7 @@ package kademlia.message;
import java.io.DataInputStream;
import java.io.IOException;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.node.Node;
@ -18,11 +19,13 @@ public class MessageFactory
private final Node localNode;
private final DHT dht;
private final KadConfiguration config;
public MessageFactory(Node local, DHT dht)
public MessageFactory(Node local, DHT dht, KadConfiguration config)
{
this.localNode = local;
this.dht = dht;
this.config = config;
}
public Message createMessage(byte code, DataInputStream in) throws IOException
@ -59,9 +62,9 @@ public class MessageFactory
case ConnectMessage.CODE:
return new ConnectReceiver(server, this.localNode);
case ContentLookupMessage.CODE:
return new ContentLookupReceiver(server, localNode, dht);
return new ContentLookupReceiver(server, this.localNode, this.dht, this.config);
case NodeLookupMessage.CODE:
return new NodeLookupReceiver(server, this.localNode);
return new NodeLookupReceiver(server, this.localNode, this.config);
case SimpleMessage.CODE:
return new SimpleReceiver();
case StoreContentMessage.CODE:

View File

@ -2,7 +2,7 @@ package kademlia.message;
import java.io.IOException;
import java.util.List;
import kademlia.core.Configuration;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.node.Node;
import kademlia.operation.Receiver;
@ -18,11 +18,13 @@ public class NodeLookupReceiver implements Receiver
private final KadServer server;
private final Node localNode;
private final KadConfiguration config;
public NodeLookupReceiver(KadServer server, Node local)
public NodeLookupReceiver(KadServer server, Node local, KadConfiguration config)
{
this.server = server;
this.localNode = local;
this.config = config;
}
/**
@ -44,7 +46,7 @@ public class NodeLookupReceiver implements Receiver
this.localNode.getRoutingTable().insert(origin);
/* Find nodes closest to the LookupId */
List<Node> nodes = this.localNode.getRoutingTable().findClosest(msg.getLookupId(), Configuration.K);
List<Node> nodes = this.localNode.getRoutingTable().findClosest(msg.getLookupId(), this.config.k());
/* Respond to the NodeLookupMessage */
Message reply = new NodeReplyMessage(this.localNode, nodes);

View File

@ -1,6 +1,7 @@
package kademlia.operation;
import java.io.IOException;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.node.Node;
import kademlia.node.NodeId;
@ -17,11 +18,13 @@ public class BucketRefreshOperation implements Operation
private final KadServer server;
private final Node localNode;
private final KadConfiguration config;
public BucketRefreshOperation(KadServer server, Node localNode)
public BucketRefreshOperation(KadServer server, Node localNode, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.config = config;
}
/**
@ -50,7 +53,7 @@ public class BucketRefreshOperation implements Operation
{
try
{
new NodeLookupOperation(server, localNode, localNode.getNodeId()).execute();
new NodeLookupOperation(server, localNode, localNode.getNodeId(), BucketRefreshOperation.this.config).execute();
}
catch (IOException e)
{

View File

@ -6,7 +6,8 @@
package kademlia.operation;
import java.io.IOException;
import kademlia.core.Configuration;
import kademlia.core.DefaultConfiguration;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.exceptions.RoutingException;
import kademlia.message.AcknowledgeMessage;
@ -22,6 +23,7 @@ public class ConnectOperation implements Operation, Receiver
private final KadServer server;
private final Node localNode;
private final Node bootstrapNode;
private final KadConfiguration config;
private boolean error;
private int attempts;
@ -30,12 +32,14 @@ public class ConnectOperation implements Operation, Receiver
* @param server The message server used to send/receive messages
* @param local The local node
* @param bootstrap Node to use to bootstrap the local node onto the network
* @param config
*/
public ConnectOperation(KadServer server, Node local, Node bootstrap)
public ConnectOperation(KadServer server, Node local, Node bootstrap, KadConfiguration config)
{
this.server = server;
this.localNode = local;
this.bootstrapNode = bootstrap;
this.config = config;
}
@Override
@ -52,7 +56,7 @@ public class ConnectOperation implements Operation, Receiver
server.sendMessage(this.bootstrapNode, m, this);
/* Wait for a while */
wait(Configuration.OPERATION_TIMEOUT);
wait(config.operationTimeout());
if (error)
{
@ -61,7 +65,7 @@ public class ConnectOperation implements Operation, Receiver
}
/* Perform lookup for our own ID to get nodes close to us */
Operation lookup = new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId());
Operation lookup = new NodeLookupOperation(this.server, this.localNode, this.localNode.getNodeId(), this.config);
lookup.execute();
/**

View File

@ -9,8 +9,9 @@ import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import kademlia.core.Configuration;
import kademlia.core.DefaultConfiguration;
import kademlia.core.GetParameter;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.KadContent;
import kademlia.exceptions.RoutingException;
@ -44,6 +45,7 @@ public class ContentLookupOperation implements Operation, Receiver
private final GetParameter params;
private final List<KadContent> contentFound;
private final int numResultsReq;
private final KadConfiguration config;
private final ContentLookupMessage lookupMessage;
@ -68,8 +70,9 @@ public class ContentLookupOperation implements Operation, Receiver
* @param localNode
* @param params The parameters to search for the content which we need to find
* @param numResultsReq The number of results for this content from different nodes required
* @param config
*/
public ContentLookupOperation(KadServer server, Node localNode, GetParameter params, int numResultsReq)
public ContentLookupOperation(KadServer server, Node localNode, GetParameter params, int numResultsReq, KadConfiguration config)
{
/* Construct our lookup message */
this.lookupMessage = new ContentLookupMessage(localNode, params);
@ -78,6 +81,7 @@ public class ContentLookupOperation implements Operation, Receiver
this.localNode = localNode;
this.params = params;
this.numResultsReq = numResultsReq;
this.config = config;
/**
* We initialize a TreeMap to store nodes.
@ -107,19 +111,19 @@ public class ContentLookupOperation implements Operation, Receiver
{
/* If we haven't finished as yet, wait a while */
/**
* @todo Get rid of this wait here!
* @todo Get rid of this wait here!
* We should run this until there are no nodes left to ask from the K closest nodes
* and only pause for short intervals in between
*
*
* @todo Do the same for the NodeLookupOperation
*/
wait(Configuration.OPERATION_TIMEOUT);
wait(this.config.operationTimeout());
/* If we still haven't received any responses by then, do a routing timeout */
if (error)
{
/* Lets not throw any exception */
//throw new RoutingException("Content Lookup Operation Timeout.");
}
}
@ -149,7 +153,7 @@ public class ContentLookupOperation implements Operation, Receiver
/**
* Asks some of the K closest nodes seen but not yet queried.
* Assures that no more than Configuration.CONCURRENCY messages are in transit at a time
* Assures that no more than DefaultConfiguration.CONCURRENCY messages are in transit at a time
*
* This method should be called every time a reply is received or a timeout occurs.
*
@ -161,7 +165,7 @@ public class ContentLookupOperation implements Operation, Receiver
private boolean askNodesorFinish() throws IOException
{
/* If >= CONCURRENCY nodes are in transit, don't do anything */
if (Configuration.CONCURRENCY <= this.messagesTransiting.size())
if (this.config.maxConcurrentMessagesTransiting() <= this.messagesTransiting.size())
{
return false;
}
@ -183,7 +187,7 @@ public class ContentLookupOperation implements Operation, Receiver
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
*/
for (int i = 0; (this.messagesTransiting.size() < Configuration.CONCURRENCY) && (i < unasked.size()); i++)
for (int i = 0; (this.messagesTransiting.size() < this.config.maxConcurrentMessagesTransiting()) && (i < unasked.size()); i++)
{
Node n = (Node) unasked.get(i);
@ -207,8 +211,8 @@ public class ContentLookupOperation implements Operation, Receiver
*/
private List<Node> closestNodesNotFailed(Byte status)
{
List<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry e : this.nodes.entrySet())
{
@ -237,7 +241,7 @@ public class ContentLookupOperation implements Operation, Receiver
{
return;
}
if (incoming instanceof ContentMessage)
{
/* The reply received is a content message with the required content, take it in */

View File

@ -2,7 +2,8 @@ package kademlia.operation;
import java.io.IOException;
import java.util.List;
import kademlia.core.Configuration;
import kademlia.core.DefaultConfiguration;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.dht.StorageEntry;
@ -23,12 +24,14 @@ public class ContentRefreshOperation implements Operation
private final KadServer server;
private final Node localNode;
private final DHT dht;
private final KadConfiguration config;
public ContentRefreshOperation(KadServer server, Node localNode, DHT dht)
public ContentRefreshOperation(KadServer server, Node localNode, DHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
/**
@ -54,7 +57,7 @@ public class ContentRefreshOperation implements Operation
* only distribute it if it has been last updated > 1 hour ago
*/
/* Get the K closest nodes to this entries */
List<Node> closestNodes = this.localNode.getRoutingTable().findClosest(e.getKey(), Configuration.K);
List<Node> closestNodes = this.localNode.getRoutingTable().findClosest(e.getKey(), this.config.k());
/* Create the message */
Message msg = new StoreContentMessage(this.localNode, dht.get(e));

View File

@ -1,6 +1,7 @@
package kademlia.operation;
import java.io.IOException;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.node.Node;
@ -17,21 +18,23 @@ public class KadRefreshOperation implements Operation
private final KadServer server;
private final Node localNode;
private final DHT dht;
private final KadConfiguration config;
public KadRefreshOperation(KadServer server, Node localNode, DHT dht)
public KadRefreshOperation(KadServer server, Node localNode, DHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
@Override
public void execute() throws IOException
{
/* Run our BucketRefreshOperation to refresh buckets */
new BucketRefreshOperation(server, localNode).execute();
new BucketRefreshOperation(this.server, this.localNode, this.config).execute();
/* After buckets have been refreshed, we refresh content */
new ContentRefreshOperation(server, localNode, dht).execute();
new ContentRefreshOperation(this.server, this.localNode, this.dht, this.config).execute();
}
}

View File

@ -7,7 +7,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import kademlia.core.Configuration;
import kademlia.core.DefaultConfiguration;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.exceptions.RoutingException;
import kademlia.exceptions.UnknownMessageException;
@ -38,6 +39,7 @@ public class NodeLookupOperation implements Operation, Receiver
private final KadServer server;
private final Node localNode;
private final NodeId lookupId;
private final KadConfiguration config;
private boolean error;
@ -59,12 +61,14 @@ public class NodeLookupOperation implements Operation, Receiver
* @param server KadServer used for communication
* @param localNode The local node making the communication
* @param lookupId The ID for which to find nodes close to
* @param config
*/
public NodeLookupOperation(KadServer server, Node localNode, NodeId lookupId)
public NodeLookupOperation(KadServer server, Node localNode, NodeId lookupId, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.lookupId = lookupId;
this.config = config;
this.lookupMessage = new NodeLookupMessage(localNode, lookupId);
@ -96,7 +100,7 @@ public class NodeLookupOperation implements Operation, Receiver
if (!this.askNodesorFinish())
{
/* If we haven't finished as yet, wait a while */
wait(Configuration.OPERATION_TIMEOUT);
wait(this.config.operationTimeout());
/* If we still haven't received any responses by then, do a routing timeout */
if (error)
@ -135,7 +139,7 @@ public class NodeLookupOperation implements Operation, Receiver
/**
* Asks some of the K closest nodes seen but not yet queried.
* Assures that no more than Configuration.CONCURRENCY messages are in transit at a time
* Assures that no more than DefaultConfiguration.CONCURRENCY messages are in transit at a time
*
* This method should be called every time a reply is received or a timeout occurs.
*
@ -147,7 +151,7 @@ public class NodeLookupOperation implements Operation, Receiver
private boolean askNodesorFinish() throws IOException
{
/* If >= CONCURRENCY nodes are in transit, don't do anything */
if (Configuration.CONCURRENCY <= this.messagesTransiting.size())
if (this.config.maxConcurrentMessagesTransiting() <= this.messagesTransiting.size())
{
return false;
}
@ -166,7 +170,7 @@ public class NodeLookupOperation implements Operation, Receiver
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
*/
for (int i = 0; (this.messagesTransiting.size() < Configuration.CONCURRENCY) && (i < unasked.size()); i++)
for (int i = 0; (this.messagesTransiting.size() < this.config.maxConcurrentMessagesTransiting()) && (i < unasked.size()); i++)
{
Node n = (Node) unasked.get(i);
@ -187,8 +191,8 @@ public class NodeLookupOperation implements Operation, Receiver
*/
private List<Node> closestNodes(String status)
{
List<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry e : this.nodes.entrySet())
{
@ -216,8 +220,8 @@ public class NodeLookupOperation implements Operation, Receiver
*/
private List<Node> closestNodesNotFailed(String status)
{
List<Node> closestNodes = new ArrayList<>(Configuration.K);
int remainingSpaces = Configuration.K;
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry<Node, String> e : this.nodes.entrySet())
{

View File

@ -2,6 +2,7 @@ package kademlia.operation;
import java.io.IOException;
import java.util.List;
import kademlia.core.KadConfiguration;
import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.dht.KadContent;
@ -22,26 +23,29 @@ public class StoreOperation implements Operation
private final Node localNode;
private final KadContent content;
private final DHT localDht;
private final KadConfiguration config;
/**
* @param server
* @param localNode
* @param content The content to be stored on the DHT
* @param localDht The local DHT
* @param config
*/
public StoreOperation(KadServer server, Node localNode, KadContent content, DHT localDht)
public StoreOperation(KadServer server, Node localNode, KadContent content, DHT localDht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.content = content;
this.localDht = localDht;
this.config = config;
}
@Override
public synchronized void execute() throws IOException
{
/* Get the nodes on which we need to store the content */
NodeLookupOperation ndlo = new NodeLookupOperation(this.server, this.localNode, this.content.getKey());
NodeLookupOperation ndlo = new NodeLookupOperation(this.server, this.localNode, this.content.getKey(), this.config);
ndlo.execute();
List<Node> nodes = ndlo.getClosestNodes();

View File

@ -0,0 +1,87 @@
package kademlia.routing;
import kademlia.node.NodeId;
/**
* A GET request can get content based on Key, Owner, Type, etc
*
* This is a class containing the parameters to be passed in a GET request
*
* We use a class since the number of filtering parameters can change later
*
* @author Joshua Kissoon
* @since 20140224
*/
public class GetParameter
{
private NodeId key;
private String ownerId = null;
private String type = null;
/**
* Construct a GetParameter to search for data by NodeId
*
* @param key
*/
public GetParameter(NodeId key)
{
this.key = key;
}
/**
* Construct a GetParameter to search for data by NodeId and owner
*
* @param key
* @param owner
*/
public GetParameter(NodeId key, String owner)
{
this(key);
this.ownerId = owner;
}
/**
* Construct a GetParameter to search for data by NodeId, owner, type
*
* @param key
* @param owner
* @param type
*/
public GetParameter(NodeId key, String owner, String type)
{
this(key, owner);
this.type = type;
}
public NodeId getKey()
{
return this.key;
}
public void setOwnerId(String ownerId)
{
this.ownerId = ownerId;
}
public String getOwnerId()
{
return this.ownerId;
}
public void setType(String type)
{
this.type = type;
}
public String getType()
{
return this.type;
}
@Override
public String toString()
{
return "GetParameter - [Key: " + key + "][Owner: " + this.ownerId + "][Type: " + this.type + "]";
}
}

View File

@ -2,8 +2,9 @@ package kademlia.tests;
import java.util.Timer;
import java.util.TimerTask;
import kademlia.core.Configuration;
import kademlia.core.Kademlia;
import kademlia.core.DefaultConfiguration;
import kademlia.Kademlia;
import kademlia.core.KadConfiguration;
import kademlia.node.NodeId;
/**
@ -53,6 +54,7 @@ public class AutoRefreshOperationTest
System.out.println(kad5);
/* Print the node states every few minutes */
KadConfiguration config = new DefaultConfiguration();
Timer timer = new Timer(true);
timer.schedule(
new TimerTask()
@ -68,7 +70,7 @@ public class AutoRefreshOperationTest
}
},
// Delay // Interval
Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL
config.restoreInterval(), config.restoreInterval()
);
}

View File

@ -2,8 +2,9 @@ package kademlia.tests;
import java.util.Timer;
import java.util.TimerTask;
import kademlia.core.Configuration;
import kademlia.core.Kademlia;
import kademlia.core.DefaultConfiguration;
import kademlia.Kademlia;
import kademlia.core.KadConfiguration;
import kademlia.node.NodeId;
/**
@ -32,8 +33,7 @@ public class AutoRefreshOperationTest2
DHTContentImpl c = new DHTContentImpl(new NodeId("AS84k678947584567465"), kad1.getOwnerId());
c.setData("Setting the data");
kad1.putLocally(c);
System.out.println("\n Content ID: " + c.getKey());
System.out.println(kad1.getNode() + " Distance from content: " + kad1.getNode().getNodeId().getDistance(c.getKey()));
System.out.println(kad2.getNode() + " Distance from content: " + kad2.getNode().getNodeId().getDistance(c.getKey()));
@ -45,6 +45,7 @@ public class AutoRefreshOperationTest2
System.out.println(kad3);
/* Print the node states every few minutes */
KadConfiguration config = new DefaultConfiguration();
Timer timer = new Timer(true);
timer.schedule(
new TimerTask()
@ -58,7 +59,7 @@ public class AutoRefreshOperationTest2
}
},
// Delay // Interval
Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL
config.restoreInterval(), config.restoreInterval()
);
}

View File

@ -3,7 +3,7 @@ package kademlia.tests;
import java.io.IOException;
import java.util.List;
import kademlia.core.GetParameter;
import kademlia.core.Kademlia;
import kademlia.Kademlia;
import kademlia.dht.KadContent;
import kademlia.node.NodeId;

View File

@ -1,7 +1,7 @@
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Kademlia;
import kademlia.Kademlia;
import kademlia.node.NodeId;
/**

View File

@ -3,7 +3,7 @@ package kademlia.tests;
import java.io.IOException;
import java.util.List;
import kademlia.core.GetParameter;
import kademlia.core.Kademlia;
import kademlia.Kademlia;
import kademlia.dht.KadContent;
import kademlia.node.NodeId;

View File

@ -1,6 +1,6 @@
package kademlia.tests;
import kademlia.core.Kademlia;
import kademlia.Kademlia;
import kademlia.node.NodeId;
/**

View File

@ -1,7 +1,7 @@
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Kademlia;
import kademlia.Kademlia;
import kademlia.message.SimpleMessage;
import kademlia.node.NodeId;
import kademlia.message.SimpleReceiver;