Removed local Kademlia lib, changed to Maven Central.

This commit is contained in:
ChronosX88 2019-03-03 11:26:38 +04:00
parent 5200454447
commit 39669a6ebf
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
65 changed files with 8 additions and 6589 deletions

View File

@ -12,6 +12,7 @@ repositories {
dependencies { dependencies {
implementation 'com.google.code.gson:gson:2.8.5' implementation 'com.google.code.gson:gson:2.8.5'
implementation 'io.github.chronosx88:kademliadht:1.0'
} }
jar { jar {

View File

@ -1,15 +1,19 @@
package io.github.chronosx88.dhtBootstrap; package io.github.chronosx88.dhtBootstrap;
import io.github.chronosx88.dhtBootstrap.kademlia.JKademliaNode; import io.github.chronosx88.kademliadht.DefaultConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId; import io.github.chronosx88.kademliadht.JKademliaNode;
import io.github.chronosx88.kademliadht.node.KademliaId;
import io.github.chronosx88.kademliadht.node.Node;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address;
public class Main { public class Main {
private static JKademliaNode node; private static JKademliaNode node;
public static void main(String[] args) { public static void main(String[] args) {
try { try {
node = new JKademliaNode("Main Bootstrap Node", new KademliaId("D65D56E189E513A6AB8E38370E6B33386EB639D6"), 7243); KademliaId kadID = new KademliaId("sgCZ+fg49g4N8FU43kW84cNVPTw=");
node = new JKademliaNode("Main Bootstrap Node", new Node(kadID, Inet4Address.getLocalHost(), 7243), 7243, new DefaultConfiguration());
System.out.println(node.getNode().getNodeId().toString()); System.out.println(node.getNode().getNodeId().toString());
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -1,101 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
import java.io.File;
/**
* A set of Kademlia configuration parameters. Default values are
* supplied and can be changed by the application as necessary.
*
*/
public class DefaultConfiguration implements KadConfiguration
{
private final static long RESTORE_INTERVAL = 60 * 1000; // in milliseconds
private final static long RESPONSE_TIMEOUT = 2000;
private final static long OPERATION_TIMEOUT = 2000;
private final static int CONCURRENCY = 10;
private final static int K = 5;
private final static int RCSIZE = 3;
private final static int STALE = 1;
private final static String LOCAL_FOLDER = "kademlia";
private final static boolean IS_TESTING = true;
/**
* Default constructor to support Gson Serialization
*/
public DefaultConfiguration()
{
}
@Override
public long restoreInterval()
{
return RESTORE_INTERVAL;
}
@Override
public long responseTimeout()
{
return RESPONSE_TIMEOUT;
}
@Override
public long operationTimeout()
{
return OPERATION_TIMEOUT;
}
@Override
public int maxConcurrentMessagesTransiting()
{
return CONCURRENCY;
}
@Override
public int k()
{
return K;
}
@Override
public int replacementCacheSize()
{
return RCSIZE;
}
@Override
public int stale()
{
return STALE;
}
@Override
public String getNodeDataFolder(String ownerId)
{
/* Setup the main storage folder if it doesn't exist */
String path = System.getProperty("user.home") + File.separator + DefaultConfiguration.LOCAL_FOLDER;
File folder = new File(path);
if (!folder.isDirectory())
{
folder.mkdir();
}
/* Setup subfolder for this owner if it doesn't exist */
File ownerFolder = new File(folder + File.separator + ownerId);
if (!ownerFolder.isDirectory())
{
ownerFolder.mkdir();
}
/* Return the path */
return ownerFolder.toString();
}
@Override
public boolean isTesting()
{
return IS_TESTING;
}
}

View File

@ -1,428 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.GetParameter;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.DHT;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KadContent;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.MessageFactory;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
import io.github.chronosx88.dhtBootstrap.kademlia.operation.ConnectOperation;
import io.github.chronosx88.dhtBootstrap.kademlia.operation.ContentLookupOperation;
import io.github.chronosx88.dhtBootstrap.kademlia.operation.Operation;
import io.github.chronosx88.dhtBootstrap.kademlia.operation.KadRefreshOperation;
import io.github.chronosx88.dhtBootstrap.kademlia.operation.StoreOperation;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.JKademliaRoutingTable;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.KademliaRoutingTable;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonDHTSerializer;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonRoutingTableSerializer;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonSerializer;
/**
* The main Kademlia Node on the network, this node manages everything for this local system.
*
* @author Joshua Kissoon
* @since 20140215
*
* @todo When we receive a store message - if we have a newer version of the content, re-send this newer version to that node so as to update their version
* @todo Handle IPv6 Addresses
*
*/
public class JKademliaNode implements KademliaNode
{
/* Kademlia Attributes */
private final String ownerId;
/* Objects to be used */
private final transient Node localNode;
private final transient KadServer server;
private final transient KademliaDHT dht;
private transient KademliaRoutingTable routingTable;
private final int udpPort;
private transient KadConfiguration config;
/* Timer used to execute refresh operations */
private transient Timer refreshOperationTimer;
private transient TimerTask refreshOperationTTask;
/* Factories */
private final transient MessageFactory messageFactory;
/* Statistics */
private final transient KadStatistician statistician;
{
statistician = new Statistician();
}
/**
* Creates a Kademlia DistributedMap using the specified name as filename base.
* If the id cannot be read from disk the specified defaultId is used.
* The instance is bootstraped to an existing network by specifying the
* address of a bootstrap node in the network.
*
* @param ownerId The Name of this node used for storage
* @param localNode The Local Node for this Kad instance
* @param udpPort The UDP port to use for routing messages
* @param dht The DHT for this instance
* @param config
* @param routingTable
*
* @throws IOException If an error occurred while reading id or local map
* from disk <i>or</i> a network error occurred while
* attempting to bootstrap to the network
* */
public JKademliaNode(String ownerId, Node localNode, int udpPort, KademliaDHT dht, KademliaRoutingTable routingTable, KadConfiguration config) throws IOException
{
this.ownerId = ownerId;
this.udpPort = udpPort;
this.localNode = localNode;
this.dht = dht;
this.config = config;
this.routingTable = routingTable;
this.messageFactory = new MessageFactory(this, this.dht, this.config);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode, this.config, this.statistician);
this.startRefreshOperation();
}
@Override
public final void startRefreshOperation()
{
this.refreshOperationTimer = new Timer(true);
refreshOperationTTask = new TimerTask()
{
@Override
public void run()
{
try
{
/* Runs a DHT RefreshOperation */
JKademliaNode.this.refresh();
}
catch (IOException e)
{
System.err.println("KademliaNode: Refresh Operation Failed; Message: " + e.getMessage());
}
}
};
refreshOperationTimer.schedule(refreshOperationTTask, this.config.restoreInterval(), this.config.restoreInterval());
}
@Override
public final void stopRefreshOperation()
{
/* Close off the timer tasks */
this.refreshOperationTTask.cancel();
this.refreshOperationTimer.cancel();
this.refreshOperationTimer.purge();
}
public JKademliaNode(String ownerId, Node node, int udpPort, KademliaRoutingTable routingTable, KadConfiguration config) throws IOException
{
this(
ownerId,
node,
udpPort,
new DHT(ownerId, config),
routingTable,
config
);
}
public JKademliaNode(String ownerId, Node node, int udpPort, KadConfiguration config) throws IOException
{
this(
ownerId,
node,
udpPort,
new JKademliaRoutingTable(node, config),
config
);
}
public JKademliaNode(String ownerId, KademliaId defaultId, int udpPort) throws IOException
{
this(
ownerId,
new Node(defaultId, InetAddress.getLocalHost(), udpPort),
udpPort,
new DefaultConfiguration()
);
}
/**
* Load Stored state using default configuration
*
* @param ownerId The ID of the owner for the stored state
*
* @return A Kademlia instance loaded from a stored state in a file
*
* @throws FileNotFoundException
* @throws ClassNotFoundException
*/
public static JKademliaNode loadFromFile(String ownerId) throws FileNotFoundException, IOException, ClassNotFoundException
{
return JKademliaNode.loadFromFile(ownerId, new DefaultConfiguration());
}
/**
* Load Stored state
*
* @param ownerId The ID of the owner for the stored state
* @param iconfig Configuration information to work with
*
* @return A Kademlia instance loaded from a stored state in a file
*
* @throws FileNotFoundException
* @throws ClassNotFoundException
*/
public static JKademliaNode loadFromFile(String ownerId, KadConfiguration iconfig) throws FileNotFoundException, IOException, ClassNotFoundException
{
DataInputStream din;
/**
* @section Read Basic Kad data
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "kad.kns"));
JKademliaNode ikad = new JsonSerializer<JKademliaNode>().read(din);
/**
* @section Read the routing table
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "routingtable.kns"));
KademliaRoutingTable irtbl = new JsonRoutingTableSerializer(iconfig).read(din);
/**
* @section Read the node state
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "node.kns"));
Node inode = new JsonSerializer<Node>().read(din);
/**
* @section Read the DHT
*/
din = new DataInputStream(new FileInputStream(getStateStorageFolderName(ownerId, iconfig) + File.separator + "dht.kns"));
KademliaDHT idht = new JsonDHTSerializer().read(din);
idht.setConfiguration(iconfig);
return new JKademliaNode(ownerId, inode, ikad.getPort(), idht, irtbl, iconfig);
}
@Override
public Node getNode()
{
return this.localNode;
}
@Override
public KadServer getServer()
{
return this.server;
}
@Override
public KademliaDHT getDHT()
{
return this.dht;
}
@Override
public KadConfiguration getCurrentConfiguration()
{
return this.config;
}
@Override
public synchronized final void bootstrap(Node n) throws IOException, RoutingException
{
long startTime = System.nanoTime();
Operation op = new ConnectOperation(this.server, this, n, this.config);
op.execute();
long endTime = System.nanoTime();
this.statistician.setBootstrapTime(endTime - startTime);
}
@Override
public int put(KadContent content) throws IOException
{
return this.put(new JKademliaStorageEntry(content));
}
@Override
public int put(JKademliaStorageEntry entry) throws IOException
{
StoreOperation sop = new StoreOperation(this.server, this, entry, this.dht, this.config);
sop.execute();
/* Return how many nodes the content was stored on */
return sop.numNodesStoredAt();
}
@Override
public void putLocally(KadContent content) throws IOException
{
this.dht.store(new JKademliaStorageEntry(content));
}
@Override
public JKademliaStorageEntry get(GetParameter param) throws NoSuchElementException, IOException, ContentNotFoundException
{
if (this.dht.contains(param))
{
/* If the content exist in our own DHT, then return it. */
return this.dht.get(param);
}
/* Seems like it doesn't exist in our DHT, get it from other Nodes */
long startTime = System.nanoTime();
ContentLookupOperation clo = new ContentLookupOperation(server, this, param, this.config);
clo.execute();
long endTime = System.nanoTime();
this.statistician.addContentLookup(endTime - startTime, clo.routeLength(), clo.isContentFound());
return clo.getContentFound();
}
@Override
public void refresh() throws IOException
{
new KadRefreshOperation(this.server, this, this.dht, this.config).execute();
}
@Override
public String getOwnerId()
{
return this.ownerId;
}
@Override
public int getPort()
{
return this.udpPort;
}
@Override
public void shutdown(final boolean saveState) throws IOException
{
/* Shut down the server */
this.server.shutdown();
this.stopRefreshOperation();
/* Save this Kademlia instance's state if required */
if (saveState)
{
/* Save the system state */
this.saveKadState();
}
}
@Override
public void saveKadState() throws IOException
{
DataOutputStream dout;
/**
* @section Store Basic Kad data
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "kad.kns"));
new JsonSerializer<JKademliaNode>().write(this, dout);
/**
* @section Save the node state
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "node.kns"));
new JsonSerializer<Node>().write(this.localNode, dout);
/**
* @section Save the routing table
* We need to save the routing table separate from the node since the routing table will contain the node and the node will contain the routing table
* This will cause a serialization recursion, and in turn a Stack Overflow
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "routingtable.kns"));
new JsonRoutingTableSerializer(this.config).write(this.getRoutingTable(), dout);
/**
* @section Save the DHT
*/
dout = new DataOutputStream(new FileOutputStream(getStateStorageFolderName(this.ownerId, this.config) + File.separator + "dht.kns"));
new JsonDHTSerializer().write(this.dht, dout);
}
/**
* Get the name of the folder for which a content should be stored
*
* @return String The name of the folder to store node states
*/
private static String getStateStorageFolderName(String ownerId, KadConfiguration iconfig)
{
/* Setup the nodes storage folder if it doesn't exist */
String path = iconfig.getNodeDataFolder(ownerId) + File.separator + "nodeState";
File nodeStateFolder = new File(path);
if (!nodeStateFolder.isDirectory())
{
nodeStateFolder.mkdir();
}
return nodeStateFolder.toString();
}
@Override
public KademliaRoutingTable getRoutingTable()
{
return this.routingTable;
}
@Override
public KadStatistician getStatistician()
{
return this.statistician;
}
/**
* Creates a string containing all data about this Kademlia instance
*
* @return The string representation of this Kad instance
*/
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("\n\nPrinting Kad State for instance with owner: ");
sb.append(this.ownerId);
sb.append("\n\n");
sb.append("\n");
sb.append("Local Node");
sb.append(this.localNode);
sb.append("\n");
sb.append("\n");
sb.append("Routing Table: ");
sb.append(this.getRoutingTable());
sb.append("\n");
sb.append("\n");
sb.append("DHT: ");
sb.append(this.dht);
sb.append("\n");
sb.append("\n\n\n");
return sb.toString();
}
}

View File

@ -1,63 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
/**
* Interface that defines a KadConfiguration object
*
* @author Joshua Kissoon
* @since 20140329
*/
public interface KadConfiguration
{
/**
* @return Interval in milliseconds between execution of RestoreOperations.
*/
public long restoreInterval();
/**
* If no reply received from a node in this period (in milliseconds)
* consider the node unresponsive.
*
* @return The time it takes to consider a node unresponsive
*/
public long responseTimeout();
/**
* @return Maximum number of milliseconds for performing an operation.
*/
public long operationTimeout();
/**
* @return Maximum number of concurrent messages in transit.
*/
public int maxConcurrentMessagesTransiting();
/**
* @return K-Value used throughout Kademlia
*/
public int k();
/**
* @return Size of replacement cache.
*/
public int replacementCacheSize();
/**
* @return # of times a node can be marked as stale before it is actually removed.
*/
public int stale();
/**
* Creates the folder in which this node data is to be stored.
*
* @param ownerId
*
* @return The folder path
*/
public String getNodeDataFolder(String ownerId);
/**
* @return Whether we're in a testing or production system.
*/
public boolean isTesting();
}

View File

@ -1,356 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.KadServerDownException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.KademliaMessageFactory;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Receiver;
/**
* The server that handles sending and receiving messages between nodes on the Kad Network
*
* @author Joshua Kissoon
* @created 20140215
*/
public class KadServer
{
/* Maximum size of a Datagram Packet */
private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB
/* Basic Kad Objects */
private final transient KadConfiguration config;
/* Server Objects */
private final DatagramSocket socket;
private transient boolean isRunning;
private final Map<Integer, Receiver> receivers;
private final Timer timer; // Schedule future tasks
private final Map<Integer, TimerTask> tasks; // Keep track of scheduled tasks
private final Node localNode;
/* Factories */
private final KademliaMessageFactory messageFactory;
private final KadStatistician statistician;
{
isRunning = true;
this.tasks = new HashMap<>();
this.receivers = new HashMap<>();
this.timer = new Timer(true);
}
/**
* Initialize our KadServer
*
* @param udpPort The port to listen on
* @param mFactory Factory used to create messages
* @param localNode Local node on which this server runs on
* @param config
* @param statistician A statistician to manage the server statistics
*
* @throws SocketException
*/
public KadServer(int udpPort, KademliaMessageFactory mFactory, Node localNode, KadConfiguration config, KadStatistician statistician) throws SocketException
{
this.config = config;
this.socket = new DatagramSocket(udpPort);
this.localNode = localNode;
this.messageFactory = mFactory;
this.statistician = statistician;
/* Start listening for incoming requests in a new thread */
this.startListener();
}
/**
* Starts the listener to listen for incoming messages
*/
private void startListener()
{
new Thread()
{
@Override
public void run()
{
listen();
}
}.start();
}
/**
* Sends a message
*
* @param msg The message to send
* @param to The node to send the message to
* @param recv The receiver to handle the response message
*
* @return Integer The communication ID of this message
*
* @throws IOException
* @throws KadServerDownException
*/
public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException, KadServerDownException
{
if (!isRunning)
{
throw new KadServerDownException(this.localNode + " - Kad Server is not running.");
}
/* Generate a random communication ID */
int comm = new Random().nextInt();
/* If we have a receiver */
if (recv != null)
{
try
{
/* Setup the receiver to handle message response */
receivers.put(comm, recv);
TimerTask task = new TimeoutTask(comm, recv);
timer.schedule(task, this.config.responseTimeout());
tasks.put(comm, task);
}
catch (IllegalStateException ex)
{
/* The timer is already cancelled so we cannot do anything here really */
}
}
/* Send the message */
sendMessage(to, msg, comm);
return comm;
}
/**
* Method called to reply to a message received
*
* @param to The Node to send the reply to
* @param msg The reply message
* @param comm The communication ID - the one received
*
* @throws IOException
*/
public synchronized void reply(Node to, Message msg, int comm) throws IOException
{
if (!isRunning)
{
throw new IllegalStateException("Kad Server is not running.");
}
sendMessage(to, msg, comm);
}
/**
* Internal sendMessage method called by the public sendMessage method after a communicationId is generated
*/
private void sendMessage(Node to, Message msg, int comm) throws IOException
{
/* Use a try-with resource to auto-close streams after usage */
try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout);)
{
/* Setup the message for transmission */
dout.writeInt(comm);
dout.writeByte(msg.code());
msg.toStream(dout);
dout.close();
byte[] data = bout.toByteArray();
if (data.length > DATAGRAM_BUFFER_SIZE)
{
throw new IOException("Message is too big");
}
/* Everything is good, now create the packet and send it */
DatagramPacket pkt = new DatagramPacket(data, 0, data.length);
pkt.setSocketAddress(to.getSocketAddress());
socket.send(pkt);
/* Lets inform the statistician that we've sent some data */
this.statistician.sentData(data.length);
}
}
/**
* Listen for incoming messages in a separate thread
*/
private void listen()
{
try
{
while (isRunning)
{
try
{
/* Wait for a packet */
byte[] buffer = new byte[DATAGRAM_BUFFER_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
/* Lets inform the statistician that we've received some data */
this.statistician.receivedData(packet.getLength());
if (this.config.isTesting())
{
/**
* Simulating network latency
* We pause for 1 millisecond/100 bytes
*/
int pause = packet.getLength() / 100;
try
{
Thread.sleep(pause);
}
catch (InterruptedException ex)
{
}
}
/* We've received a packet, now handle it */
try (ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
DataInputStream din = new DataInputStream(bin);)
{
/* Read in the conversation Id to know which handler to handle this response */
int comm = din.readInt();
byte messCode = din.readByte();
Message msg = messageFactory.createMessage(messCode, din);
din.close();
/* Get a receiver for this message */
Receiver receiver;
if (this.receivers.containsKey(comm))
{
/* If there is a reciever in the receivers to handle this */
synchronized (this)
{
receiver = this.receivers.remove(comm);
TimerTask task = (TimerTask) tasks.remove(comm);
if (task != null)
{
task.cancel();
}
}
}
else
{
/* There is currently no receivers, try to get one */
receiver = messageFactory.createReceiver(messCode, this);
}
/* Invoke the receiver */
if (receiver != null)
{
receiver.receive(msg, comm);
}
}
}
catch (IOException e)
{
//this.isRunning = false;
System.err.println("Server ran into a problem in listener method. Message: " + e.getMessage());
}
}
}
finally
{
if (!socket.isClosed())
{
socket.close();
}
this.isRunning = false;
}
}
/**
* Remove a conversation receiver
*
* @param comm The id of this conversation
*/
private synchronized void unregister(int comm)
{
receivers.remove(comm);
this.tasks.remove(comm);
}
/**
* Stops listening and shuts down the server
*/
public synchronized void shutdown()
{
this.isRunning = false;
this.socket.close();
timer.cancel();
}
/**
* Task that gets called by a separate thread if a timeout for a receiver occurs.
* When a reply arrives this task must be canceled using the <code>cancel()</code>
* method inherited from <code>TimerTask</code>. In this case the caller is
* responsible for removing the task from the <code>tasks</code> map.
* */
class TimeoutTask extends TimerTask
{
private final int comm;
private final Receiver recv;
public TimeoutTask(int comm, Receiver recv)
{
this.comm = comm;
this.recv = recv;
}
@Override
public void run()
{
if (!KadServer.this.isRunning)
{
return;
}
try
{
unregister(comm);
recv.timeout(comm);
}
catch (IOException e)
{
System.err.println("Cannot unregister a receiver. Message: " + e.getMessage());
}
}
}
public void printReceivers()
{
for (Integer r : this.receivers.keySet())
{
System.out.println("Receiver for comm: " + r + "; Receiver: " + this.receivers.get(r));
}
}
public boolean isRunning()
{
return this.isRunning;
}
}

View File

@ -1,87 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
/**
* Specification for class that keeps statistics for a Kademlia instance.
*
* These statistics are temporary and will be lost when Kad is shut down.
*
* @author Joshua Kissoon
* @since 20140507
*/
public interface KadStatistician
{
/**
* Used to indicate some data is sent
*
* @param size The size of the data sent
*/
public void sentData(long size);
/**
* @return The total data sent in KiloBytes
*/
public long getTotalDataSent();
/**
* Used to indicate some data was received
*
* @param size The size of the data received
*/
public void receivedData(long size);
/**
* @return The total data received in KiloBytes
*/
public long getTotalDataReceived();
/**
* Sets the bootstrap time for this Kademlia Node
*
* @param time The bootstrap time in nanoseconds
*/
public void setBootstrapTime(long time);
/**
* @return How long the system took to bootstrap in milliseconds
*/
public long getBootstrapTime();
/**
* Add the timing for a new content lookup operation that took place
*
* @param time The time the content lookup took in nanoseconds
* @param routeLength The length of the route it took to get the content
* @param isSuccessful Whether the content lookup was successful or not
*/
public void addContentLookup(long time, int routeLength, boolean isSuccessful);
/**
* @return The total number of content lookups performed.
*/
public int numContentLookups();
/**
* @return How many content lookups have failed.
*/
public int numFailedContentLookups();
/**
* @return The total time spent on content lookups.
*/
public long totalContentLookupTime();
/**
* Compute the average time a content lookup took
*
* @return The average time in milliseconds
*/
public double averageContentLookupTime();
/**
* Compute the average route length of content lookup operations.
*
* @return The average route length
*/
public double averageContentLookupRouteLength();
}

View File

@ -1,154 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
import java.io.IOException;
import java.util.NoSuchElementException;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.GetParameter;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KadContent;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.KademliaRoutingTable;
/**
* The main Kademlia Node on the network, this node manages everything for this local system.
*
* @author Joshua Kissoon
* @since 20140523
*
*/
public interface KademliaNode
{
/**
* Schedule the recurring refresh operation
*/
public void startRefreshOperation();
/**
* Stop the recurring refresh operation
*/
public void stopRefreshOperation();
/**
* @return Node The local node for this system
*/
public Node getNode();
/**
* @return The KadServer used to send/receive messages
*/
public KadServer getServer();
/**
* @return The DHT for this kad instance
*/
public KademliaDHT getDHT();
/**
* @return The current KadConfiguration object being used
*/
public KadConfiguration getCurrentConfiguration();
/**
* Connect to an existing peer-to-peer network.
*
* @param n The known node in the peer-to-peer network
*
* @throws RoutingException If the bootstrap node could not be contacted
* @throws IOException If a network error occurred
* @throws IllegalStateException If this object is closed
* */
public void bootstrap(Node n) throws IOException, RoutingException;
/**
* Stores the specified value under the given key
* This value is stored on K nodes on the network, or all nodes if there are > K total nodes in the network
*
* @param content The content to put onto the DHT
*
* @return Integer How many nodes the content was stored on
*
* @throws IOException
*
*/
public int put(KadContent content) throws IOException;
/**
* Stores the specified value under the given key
* This value is stored on K nodes on the network, or all nodes if there are > K total nodes in the network
*
* @param entry The StorageEntry with the content to put onto the DHT
*
* @return Integer How many nodes the content was stored on
*
* @throws IOException
*
*/
public int put(JKademliaStorageEntry entry) throws IOException;
/**
* Store a content on the local node's DHT
*
* @param content The content to put on the DHT
*
* @throws IOException
*/
public void putLocally(KadContent content) throws IOException;
/**
* Get some content stored on the DHT
*
* @param param The parameters used to search for the content
*
* @return DHTContent The content
*
* @throws IOException
* @throws ContentNotFoundException
*/
public JKademliaStorageEntry get(GetParameter param) throws NoSuchElementException, IOException, ContentNotFoundException;
/**
* Allow the user of the System to call refresh even out of the normal Kad refresh timing
*
* @throws IOException
*/
public void refresh() throws IOException;
/**
* @return String The ID of the owner of this local network
*/
public String getOwnerId();
/**
* @return Integer The port on which this kad instance is running
*/
public int getPort();
/**
* Here we handle properly shutting down the Kademlia instance
*
* @param saveState Whether to save the application state or not
*
* @throws java.io.FileNotFoundException
*/
public void shutdown(final boolean saveState) throws IOException;
/**
* Saves the node state to a text file
*
* @throws java.io.FileNotFoundException
*/
public void saveKadState() throws IOException;
/**
* @return The routing table for this node.
*/
public KademliaRoutingTable getRoutingTable();
/**
* @return The statistician that manages all statistics
*/
public KadStatistician getStatistician();
}

View File

@ -1,182 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia;
import java.text.DecimalFormat;
/**
* Class that keeps statistics for this Kademlia instance.
*
* These statistics are temporary and will be lost when Kad is shut down.
*
* @author Joshua Kissoon
* @since 20140505
*/
public class Statistician implements KadStatistician
{
/* How much data was sent and received by the server over the network */
private long totalDataSent, totalDataReceived;
private long numDataSent, numDataReceived;
/* Bootstrap timings */
private long bootstrapTime;
/* Content lookup operation timing & route length */
private int numContentLookups, numFailedContentLookups;
private long totalContentLookupTime;
private long totalRouteLength;
{
this.totalDataSent = 0;
this.totalDataReceived = 0;
this.bootstrapTime = 0;
this.numContentLookups = 0;
this.totalContentLookupTime = 0;
this.totalRouteLength = 0;
}
@Override
public void sentData(long size)
{
this.totalDataSent += size;
this.numDataSent++;
}
@Override
public long getTotalDataSent()
{
if (this.totalDataSent == 0)
{
return 0L;
}
return this.totalDataSent / 1000L;
}
@Override
public void receivedData(long size)
{
this.totalDataReceived += size;
this.numDataReceived++;
}
@Override
public long getTotalDataReceived()
{
if (this.totalDataReceived == 0)
{
return 0L;
}
return this.totalDataReceived / 1000L;
}
@Override
public void setBootstrapTime(long time)
{
this.bootstrapTime = time;
}
@Override
public long getBootstrapTime()
{
return this.bootstrapTime / 1000000L;
}
@Override
public void addContentLookup(long time, int routeLength, boolean isSuccessful)
{
if (isSuccessful)
{
this.numContentLookups++;
this.totalContentLookupTime += time;
this.totalRouteLength += routeLength;
}
else
{
this.numFailedContentLookups++;
}
}
@Override
public int numContentLookups()
{
return this.numContentLookups;
}
@Override
public int numFailedContentLookups()
{
return this.numFailedContentLookups;
}
@Override
public long totalContentLookupTime()
{
return this.totalContentLookupTime;
}
@Override
public double averageContentLookupTime()
{
if (this.numContentLookups == 0)
{
return 0D;
}
double avg = (double) ((double) this.totalContentLookupTime / (double) this.numContentLookups) / 1000000D;
DecimalFormat df = new DecimalFormat("#.00");
return new Double(df.format(avg));
}
@Override
public double averageContentLookupRouteLength()
{
if (this.numContentLookups == 0)
{
return 0D;
}
double avg = (double) ((double) this.totalRouteLength / (double) this.numContentLookups);
DecimalFormat df = new DecimalFormat("#.00");
return new Double(df.format(avg));
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("Statistician: [");
sb.append("Bootstrap Time: ");
sb.append(this.getBootstrapTime());
sb.append("; ");
sb.append("Data Sent: ");
sb.append("(");
sb.append(this.numDataSent);
sb.append(") ");
sb.append(this.getTotalDataSent());
sb.append(" bytes; ");
sb.append("Data Received: ");
sb.append("(");
sb.append(this.numDataReceived);
sb.append(") ");
sb.append(this.getTotalDataReceived());
sb.append(" bytes; ");
sb.append("Num Content Lookups: ");
sb.append(this.numContentLookups());
sb.append("; ");
sb.append("Avg Content Lookup Time: ");
sb.append(this.averageContentLookupTime());
sb.append("; ");
sb.append("Avg Content Lookup Route Lth: ");
sb.append(this.averageContentLookupRouteLength());
sb.append("; ");
sb.append("]");
return sb.toString();
}
}

View File

@ -1,265 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentExistException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonSerializer;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.KadSerializer;
/**
* The main Distributed Hash Table class that manages the entire DHT
*
* @author Joshua Kissoon
* @since 20140226
*/
public class DHT implements KademliaDHT
{
private transient StoredContentManager contentManager;
private transient KadSerializer<JKademliaStorageEntry> serializer = null;
private transient KadConfiguration config;
private final String ownerId;
public DHT(String ownerId, KadConfiguration config)
{
this.ownerId = ownerId;
this.config = config;
this.initialize();
}
@Override
public final void initialize()
{
contentManager = new StoredContentManager();
}
@Override
public void setConfiguration(KadConfiguration con)
{
this.config = con;
}
@Override
public KadSerializer<JKademliaStorageEntry> getSerializer()
{
if (null == serializer)
{
serializer = new JsonSerializer<>();
}
return serializer;
}
@Override
public boolean store(JKademliaStorageEntry content) throws IOException
{
/* Lets check if we have this content and it's the updated version */
if (this.contentManager.contains(content.getContentMetadata()))
{
KademliaStorageEntryMetadata current = this.contentManager.get(content.getContentMetadata());
/* update the last republished time */
current.updateLastRepublished();
if (current.getLastUpdatedTimestamp() >= content.getContentMetadata().getLastUpdatedTimestamp())
{
/* We have the current content, no need to update it! just leave this method now */
return false;
}
else
{
/* We have this content, but not the latest version, lets delete it so the new version will be added below */
try
{
//System.out.println("Removing older content to update it");
this.remove(content.getContentMetadata());
}
catch (ContentNotFoundException ex)
{
/* This won't ever happen at this point since we only get here if the content is found, lets ignore it */
}
}
}
/**
* If we got here means we don't have this content, or we need to update the content
* If we need to update the content, the code above would've already deleted it, so we just need to re-add it
*/
try
{
//System.out.println("Adding new content.");
/* Keep track of this content in the entries manager */
KademliaStorageEntryMetadata sEntry = this.contentManager.put(content.getContentMetadata());
/* Now we store the content locally in a file */
String contentStorageFolder = this.getContentStorageFolderName(content.getContentMetadata().getKey());
try (FileOutputStream fout = new FileOutputStream(contentStorageFolder + File.separator + sEntry.hashCode() + ".kct");
DataOutputStream dout = new DataOutputStream(fout))
{
this.getSerializer().write(content, dout);
}
return true;
}
catch (ContentExistException e)
{
/**
* Content already exist on the DHT
* This won't happen because above takes care of removing the content if it's older and needs to be updated,
* or returning if we already have the current content version.
*/
return false;
}
}
@Override
public boolean store(KadContent content) throws IOException
{
return this.store(new JKademliaStorageEntry(content));
}
@Override
public JKademliaStorageEntry retrieve(KademliaId key, int hashCode) throws FileNotFoundException, IOException, ClassNotFoundException
{
String folder = this.getContentStorageFolderName(key);
DataInputStream din = new DataInputStream(new FileInputStream(folder + File.separator + hashCode + ".kct"));
return this.getSerializer().read(din);
}
@Override
public boolean contains(GetParameter param)
{
return this.contentManager.contains(param);
}
@Override
public JKademliaStorageEntry get(KademliaStorageEntryMetadata entry) throws IOException, NoSuchElementException
{
try
{
return this.retrieve(entry.getKey(), entry.hashCode());
}
catch (FileNotFoundException e)
{
System.err.println("Error while loading file for content. Message: " + e.getMessage());
}
catch (ClassNotFoundException e)
{
System.err.println("The class for some content was not found. Message: " + e.getMessage());
}
/* If we got here, means we got no entries */
throw new NoSuchElementException();
}
@Override
public JKademliaStorageEntry get(GetParameter param) throws NoSuchElementException, IOException
{
/* Load a KadContent if any exist for the given criteria */
try
{
KademliaStorageEntryMetadata e = this.contentManager.get(param);
return this.retrieve(e.getKey(), e.hashCode());
}
catch (FileNotFoundException e)
{
System.err.println("Error while loading file for content. Message: " + e.getMessage());
}
catch (ClassNotFoundException e)
{
System.err.println("The class for some content was not found. Message: " + e.getMessage());
}
/* If we got here, means we got no entries */
throw new NoSuchElementException();
}
@Override
public void remove(KadContent content) throws ContentNotFoundException
{
this.remove(new StorageEntryMetadata(content));
}
@Override
public void remove(KademliaStorageEntryMetadata entry) throws ContentNotFoundException
{
String folder = this.getContentStorageFolderName(entry.getKey());
File file = new File(folder + File.separator + entry.hashCode() + ".kct");
contentManager.remove(entry);
if (file.exists())
{
file.delete();
}
else
{
throw new ContentNotFoundException();
}
}
/**
* Get the name of the folder for which a content should be stored
*
* @param key The key of the content
*
* @return String The name of the folder
*/
private String getContentStorageFolderName(KademliaId key)
{
/**
* Each content is stored in a folder named after the first 2 characters of the NodeId
*
* The name of the file containing the content is the hash of this content
*/
String folderName = key.hexRepresentation().substring(0, 2);
File contentStorageFolder = new File(this.config.getNodeDataFolder(ownerId) + File.separator + folderName);
/* Create the content folder if it doesn't exist */
if (!contentStorageFolder.isDirectory())
{
contentStorageFolder.mkdir();
}
return contentStorageFolder.toString();
}
@Override
public List<KademliaStorageEntryMetadata> getStorageEntries()
{
return contentManager.getAllEntries();
}
@Override
public void putStorageEntries(List<KademliaStorageEntryMetadata> ientries)
{
for (KademliaStorageEntryMetadata e : ientries)
{
try
{
this.contentManager.put(e);
}
catch (ContentExistException ex)
{
/* Entry already exist, no need to store it again */
}
}
}
@Override
public synchronized String toString()
{
return this.contentManager.toString();
}
}

View File

@ -1,117 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* A GET request can get content based on Key, Owner, Type, etc
*
* This is a class containing the parameters to be passed in a GET request
*
* We use a class since the number of filtering parameters can change later
*
* @author Joshua Kissoon
* @since 20140224
*/
public class GetParameter
{
private KademliaId key;
private String ownerId = null;
private String type = null;
/**
* Construct a GetParameter to search for data by NodeId and owner
*
* @param key
* @param type
*/
public GetParameter(KademliaId key, String type)
{
this.key = key;
this.type = type;
}
/**
* Construct a GetParameter to search for data by NodeId, owner, type
*
* @param key
* @param type
* @param owner
*/
public GetParameter(KademliaId key, String type, String owner)
{
this(key, type);
this.ownerId = owner;
}
/**
* Construct our get parameter from a Content
*
* @param c
*/
public GetParameter(KadContent c)
{
this.key = c.getKey();
if (c.getType() != null)
{
this.type = c.getType();
}
if (c.getOwnerId() != null)
{
this.ownerId = c.getOwnerId();
}
}
/**
* Construct our get parameter from a StorageEntryMeta data
*
* @param md
*/
public GetParameter(KademliaStorageEntryMetadata md)
{
this.key = md.getKey();
if (md.getType() != null)
{
this.type = md.getType();
}
if (md.getOwnerId() != null)
{
this.ownerId = md.getOwnerId();
}
}
public KademliaId getKey()
{
return this.key;
}
public void setOwnerId(String ownerId)
{
this.ownerId = ownerId;
}
public String getOwnerId()
{
return this.ownerId;
}
public void setType(String type)
{
this.type = type;
}
public String getType()
{
return this.type;
}
@Override
public String toString()
{
return "GetParameter - [Key: " + key + "][Owner: " + this.ownerId + "][Type: " + this.type + "]";
}
}

View File

@ -1,59 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
/**
* A JKademliaStorageEntry class that is used to store a content on the DHT
*
* @author Joshua Kissoon
* @since 20140402
*/
public class JKademliaStorageEntry implements KademliaStorageEntry
{
private String content;
private final StorageEntryMetadata metadata;
public JKademliaStorageEntry(final KadContent content)
{
this(content, new StorageEntryMetadata(content));
}
public JKademliaStorageEntry(final KadContent content, final StorageEntryMetadata metadata)
{
this.setContent(content.toSerializedForm());
this.metadata = metadata;
}
@Override
public final void setContent(final byte[] data)
{
this.content = new String(data);
}
@Override
public final byte[] getContent()
{
return this.content.getBytes();
}
@Override
public final KademliaStorageEntryMetadata getContentMetadata()
{
return this.metadata;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("[StorageEntry: ");
sb.append("[Content: ");
sb.append(this.getContent());
sb.append("]");
sb.append(this.getContentMetadata());
sb.append("]");
return sb.toString();
}
}

View File

@ -1,65 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Any piece of content that needs to be stored on the DHT
*
* @author Joshua Kissoon
*
* @since 20140224
*/
public interface KadContent
{
/**
* @return NodeId The DHT key for this content
*/
public KademliaId getKey();
/**
* @return String The type of content
*/
public String getType();
/**
* Each content will have an created date
* This allows systems to know when to delete a content form his/her machine
*
* @return long The create date of this content
*/
public long getCreatedTimestamp();
/**
* Each content will have an update timestamp
* This allows the DHT to keep only the latest version of a content
*
* @return long The timestamp of when this content was last updated
*/
public long getLastUpdatedTimestamp();
/**
* @return The ID of the owner of this content
*/
public String getOwnerId();
/**
* Each content needs to be in byte format for transporting and storage,
* this method takes care of that.
*
* Each object is responsible for transforming itself to byte format since the
* structure of methods may differ.
*
* @return The content in byte format
*/
public byte[] toSerializedForm();
/**
* Given the Content in byte format, read it
*
* @param data The object in byte format
*
* @return A new object from the given
*/
public KadContent fromSerializedForm(byte[] data);
}

View File

@ -1,122 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.KadSerializer;
/**
* The main Distributed Hash Table interface that manages the entire DHT
*
* @author Joshua Kissoon
* @since 20140523
*/
public interface KademliaDHT
{
/**
* Initialize this DHT to it's default state
*/
public void initialize();
/**
* Set a new configuration. Mainly used when we restore the DHT state from a file
*
* @param con The new configuration file
*/
public void setConfiguration(KadConfiguration con);
/**
* Creates a new Serializer or returns an existing serializer
*
* @return The new ContentSerializer
*/
public KadSerializer<JKademliaStorageEntry> getSerializer();
/**
* Handle storing content locally
*
* @param content The DHT content to store
*
* @return boolean true if we stored the content, false if the content already exists and is up to date
*
* @throws IOException
*/
public boolean store(JKademliaStorageEntry content) throws IOException;
public boolean store(KadContent content) throws IOException;
/**
* Retrieves a Content from local storage
*
* @param key The Key of the content to retrieve
* @param hashCode The hash code of the content to retrieve
*
* @return A KadContent object
*
* @throws FileNotFoundException
* @throws ClassNotFoundException
*/
public JKademliaStorageEntry retrieve(KademliaId key, int hashCode) throws FileNotFoundException, IOException, ClassNotFoundException;
/**
* Check if any content for the given criteria exists in this DHT
*
* @param param The content search criteria
*
* @return boolean Whether any content exist that satisfy the criteria
*/
public boolean contains(GetParameter param);
/**
* Retrieve and create a KadContent object given the StorageEntry object
*
* @param entry The StorageEntry used to retrieve this content
*
* @return KadContent The content object
*
* @throws IOException
*/
public JKademliaStorageEntry get(KademliaStorageEntryMetadata entry) throws IOException, NoSuchElementException;
/**
* Get the StorageEntry for the content if any exist.
*
* @param param The parameters used to filter the content needed
*
* @return KadContent A KadContent found on the DHT satisfying the given criteria
*
* @throws IOException
*/
public JKademliaStorageEntry get(GetParameter param) throws NoSuchElementException, IOException;
/**
* Delete a content from local storage
*
* @param content The Content to Remove
*
*
* @throws ContentNotFoundException
*/
public void remove(KadContent content) throws ContentNotFoundException;
public void remove(KademliaStorageEntryMetadata entry) throws ContentNotFoundException;
/**
* @return A List of all StorageEntries for this node
*/
public List<KademliaStorageEntryMetadata> getStorageEntries();
/**
* Used to add a list of storage entries for existing content to the DHT.
* Mainly used when retrieving StorageEntries from a saved state file.
*
* @param ientries The entries to add
*/
public void putStorageEntries(List<KademliaStorageEntryMetadata> ientries);
}

View File

@ -1,32 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
/**
* A StorageEntry interface for the storage entry class used to store a content on the DHT
*
* @author Joshua Kissoon
* @since 20140523
*/
public interface KademliaStorageEntry
{
/**
* Add the content to the storage entry
*
* @param data The content data in byte[] format
*/
public void setContent(final byte[] data);
/**
* Get the content from this storage entry
*
* @return The content in byte format
*/
public byte[] getContent();
/**
* Get the metadata for this storage entry
*
* @return the storage entry metadata
*/
public KademliaStorageEntryMetadata getContentMetadata();
}

View File

@ -1,59 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Keeps track of data for a Content stored in the DHT
* Used by the StorageEntryManager class
*
* @author Joshua Kissoon
* @since 20140226
*/
public interface KademliaStorageEntryMetadata
{
/**
* @return The Kademlia ID of this content
*/
public KademliaId getKey();
/**
* @return The content's owner ID
*/
public String getOwnerId();
/**
* @return The type of this content
*/
public String getType();
/**
* @return A hash of the content
*/
public int getContentHash();
/**
* @return The last time this content was updated
*/
public long getLastUpdatedTimestamp();
/**
* When a node is looking for content, he sends the search criteria in a GetParameter object
* Here we take this GetParameter object and check if this StorageEntry satisfies the given parameters
*
* @param params
*
* @return boolean Whether this content satisfies the parameters
*/
public boolean satisfiesParameters(GetParameter params);
/**
* @return The timestamp for the last time this content was republished
*/
public long lastRepublished();
/**
* Whenever we republish a content or get this content from the network, we update the last republished time
*/
public void updateLastRepublished();
}

View File

@ -1,152 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import java.util.Objects;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Keeps track of data for a Content stored in the DHT
* Used by the StorageEntryManager class
*
* @author Joshua Kissoon
* @since 20140226
*/
public class StorageEntryMetadata implements KademliaStorageEntryMetadata
{
private final KademliaId key;
private final String ownerId;
private final String type;
private final int contentHash;
private final long updatedTs;
/* This value is the last time this content was last updated from the network */
private long lastRepublished;
public StorageEntryMetadata(KadContent content)
{
this.key = content.getKey();
this.ownerId = content.getOwnerId();
this.type = content.getType();
this.contentHash = content.hashCode();
this.updatedTs = content.getLastUpdatedTimestamp();
this.lastRepublished = System.currentTimeMillis() / 1000L;
}
@Override
public KademliaId getKey()
{
return this.key;
}
@Override
public String getOwnerId()
{
return this.ownerId;
}
@Override
public String getType()
{
return this.type;
}
@Override
public int getContentHash()
{
return this.contentHash;
}
@Override
public long getLastUpdatedTimestamp()
{
return this.updatedTs;
}
/**
* When a node is looking for content, he sends the search criteria in a GetParameter object
* Here we take this GetParameter object and check if this StorageEntry satisfies the given parameters
*
* @param params
*
* @return boolean Whether this content satisfies the parameters
*/
@Override
public boolean satisfiesParameters(GetParameter params)
{
/* Check that owner id matches */
if ((params.getOwnerId() != null) && (!params.getOwnerId().equals(this.ownerId)))
{
return false;
}
/* Check that type matches */
if ((params.getType() != null) && (!params.getType().equals(this.type)))
{
return false;
}
/* Check that key matches */
if ((params.getKey() != null) && (!params.getKey().equals(this.key)))
{
return false;
}
return true;
}
@Override
public long lastRepublished()
{
return this.lastRepublished;
}
/**
* Whenever we republish a content or get this content from the network, we update the last republished time
*/
@Override
public void updateLastRepublished()
{
this.lastRepublished = System.currentTimeMillis() / 1000L;
}
@Override
public boolean equals(Object o)
{
if (o instanceof KademliaStorageEntryMetadata)
{
return this.hashCode() == o.hashCode();
}
return false;
}
@Override
public int hashCode()
{
int hash = 3;
hash = 23 * hash + Objects.hashCode(this.key);
hash = 23 * hash + Objects.hashCode(this.ownerId);
hash = 23 * hash + Objects.hashCode(this.type);
return hash;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("[StorageEntry: ");
sb.append("{Key: ");
sb.append(this.key);
sb.append("} ");
sb.append("{Owner: ");
sb.append(this.ownerId);
sb.append("} ");
sb.append("{Type: ");
sb.append(this.type);
sb.append("} ");
sb.append("]");
return sb.toString();
}
}

View File

@ -1,202 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.dht;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentExistException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* It would be infeasible to keep all content in memory to be send when requested
* Instead we store content into files
* We use this Class to keep track of all content stored
*
* @author Joshua Kissoon
* @since 20140226
*/
class StoredContentManager
{
private final Map<KademliaId, List<KademliaStorageEntryMetadata>> entries;
{
entries = new HashMap<>();
}
/**
* Add a new entry to our storage
*
* @param content The content to store a reference to
*/
public KademliaStorageEntryMetadata put(KadContent content) throws ContentExistException
{
return this.put(new StorageEntryMetadata(content));
}
/**
* Add a new entry to our storage
*
* @param entry The StorageEntry to store
*/
public KademliaStorageEntryMetadata put(KademliaStorageEntryMetadata entry) throws ContentExistException
{
if (!this.entries.containsKey(entry.getKey()))
{
this.entries.put(entry.getKey(), new ArrayList<>());
}
/* If this entry doesn't already exist, then we add it */
if (!this.contains(entry))
{
this.entries.get(entry.getKey()).add(entry);
return entry;
}
else
{
throw new ContentExistException("Content already exists on this DHT");
}
}
/**
* Checks if our DHT has a Content for the given criteria
*
* @param param The parameters used to search for a content
*
* @return boolean
*/
public synchronized boolean contains(GetParameter param)
{
if (this.entries.containsKey(param.getKey()))
{
/* Content with this key exist, check if any match the rest of the search criteria */
for (KademliaStorageEntryMetadata e : this.entries.get(param.getKey()))
{
/* If any entry satisfies the given parameters, return true */
if (e.satisfiesParameters(param))
{
return true;
}
}
}
else
{
}
return false;
}
/**
* Check if a content exist in the DHT
*/
public synchronized boolean contains(KadContent content)
{
return this.contains(new GetParameter(content));
}
/**
* Check if a StorageEntry exist on this DHT
*/
public synchronized boolean contains(KademliaStorageEntryMetadata entry)
{
return this.contains(new GetParameter(entry));
}
/**
* Checks if our DHT has a Content for the given criteria
*
* @param param The parameters used to search for a content
*
* @return List of content for the specific search parameters
*/
public KademliaStorageEntryMetadata get(GetParameter param) throws NoSuchElementException
{
if (this.entries.containsKey(param.getKey()))
{
/* Content with this key exist, check if any match the rest of the search criteria */
for (KademliaStorageEntryMetadata e : this.entries.get(param.getKey()))
{
/* If any entry satisfies the given parameters, return true */
if (e.satisfiesParameters(param))
{
return e;
}
}
/* If we got here, means we didn't find any entry */
throw new NoSuchElementException();
}
else
{
throw new NoSuchElementException("No content exist for the given parameters");
}
}
public KademliaStorageEntryMetadata get(KademliaStorageEntryMetadata md)
{
return this.get(new GetParameter(md));
}
/**
* @return A list of all storage entries
*/
public synchronized List<KademliaStorageEntryMetadata> getAllEntries()
{
List<KademliaStorageEntryMetadata> entriesRet = new ArrayList<>();
for (List<KademliaStorageEntryMetadata> entrySet : this.entries.values())
{
if (entrySet.size() > 0)
{
entriesRet.addAll(entrySet);
}
}
return entriesRet;
}
public void remove(KadContent content) throws ContentNotFoundException
{
this.remove(new StorageEntryMetadata(content));
}
public void remove(KademliaStorageEntryMetadata entry) throws ContentNotFoundException
{
if (contains(entry))
{
this.entries.get(entry.getKey()).remove(entry);
}
else
{
throw new ContentNotFoundException("This content does not exist in the Storage Entries");
}
}
@Override
public synchronized String toString()
{
StringBuilder sb = new StringBuilder("Stored Content: \n");
int count = 0;
for (List<KademliaStorageEntryMetadata> es : this.entries.values())
{
if (entries.size() < 1)
{
continue;
}
for (KademliaStorageEntryMetadata e : es)
{
sb.append(++count);
sb.append(". ");
sb.append(e);
sb.append("\n");
}
}
sb.append("\n");
return sb.toString();
}
}

View File

@ -1,21 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.exceptions;
/**
* An exception used to indicate that a content already exist on the DHT
*
* @author Joshua Kissoon
* @created 20140322
*/
public class ContentExistException extends Exception
{
public ContentExistException()
{
super();
}
public ContentExistException(String message)
{
super(message);
}
}

View File

@ -1,21 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.exceptions;
/**
* An exception used to indicate that a content does not exist on the DHT
*
* @author Joshua Kissoon
* @created 20140322
*/
public class ContentNotFoundException extends Exception
{
public ContentNotFoundException()
{
super();
}
public ContentNotFoundException(String message)
{
super(message);
}
}

View File

@ -1,21 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.exceptions;
/**
* An exception to be thrown whenever the Kad Server is down
*
* @author Joshua Kissoon
* @created 20140428
*/
public class KadServerDownException extends RoutingException
{
public KadServerDownException()
{
super();
}
public KadServerDownException(String message)
{
super(message);
}
}

View File

@ -1,23 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.exceptions;
import java.io.IOException;
/**
* An exception to be thrown whenever there is a routing problem
*
* @author Joshua Kissoon
* @created 20140219
*/
public class RoutingException extends IOException
{
public RoutingException()
{
super();
}
public RoutingException(String message)
{
super(message);
}
}

View File

@ -1,21 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.exceptions;
/**
* An exception used to indicate an unknown message type or communication identifier
*
* @author Joshua Kissoon
* @created 20140219
*/
public class UnknownMessageException extends RuntimeException
{
public UnknownMessageException()
{
super();
}
public UnknownMessageException(String message)
{
super(message);
}
}

View File

@ -1,59 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* A message used to acknowledge a request from a node; can be used in many situations.
* - Mainly used to acknowledge a connect message
*
* @author Joshua Kissoon
* @created 20140218
*/
public class AcknowledgeMessage implements Message
{
private Node origin;
public static final byte CODE = 0x01;
public AcknowledgeMessage(Node origin)
{
this.origin = origin;
}
public AcknowledgeMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
origin.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "AcknowledgeMessage[origin=" + origin.getNodeId() + "]";
}
}

View File

@ -1,58 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* A message sent to another node requesting to connect to them.
*
* @author Joshua Kissoon
* @created 20140218
*/
public class ConnectMessage implements Message
{
private Node origin;
public static final byte CODE = 0x02;
public ConnectMessage(Node origin)
{
this.origin = origin;
}
public ConnectMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
origin.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "ConnectMessage[origin NodeId=" + origin.getNodeId() + "]";
}
}

View File

@ -1,58 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
/**
* Receives a ConnectMessage and sends an AcknowledgeMessage as reply.
*
* @author Joshua Kissoon
* @created 20140219
*/
public class ConnectReceiver implements Receiver
{
private final KadServer server;
private final KademliaNode localNode;
public ConnectReceiver(KadServer server, KademliaNode local)
{
this.server = server;
this.localNode = local;
}
/**
* Handle receiving a ConnectMessage
*
* @param comm
*
* @throws IOException
*/
@Override
public void receive(Message incoming, int comm) throws IOException
{
ConnectMessage mess = (ConnectMessage) incoming;
/* Update the local space by inserting the origin node. */
this.localNode.getRoutingTable().insert(mess.getOrigin());
/* Respond to the connect request */
AcknowledgeMessage msg = new AcknowledgeMessage(this.localNode.getNode());
/* Reply to the connect message with an Acknowledgement */
this.server.reply(mess.getOrigin(), msg, comm);
}
/**
* We don't need to do anything here
*
* @param comm
*
* @throws IOException
*/
@Override
public void timeout(int comm) throws IOException
{
}
}

View File

@ -1,80 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.GetParameter;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonSerializer;
/**
* Messages used to send to another node requesting content.
*
* @author Joshua Kissoon
* @since 20140226
*/
public class ContentLookupMessage implements Message
{
public static final byte CODE = 0x03;
private Node origin;
private GetParameter params;
/**
* @param origin The node where this lookup came from
* @param params The parameters used to find the content
*/
public ContentLookupMessage(Node origin, GetParameter params)
{
this.origin = origin;
this.params = params;
}
public ContentLookupMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
public GetParameter getParameters()
{
return this.params;
}
public Node getOrigin()
{
return this.origin;
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
this.origin.toStream(out);
/* Write the params to the stream */
new JsonSerializer<GetParameter>().write(this.params, out);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
/* Read the params from the stream */
try
{
this.params = new JsonSerializer<GetParameter>().read(in);
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
}
@Override
public byte code()
{
return CODE;
}
}

View File

@ -1,69 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
import java.util.NoSuchElementException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
/**
* Responds to a ContentLookupMessage by sending a ContentMessage containing the requested content;
* if the requested content is not found, a NodeReplyMessage containing the K closest nodes to the request key is sent.
*
* @author Joshua Kissoon
* @since 20140226
*/
public class ContentLookupReceiver implements Receiver
{
private final KadServer server;
private final KademliaNode localNode;
private final KademliaDHT dht;
private final KadConfiguration config;
public ContentLookupReceiver(KadServer server, KademliaNode localNode, KademliaDHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
@Override
public void receive(Message incoming, int comm) throws IOException
{
ContentLookupMessage msg = (ContentLookupMessage) incoming;
this.localNode.getRoutingTable().insert(msg.getOrigin());
/* Check if we can have this data */
if (this.dht.contains(msg.getParameters()))
{
try
{
/* Return a ContentMessage with the required data */
ContentMessage cMsg = new ContentMessage(localNode.getNode(), this.dht.get(msg.getParameters()));
server.reply(msg.getOrigin(), cMsg, comm);
}
catch (NoSuchElementException ex)
{
/* @todo Not sure why this exception is thrown here, checkup the system when tests are writtem*/
}
}
else
{
/**
* Return a the K closest nodes to this content identifier
* We create a NodeLookupReceiver and let this receiver handle this operation
*/
NodeLookupMessage lkpMsg = new NodeLookupMessage(msg.getOrigin(), msg.getParameters().getKey());
new NodeLookupReceiver(server, localNode, this.config).receive(lkpMsg, comm);
}
}
@Override
public void timeout(int comm)
{
}
}

View File

@ -1,85 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonSerializer;
/**
* A Message used to send content between nodes
*
* @author Joshua Kissoon
* @since 20140226
*/
public class ContentMessage implements Message
{
public static final byte CODE = 0x04;
private JKademliaStorageEntry content;
private Node origin;
/**
* @param origin Where the message came from
* @param content The content to be stored
*
*/
public ContentMessage(Node origin, JKademliaStorageEntry content)
{
this.content = content;
this.origin = origin;
}
public ContentMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
this.origin.toStream(out);
/* Serialize the KadContent, then send it to the stream */
new JsonSerializer<JKademliaStorageEntry>().write(content, out);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
try
{
this.content = new JsonSerializer<JKademliaStorageEntry>().read(in);
}
catch (ClassNotFoundException e)
{
System.err.println("ClassNotFoundException when reading StorageEntry; Message: " + e.getMessage());
}
}
public Node getOrigin()
{
return this.origin;
}
public JKademliaStorageEntry getContent()
{
return this.content;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "ContentMessage[origin=" + origin + ",content=" + content + "]";
}
}

View File

@ -1,37 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
/**
* A factory that handles creating messages and receivers
*
* @author Joshua Kissoon
* @since 20140523
*/
public interface KademliaMessageFactory
{
/**
* Method that creates a message based on the code and input stream
*
* @param code The message code
* @param in An input stream with the message data
*
* @return A message
*
* @throws IOException
*/
public Message createMessage(byte code, DataInputStream in) throws IOException;
/**
* Method that returns a receiver to handle a specific type of message
*
* @param code The message code
* @param server
*
* @return A receiver
*/
public Receiver createReceiver(byte code, KadServer server);
}

View File

@ -1,14 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
public interface Message extends Streamable
{
/**
* The unique code for the message type, used to differentiate all messages
* from each other. Since this is of <code>byte</code> type there can
* be at most 256 different message types.
*
* @return byte A unique code representing the message type
* */
public byte code();
}

View File

@ -1,76 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
/**
* Handles creating messages and receivers
*
* @author Joshua Kissoon
* @since 20140202
*/
public class MessageFactory implements KademliaMessageFactory
{
private final KademliaNode localNode;
private final KademliaDHT dht;
private final KadConfiguration config;
public MessageFactory(KademliaNode local, KademliaDHT dht, KadConfiguration config)
{
this.localNode = local;
this.dht = dht;
this.config = config;
}
@Override
public Message createMessage(byte code, DataInputStream in) throws IOException
{
switch (code)
{
case AcknowledgeMessage.CODE:
return new AcknowledgeMessage(in);
case ConnectMessage.CODE:
return new ConnectMessage(in);
case ContentMessage.CODE:
return new ContentMessage(in);
case ContentLookupMessage.CODE:
return new ContentLookupMessage(in);
case NodeLookupMessage.CODE:
return new NodeLookupMessage(in);
case NodeReplyMessage.CODE:
return new NodeReplyMessage(in);
case SimpleMessage.CODE:
return new SimpleMessage(in);
case StoreContentMessage.CODE:
return new StoreContentMessage(in);
default:
//System.out.println(this.localNode + " - No Message handler found for message. Code: " + code);
return new SimpleMessage(in);
}
}
@Override
public Receiver createReceiver(byte code, KadServer server)
{
switch (code)
{
case ConnectMessage.CODE:
return new ConnectReceiver(server, this.localNode);
case ContentLookupMessage.CODE:
return new ContentLookupReceiver(server, this.localNode, this.dht, this.config);
case NodeLookupMessage.CODE:
return new NodeLookupReceiver(server, this.localNode, this.config);
case StoreContentMessage.CODE:
return new StoreContentReceiver(server, this.localNode, this.dht);
default:
//System.out.println("No receiver found for message. Code: " + code);
return new SimpleReceiver();
}
}
}

View File

@ -1,75 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* A message sent to other nodes requesting the K-Closest nodes to a key sent in this message.
*
* @author Joshua Kissoon
* @created 20140218
*/
public class NodeLookupMessage implements Message
{
private Node origin;
private KademliaId lookupId;
public static final byte CODE = 0x05;
/**
* A new NodeLookupMessage to find nodes
*
* @param origin The Node from which the message is coming from
* @param lookup The key for which to lookup nodes for
*/
public NodeLookupMessage(Node origin, KademliaId lookup)
{
this.origin = origin;
this.lookupId = lookup;
}
public NodeLookupMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
this.lookupId = new KademliaId(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
this.origin.toStream(out);
this.lookupId.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
public KademliaId getLookupId()
{
return this.lookupId;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "NodeLookupMessage[origin=" + origin + ",lookup=" + lookupId + "]";
}
}

View File

@ -1,72 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* Receives a NodeLookupMessage and sends a NodeReplyMessage as reply with the K-Closest nodes to the ID sent.
*
* @author Joshua Kissoon
* @created 20140219
*/
public class NodeLookupReceiver implements Receiver
{
private final KadServer server;
private final KademliaNode localNode;
private final KadConfiguration config;
public NodeLookupReceiver(KadServer server, KademliaNode local, KadConfiguration config)
{
this.server = server;
this.localNode = local;
this.config = config;
}
/**
* Handle receiving a NodeLookupMessage
* Find the set of K nodes closest to the lookup ID and return them
*
* @param comm
*
* @throws IOException
*/
@Override
public void receive(Message incoming, int comm) throws IOException
{
NodeLookupMessage msg = (NodeLookupMessage) incoming;
Node origin = msg.getOrigin();
/* Update the local space by inserting the origin node. */
this.localNode.getRoutingTable().insert(origin);
/* Find nodes closest to the LookupId */
List<Node> nodes = this.localNode.getRoutingTable().findClosest(msg.getLookupId(), this.config.k());
/* Respond to the NodeLookupMessage */
Message reply = new NodeReplyMessage(this.localNode.getNode(), nodes);
if (this.server.isRunning())
{
/* Let the Server send the reply */
this.server.reply(origin, reply, comm);
}
}
/**
* We don't need to do anything here
*
* @param comm
*
* @throws IOException
*/
@Override
public void timeout(int comm) throws IOException
{
}
}

View File

@ -1,94 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* A message used to connect nodes.
* When a NodeLookup Request comes in, we respond with a NodeReplyMessage.
*
* @author Joshua Kissoon
* @created 20140218
*/
public class NodeReplyMessage implements Message
{
private Node origin;
public static final byte CODE = 0x06;
private List<Node> nodes;
public NodeReplyMessage(Node origin, List<Node> nodes)
{
this.origin = origin;
this.nodes = nodes;
}
public NodeReplyMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
/* Read in the origin */
this.origin = new Node(in);
/* Get the number of incoming nodes */
int len = in.readInt();
this.nodes = new ArrayList<>(len);
/* Read in all nodes */
for (int i = 0; i < len; i++)
{
this.nodes.add(new Node(in));
}
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
/* Add the origin node to the stream */
origin.toStream(out);
/* Add all other nodes to the stream */
int len = this.nodes.size();
if (len > 255)
{
throw new IndexOutOfBoundsException("Too many nodes in list to send in NodeReplyMessage. Size: " + len);
}
/* Writing the nodes to the stream */
out.writeInt(len);
for (Node n : this.nodes)
{
n.toStream(out);
}
}
public Node getOrigin()
{
return this.origin;
}
@Override
public byte code()
{
return CODE;
}
public List<Node> getNodes()
{
return this.nodes;
}
@Override
public String toString()
{
return "NodeReplyMessage[origin NodeId=" + origin.getNodeId() + "]";
}
}

View File

@ -1,33 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
/**
* A receiver waits for incoming messages and perform some action when the message is received
*
* @author Joshua Kissoon
* @created 20140218
*/
public interface Receiver
{
/**
* Message is received, now handle it
*
* @param conversationId The ID of this conversation, used for further conversations
* @param incoming The incoming
*
* @throws IOException
*/
public void receive(Message incoming, int conversationId) throws IOException;
/**
* If no reply is received in <code>MessageServer.TIMEOUT</code> seconds for the
* message with communication id <code>comm</code>, the MessageServer calls this method
*
* @param conversationId The conversation ID of this communication
*
* @throws IOException if an I/O error occurs
* */
public void timeout(int conversationId) throws IOException;
}

View File

@ -1,72 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* A simple message used for testing the system; Default message constructed if the message type sent is not available
*
* @author Joshua Kissoon
* @created 20140217
*/
public class SimpleMessage implements Message
{
/* Message constants */
public static final byte CODE = 0x07;
private String content;
public SimpleMessage(String message)
{
this.content = message;
}
public SimpleMessage(DataInputStream in)
{
this.fromStream(in);
}
@Override
public byte code()
{
return CODE;
}
@Override
public void toStream(DataOutputStream out)
{
try
{
out.writeInt(this.content.length());
out.writeBytes(this.content);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public final void fromStream(DataInputStream in)
{
try
{
byte[] buff = new byte[in.readInt()];
in.readFully(buff);
this.content = new String(buff);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public String toString()
{
return this.content;
}
}

View File

@ -1,25 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
/**
* Default receiver if none other is called
*
* @author Joshua Kissoon
* @created 20140202
*/
public class SimpleReceiver implements Receiver
{
@Override
public void receive(Message incoming, int conversationId)
{
//System.out.println("Received message: " + incoming);
}
@Override
public void timeout(int conversationId) throws IOException
{
//System.out.println("SimpleReceiver message timeout.");
}
}

View File

@ -1,84 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.util.serializer.JsonSerializer;
/**
* A StoreContentMessage used to send a store message to a node
*
* @author Joshua Kissoon
* @since 20140225
*/
public class StoreContentMessage implements Message
{
public static final byte CODE = 0x08;
private JKademliaStorageEntry content;
private Node origin;
/**
* @param origin Where the message came from
* @param content The content to be stored
*
*/
public StoreContentMessage(Node origin, JKademliaStorageEntry content)
{
this.content = content;
this.origin = origin;
}
public StoreContentMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
this.origin.toStream(out);
/* Serialize the KadContent, then send it to the stream */
new JsonSerializer<JKademliaStorageEntry>().write(content, out);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
try
{
this.content = new JsonSerializer<JKademliaStorageEntry>().read(in);
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
}
public Node getOrigin()
{
return this.origin;
}
public JKademliaStorageEntry getContent()
{
return this.content;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "StoreContentMessage[origin=" + origin + ",content=" + content + "]";
}
}

View File

@ -1,57 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
/**
* Receiver for incoming StoreContentMessage
*
* @author Joshua Kissoon
* @since 20140225
*/
public class StoreContentReceiver implements Receiver
{
private final KadServer server;
private final KademliaNode localNode;
private final KademliaDHT dht;
public StoreContentReceiver(KadServer server, KademliaNode localNode, KademliaDHT dht)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
}
@Override
public void receive(Message incoming, int comm)
{
/* It's a StoreContentMessage we're receiving */
StoreContentMessage msg = (StoreContentMessage) incoming;
/* Insert the message sender into this node's routing table */
this.localNode.getRoutingTable().insert(msg.getOrigin());
try
{
/* Store this Content into the DHT */
this.dht.store(msg.getContent());
}
catch (IOException e)
{
System.err.println("Unable to store received content; Message: " + e.getMessage());
}
}
@Override
public void timeout(int comm)
{
/**
* This receiver only handles Receiving content when we've received the message,
* so no timeout will happen with this receiver.
*/
}
}

View File

@ -1,42 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* A Streamable object is able to write it's state to an output stream and
* a class implementing Streamable must be able to recreate an instance of
* the class from an input stream. No information about class name is written
* to the output stream so it must be known what class type is expected when
* reading objects back in from an input stream. This gives a space
* advantage over Serializable.
* <p>
* Since the exact class must be known anyway prior to reading, it is incouraged
* that classes implementing Streamble also provide a constructor of the form:
* <p>
* <code>Streamable(DataInput in) throws IOException;</code>
* */
public interface Streamable
{
/**
* Writes the internal state of the Streamable object to the output stream
* in a format that can later be read by the same Streamble class using
* the {@link #fromStream} method.
*
* @param out
*
* @throws IOException
*/
public void toStream(DataOutputStream out) throws IOException;
/**
* Reads the internal state of the Streamable object from the input stream.
*
* @param out
*
* @throws IOException
*/
public void fromStream(DataInputStream out) throws IOException;
}

View File

@ -1,264 +0,0 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc Represents a Kademlia Node ID
*/
package io.github.chronosx88.dhtBootstrap.kademlia.node;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Random;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Streamable;
import javax.xml.bind.DatatypeConverter;
public class KademliaId implements Streamable, Serializable
{
public final transient static int ID_LENGTH = 160;
private byte[] keyBytes;
/**
* Construct the NodeId from some string
*
* @param data The user generated key string
*/
public KademliaId(String data)
{
keyBytes = DatatypeConverter.parseHexBinary(data);
if (keyBytes.length != ID_LENGTH / 8)
{
throw new IllegalArgumentException("Specified Data need to be " + (ID_LENGTH / 8) + " characters long.");
}
}
/**
* Generate a random key
*/
public KademliaId()
{
keyBytes = new byte[ID_LENGTH / 8];
new Random().nextBytes(keyBytes);
}
/**
* Generate the NodeId from a given byte[]
*
* @param bytes
*/
public KademliaId(byte[] bytes)
{
/*if (bytes.length != ID_LENGTH / 8)
{
throw new IllegalArgumentException("Specified Data need to be " + (ID_LENGTH / 8) + " characters long. Data Given: '" + new String(bytes) + "'");
}*/
this.keyBytes = bytes;
}
/**
* Load the NodeId from a DataInput stream
*
* @param in The stream from which to load the NodeId
*
* @throws IOException
*/
public KademliaId(DataInputStream in) throws IOException
{
this.fromStream(in);
}
public byte[] getBytes()
{
return this.keyBytes;
}
/**
* @return The BigInteger representation of the key
*/
public BigInteger getInt()
{
return new BigInteger(1, this.getBytes());
}
/**
* Compares a NodeId to this NodeId
*
* @param o The NodeId to compare to this NodeId
*
* @return boolean Whether the 2 NodeIds are equal
*/
@Override
public boolean equals(Object o)
{
if (o instanceof KademliaId)
{
KademliaId nid = (KademliaId) o;
return this.hashCode() == nid.hashCode();
}
return false;
}
@Override
public int hashCode()
{
int hash = 7;
hash = 83 * hash + Arrays.hashCode(this.keyBytes);
return hash;
}
/**
* Checks the distance between this and another NodeId
*
* @param nid
*
* @return The distance of this NodeId from the given NodeId
*/
public KademliaId xor(KademliaId nid)
{
byte[] result = new byte[ID_LENGTH / 8];
byte[] nidBytes = nid.getBytes();
for (int i = 0; i < ID_LENGTH / 8; i++)
{
result[i] = (byte) (this.keyBytes[i] ^ nidBytes[i]);
}
KademliaId resNid = new KademliaId(result);
return resNid;
}
/**
* Generates a NodeId that is some distance away from this NodeId
*
* @param distance in number of bits
*
* @return NodeId The newly generated NodeId
*/
public KademliaId generateNodeIdByDistance(int distance)
{
byte[] result = new byte[ID_LENGTH / 8];
/* Since distance = ID_LENGTH - prefixLength, we need to fill that amount with 0's */
int numByteZeroes = (ID_LENGTH - distance) / 8;
int numBitZeroes = 8 - (distance % 8);
/* Filling byte zeroes */
for (int i = 0; i < numByteZeroes; i++)
{
result[i] = 0;
}
/* Filling bit zeroes */
BitSet bits = new BitSet(8);
bits.set(0, 8);
for (int i = 0; i < numBitZeroes; i++)
{
/* Shift 1 zero into the start of the value */
bits.clear(i);
}
bits.flip(0, 8); // Flip the bits since they're in reverse order
result[numByteZeroes] = (byte) bits.toByteArray()[0];
/* Set the remaining bytes to Maximum value */
for (int i = numByteZeroes + 1; i < result.length; i++)
{
result[i] = Byte.MAX_VALUE;
}
return this.xor(new KademliaId(result));
}
/**
* Counts the number of leading 0's in this NodeId
*
* @return Integer The number of leading 0's
*/
public int getFirstSetBitIndex()
{
int prefixLength = 0;
for (byte b : this.keyBytes)
{
if (b == 0)
{
prefixLength += 8;
}
else
{
/* If the byte is not 0, we need to count how many MSBs are 0 */
int count = 0;
for (int i = 7; i >= 0; i--)
{
boolean a = (b & (1 << i)) == 0;
if (a)
{
count++;
}
else
{
break; // Reset the count if we encounter a non-zero number
}
}
/* Add the count of MSB 0s to the prefix length */
prefixLength += count;
/* Break here since we've now covered the MSB 0s */
break;
}
}
return prefixLength;
}
/**
* Gets the distance from this NodeId to another NodeId
*
* @param to
*
* @return Integer The distance
*/
public int getDistance(KademliaId to)
{
/**
* Compute the xor of this and to
* Get the index i of the first set bit of the xor returned NodeId
* The distance between them is ID_LENGTH - i
*/
return ID_LENGTH - this.xor(to).getFirstSetBitIndex();
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
/* Add the NodeId to the stream */
out.write(this.getBytes());
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
byte[] input = new byte[ID_LENGTH / 8];
in.readFully(input);
this.keyBytes = input;
}
public String hexRepresentation()
{
/* Returns the hex format of this NodeId */
return DatatypeConverter.printHexBinary(keyBytes);
}
@Override
public String toString()
{
return this.hexRepresentation();
}
}

View File

@ -1,44 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.node;
import java.math.BigInteger;
import java.util.Comparator;
/**
* A Comparator to compare 2 keys to a given key
*
* @author Joshua Kissoon
* @since 20140322
*/
public class KeyComparator implements Comparator<Node>
{
private final BigInteger key;
/**
* @param key The NodeId relative to which the distance should be measured.
*/
public KeyComparator(KademliaId key)
{
this.key = key.getInt();
}
/**
* Compare two objects which must both be of type <code>Node</code>
* and determine which is closest to the identifier specified in the
* constructor.
*
* @param n1 Node 1 to compare distance from the key
* @param n2 Node 2 to compare distance from the key
*/
@Override
public int compare(Node n1, Node n2)
{
BigInteger b1 = n1.getNodeId().getInt();
BigInteger b2 = n2.getNodeId().getInt();
b1 = b1.xor(key);
b2 = b2.xor(key);
return b1.abs().compareTo(b2.abs());
}
}

View File

@ -1,134 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.node;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Streamable;
/**
* A Node in the Kademlia network - Contains basic node network information.
*
* @author Joshua Kissoon
* @since 20140202
* @version 0.1
*/
public class Node implements Streamable, Serializable
{
private KademliaId nodeId;
private InetAddress inetAddress;
private int port;
private final String strRep;
public Node(KademliaId nid, InetAddress ip, int port)
{
this.nodeId = nid;
this.inetAddress = ip;
this.port = port;
this.strRep = this.nodeId.toString();
}
/**
* Load the Node's data from a DataInput stream
*
* @param in
*
* @throws IOException
*/
public Node(DataInputStream in) throws IOException
{
this.fromStream(in);
this.strRep = this.nodeId.toString();
}
/**
* Set the InetAddress of this node
*
* @param addr The new InetAddress of this node
*/
public void setInetAddress(InetAddress addr)
{
this.inetAddress = addr;
}
/**
* @return The NodeId object of this node
*/
public KademliaId getNodeId()
{
return this.nodeId;
}
/**
* Creates a SocketAddress for this node
*
* @return
*/
public InetSocketAddress getSocketAddress()
{
return new InetSocketAddress(this.inetAddress, this.port);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
/* Add the NodeId to the stream */
this.nodeId.toStream(out);
/* Add the Node's IP address to the stream */
byte[] a = inetAddress.getAddress();
if (a.length != 4)
{
throw new RuntimeException("Expected InetAddress of 4 bytes, got " + a.length);
}
out.write(a);
/* Add the port to the stream */
out.writeInt(port);
}
@Override
public final void fromStream(DataInputStream in) throws IOException
{
/* Load the NodeId */
this.nodeId = new KademliaId(in);
/* Load the IP Address */
byte[] ip = new byte[4];
in.readFully(ip);
this.inetAddress = InetAddress.getByAddress(ip);
/* Read in the port */
this.port = in.readInt();
}
@Override
public boolean equals(Object o)
{
if (o instanceof Node)
{
Node n = (Node) o;
if (n == this)
{
return true;
}
return this.getNodeId().equals(n.getNodeId());
}
return false;
}
@Override
public int hashCode()
{
return this.getNodeId().hashCode();
}
@Override
public String toString()
{
return this.getNodeId().toString();
}
}

View File

@ -1,66 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* At each time interval t, nodes need to refresh their K-Buckets
* This operation takes care of refreshing this node's K-Buckets
*
* @author Joshua Kissoon
* @created 20140224
*/
public class BucketRefreshOperation implements Operation
{
private final KadServer server;
private final KademliaNode localNode;
private final KadConfiguration config;
public BucketRefreshOperation(KadServer server, KademliaNode localNode, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.config = config;
}
/**
* Each bucket need to be refreshed at every time interval t.
* Find an identifier in each bucket's range, use it to look for nodes closest to this identifier
* allowing the bucket to be refreshed.
*
* Then Do a NodeLookupOperation for each of the generated NodeIds,
* This will find the K-Closest nodes to that ID, and update the necessary K-Bucket
*
* @throws IOException
*/
@Override
public synchronized void execute() throws IOException
{
for (int i = 1; i < KademliaId.ID_LENGTH; i++)
{
/* Construct a NodeId that is i bits away from the current node Id */
final KademliaId current = this.localNode.getNode().getNodeId().generateNodeIdByDistance(i);
/* Run the Node Lookup Operation, each in a different thread to speed up things */
new Thread()
{
@Override
public void run()
{
try
{
new NodeLookupOperation(server, localNode, current, BucketRefreshOperation.this.config).execute();
}
catch (IOException e)
{
//System.err.println("Bucket Refresh Operation Failed. Msg: " + e.getMessage());
}
}
}.start();
}
}
}

View File

@ -1,140 +0,0 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc Operation that handles connecting to an existing Kademlia network using a bootstrap node
*/
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Receiver;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.AcknowledgeMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.message.ConnectMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
public class ConnectOperation implements Operation, Receiver
{
public static final int MAX_CONNECT_ATTEMPTS = 5; // Try 5 times to connect to a node
private final KadServer server;
private final KademliaNode localNode;
private final Node bootstrapNode;
private final KadConfiguration config;
private boolean error;
private int attempts;
/**
* @param server The message server used to send/receive messages
* @param local The local node
* @param bootstrap Node to use to bootstrap the local node onto the network
* @param config
*/
public ConnectOperation(KadServer server, KademliaNode local, Node bootstrap, KadConfiguration config)
{
this.server = server;
this.localNode = local;
this.bootstrapNode = bootstrap;
this.config = config;
}
@Override
public synchronized void execute() throws IOException
{
try
{
/* Contact the bootstrap node */
this.error = true;
this.attempts = 0;
Message m = new ConnectMessage(this.localNode.getNode());
/* Send a connect message to the bootstrap node */
server.sendMessage(this.bootstrapNode, m, this);
/* If we haven't finished as yet, wait for a maximum of config.operationTimeout() time */
int totalTimeWaited = 0;
int timeInterval = 50; // We re-check every 300 milliseconds
while (totalTimeWaited < this.config.operationTimeout())
{
if (error)
{
wait(timeInterval);
totalTimeWaited += timeInterval;
}
else
{
break;
}
}
if (error)
{
/* If we still haven't received any responses by then, do a routing timeout */
throw new RoutingException("ConnectOperation: Bootstrap node did not respond: " + bootstrapNode);
}
/* Perform lookup for our own ID to get nodes close to us */
Operation lookup = new NodeLookupOperation(this.server, this.localNode, this.localNode.getNode().getNodeId(), this.config);
lookup.execute();
/**
* Refresh buckets to get a good routing table
* After the above lookup operation, K nodes will be in our routing table,
* Now we try to populate all of our buckets.
*/
new BucketRefreshOperation(this.server, this.localNode, this.config).execute();
}
catch (InterruptedException e)
{
System.err.println("Connect operation was interrupted. ");
}
}
/**
* Receives an AcknowledgeMessage from the bootstrap node.
*
* @param comm
*/
@Override
public synchronized void receive(Message incoming, int comm)
{
/* The incoming message will be an acknowledgement message */
AcknowledgeMessage msg = (AcknowledgeMessage) incoming;
/* The bootstrap node has responded, insert it into our space */
this.localNode.getRoutingTable().insert(this.bootstrapNode);
/* We got a response, so the error is false */
error = false;
/* Wake up any waiting thread */
notify();
}
/**
* Resends a ConnectMessage to the boot strap node a maximum of MAX_ATTEMPTS
* times.
*
* @param comm
*
* @throws IOException
*/
@Override
public synchronized void timeout(int comm) throws IOException
{
if (++this.attempts < MAX_CONNECT_ATTEMPTS)
{
this.server.sendMessage(this.bootstrapNode, new ConnectMessage(this.localNode.getNode()), this);
}
else
{
/* We just exit, so notify all other threads that are possibly waiting */
notify();
}
}
}

View File

@ -1,342 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Receiver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import io.github.chronosx88.dhtBootstrap.kademlia.JKademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.GetParameter;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.UnknownMessageException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.ContentLookupMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.message.ContentMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.message.NodeReplyMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KeyComparator;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.util.RouteLengthChecker;
/**
* Looks up a specified identifier and returns the value associated with it
*
* @author Joshua Kissoon
* @since 20140226
*/
public class ContentLookupOperation implements Operation, Receiver
{
/* Constants */
private static final Byte UNASKED = (byte) 0x00;
private static final Byte AWAITING = (byte) 0x01;
private static final Byte ASKED = (byte) 0x02;
private static final Byte FAILED = (byte) 0x03;
private final KadServer server;
private final JKademliaNode localNode;
private JKademliaStorageEntry contentFound = null;
private final KadConfiguration config;
private final ContentLookupMessage lookupMessage;
private boolean isContentFound;
private final SortedMap<Node, Byte> nodes;
/* Tracks messages in transit and awaiting reply */
private final Map<Integer, Node> messagesTransiting;
/* Used to sort nodes */
private final Comparator comparator;
/* Statistical information */
private final RouteLengthChecker routeLengthChecker;
{
messagesTransiting = new HashMap<>();
isContentFound = false;
routeLengthChecker = new RouteLengthChecker();
}
/**
* @param server
* @param localNode
* @param params The parameters to search for the content which we need to find
* @param config
*/
public ContentLookupOperation(KadServer server, JKademliaNode localNode, GetParameter params, KadConfiguration config)
{
/* Construct our lookup message */
this.lookupMessage = new ContentLookupMessage(localNode.getNode(), params);
this.server = server;
this.localNode = localNode;
this.config = config;
/**
* We initialize a TreeMap to store nodes.
* This map will be sorted by which nodes are closest to the lookupId
*/
this.comparator = new KeyComparator(params.getKey());
this.nodes = new TreeMap(this.comparator);
}
/**
* @throws IOException
* @throws RoutingException
*/
@Override
public synchronized void execute() throws IOException, RoutingException
{
try
{
/* Set the local node as already asked */
nodes.put(this.localNode.getNode(), ASKED);
/**
* We add all nodes here instead of the K-Closest because there may be the case that the K-Closest are offline
* - The operation takes care of looking at the K-Closest.
*/
List<Node> allNodes = this.localNode.getRoutingTable().getAllNodes();
this.addNodes(allNodes);
/* Also add the initial set of nodes to the routeLengthChecker */
this.routeLengthChecker.addInitialNodes(allNodes);
/**
* If we haven't found the requested amount of content as yet,
* keey trying until config.operationTimeout() time has expired
*/
int totalTimeWaited = 0;
int timeInterval = 10; // We re-check every n milliseconds
while (totalTimeWaited < this.config.operationTimeout())
{
if (!this.askNodesorFinish() && !isContentFound)
{
wait(timeInterval);
totalTimeWaited += timeInterval;
}
else
{
break;
}
}
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
/**
* Add nodes from this list to the set of nodes to lookup
*
* @param list The list from which to add nodes
*/
public void addNodes(List<Node> list)
{
for (Node o : list)
{
/* If this node is not in the list, add the node */
if (!nodes.containsKey(o))
{
nodes.put(o, UNASKED);
}
}
}
/**
* Asks some of the K closest nodes seen but not yet queried.
* Assures that no more than DefaultConfiguration.CONCURRENCY messages are in transit at a time
*
* This method should be called every time a reply is received or a timeout occurs.
*
* If all K closest nodes have been asked and there are no messages in transit,
* the algorithm is finished.
*
* @return <code>true</code> if finished OR <code>false</code> otherwise
*/
private boolean askNodesorFinish() throws IOException
{
/* If >= CONCURRENCY nodes are in transit, don't do anything */
if (this.config.maxConcurrentMessagesTransiting() <= this.messagesTransiting.size())
{
return false;
}
/* Get unqueried nodes among the K closest seen that have not FAILED */
List<Node> unasked = this.closestNodesNotFailed(UNASKED);
if (unasked.isEmpty() && this.messagesTransiting.isEmpty())
{
/* We have no unasked nodes nor any messages in transit, we're finished! */
return true;
}
/* Sort nodes according to criteria */
Collections.sort(unasked, this.comparator);
/**
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
*/
for (int i = 0; (this.messagesTransiting.size() < this.config.maxConcurrentMessagesTransiting()) && (i < unasked.size()); i++)
{
Node n = (Node) unasked.get(i);
int comm = server.sendMessage(n, lookupMessage, this);
this.nodes.put(n, AWAITING);
this.messagesTransiting.put(comm, n);
}
/* We're not finished as yet, return false */
return false;
}
/**
* Find The K closest nodes to the target lookupId given that have not FAILED.
* From those K, get those that have the specified status
*
* @param status The status of the nodes to return
*
* @return A List of the closest nodes
*/
private List<Node> closestNodesNotFailed(Byte status)
{
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry e : this.nodes.entrySet())
{
if (!FAILED.equals(e.getValue()))
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add((Node) e.getKey());
}
if (--remainingSpaces == 0)
{
break;
}
}
}
return closestNodes;
}
@Override
public synchronized void receive(Message incoming, int comm) throws IOException, RoutingException
{
if (this.isContentFound)
{
return;
}
if (incoming instanceof ContentMessage)
{
/* The reply received is a content message with the required content, take it in */
ContentMessage msg = (ContentMessage) incoming;
/* Add the origin node to our routing table */
this.localNode.getRoutingTable().insert(msg.getOrigin());
/* Get the Content and check if it satisfies the required parameters */
JKademliaStorageEntry content = msg.getContent();
this.contentFound = content;
this.isContentFound = true;
}
else
{
/* The reply received is a NodeReplyMessage with nodes closest to the content needed */
NodeReplyMessage msg = (NodeReplyMessage) incoming;
/* Add the origin node to our routing table */
Node origin = msg.getOrigin();
this.localNode.getRoutingTable().insert(origin);
/* Set that we've completed ASKing the origin node */
this.nodes.put(origin, ASKED);
/* Remove this msg from messagesTransiting since it's completed now */
this.messagesTransiting.remove(comm);
/* Add the received nodes to the routeLengthChecker */
this.routeLengthChecker.addNodes(msg.getNodes(), origin);
/* Add the received nodes to our nodes list to query */
this.addNodes(msg.getNodes());
this.askNodesorFinish();
}
}
/**
* A node does not respond or a packet was lost, we set this node as failed
*
* @param comm
*
* @throws IOException
*/
@Override
public synchronized void timeout(int comm) throws IOException
{
/* Get the node associated with this communication */
Node n = this.messagesTransiting.get(new Integer(comm));
if (n == null)
{
throw new UnknownMessageException("Unknown comm: " + comm);
}
/* Mark this node as failed and inform the routing table that it's unresponsive */
this.nodes.put(n, FAILED);
this.localNode.getRoutingTable().setUnresponsiveContact(n);
this.messagesTransiting.remove(comm);
this.askNodesorFinish();
}
/**
* @return Whether the content was found or not.
*/
public boolean isContentFound()
{
return this.isContentFound;
}
/**
* @return The list of all content found during the lookup operation
*
* @throws ContentNotFoundException
*/
public JKademliaStorageEntry getContentFound() throws ContentNotFoundException
{
if (this.isContentFound)
{
return this.contentFound;
}
else
{
throw new ContentNotFoundException("No Value was found for the given key.");
}
}
/**
* @return How many hops it took in order to get to the content.
*/
public int routeLength()
{
return this.routeLengthChecker.getRouteLength();
}
}

View File

@ -1,99 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaStorageEntryMetadata;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.ContentNotFoundException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.message.StoreContentMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* Refresh/Restore the data on this node by sending the data to the K-Closest nodes to the data
*
* @author Joshua Kissoon
* @since 20140306
*/
public class ContentRefreshOperation implements Operation
{
private final KadServer server;
private final KademliaNode localNode;
private final KademliaDHT dht;
private final KadConfiguration config;
public ContentRefreshOperation(KadServer server, KademliaNode localNode, KademliaDHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
/**
* For each content stored on this DHT, distribute it to the K closest nodes
Also delete the content if this node is no longer one of the K closest nodes
We assume that our JKademliaRoutingTable is updated, and we can get the K closest nodes from that table
*
* @throws IOException
*/
@Override
public void execute() throws IOException
{
/* Get a list of all storage entries for content */
List<KademliaStorageEntryMetadata> entries = this.dht.getStorageEntries();
/* If a content was last republished before this time, then we need to republish it */
final long minRepublishTime = (System.currentTimeMillis() / 1000L) - this.config.restoreInterval();
/* For each storage entry, distribute it */
for (KademliaStorageEntryMetadata e : entries)
{
/* Check last update time of this entry and only distribute it if it has been last updated > 1 hour ago */
if (e.lastRepublished() > minRepublishTime)
{
continue;
}
/* Set that this content is now republished */
e.updateLastRepublished();
/* Get the K closest nodes to this entries */
List<Node> closestNodes = this.localNode.getRoutingTable().findClosest(e.getKey(), this.config.k());
/* Create the message */
Message msg = new StoreContentMessage(this.localNode.getNode(), dht.get(e));
/*Store the message on all of the K-Nodes*/
for (Node n : closestNodes)
{
/*We don't need to again store the content locally, it's already here*/
if (!n.equals(this.localNode.getNode()))
{
/* Send a contentstore operation to the K-Closest nodes */
this.server.sendMessage(n, msg, null);
}
}
/* Delete any content on this node that this node is not one of the K-Closest nodes to */
try
{
if (!closestNodes.contains(this.localNode.getNode()))
{
this.dht.remove(e);
}
}
catch (ContentNotFoundException cnfe)
{
/* It would be weird if the content is not found here */
System.err.println("ContentRefreshOperation: Removing content from local node, content not found... Message: " + cnfe.getMessage());
}
}
}
}

View File

@ -1,40 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
/**
* An operation that handles refreshing the entire Kademlia Systems including buckets and content
*
* @author Joshua Kissoon
* @since 20140306
*/
public class KadRefreshOperation implements Operation
{
private final KadServer server;
private final KademliaNode localNode;
private final KademliaDHT dht;
private final KadConfiguration config;
public KadRefreshOperation(KadServer server, KademliaNode localNode, KademliaDHT dht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.dht = dht;
this.config = config;
}
@Override
public void execute() throws IOException
{
/* Run our BucketRefreshOperation to refresh buckets */
new BucketRefreshOperation(this.server, this.localNode, this.config).execute();
/* After buckets have been refreshed, we refresh content */
new ContentRefreshOperation(this.server, this.localNode, this.dht, this.config).execute();
}
}

View File

@ -1,323 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Receiver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.message.NodeLookupMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.message.NodeReplyMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KeyComparator;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Finds the K closest nodes to a specified identifier
* The algorithm terminates when it has gotten responses from the K closest nodes it has seen.
* Nodes that fail to respond are removed from consideration
*
* @author Joshua Kissoon
* @created 20140219
*/
public class NodeLookupOperation implements Operation, Receiver
{
/* Constants */
private static final String UNASKED = "UnAsked";
private static final String AWAITING = "Awaiting";
private static final String ASKED = "Asked";
private static final String FAILED = "Failed";
private final KadServer server;
private final KademliaNode localNode;
private final KadConfiguration config;
private final Message lookupMessage; // Message sent to each peer
private final Map<Node, String> nodes;
/* Tracks messages in transit and awaiting reply */
private final Map<Integer, Node> messagesTransiting;
/* Used to sort nodes */
private final Comparator comparator;
{
messagesTransiting = new HashMap<>();
}
/**
* @param server KadServer used for communication
* @param localNode The local node making the communication
* @param lookupId The ID for which to find nodes close to
* @param config
*/
public NodeLookupOperation(KadServer server, KademliaNode localNode, KademliaId lookupId, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.config = config;
this.lookupMessage = new NodeLookupMessage(localNode.getNode(), lookupId);
/**
* We initialize a TreeMap to store nodes.
* This map will be sorted by which nodes are closest to the lookupId
*/
this.comparator = new KeyComparator(lookupId);
this.nodes = new TreeMap(this.comparator);
}
/**
* @throws IOException
* @throws RoutingException
*/
@Override
public synchronized void execute() throws IOException, RoutingException
{
try
{
/* Set the local node as already asked */
nodes.put(this.localNode.getNode(), ASKED);
/**
* We add all nodes here instead of the K-Closest because there may be the case that the K-Closest are offline
* - The operation takes care of looking at the K-Closest.
*/
this.addNodes(this.localNode.getRoutingTable().getAllNodes());
/* If we haven't finished as yet, wait for a maximum of config.operationTimeout() time */
int totalTimeWaited = 0;
int timeInterval = 10; // We re-check every n milliseconds
while (totalTimeWaited < this.config.operationTimeout())
{
if (!this.askNodesorFinish())
{
wait(timeInterval);
totalTimeWaited += timeInterval;
}
else
{
break;
}
}
/* Now after we've finished, we would have an idea of offline nodes, lets update our routing table */
this.localNode.getRoutingTable().setUnresponsiveContacts(this.getFailedNodes());
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
public List<Node> getClosestNodes()
{
return this.closestNodes(ASKED);
}
/**
* Add nodes from this list to the set of nodes to lookup
*
* @param list The list from which to add nodes
*/
public void addNodes(List<Node> list)
{
for (Node o : list)
{
/* If this node is not in the list, add the node */
if (!nodes.containsKey(o))
{
nodes.put(o, UNASKED);
}
}
}
/**
* Asks some of the K closest nodes seen but not yet queried.
* Assures that no more than DefaultConfiguration.CONCURRENCY messages are in transit at a time
*
* This method should be called every time a reply is received or a timeout occurs.
*
* If all K closest nodes have been asked and there are no messages in transit,
* the algorithm is finished.
*
* @return <code>true</code> if finished OR <code>false</code> otherwise
*/
private boolean askNodesorFinish() throws IOException
{
/* If >= CONCURRENCY nodes are in transit, don't do anything */
if (this.config.maxConcurrentMessagesTransiting() <= this.messagesTransiting.size())
{
return false;
}
/* Get unqueried nodes among the K closest seen that have not FAILED */
List<Node> unasked = this.closestNodesNotFailed(UNASKED);
if (unasked.isEmpty() && this.messagesTransiting.isEmpty())
{
/* We have no unasked nodes nor any messages in transit, we're finished! */
return true;
}
/**
* Send messages to nodes in the list;
* making sure than no more than CONCURRENCY messsages are in transit
*/
for (int i = 0; (this.messagesTransiting.size() < this.config.maxConcurrentMessagesTransiting()) && (i < unasked.size()); i++)
{
Node n = (Node) unasked.get(i);
int comm = server.sendMessage(n, lookupMessage, this);
this.nodes.put(n, AWAITING);
this.messagesTransiting.put(comm, n);
}
/* We're not finished as yet, return false */
return false;
}
/**
* @param status The status of the nodes to return
*
* @return The K closest nodes to the target lookupId given that have the specified status
*/
private List<Node> closestNodes(String status)
{
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry e : this.nodes.entrySet())
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add((Node) e.getKey());
if (--remainingSpaces == 0)
{
break;
}
}
}
return closestNodes;
}
/**
* Find The K closest nodes to the target lookupId given that have not FAILED.
* From those K, get those that have the specified status
*
* @param status The status of the nodes to return
*
* @return A List of the closest nodes
*/
private List<Node> closestNodesNotFailed(String status)
{
List<Node> closestNodes = new ArrayList<>(this.config.k());
int remainingSpaces = this.config.k();
for (Map.Entry<Node, String> e : this.nodes.entrySet())
{
if (!FAILED.equals(e.getValue()))
{
if (status.equals(e.getValue()))
{
/* We got one with the required status, now add it */
closestNodes.add(e.getKey());
}
if (--remainingSpaces == 0)
{
break;
}
}
}
return closestNodes;
}
/**
* Receive and handle the incoming NodeReplyMessage
*
* @param comm
*
* @throws IOException
*/
@Override
public synchronized void receive(Message incoming, int comm) throws IOException
{
if (!(incoming instanceof NodeReplyMessage))
{
/* Not sure why we get a message of a different type here... @todo Figure it out. */
return;
}
/* We receive a NodeReplyMessage with a set of nodes, read this message */
NodeReplyMessage msg = (NodeReplyMessage) incoming;
/* Add the origin node to our routing table */
Node origin = msg.getOrigin();
this.localNode.getRoutingTable().insert(origin);
/* Set that we've completed ASKing the origin node */
this.nodes.put(origin, ASKED);
/* Remove this msg from messagesTransiting since it's completed now */
this.messagesTransiting.remove(comm);
/* Add the received nodes to our nodes list to query */
this.addNodes(msg.getNodes());
this.askNodesorFinish();
}
/**
* A node does not respond or a packet was lost, we set this node as failed
*
* @param comm
*
* @throws IOException
*/
@Override
public synchronized void timeout(int comm) throws IOException
{
/* Get the node associated with this communication */
Node n = this.messagesTransiting.get(comm);
if (n == null)
{
return;
}
/* Mark this node as failed and inform the routing table that it is unresponsive */
this.nodes.put(n, FAILED);
this.localNode.getRoutingTable().setUnresponsiveContact(n);
this.messagesTransiting.remove(comm);
this.askNodesorFinish();
}
public List<Node> getFailedNodes()
{
List<Node> failedNodes = new ArrayList<>();
for (Map.Entry<Node, String> e : this.nodes.entrySet())
{
if (e.getValue().equals(FAILED))
{
failedNodes.add(e.getKey());
}
}
return failedNodes;
}
}

View File

@ -1,21 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
/**
* An operation in the Kademlia routing protocol
*
* @author Joshua Kissoon
* @created 20140218
*/
public interface Operation
{
/**
* Starts an operation and returns when the operation is finished
*
* @throws RoutingException
*/
public void execute() throws IOException, RoutingException;
}

View File

@ -1,39 +0,0 @@
/**
* Implementation of the Kademlia Ping operation,
* This is on hold at the moment since I'm not sure if we'll use ping given the improvements mentioned in the paper.
*
* @author Joshua Kissoon
* @since 20140218
*/
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.exceptions.RoutingException;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
public class PingOperation implements Operation
{
private final KadServer server;
private final Node localNode;
private final Node toPing;
/**
* @param server The Kademlia server used to send & receive messages
* @param local The local node
* @param toPing The node to send the ping message to
*/
public PingOperation(KadServer server, Node local, Node toPing)
{
this.server = server;
this.localNode = local;
this.toPing = toPing;
}
@Override
public void execute() throws IOException, RoutingException
{
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}

View File

@ -1,83 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.operation;
import java.io.IOException;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.KadServer;
import io.github.chronosx88.dhtBootstrap.kademlia.KademliaNode;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.JKademliaStorageEntry;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
import io.github.chronosx88.dhtBootstrap.kademlia.message.Message;
import io.github.chronosx88.dhtBootstrap.kademlia.message.StoreContentMessage;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* Operation that stores a DHT Content onto the K closest nodes to the content Key
*
* @author Joshua Kissoon
* @since 20140224
*/
public class StoreOperation implements Operation
{
private final KadServer server;
private final KademliaNode localNode;
private final JKademliaStorageEntry storageEntry;
private final KademliaDHT localDht;
private final KadConfiguration config;
/**
* @param server
* @param localNode
* @param storageEntry The content to be stored on the DHT
* @param localDht The local DHT
* @param config
*/
public StoreOperation(KadServer server, KademliaNode localNode, JKademliaStorageEntry storageEntry, KademliaDHT localDht, KadConfiguration config)
{
this.server = server;
this.localNode = localNode;
this.storageEntry = storageEntry;
this.localDht = localDht;
this.config = config;
}
@Override
public synchronized void execute() throws IOException
{
/* Get the nodes on which we need to store the content */
NodeLookupOperation ndlo = new NodeLookupOperation(this.server, this.localNode, this.storageEntry.getContentMetadata().getKey(), this.config);
ndlo.execute();
List<Node> nodes = ndlo.getClosestNodes();
/* Create the message */
Message msg = new StoreContentMessage(this.localNode.getNode(), this.storageEntry);
/*Store the message on all of the K-Nodes*/
for (Node n : nodes)
{
if (n.equals(this.localNode.getNode()))
{
/* Store the content locally */
this.localDht.store(this.storageEntry);
}
else
{
/**
* @todo Create a receiver that receives a store acknowledgement message to count how many nodes a content have been stored at
*/
this.server.sendMessage(n, msg, null);
}
}
}
/**
* @return The number of nodes that have stored this content
*
* @todo Implement this method
*/
public int numNodesStoredAt()
{
return 1;
}
}

View File

@ -1,118 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* Keeps information about contacts of the Node; Contacts are stored in the Buckets in the Routing Table.
*
* Contacts are used instead of nodes because more information is needed than just the node information.
* - Information such as
* -- Last seen time
*
* @author Joshua Kissoon
* @since 20140425
* @updated 20140426
*/
public class Contact implements Comparable<Contact>
{
private final Node n;
private long lastSeen;
/**
* Stale as described by Kademlia paper page 64
* When a contact fails to respond, if the replacement cache is empty and there is no replacement for the contact,
* just mark it as stale.
*
* Now when a new contact is added, if the contact is stale, it is removed.
*/
private int staleCount;
/**
* Create a contact object
*
* @param n The node associated with this contact
*/
public Contact(Node n)
{
this.n = n;
this.lastSeen = System.currentTimeMillis() / 1000L;
}
public Node getNode()
{
return this.n;
}
/**
* When a Node sees a contact a gain, the Node will want to update that it's seen recently,
* this method updates the last seen timestamp for this contact.
*/
public void setSeenNow()
{
this.lastSeen = System.currentTimeMillis() / 1000L;
}
/**
* When last was this contact seen?
*
* @return long The last time this contact was seen.
*/
public long lastSeen()
{
return this.lastSeen;
}
@Override
public boolean equals(Object c)
{
if (c instanceof Contact)
{
return ((Contact) c).getNode().equals(this.getNode());
}
return false;
}
/**
* Increments the amount of times this count has failed to respond to a request.
*/
public void incrementStaleCount()
{
staleCount++;
}
/**
* @return Integer Stale count
*/
public int staleCount()
{
return this.staleCount;
}
/**
* Reset the stale count of the contact if it's recently seen
*/
public void resetStaleCount()
{
this.staleCount = 0;
}
@Override
public int compareTo(Contact o)
{
if (this.getNode().equals(o.getNode()))
{
return 0;
}
return (this.lastSeen() > o.lastSeen()) ? 1 : -1;
}
@Override
public int hashCode()
{
return this.getNode().hashCode();
}
}

View File

@ -1,34 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import java.util.Comparator;
/**
* A Comparator to compare 2 contacts by their last seen time
*
* @author Joshua Kissoon
* @since 20140426
*/
public class ContactLastSeenComparator implements Comparator<Contact>
{
/**
* Compare two contacts to determine their order in the Bucket,
* Contacts are ordered by their last seen timestamp.
*
* @param c1 Contact 1
* @param c2 Contact 2
*/
@Override
public int compare(Contact c1, Contact c2)
{
if (c1.getNode().equals(c2.getNode()))
{
return 0;
}
else
{
/* We may have 2 different contacts with same last seen values so we can't return 0 here */
return c1.lastSeen() > c2.lastSeen() ? 1 : -1;
}
}
}

View File

@ -1,275 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* A bucket in the Kademlia routing table
*
* @author Joshua Kissoon
* @created 20140215
*/
public class JKademliaBucket implements KademliaBucket
{
/* How deep is this bucket in the Routing Table */
private final int depth;
/* Contacts stored in this routing table */
private final TreeSet<Contact> contacts;
/* A set of last seen contacts that can replace any current contact that is unresponsive */
private final TreeSet<Contact> replacementCache;
private final KadConfiguration config;
{
contacts = new TreeSet<>();
replacementCache = new TreeSet<>();
}
/**
* @param depth How deep in the routing tree is this bucket
* @param config
*/
public JKademliaBucket(int depth, KadConfiguration config)
{
this.depth = depth;
this.config = config;
}
@Override
public synchronized void insert(Contact c)
{
if (this.contacts.contains(c))
{
/**
* If the contact is already in the bucket, lets update that we've seen it
* We need to remove and re-add the contact to get the Sorted Set to update sort order
*/
Contact tmp = this.removeFromContacts(c.getNode());
tmp.setSeenNow();
tmp.resetStaleCount();
this.contacts.add(tmp);
}
else
{
/* If the bucket is filled, so put the contacts in the replacement cache */
if (contacts.size() >= this.config.k())
{
/* If the cache is empty, we check if any contacts are stale and replace the stalest one */
Contact stalest = null;
for (Contact tmp : this.contacts)
{
if (tmp.staleCount() >= this.config.stale())
{
/* Contact is stale */
if (stalest == null)
{
stalest = tmp;
}
else if (tmp.staleCount() > stalest.staleCount())
{
stalest = tmp;
}
}
}
/* If we have a stale contact, remove it and add the new contact to the bucket */
if (stalest != null)
{
this.contacts.remove(stalest);
this.contacts.add(c);
}
else
{
/* No stale contact, lets insert this into replacement cache */
this.insertIntoReplacementCache(c);
}
}
else
{
this.contacts.add(c);
}
}
}
@Override
public synchronized void insert(Node n)
{
this.insert(new Contact(n));
}
@Override
public synchronized boolean containsContact(Contact c)
{
return this.contacts.contains(c);
}
@Override
public synchronized boolean containsNode(Node n)
{
return this.containsContact(new Contact(n));
}
@Override
public synchronized boolean removeContact(Contact c)
{
/* If the contact does not exist, then we failed to remove it */
if (!this.contacts.contains(c))
{
return false;
}
/* Contact exist, lets remove it only if our replacement cache has a replacement */
if (!this.replacementCache.isEmpty())
{
/* Replace the contact with one from the replacement cache */
this.contacts.remove(c);
Contact replacement = this.replacementCache.first();
this.contacts.add(replacement);
this.replacementCache.remove(replacement);
}
else
{
/* There is no replacement, just increment the contact's stale count */
this.getFromContacts(c.getNode()).incrementStaleCount();
}
return true;
}
private synchronized Contact getFromContacts(Node n)
{
for (Contact c : this.contacts)
{
if (c.getNode().equals(n))
{
return c;
}
}
/* This contact does not exist */
throw new NoSuchElementException("The contact does not exist in the contacts list.");
}
private synchronized Contact removeFromContacts(Node n)
{
for (Contact c : this.contacts)
{
if (c.getNode().equals(n))
{
this.contacts.remove(c);
return c;
}
}
/* We got here means this element does not exist */
throw new NoSuchElementException("Node does not exist in the replacement cache. ");
}
@Override
public synchronized boolean removeNode(Node n)
{
return this.removeContact(new Contact(n));
}
@Override
public synchronized int numContacts()
{
return this.contacts.size();
}
@Override
public synchronized int getDepth()
{
return this.depth;
}
@Override
public synchronized List<Contact> getContacts()
{
final ArrayList<Contact> ret = new ArrayList<>();
/* If we have no contacts, return the blank arraylist */
if (this.contacts.isEmpty())
{
return ret;
}
/* We have contacts, lets copy put them into the arraylist and return */
for (Contact c : this.contacts)
{
ret.add(c);
}
return ret;
}
/**
* When the bucket is filled, we keep extra contacts in the replacement cache.
*/
private synchronized void insertIntoReplacementCache(Contact c)
{
/* Just return if this contact is already in our replacement cache */
if (this.replacementCache.contains(c))
{
/**
* If the contact is already in the bucket, lets update that we've seen it
* We need to remove and re-add the contact to get the Sorted Set to update sort order
*/
Contact tmp = this.removeFromReplacementCache(c.getNode());
tmp.setSeenNow();
this.replacementCache.add(tmp);
}
else if (this.replacementCache.size() > this.config.k())
{
/* if our cache is filled, we remove the least recently seen contact */
this.replacementCache.remove(this.replacementCache.last());
this.replacementCache.add(c);
}
else
{
this.replacementCache.add(c);
}
}
private synchronized Contact removeFromReplacementCache(Node n)
{
for (Contact c : this.replacementCache)
{
if (c.getNode().equals(n))
{
this.replacementCache.remove(c);
return c;
}
}
/* We got here means this element does not exist */
throw new NoSuchElementException("Node does not exist in the replacement cache. ");
}
@Override
public synchronized String toString()
{
StringBuilder sb = new StringBuilder("Bucket at depth: ");
sb.append(this.depth);
sb.append("\n Nodes: \n");
for (Contact n : this.contacts)
{
sb.append("Node: ");
sb.append(n.getNode().getNodeId().toString());
sb.append(" (stale: ");
sb.append(n.staleCount());
sb.append(")");
sb.append("\n");
}
return sb.toString();
}
}

View File

@ -1,238 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KeyComparator;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Implementation of a Kademlia routing table
*
* @author Joshua Kissoon
* @created 20140215
*/
public class JKademliaRoutingTable implements KademliaRoutingTable
{
private final Node localNode; // The current node
private transient KademliaBucket[] buckets;
private transient KadConfiguration config;
public JKademliaRoutingTable(Node localNode, KadConfiguration config)
{
this.localNode = localNode;
this.config = config;
/* Initialize all of the buckets to a specific depth */
this.initialize();
/* Insert the local node */
this.insert(localNode);
}
/**
* Initialize the JKademliaRoutingTable to it's default state
*/
@Override
public final void initialize()
{
this.buckets = new KademliaBucket[KademliaId.ID_LENGTH];
for (int i = 0; i < KademliaId.ID_LENGTH; i++)
{
buckets[i] = new JKademliaBucket(i, this.config);
}
}
@Override
public void setConfiguration(KadConfiguration config)
{
this.config = config;
}
/**
* Adds a contact to the routing table based on how far it is from the LocalNode.
*
* @param c The contact to add
*/
@Override
public synchronized final void insert(Contact c)
{
this.buckets[this.getBucketId(c.getNode().getNodeId())].insert(c);
}
/**
* Adds a node to the routing table based on how far it is from the LocalNode.
*
* @param n The node to add
*/
@Override
public synchronized final void insert(Node n)
{
this.buckets[this.getBucketId(n.getNodeId())].insert(n);
}
/**
* Compute the bucket ID in which a given node should be placed; the bucketId is computed based on how far the node is away from the Local Node.
*
* @param nid The NodeId for which we want to find which bucket it belong to
*
* @return Integer The bucket ID in which the given node should be placed.
*/
@Override
public final int getBucketId(KademliaId nid)
{
int bId = this.localNode.getNodeId().getDistance(nid) - 1;
/* If we are trying to insert a node into it's own routing table, then the bucket ID will be -1, so let's just keep it in bucket 0 */
return bId < 0 ? 0 : bId;
}
/**
* Find the closest set of contacts to a given NodeId
*
* @param target The NodeId to find contacts close to
* @param numNodesRequired The number of contacts to find
*
* @return List A List of contacts closest to target
*/
@Override
public synchronized final List<Node> findClosest(KademliaId target, int numNodesRequired)
{
TreeSet<Node> sortedSet = new TreeSet<>(new KeyComparator(target));
sortedSet.addAll(this.getAllNodes());
List<Node> closest = new ArrayList<>(numNodesRequired);
/* Now we have the sorted set, lets get the top numRequired */
int count = 0;
for (Node n : sortedSet)
{
closest.add(n);
if (++count == numNodesRequired)
{
break;
}
}
return closest;
}
/**
* @return List A List of all Nodes in this JKademliaRoutingTable
*/
@Override
public synchronized final List<Node> getAllNodes()
{
List<Node> nodes = new ArrayList<>();
for (KademliaBucket b : this.buckets)
{
for (Contact c : b.getContacts())
{
nodes.add(c.getNode());
}
}
return nodes;
}
/**
* @return List A List of all Nodes in this JKademliaRoutingTable
*/
@Override
public final List<Contact> getAllContacts()
{
List<Contact> contacts = new ArrayList<>();
for (KademliaBucket b : this.buckets)
{
contacts.addAll(b.getContacts());
}
return contacts;
}
/**
* @return Bucket[] The buckets in this Kad Instance
*/
@Override
public final KademliaBucket[] getBuckets()
{
return this.buckets;
}
/**
* Set the KadBuckets of this routing table, mainly used when retrieving saved state
*
* @param buckets
*/
public final void setBuckets(KademliaBucket[] buckets)
{
this.buckets = buckets;
}
/**
* Method used by operations to notify the routing table of any contacts that have been unresponsive.
*
* @param contacts The set of unresponsive contacts
*/
@Override
public void setUnresponsiveContacts(List<Node> contacts)
{
if (contacts.isEmpty())
{
return;
}
for (Node n : contacts)
{
this.setUnresponsiveContact(n);
}
}
/**
* Method used by operations to notify the routing table of any contacts that have been unresponsive.
*
* @param n
*/
@Override
public synchronized void setUnresponsiveContact(Node n)
{
int bucketId = this.getBucketId(n.getNodeId());
/* Remove the contact from the bucket */
this.buckets[bucketId].removeNode(n);
}
@Override
public synchronized final String toString()
{
StringBuilder sb = new StringBuilder("\nPrinting Routing Table Started ***************** \n");
int totalContacts = 0;
for (KademliaBucket b : this.buckets)
{
if (b.numContacts() > 0)
{
totalContacts += b.numContacts();
sb.append("# nodes in Bucket with depth ");
sb.append(b.getDepth());
sb.append(": ");
sb.append(b.numContacts());
sb.append("\n");
sb.append(b.toString());
sb.append("\n");
}
}
sb.append("\nTotal Contacts: ");
sb.append(totalContacts);
sb.append("\n\n");
sb.append("Printing Routing Table Ended ******************** ");
return sb.toString();
}
}

View File

@ -1,87 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* A bucket used to store Contacts in the routing table.
*
* @author Joshua Kissoon
* @created 20140215
*/
public interface KademliaBucket
{
/**
* Adds a contact to the bucket
*
* @param c the new contact
*/
public void insert(Contact c);
/**
* Create a new contact and insert it into the bucket.
*
* @param n The node to create the contact from
*/
public void insert(Node n);
/**
* Checks if this bucket contain a contact
*
* @param c The contact to check for
*
* @return boolean
*/
public boolean containsContact(Contact c);
/**
* Checks if this bucket contain a node
*
* @param n The node to check for
*
* @return boolean
*/
public boolean containsNode(Node n);
/**
* Remove a contact from this bucket.
*
* If there are replacement contacts in the replacement cache,
* select the last seen one and put it into the bucket while removing the required contact.
*
* If there are no contacts in the replacement cache, then we just mark the contact requested to be removed as stale.
* Marking as stale would actually be incrementing the stale count of the contact.
*
* @param c The contact to remove
*
* @return Boolean whether the removal was successful.
*/
public boolean removeContact(Contact c);
/**
* Remove the contact object related to a node from this bucket
*
* @param n The node of the contact to remove
*
* @return Boolean whether the removal was successful.
*/
public boolean removeNode(Node n);
/**
* Counts the number of contacts in this bucket.
*
* @return Integer The number of contacts in this bucket
*/
public int numContacts();
/**
* @return Integer The depth of this bucket in the RoutingTable
*/
public int getDepth();
/**
* @return An Iterable structure with all contacts in this bucket
*/
public List<Contact> getContacts();
}

View File

@ -1,91 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.routing;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
import io.github.chronosx88.dhtBootstrap.kademlia.node.KademliaId;
/**
* Specification for Kademlia's Routing Table
*
* @author Joshua Kissoon
* @since 20140501
*/
public interface KademliaRoutingTable
{
/**
* Initialize the RoutingTable to it's default state
*/
public void initialize();
/**
* Sets the configuration file for this routing table
*
* @param config
*/
public void setConfiguration(KadConfiguration config);
/**
* Adds a contact to the routing table based on how far it is from the LocalNode.
*
* @param c The contact to add
*/
public void insert(Contact c);
/**
* Adds a node to the routing table based on how far it is from the LocalNode.
*
* @param n The node to add
*/
public void insert(Node n);
/**
* Compute the bucket ID in which a given node should be placed; the bucketId is computed based on how far the node is away from the Local Node.
*
* @param nid The NodeId for which we want to find which bucket it belong to
*
* @return Integer The bucket ID in which the given node should be placed.
*/
public int getBucketId(KademliaId nid);
/**
* Find the closest set of contacts to a given NodeId
*
* @param target The NodeId to find contacts close to
* @param numNodesRequired The number of contacts to find
*
* @return List A List of contacts closest to target
*/
public List<Node> findClosest(KademliaId target, int numNodesRequired);
/**
* @return List A List of all Nodes in this RoutingTable
*/
public List getAllNodes();
/**
* @return List A List of all Nodes in this RoutingTable
*/
public List getAllContacts();
/**
* @return Bucket[] The buckets in this Kad Instance
*/
public KademliaBucket[] getBuckets();
/**
* Method used by operations to notify the routing table of any contacts that have been unresponsive.
*
* @param contacts The set of unresponsive contacts
*/
public void setUnresponsiveContacts(List<Node> contacts);
/**
* Method used by operations to notify the routing table of any contacts that have been unresponsive.
*
* @param n
*/
public void setUnresponsiveContact(Node n);
}

View File

@ -1,100 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* A class that is used to calculate the hash of strings.
*
* @author Joshua Kissoon
* @since 20140405
*/
public class HashCalculator
{
/**
* Computes the SHA-1 Hash.
*
* @param toHash The string to hash
*
* @return byte[20] The hashed string
*
* @throws NoSuchAlgorithmException
*/
public static byte[] sha1Hash(String toHash) throws NoSuchAlgorithmException
{
/* Create a MessageDigest */
MessageDigest md = MessageDigest.getInstance("SHA-1");
/* Add password bytes to digest */
md.update(toHash.getBytes());
/* Get the hashed bytes */
return md.digest();
}
/**
* Computes the SHA-1 Hash using a Salt.
*
* @param toHash The string to hash
* @param salt A salt used to blind the hash
*
* @return byte[20] The hashed string
*
* @throws NoSuchAlgorithmException
*/
public static byte[] sha1Hash(String toHash, String salt) throws NoSuchAlgorithmException
{
/* Create a MessageDigest */
MessageDigest md = MessageDigest.getInstance("SHA-1");
/* Add password bytes to digest */
md.update(toHash.getBytes());
/* Get the hashed bytes */
return md.digest(salt.getBytes());
}
/**
* Computes the MD5 Hash.
*
* @param toHash The string to hash
*
* @return byte[16] The hashed string
*
* @throws NoSuchAlgorithmException
*/
public static byte[] md5Hash(String toHash) throws NoSuchAlgorithmException
{
/* Create a MessageDigest */
MessageDigest md = MessageDigest.getInstance("MD5");
/* Add password bytes to digest */
md.update(toHash.getBytes());
/* Get the hashed bytes */
return md.digest();
}
/**
* Computes the MD5 Hash using a salt.
*
* @param toHash The string to hash
* @param salt A salt used to blind the hash
*
* @return byte[16] The hashed string
*
* @throws NoSuchAlgorithmException
*/
public static byte[] md5Hash(String toHash, String salt) throws NoSuchAlgorithmException
{
/* Create a MessageDigest */
MessageDigest md = MessageDigest.getInstance("MD5");
/* Add password bytes to digest */
md.update(toHash.getBytes());
/* Get the hashed bytes */
return md.digest(salt.getBytes());
}
}

View File

@ -1,92 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util;
import java.util.Collection;
import java.util.HashMap;
import io.github.chronosx88.dhtBootstrap.kademlia.node.Node;
/**
* Class that helps compute the route length taken to complete an operation.
*
* Only used for routing operations - mainly the NodeLookup and ContentLookup Operations.
*
* Idea:
* - Add the original set of nodes with route length 0;
* - When we get a node reply with a set of nodes, we add those nodes and set the route length to their sender route length + 1
*
* @author Joshua Kissoon
* @since 20140510
*/
public class RouteLengthChecker
{
/* Store the nodes and their route length (RL) */
private final HashMap<Node, Integer> nodes;
/* Lets cache the max route length instead of having to go and search for it later */
private int maxRouteLength;
{
this.nodes = new HashMap<>();
this.maxRouteLength = 1;
}
/**
* Add the initial nodes in the routing operation
*
* @param initialNodes The set of initial nodes
*/
public void addInitialNodes(Collection<Node> initialNodes)
{
for (Node n : initialNodes)
{
this.nodes.put(n, 1);
}
}
/**
* Add any nodes that we get from a node reply.
*
* The route length of these nodes will be their sender + 1;
*
* @param inputSet The set of nodes we receive
* @param sender The node who send the set
*/
public void addNodes(Collection<Node> inputSet, Node sender)
{
if (!this.nodes.containsKey(sender))
{
return;
}
/* Get the route length of the input set - sender RL + 1 */
int inputSetRL = this.nodes.get(sender) + 1;
if (inputSetRL > this.maxRouteLength)
{
this.maxRouteLength = inputSetRL;
}
/* Add the nodes to our set */
for (Node n : inputSet)
{
/* We only add if the node is not already there... */
if (!this.nodes.containsKey(n))
{
this.nodes.put(n, inputSetRL);
}
}
}
/**
* Get the route length of the operation!
*
* It will be the max route length of all the nodes here.
*
* @return The route length
*/
public int getRouteLength()
{
return this.maxRouteLength;
}
}

View File

@ -1,95 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util.serializer;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.reflect.Type;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.DHT;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaDHT;
import io.github.chronosx88.dhtBootstrap.kademlia.dht.KademliaStorageEntryMetadata;
/**
* A KadSerializer that serializes DHT to JSON format
* The generic serializer is not working for DHT
*
* Why a DHT specific serializer?
* The DHT structure:
* - DHT
* -- StorageEntriesManager
* --- Map<NodeId, List<StorageEntry>>
* ---- NodeId:KeyBytes
* ---- List<StorageEntry>
* ----- StorageEntry: Key, OwnerId, Type, Hash
*
* The above structure seems to be causing some problem for Gson, especially at the Map part.
*
* Solution
* - Make the StorageEntriesManager transient
* - Simply store all StorageEntry in the serialized object
* - When reloading, re-add all StorageEntry to the DHT
*
* @author Joshua Kissoon
*
* @since 20140310
*/
public class JsonDHTSerializer implements KadSerializer<KademliaDHT>
{
private final Gson gson;
private final Type storageEntriesCollectionType;
{
gson = new Gson();
storageEntriesCollectionType = new TypeToken<List<KademliaStorageEntryMetadata>>()
{
}.getType();
}
@Override
public void write(KademliaDHT data, DataOutputStream out) throws IOException
{
try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
{
writer.beginArray();
/* Write the basic DHT */
gson.toJson(data, DHT.class, writer);
/* Now Store the Entries */
gson.toJson(data.getStorageEntries(), this.storageEntriesCollectionType, writer);
writer.endArray();
}
}
@Override
public KademliaDHT read(DataInputStream in) throws IOException, ClassNotFoundException
{
try (DataInputStream din = new DataInputStream(in);
JsonReader reader = new JsonReader(new InputStreamReader(in)))
{
reader.beginArray();
/* Read the basic DHT */
DHT dht = gson.fromJson(reader, DHT.class);
dht.initialize();
/* Now get the entries and add them back to the DHT */
List<KademliaStorageEntryMetadata> entries = gson.fromJson(reader, this.storageEntriesCollectionType);
dht.putStorageEntries(entries);
reader.endArray();
return dht;
}
}
}

View File

@ -1,112 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util.serializer;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.JKademliaRoutingTable;
import java.lang.reflect.Type;
import java.util.List;
import io.github.chronosx88.dhtBootstrap.kademlia.KadConfiguration;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.Contact;
import io.github.chronosx88.dhtBootstrap.kademlia.routing.KademliaRoutingTable;
/**
* A KadSerializer that serializes routing tables to JSON format
The generic serializer is not working for routing tables
Why a JKademliaRoutingTable specific serializer?
The routing table structure:
- JKademliaRoutingTable
-- Buckets[]
--- Map<NodeId, Node>
* ---- NodeId:KeyBytes
* ---- Node: NodeId, InetAddress, Port
*
* The above structure seems to be causing some problem for Gson,
* especially at the Map part.
*
* Solution
- Make the Buckets[] transient
- Simply store all Nodes in the serialized object
- When reloading, re-add all nodes to the JKademliaRoutingTable
*
* @author Joshua Kissoon
*
* @since 20140310
*/
public class JsonRoutingTableSerializer implements KadSerializer<KademliaRoutingTable>
{
private final Gson gson;
Type contactCollectionType = new TypeToken<List<Contact>>()
{
}.getType();
private final KadConfiguration config;
{
gson = new Gson();
}
/**
* Initialize the class
*
* @param config
*/
public JsonRoutingTableSerializer(KadConfiguration config)
{
this.config = config;
}
@Override
public void write(KademliaRoutingTable data, DataOutputStream out) throws IOException
{
try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
{
writer.beginArray();
/* Write the basic JKademliaRoutingTable */
gson.toJson(data, JKademliaRoutingTable.class, writer);
/* Now Store the Contacts */
gson.toJson(data.getAllContacts(), contactCollectionType, writer);
writer.endArray();
}
}
@Override
public KademliaRoutingTable read(DataInputStream in) throws IOException, ClassNotFoundException
{
try (DataInputStream din = new DataInputStream(in);
JsonReader reader = new JsonReader(new InputStreamReader(in)))
{
reader.beginArray();
/* Read the basic JKademliaRoutingTable */
KademliaRoutingTable tbl = gson.fromJson(reader, KademliaRoutingTable.class);
tbl.setConfiguration(config);
/* Now get the Contacts and add them back to the JKademliaRoutingTable */
List<Contact> contacts = gson.fromJson(reader, contactCollectionType);
tbl.initialize();
for (Contact c : contacts)
{
tbl.insert(c);
}
reader.endArray();
/* Read and return the Content*/
return tbl;
}
}
}

View File

@ -1,67 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util.serializer;
import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
/**
* A KadSerializer that serializes content to JSON format
*
* @param <T> The type of content to serialize
*
* @author Joshua Kissoon
*
* @since 20140225
*/
public class JsonSerializer<T> implements KadSerializer<T>
{
private final Gson gson;
{
gson = new Gson();
}
@Override
public void write(T data, DataOutputStream out) throws IOException
{
try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
{
writer.beginArray();
/* Store the content type */
gson.toJson(data.getClass().getName(), String.class, writer);
/* Now Store the content */
gson.toJson(data, data.getClass(), writer);
writer.endArray();
}
}
@Override
public T read(DataInputStream in) throws IOException, ClassNotFoundException
{
try (DataInputStream din = new DataInputStream(in);
JsonReader reader = new JsonReader(new InputStreamReader(in)))
{
reader.beginArray();
/* Read the class name */
String className = gson.fromJson(reader, String.class);
/* Read and return the Content*/
T ret = gson.fromJson(reader, Class.forName(className));
reader.endArray();
return ret;
}
}
}

View File

@ -1,41 +0,0 @@
package io.github.chronosx88.dhtBootstrap.kademlia.util.serializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* A Serializer is used to transform data to and from a specified form.
*
* Here we define the structure of any Serializer used in Kademlia
*
* @author Joshua Kissoon
* @param <T> The type of content being serialized
*
* @since 20140225
*/
public interface KadSerializer<T>
{
/**
* Write a KadContent to a DataOutput stream
*
* @param data The data to write
* @param out The output Stream to write to
*
* @throws IOException
*/
public void write(T data, DataOutputStream out) throws IOException;
/**
* Read data of type T from a DataInput Stream
*
* @param in The InputStream to read the data from
*
* @return T Data of type T
*
* @throws IOException
* @throws ClassNotFoundException
*/
public T read(DataInputStream in) throws IOException, ClassNotFoundException;
}