KademliaDHT/src/kademlia/core/Kademlia.java

391 lines
13 KiB
Java
Raw Normal View History

2014-02-18 20:37:07 +00:00
package kademlia.core;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
2014-02-18 20:37:07 +00:00
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
2014-02-18 20:37:07 +00:00
import java.util.Timer;
import java.util.TimerTask;
import kademlia.dht.DHT;
2014-02-25 08:12:08 +00:00
import kademlia.dht.KadContent;
2014-02-18 20:37:07 +00:00
import kademlia.exceptions.RoutingException;
import kademlia.message.MessageFactory;
import kademlia.node.Node;
import kademlia.node.NodeId;
import kademlia.operation.ConnectOperation;
import kademlia.operation.ContentLookupOperation;
2014-02-18 20:37:07 +00:00
import kademlia.operation.Operation;
import kademlia.operation.KadRefreshOperation;
import kademlia.operation.StoreOperation;
2014-03-10 05:38:51 +00:00
import kademlia.routing.RoutingTable;
import kademlia.serializer.JsonDHTSerializer;
import kademlia.serializer.JsonRoutingTableSerializer;
import kademlia.serializer.JsonSerializer;
2014-02-18 20:37:07 +00:00
/**
* The main Kademlia network management class
*
* @author Joshua Kissoon
* @since 20140215
*
* @todo When we receive a store message - if we have a newer version of the content, re-send this newer version to that node so as to update their version
* @todo Handle IPv6 Addresses
* @todo Handle compressing data
* @todo Allow optional storing of content locally using the put method
* @todo Instead of using a StoreContentMessage to send a store RPC and a ContentMessage to receive a FIND rpc, make them 1 message with different operation type
2014-03-07 05:48:04 +00:00
* @todo If we're trying to send a message to this node, just cancel the sending process and handle the message right here
* @todo Keep this node in it's own routing table - it helps for ContentRefresh operation - easy to check whether this node is one of the k-nodes for a content
* @todo Move DHT.getContentStorageFolderName to the Configuration class
* @todo Implement Kademlia.ping() operation.
2014-03-07 05:48:04 +00:00
*
*/
2014-02-18 20:37:07 +00:00
public class Kademlia
{
/* Kademlia Attributes */
private final String ownerId;
2014-02-18 20:37:07 +00:00
/* Objects to be used */
2014-03-10 05:38:51 +00:00
private final transient Node localNode;
private final transient KadServer server;
private final transient DHT dht;
private final transient Timer timer;
private final int udpPort;
2014-02-18 20:37:07 +00:00
/* Factories */
2014-03-10 05:38:51 +00:00
private final transient MessageFactory messageFactory;
2014-02-18 20:37:07 +00:00
/**
* Creates a Kademlia DistributedMap using the specified name as filename base.
* If the id cannot be read from disk the specified defaultId is used.
* The instance is bootstraped to an existing network by specifying the
* address of a bootstrap node in the network.
*
* @param ownerId The Name of this node used for storage
* @param localNode The Local Node for this Kad instance
2014-02-18 20:37:07 +00:00
* @param udpPort The UDP port to use for routing messages
* @param dht The DHT for this instance
2014-02-18 20:37:07 +00:00
*
* @throws IOException If an error occurred while reading id or local map
* from disk <i>or</i> a network error occurred while
attempting to bootstrap to the network
2014-02-18 20:37:07 +00:00
* */
public Kademlia(String ownerId, Node localNode, int udpPort, DHT dht) throws IOException
2014-02-18 20:37:07 +00:00
{
this.ownerId = ownerId;
this.udpPort = udpPort;
this.localNode = localNode;
this.dht = dht;
this.messageFactory = new MessageFactory(localNode, this.dht);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode);
2014-02-18 20:37:07 +00:00
this.timer = new Timer(true);
/* Schedule Recurring RestoreOperation */
timer.schedule(
new TimerTask()
{
@Override
public void run()
{
try
{
/* Runs a DHT RefreshOperation */
Kademlia.this.refresh();
}
catch (IOException e)
{
System.err.println("Refresh Operation Failed; Message: " + e.getMessage());
}
2014-02-18 20:37:07 +00:00
}
},
// Delay // Interval
Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL
);
}
public Kademlia(String ownerId, NodeId defaultId, int udpPort) throws IOException
{
this(ownerId, new Node(defaultId, InetAddress.getLocalHost(), udpPort), udpPort, new DHT());
}
/**
* Load Stored state
*
* @param ownerId The ID of the owner for the stored state
*
* @return A Kademlia instance loaded from a stored state in a file
*
* @throws java.io.FileNotFoundException
* @throws java.lang.ClassNotFoundException
*
* @todo Boot up this Kademlia instance from a saved file state
*/
public static Kademlia loadFromFile(String ownerId) throws FileNotFoundException, IOException, ClassNotFoundException
{
DataInputStream din;
/**
* @section Read Basic Kad data
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "kad.kns"));
Kademlia ikad = new JsonSerializer<Kademlia>().read(din);
/**
* @section Read the routing table
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "routingtable.kns"));
RoutingTable irtbl = new JsonRoutingTableSerializer().read(din);
/**
* @section Read the node state
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "node.kns"));
Node inode = new JsonSerializer<Node>().read(din);
inode.setRoutingTable(irtbl);
/**
* @section Read the DHT
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId) + File.separator + "dht.kns"));
DHT idht = new JsonDHTSerializer().read(din);
return new Kademlia(ownerId, inode, ikad.getPort(), idht);
}
2014-02-18 20:37:07 +00:00
/**
* @return Node The local node for this system
*/
public Node getNode()
{
return this.localNode;
}
/**
* @return The KadServer used to send/receive messages
*/
public KadServer getServer()
{
return this.server;
}
/**
* Connect to an existing peer-to-peer network.
*
* @param n The known node in the peer-to-peer network
*
* @throws RoutingException If the bootstrap node could not be contacted
* @throws IOException If a network error occurred
* @throws IllegalStateException If this object is closed
* */
public synchronized final void bootstrap(Node n) throws IOException, RoutingException
2014-02-18 20:37:07 +00:00
{
Operation op = new ConnectOperation(this.server, this.localNode, n);
op.execute();
}
/**
* Stores the specified value under the given key
* This value is stored on K nodes on the network, or all nodes if there are > K total nodes in the network
*
* @param content The content to put onto the DHT
*
* @return Integer How many nodes the content was stored on
*
* @throws java.io.IOException
*
*/
public synchronized int put(KadContent content) throws IOException
{
StoreOperation sop = new StoreOperation(server, localNode, content);
sop.execute();
/* Return how many nodes the content was stored on */
return sop.numNodesStoredAt();
}
/**
* Get some content stored on the DHT
* The content returned is a JSON String in byte format; this string is parsed into a class
*
* @param param The parameters used to search for the content
* @param numResultsReq How many results are required from different nodes
*
* @return DHTContent The content
*
* @throws java.io.IOException
*/
public List<KadContent> get(GetParameter param, int numResultsReq) throws NoSuchElementException, IOException
{
List contentFound;
if (this.dht.contains(param))
{
/* If the content exist in our own DHT, then return it. */
contentFound = new ArrayList<>();
contentFound.add(this.dht.get(param));
}
else
{
/* Seems like it doesn't exist in our DHT, get it from other Nodes */
ContentLookupOperation clo = new ContentLookupOperation(server, localNode, param, numResultsReq);
clo.execute();
contentFound = clo.getContentFound();
}
return contentFound;
}
/**
* Allow the user of the System to call refresh even out of the normal Kad refresh timing
*
* @throws java.io.IOException
*/
public void refresh() throws IOException
{
new KadRefreshOperation(this.server, this.localNode, this.dht).execute();
}
/**
* @return String The ID of the owner of this local network
*/
public String getOwnerId()
{
return this.ownerId;
}
/**
* @return Integer The port on which this kad instance is running
*/
public int getPort()
{
return this.udpPort;
}
/**
* Here we handle properly shutting down the Kademlia instance
*
* @throws java.io.FileNotFoundException
*/
public void shutdown() throws FileNotFoundException, IOException
{
/* Shut down the server */
this.server.shutdown();
/* Save this Kademlia instance's state if required */
if (Configuration.SAVE_STATE_ON_SHUTDOWN)
{
/* Save the system state */
this.saveKadState();
}
}
/**
* Saves the node state to a text file
*
* @throws java.io.FileNotFoundException
*/
2014-03-10 05:38:51 +00:00
private void saveKadState() throws FileNotFoundException, IOException
{
DataOutputStream dout;
2014-03-10 05:38:51 +00:00
/**
* @section Store Basic Kad data
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "kad.kns"));
new JsonSerializer<Kademlia>().write(this, dout);
2014-03-10 05:38:51 +00:00
/**
* @section Save the node state
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "node.kns"));
new JsonSerializer<Node>().write(this.localNode, dout);
2014-03-10 05:38:51 +00:00
/**
* @section Save the routing table
* We need to save the routing table separate from the node since the routing table will contain the node and the node will contain the routing table
* This will cause a serialization recursion, and in turn a Stack Overflow
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "routingtable.kns"));
new JsonRoutingTableSerializer().write(this.localNode.getRoutingTable(), dout);
2014-03-10 05:38:51 +00:00
/**
* @section Save the DHT
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId) + File.separator + "dht.kns"));
new JsonDHTSerializer().write(this.dht, dout);
2014-03-10 05:38:51 +00:00
}
/**
* Get the name of the folder for which a content should be stored
*
* @return String The name of the folder to store node states
*/
2014-03-10 05:38:51 +00:00
private static String getStateStorageFolderName(String ownerId)
{
2014-03-10 05:38:51 +00:00
String path = System.getProperty("user.home") + File.separator + Configuration.LOCAL_FOLDER;
File folder = new File(path);
/* Create the main storage folder if it doesn't exist */
2014-03-10 05:38:51 +00:00
if (!folder.isDirectory())
{
2014-03-10 05:38:51 +00:00
folder.mkdir();
}
2014-03-10 05:38:51 +00:00
/* Create the nodes storage folder if it doesn't exist */
path = folder + File.separator + "nodes";
folder = new File(path);
if (!folder.isDirectory())
{
2014-03-10 05:38:51 +00:00
folder.mkdir();
}
2014-03-10 05:38:51 +00:00
/* Create this Kad instance storage folder */
path += File.separator + ownerId;
folder = new File(path);
if (!folder.isDirectory())
{
folder.mkdir();
}
return folder.toString();
}
/**
* Creates a string containing all data about this Kademlia instance
*
* @return The string representation of this Kad instance
*/
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("\n\nPrinting Kad State for instance with owner: ");
sb.append(this.ownerId);
sb.append("\n\n");
sb.append("\n");
sb.append("Local Node");
sb.append(this.localNode);
sb.append("\n");
sb.append("\n");
sb.append("Routing Table: ");
sb.append(this.localNode.getRoutingTable());
sb.append("\n");
sb.append("\n");
sb.append("DHT: ");
sb.append(this.dht);
sb.append("\n");
sb.append("\n\n\n");
return sb.toString();
}
2014-02-18 20:37:07 +00:00
}