Initial Commit

This commit is contained in:
Joshua Kissoon 2014-02-19 02:07:07 +05:30
parent 7ccba43821
commit b3e1403872
31 changed files with 3252 additions and 0 deletions

73
build.xml Normal file
View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- You may freely edit this file. See commented blocks below for -->
<!-- some examples of how to customize the build. -->
<!-- (If you delete it and reopen the project it will be recreated.) -->
<!-- By default, only the Clean and Build commands use this build script. -->
<!-- Commands such as Run, Debug, and Test only use this build script if -->
<!-- the Compile on Save feature is turned off for the project. -->
<!-- You can turn off the Compile on Save (or Deploy on Save) setting -->
<!-- in the project's Project Properties dialog box.-->
<project name="Kademlia" default="default" basedir=".">
<description>Builds, tests, and runs the project Kademlia.</description>
<import file="nbproject/build-impl.xml"/>
<!--
There exist several targets which are by default empty and which can be
used for execution of your tasks. These targets are usually executed
before and after some main targets. They are:
-pre-init: called before initialization of project properties
-post-init: called after initialization of project properties
-pre-compile: called before javac compilation
-post-compile: called after javac compilation
-pre-compile-single: called before javac compilation of single file
-post-compile-single: called after javac compilation of single file
-pre-compile-test: called before javac compilation of JUnit tests
-post-compile-test: called after javac compilation of JUnit tests
-pre-compile-test-single: called before javac compilation of single JUnit test
-post-compile-test-single: called after javac compilation of single JUunit test
-pre-jar: called before JAR building
-post-jar: called after JAR building
-post-clean: called after cleaning build products
(Targets beginning with '-' are not intended to be called on their own.)
Example of inserting an obfuscator after compilation could look like this:
<target name="-post-compile">
<obfuscate>
<fileset dir="${build.classes.dir}"/>
</obfuscate>
</target>
For list of available properties check the imported
nbproject/build-impl.xml file.
Another way to customize the build is by overriding existing main targets.
The targets of interest are:
-init-macrodef-javac: defines macro for javac compilation
-init-macrodef-junit: defines macro for junit execution
-init-macrodef-debug: defines macro for class debugging
-init-macrodef-java: defines macro for class execution
-do-jar: JAR building
run: execution of project
-javadoc-build: Javadoc generation
test-report: JUnit report generation
An example of overriding the target for project execution could look like this:
<target name="run" depends="Kademlia-impl.jar">
<exec dir="bin" executable="launcher.exe">
<arg file="${dist.jar}"/>
</exec>
</target>
Notice that the overridden target depends on the jar target and not only on
the compile target as the regular run target does. Again, for a list of available
properties which you can use, check the target you are overriding in the
nbproject/build-impl.xml file.
-->
</project>

3
manifest.mf Normal file
View File

@ -0,0 +1,3 @@
Manifest-Version: 1.0
X-COMMENT: Main-Class will be added automatically by build

1407
nbproject/build-impl.xml Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,8 @@
build.xml.data.CRC32=7e563d6e
build.xml.script.CRC32=c3cd04bd
build.xml.stylesheet.CRC32=8064a381@1.68.1.46
# This file is used by a NetBeans-based IDE to track changes in generated files such as build-impl.xml.
# Do not edit this file. You may delete it but then the IDE will never regenerate such files for you.
nbproject/build-impl.xml.data.CRC32=7e563d6e
nbproject/build-impl.xml.script.CRC32=5c6dd9f7
nbproject/build-impl.xml.stylesheet.CRC32=5a01deb7@1.68.1.46

View File

@ -0,0 +1,77 @@
annotation.processing.enabled=true
annotation.processing.enabled.in.editor=false
annotation.processing.processors.list=
annotation.processing.run.all.processors=true
annotation.processing.source.output=${build.generated.sources.dir}/ap-source-output
application.title=Kademlia
application.vendor=Joshua
build.classes.dir=${build.dir}/classes
build.classes.excludes=**/*.java,**/*.form
# This directory is removed when the project is cleaned:
build.dir=build
build.generated.dir=${build.dir}/generated
build.generated.sources.dir=${build.dir}/generated-sources
# Only compile against the classpath explicitly listed here:
build.sysclasspath=ignore
build.test.classes.dir=${build.dir}/test/classes
build.test.results.dir=${build.dir}/test/results
# Uncomment to specify the preferred debugger connection transport:
#debug.transport=dt_socket
debug.classpath=\
${run.classpath}
debug.test.classpath=\
${run.test.classpath}
# Files in build.classes.dir which should be excluded from distribution jar
dist.archive.excludes=
# This directory is removed when the project is cleaned:
dist.dir=dist
dist.jar=${dist.dir}/Kademlia.jar
dist.javadoc.dir=${dist.dir}/javadoc
endorsed.classpath=
excludes=
file.reference.gson-2.2.4.jar=C:\\Users\\Joshua\\Documents\\NetBeansProjects\\Libraries\\gson-2.2.4.jar
includes=**
jar.compress=false
javac.classpath=\
${file.reference.gson-2.2.4.jar}
# Space-separated list of extra javac options
javac.compilerargs=
javac.deprecation=false
javac.processorpath=\
${javac.classpath}
javac.source=1.7
javac.target=1.7
javac.test.classpath=\
${javac.classpath}:\
${build.classes.dir}
javac.test.processorpath=\
${javac.test.classpath}
javadoc.additionalparam=
javadoc.author=false
javadoc.encoding=${source.encoding}
javadoc.noindex=false
javadoc.nonavbar=false
javadoc.notree=false
javadoc.private=false
javadoc.splitindex=true
javadoc.use=true
javadoc.version=false
javadoc.windowtitle=
main.class=kademlia.KademliaBasic
manifest.file=manifest.mf
meta.inf.dir=${src.dir}/META-INF
mkdist.disabled=false
platform.active=default_platform
run.classpath=\
${javac.classpath}:\
${build.classes.dir}
# Space-separated list of JVM arguments used when running the project.
# You may also define separate properties like run-sys-prop.name=value instead of -Dname=value.
# To set system properties for unit tests define test-sys-prop.name=value:
run.jvmargs=
run.test.classpath=\
${javac.test.classpath}:\
${build.test.classes.dir}
source.encoding=UTF-8
src.dir=src
test.src.dir=test

15
nbproject/project.xml Normal file
View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://www.netbeans.org/ns/project/1">
<type>org.netbeans.modules.java.j2seproject</type>
<configuration>
<data xmlns="http://www.netbeans.org/ns/j2se-project/3">
<name>Kademlia</name>
<source-roots>
<root id="src.dir"/>
</source-roots>
<test-roots>
<root id="test.src.dir"/>
</test-roots>
</data>
</configuration>
</project>

View File

@ -0,0 +1,24 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package kademlia;
/**
*
* @author Joshua
*/
public class KademliaBasic
{
/**
* @param args the command line arguments
*/
public static void main(String[] args)
{
// TODO code application logic here
}
}

View File

@ -0,0 +1,50 @@
package kademlia.core;
/**
* A set of Kademlia configuration parameters. Default values are
* supplied and can be changed by the application as necessary.
* */
public class Configuration
{
/**
* Interval in milliseconds between execution of RestoreOperations.
* */
public static long RESTORE_INTERVAL = 60 * 60 * 1000;
/**
* If no reply received from a node in this period (in milliseconds)
* consider the node unresponsive.
* */
public static long RESPONSE_TIMEOUT = 3000;
/**
* Maximum number of milliseconds for performing an operation.
* */
public static long OPERATION_TIMEOUT = 10000;
/**
* Maximum number of concurrent messages in transit.
* */
public static int CONCURRENCY = 2;
/**
* Log base exponent.
* */
public static int B = 2;
/**
* Bucket size.
* */
public static int K = 3;
/**
* Size of replacement cache.
* */
public static int RCSIZE = 3;
/**
* Number of times a node can be marked as stale before it is actually removed.
* */
public static int STALE = 1;
}

View File

@ -0,0 +1,245 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc This server handles sending and receiving messages
*/
package kademlia.core;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import kademlia.message.Message;
import kademlia.message.MessageFactory;
import kademlia.node.Node;
import kademlia.operation.Receiver;
public class KadServer
{
/* Constants */
private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB
/* Server Objects */
private final int udpPort;
private final DatagramSocket socket;
private boolean isRunning;
private final HashMap<Integer, Receiver> receivers;
private final Timer timer; // Schedule future tasks
/* Factories */
private final MessageFactory messageFactory;
{
isRunning = true;
}
public KadServer(int udpPort, MessageFactory mFactory) throws SocketException
{
this.udpPort = udpPort;
this.socket = new DatagramSocket(udpPort);
this.receivers = new HashMap<>();
this.timer = new Timer(true);
this.messageFactory = mFactory;
/* Start listening for incoming requests in a new thread */
this.startListener();
}
/**
* Starts the listener to listen for incoming messages
*/
private void startListener()
{
new Thread()
{
@Override
public void run()
{
listen();
}
}.start();
}
/**
* Sends a message
*
* @param msg The message to send
* @param to The node to send the message to
* @param recv The receiver to handle the response message
*
* @throws IOException
*/
public void sendMessage(Node to, Message msg, Receiver recv) throws IOException
{
if (!isRunning)
{
throw new IllegalStateException("Kad Server is not running.");
}
/* Generate a random communication ID */
int comm = new Integer(new Random().nextInt());
/* Setup the receiver to handle message response */
receivers.put(comm, recv);
timer.schedule(new TimeoutTask(comm, recv), Configuration.RESPONSE_TIMEOUT);
/* Send the message */
sendMessage(to, msg, comm);
}
public void reply(Node to, Message msg, int comm) throws IOException
{
if (!isRunning)
{
throw new IllegalStateException("Kad Server is not running.");
}
sendMessage(to, msg, comm);
}
private void sendMessage(Node to, Message msg, int comm) throws IOException
{
/* Setup the message for transmission */
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
dout.writeInt(comm);
dout.writeByte(msg.code());
msg.toStream(dout);
dout.close();
byte[] data = bout.toByteArray();
if (data.length > DATAGRAM_BUFFER_SIZE)
{
throw new IOException("MEssage is too big");
}
/* Everything is good, now create the packet and send it */
DatagramPacket pkt = new DatagramPacket(data, 0, data.length);
pkt.setSocketAddress(to.getSocketAddress());
socket.send(pkt);
}
/**
* Listen for incoming messages in a separate thread
*/
private void listen()
{
try
{
while (isRunning)
{
try
{
/* Wait for a packet */
byte[] buffer = new byte[DATAGRAM_BUFFER_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
/* We've received a packet, now handle it */
ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
DataInputStream din = new DataInputStream(bin);
/* Read in the conversation Id to know which handler to handle this response */
int comm = din.readInt();
byte messCode = din.readByte();
Message msg = messageFactory.createMessage(messCode, din);
din.close();
System.out.println("Message Received: " + msg);
System.out.println("Receivers: " + receivers);
/* Get a receiver for this message */
Receiver receiver;
if (this.receivers.containsKey(comm))
{
/* If there is a reciever in the receivers to handle this */
receiver = this.receivers.remove(comm);
}
else
{
/* There is currently no receivers, try to get one */
receiver = messageFactory.createReceiver(messCode, this);
}
/* Invoke the receiver */
if (receiver != null)
{
receiver.receive(msg, comm);
}
}
catch (IOException e)
{
this.isRunning = false;
e.printStackTrace();
}
}
}
finally
{
if (!socket.isClosed())
{
socket.close();
}
this.isRunning = false;
}
}
/**
* Remove a conversation receiver
*
* @param comm The id of this conversation
*/
private synchronized void unregister(int comm)
{
Integer key = new Integer(comm);
receivers.remove(key);
}
/**
* Task that gets called by a separate thread if a timeout for a receiver occurs.
* When a reply arrives this task must be cancelled using the <code>cancel()</code>
* method inherited from <code>TimerTask</code>. In this case the caller is
* responsible for removing the task from the <code>tasks</code> map.
* */
class TimeoutTask extends TimerTask
{
private final int comm;
private final Receiver recv;
public TimeoutTask(int comm, Receiver recv)
{
this.comm = comm;
this.recv = recv;
}
@Override
public void run()
{
try
{
unregister(comm);
recv.timeout(comm);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,105 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc The main Kademlia network management class
*/
package kademlia.core;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Timer;
import java.util.TimerTask;
import kademlia.exceptions.RoutingException;
import kademlia.message.MessageFactory;
import kademlia.node.Node;
import kademlia.node.NodeId;
import kademlia.operation.ConnectOperation;
import kademlia.operation.Operation;
public class Kademlia
{
/* Kademlia Attributes */
private final String name;
/* Objects to be used */
private final Node localNode;
private final KadServer server;
private final Timer timer;
/* Factories */
private final MessageFactory messageFactory;
/**
* Creates a Kademlia DistributedMap using the specified name as filename base.
* If the id cannot be read from disk the specified defaultId is used.
* The instance is bootstraped to an existing network by specifying the
* address of a bootstrap node in the network.
*
* @param name The Name of this node used for storage
* @param defaultId Default id for the node
* @param udpPort The UDP port to use for routing messages
*
* @throws IOException If an error occurred while reading id or local map
* from disk <i>or</i> a network error occurred while
* attempting to connect to the network
* */
public Kademlia(String name, NodeId defaultId, int udpPort) throws IOException
{
this.name = name;
this.localNode = new Node(defaultId, InetAddress.getLocalHost(), udpPort);
this.messageFactory = new MessageFactory(localNode);
this.server = new KadServer(udpPort, this.messageFactory);
this.timer = new Timer(true);
/* Schedule Recurring RestoreOperation */
timer.schedule(
new TimerTask()
{
@Override
public void run()
{
/**
* @todo Create Operation that
* Refreshes all buckets and sends HashMessages to all nodes that are
* among the K closest to mappings stored at this node. Also deletes any
* mappings that this node is no longer among the K closest to.
* */
}
},
// Delay // Interval
Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL
);
}
/**
* @return Node The local node for this system
*/
public Node getNode()
{
return this.localNode;
}
/**
* @return The KadServer used to send/receive messages
*/
public KadServer getServer()
{
return this.server;
}
/**
* Connect to an existing peer-to-peer network.
*
* @param n The known node in the peer-to-peer network
*
* @throws RoutingException If the bootstrap node could not be contacted
* @throws IOException If a network error occurred
* @throws IllegalStateException If this object is closed
* */
public final void connect(Node n) throws IOException, RoutingException
{
Operation op = new ConnectOperation(this.server, this.localNode, n);
op.execute();
}
}

View File

@ -0,0 +1,22 @@
/**
* @author Joshua Kissoon
* @created 20140219
* @desc An exception to be thrown whenever there is a routing problem
*/
package kademlia.exceptions;
import java.io.IOException;
public class RoutingException extends IOException
{
public RoutingException()
{
super();
}
public RoutingException(String message)
{
super(message);
}
}

View File

@ -0,0 +1,57 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc A message used to acknowledge a request from a node
*/
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import kademlia.node.Node;
public class AcknowledgeMessage implements Message
{
private Node origin;
public static final byte CODE = 0x10;
public AcknowledgeMessage(Node origin)
{
this.origin = origin;
}
public AcknowledgeMessage(DataInput in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInput in) throws IOException
{
this.origin = new Node(in);
}
@Override
public void toStream(DataOutput out) throws IOException
{
origin.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "AcknowledgeMessage[origin=" + origin.getNodeId() + "]";
}
}

View File

@ -0,0 +1,57 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc A message used to connect nodes
*/
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import kademlia.node.Node;
public class ConnectMessage implements Message
{
private Node origin;
public static final byte CODE = 0x11;
public ConnectMessage(Node origin)
{
this.origin = origin;
}
public ConnectMessage(DataInput in) throws IOException
{
this.fromStream(in);
}
@Override
public final void fromStream(DataInput in) throws IOException
{
this.origin = new Node(in);
}
@Override
public void toStream(DataOutput out) throws IOException
{
origin.toStream(out);
}
public Node getOrigin()
{
return this.origin;
}
@Override
public byte code()
{
return CODE;
}
@Override
public String toString()
{
return "ConnectMessage[origin NodeId=" + origin.getNodeId() + "]";
}
}

View File

@ -0,0 +1,58 @@
/**
* @author Joshua Kissoon
* @created 20140219
* @desc Receives a ConnectMessage and sends an AcknowledgeMessage as reply
*/
package kademlia.message;
import java.io.IOException;
import kademlia.core.KadServer;
import kademlia.node.Node;
import kademlia.operation.Receiver;
public class ConnectReceiver implements Receiver
{
private final KadServer server;
private final Node localNode;
public ConnectReceiver(KadServer server, Node local)
{
this.server = server;
this.localNode = local;
}
/**
* Handle receiving a ConnectMessage
*
* @param comm
*
* @throws java.io.IOException
*/
@Override
public void receive(Message incoming, int comm) throws IOException
{
ConnectMessage mess = (ConnectMessage) incoming;
/* Update the local space by inserting the origin node. */
this.localNode.getRoutingTable().insert(mess.getOrigin());
/* Respond to the connect request */
AcknowledgeMessage msg = new AcknowledgeMessage(this.localNode);
/* Reply to the connect message with an Acknowledgement */
this.server.reply(mess.getOrigin(), msg, comm);
}
/**
* We don't need to do anything here
*
* @param comm
*
* @throws java.io.IOException
*/
@Override
public void timeout(int comm) throws IOException
{
}
}

View File

@ -0,0 +1,10 @@
package kademlia.message;
public interface Message extends Streamable {
/**
* The unique code for the message type, used to differentiate all messages
* from each other. Since this is of <code>byte</code> type there can
* be at most 256 different message types.
**/
public byte code();
}

View File

@ -0,0 +1,49 @@
/**
* @author Joshua
* @created
* @desc
*/
package kademlia.message;
import java.io.DataInput;
import java.io.IOException;
import kademlia.core.KadServer;
import kademlia.node.Node;
import kademlia.operation.Receiver;
public class MessageFactory
{
private final Node localNode;
public MessageFactory(Node local)
{
this.localNode = local;
}
public Message createMessage(byte code, DataInput in) throws IOException
{
switch (code)
{
default:
case SimpleMessage.CODE:
return new SimpleMessage(in);
case ConnectMessage.CODE:
return new ConnectMessage(in);
case AcknowledgeMessage.CODE:
return new AcknowledgeMessage(in);
}
}
public Receiver createReceiver(byte code, KadServer server)
{
switch (code)
{
default:
case SimpleMessage.CODE:
return new SimpleReceiver();
case ConnectMessage.CODE:
return new ConnectReceiver(server, this.localNode);
}
}
}

View File

@ -0,0 +1,72 @@
/**
* @author Joshua Kissoon
* @created 20140217
* @desc A simple message used for testing the system
*/
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SimpleMessage implements Message
{
/* Message constants */
public static final byte CODE = 0x01;
private String content;
public SimpleMessage(String message)
{
this.content = message;
}
public SimpleMessage(DataInput in)
{
System.out.println("Creating message from input stream.");
this.fromStream(in);
}
@Override
public byte code()
{
return CODE;
}
@Override
public void toStream(DataOutput out)
{
try
{
out.writeInt(this.content.length());
out.writeBytes(this.content);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public final void fromStream(DataInput in)
{
try
{
byte[] buff = new byte[in.readInt()];
in.readFully(buff);
this.content = new String(buff);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public String toString()
{
return this.content;
}
}

View File

@ -0,0 +1,26 @@
/**
* @author Joshua
* @created
* @desc
*/
package kademlia.message;
import java.io.IOException;
import kademlia.message.Message;
import kademlia.operation.Receiver;
public class SimpleReceiver implements Receiver
{
@Override
public void receive(Message incoming, int conversationId)
{
System.out.println("Received message: " + incoming);
}
@Override
public void timeout(int conversationId) throws IOException
{
System.out.println("");
}
}

View File

@ -0,0 +1,32 @@
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* A Streamable object is able to write it's state to an output stream and
* a class implementing Streamable must be able to recreate an instance of
* the class from an input stream. No information about class name is written
* to the output stream so it must be known what class type is expected when
* reading objects back in from an input stream. This gives a space
* advantage over Serializable.
* <p>
* Since the exact class must be known anyway prior to reading, it is incouraged
* that classes implementing Streamble also provide a constructor of the form:
* <p>
* <code>Streamable(DataInput in) throws IOException;</code>
**/
public interface Streamable {
/**
* Writes the internal state of the Streamable object to the output stream
* in a format that can later be read by the same Streamble class using
* the {@link #fromStream} method.
**/
public void toStream(DataOutput out) throws IOException;
/**
* Reads the internal state of the Streamable object from the input stream.
**/
public void fromStream(DataInput out) throws IOException;
}

118
src/kademlia/node/Node.java Normal file
View File

@ -0,0 +1,118 @@
/**
* @author Joshua
* @created
* @desc
*/
package kademlia.node;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import kademlia.message.Streamable;
import kademlia.routing.RoutingTable;
public class Node implements Streamable
{
private NodeId nodeId;
private InetAddress inetAddress;
private int port;
private final RoutingTable routingTable;
{
this.routingTable = new RoutingTable(this);
}
public Node(NodeId nid, InetAddress ip, int port)
{
this.nodeId = nid;
this.inetAddress = ip;
this.port = port;
}
/**
* Load the Node's data from a DataInput stream
*
* @param in
*
* @throws IOException
*/
public Node(DataInput in) throws IOException
{
this.fromStream(in);
}
/**
* Set the InetAddress of this node
*
* @param addr The new InetAddress of this node
*/
public void setInetAddress(InetAddress addr)
{
this.inetAddress = addr;
}
/**
* @return The NodeId object of this node
*/
public NodeId getNodeId()
{
return this.nodeId;
}
/**
* Creates a SocketAddress for this node
*
* @return
*/
public SocketAddress getSocketAddress()
{
return new InetSocketAddress(this.inetAddress, this.port);
}
@Override
public void toStream(DataOutput out) throws IOException
{
/* Add the NodeId to the stream */
this.nodeId.toStream(out);
/* Add the Node's IP address to the stream */
byte[] a = inetAddress.getAddress();
if (a.length != 4)
{
throw new RuntimeException("Expected InetAddress of 4 bytes, got " + a.length);
}
out.write(a);
/* Add the port to the stream */
out.writeInt(port);
}
@Override
public final void fromStream(DataInput in) throws IOException
{
/* Load the NodeId */
this.nodeId = new NodeId(in);
/* Load the IP Address */
byte[] ip = new byte[4];
in.readFully(ip);
this.inetAddress = InetAddress.getByAddress(ip);
/* Read in the port */
this.port = in.readInt();
}
/**
* @return The RoutingTable of this Node
*/
public RoutingTable getRoutingTable()
{
return this.routingTable;
}
}

View File

@ -0,0 +1,165 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc Represents a Kademlia Node ID
*/
package kademlia.node;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import kademlia.message.Streamable;
public class NodeId implements Streamable
{
public final static int ID_LENGTH = 160;
private byte[] keyBytes;
/**
* Construct the NodeId from some string
*
* @param data The user generated key string
*
* @todo Throw an exception if the key is too short or too long
*/
public NodeId(String data)
{
keyBytes = data.getBytes();
}
/**
* Generate a random key
*/
public NodeId()
{
keyBytes = new byte[ID_LENGTH];
new Random().nextBytes(keyBytes);
}
public NodeId(byte[] bytes)
{
this.keyBytes = bytes;
}
/**
* Load the NodeId from a DataInput stream
*
* @param in The stream from which to load the NodeId
*
* @throws IOException
*/
public NodeId(DataInput in) throws IOException
{
this.fromStream(in);
}
public byte[] getBytes()
{
return this.keyBytes;
}
/**
* Compares a NodeId to this NodeId
*
* @param nid The NodeId to compare to this NodeId
*
* @return boolean Whether the 2 NodeIds are equal
*/
public boolean equals(NodeId nid)
{
return Arrays.equals(keyBytes, nid.getBytes());
}
/**
* Checks if a given NodeId is less than this NodeId
*
* @param nid The NodeId to compare to this NodeId
*
* @return boolean Whether the given NodeId is less than this NodeId
*/
public boolean lessThan(NodeId nid)
{
byte[] nidBytes = nid.getBytes();
for (int i = 0; i < ID_LENGTH; i++)
{
if (this.keyBytes[i] != nidBytes[i])
{
return this.keyBytes[i] < nidBytes[i];
}
}
/* We got here means they're equal */
return false;
}
/**
* Checks the distance between this and another NodeId
*
* @param nid
*
* @return The distance of this NodeId from the given NodeId
*/
public NodeId xor(NodeId nid)
{
byte[] result = new byte[ID_LENGTH];
byte[] nidBytes = nid.getBytes();
for (int i = 0; i < ID_LENGTH / 8; i++)
{
result[i] = (byte) (this.keyBytes[i] ^ nidBytes[i]);
}
return new NodeId(result);
}
/**
* Checks the number of leading 0's in this NodeId
*
* @return int The number of leading 0's
*/
public int prefixLength()
{
int prefixLength = 0;
for (byte b : this.keyBytes)
{
if (b == 0)
{
prefixLength++;
}
else
{
break;
}
}
return prefixLength;
}
@Override
public void toStream(DataOutput out) throws IOException
{
/* Add the NodeId to the stream */
out.write(this.getBytes());
}
@Override
public void fromStream(DataInput in) throws IOException
{
byte[] input = new byte[ID_LENGTH / 8];
in.readFully(input);
this.keyBytes = input;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("NodeId: ");
sb.append(new String(this.keyBytes));
return sb.toString();
}
}

View File

@ -0,0 +1,122 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc Operation that handles connecting to an existing Kademlia network using a bootstrap node
*/
package kademlia.operation;
import java.io.IOException;
import kademlia.core.Configuration;
import kademlia.core.KadServer;
import kademlia.exceptions.RoutingException;
import kademlia.message.AcknowledgeMessage;
import kademlia.message.ConnectMessage;
import kademlia.message.Message;
import kademlia.node.Node;
public class ConnectOperation implements Operation, Receiver
{
public static final int MAX_CONNECT_ATTEMPTS = 5; // Try 5 times to connect to a node
private final KadServer server;
private final Node localNode;
private final Node bootstrapNode;
private boolean error;
private int attempts;
/**
* @param server The message server used to send/receive messages
* @param local The local node
* @param bootstrap Node to use to bootstrap the local node onto the network
*/
public ConnectOperation(KadServer server, Node local, Node bootstrap)
{
this.server = server;
this.localNode = local;
this.bootstrapNode = bootstrap;
}
/**
* @return null
*/
@Override
public synchronized Object execute()
{
try
{
/* Contact the bootstrap node */
this.error = true;
this.attempts = 0;
Message m = new ConnectMessage(this.localNode);
/* Send a connect message to the bootstrap node */
server.sendMessage(this.bootstrapNode, m, this);
/* Wait for a while */
wait(Configuration.OPERATION_TIMEOUT);
if (error)
{
/* Means the contact failed */
throw new RoutingException("Bootstrap node did not respond: " + bootstrapNode);
}
/* @todo Perform lookup for our own ID to get nodes close to us */
/* @todo Refresh buckets to get a good routing table */
return null;
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
return null;
}
/**
* Receives an AcknowledgeMessage from the bootstrap node.
*
* @param comm
*/
@Override
public synchronized void receive(Message incoming, int comm)
{
/* The incoming message will be an acknowledgement message */
AcknowledgeMessage msg = (AcknowledgeMessage) incoming;
System.out.println("ConnectOperation now handling Acknowledgement Message: " + msg);
/* The bootstrap node has responded, insert it into our space */
this.localNode.getRoutingTable().insert(this.bootstrapNode);
/* We got a response, so the error is false */
error = false;
/* Wake up any waiting thread */
notify();
}
/**
* Resends a ConnectMessage to the boot strap node a maximum of MAX_ATTEMPTS
* times.
*
* @param comm
*
* @throws java.io.IOException
*/
@Override
public synchronized void timeout(int comm) throws IOException
{
if (++this.attempts < MAX_CONNECT_ATTEMPTS)
{
this.server.sendMessage(this.bootstrapNode, new ConnectMessage(this.localNode), this);
}
else
{
/* We just exit, so notify all other threads that are possibly waiting */
notify();
}
}
}

View File

@ -0,0 +1,17 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc Interface for different Kademlia operations
*/
package kademlia.operation;
public interface Operation
{
/**
* Starts an operation and returns when the operation is finished
*
* @return The return value can differ per operation
*/
public Object execute();
}

View File

@ -0,0 +1,36 @@
///**
// * @author Joshua Kissoon
// * @created 20140218
// * @desc Implementation of the Kademlia Ping operation
// */
//package kademlia.operation;
//
//import kademlia.core.KadServer;
//import kademlia.node.Node;
//
//public class PingOperation implements Operation
//{
//
// private final KadServer server;
// private final Node localNode;
// private final Node toPing;
//
// /**
// * @param server The Kademlia server used to send & receive messages
// * @param local The local node
// * @param toPing The node to send the ping message to
// */
// public PingOperation(KadServer server, Node local, Node toPing)
// {
// this.server = server;
// this.localNode = local;
// this.toPing = toPing;
// }
//
// @Override
// public Object execute()
// {
// /* @todo Create a pingmessage and send this message to the toPing node,
// then handle the reply from this node using a reciever */
// }
//}

View File

@ -0,0 +1,31 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc A receiver waits for incoming messages and perform some action when the message is received
*/
package kademlia.operation;
import java.io.IOException;
import kademlia.message.Message;
public interface Receiver
{
/**
* Message is received, now handle it
*
* @param conversationId The ID of this conversation, used for further conversations
* @param incoming The incoming
*/
public void receive(Message incoming, int conversationId) throws IOException;
/**
* If no reply is received in <code>MessageServer.TIMEOUT</code> seconds for the
* message with communication id <code>comm</code>, the MessageServer calls this method
*
* @param conversationId The conversation ID of this communication
*
* @throws IOException if an I/O error occurs
* */
public void timeout(int conversationId) throws IOException;
}

View File

@ -0,0 +1,27 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc A bucket for the DHT Protocol
*/
package kademlia.routing;
import kademlia.node.Node;
public interface Bucket
{
/**
* Adds a new node to the bucket
*
* @param n the new node
*/
public void insert(Node n);
/**
* Marks a node as dead: the dead node will be replace if
* insert was invoked
*
* @param n the dead node
*/
public void markDead(Node n);
}

View File

@ -0,0 +1,82 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc A bucket in the Kademlia routing table
*/
package kademlia.routing;
import java.util.ArrayList;
import kademlia.node.Node;
public class KadBucket implements Bucket
{
private final int depth;
private final ArrayList<Node> nodes;
{
nodes = new ArrayList<>();
}
/**
* @param depth How deep in the routing tree is this bucket
*/
public KadBucket(int depth)
{
this.depth = depth;
}
@Override
public void insert(Node n)
{
/*@todo Check if the bucket is filled already and handle this */
/* Check if the contact is already in the bucket */
if (this.nodes.contains(n))
{
/* @todo If it is, then move it to the front */
/* @todo Possibly use a doubly linked list instead of an ArrayList */
}
else
{
nodes.add(n);
}
}
public int numNodes()
{
return this.nodes.size();
}
public int getDepth()
{
return this.depth;
}
@Override
public void markDead(Node n)
{
this.nodes.remove(n);
}
public ArrayList<Node> getContacts()
{
return this.nodes;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("Printing bucket at depth: ");
sb.append(this.depth);
sb.append("\n Nodes: \n");
for (Node n : this.nodes)
{
sb.append("Node: ");
sb.append(n.getNodeId().toString());
sb.append("\n");
}
return sb.toString();
}
}

View File

@ -0,0 +1,146 @@
/**
* @author Joshua Kissoon
* @created 20140215
* @desc Implementation of a Kademlia routing table
*/
package kademlia.routing;
import java.util.ArrayList;
import kademlia.node.Node;
import kademlia.node.NodeId;
public class RoutingTable
{
private final Node node; // The current node
private final KadBucket[] buckets;
{
buckets = new KadBucket[NodeId.ID_LENGTH]; // 160 buckets; 1 for each level in the tree
}
public RoutingTable(Node node)
{
this.node = node;
/* Initialize all of the buckets to a specific depth */
for (int i = 0; i < NodeId.ID_LENGTH; i++)
{
buckets[i] = new KadBucket(i);
}
}
/**
* Adds a new contact to the routing table
*
* @param n The contact to add
*/
public void insert(Node n)
{
/* Find the prefix length of how far this node is away from the contact node */
int prefixLength = this.node.getNodeId().xor(n.getNodeId()).prefixLength();
/* Put this contact to the bucket that stores contacts prefixLength distance away */
this.buckets[prefixLength].insert(n);
}
/**
* Find the closest set of contacts to a given NodeId
*
* @param target The NodeId to find contacts close to
* @param num The number of contacts to find
*
* @return ArrayList<Contact> An ArrayList of num contacts closest to target
*/
public ArrayList<Node> findClosest(NodeId target, int num)
{
ArrayList<Node> closest = new ArrayList<>(num);
/* Get the bucket number to search for closest from */
int bucketNumber = this.node.getNodeId().xor(target).prefixLength();
/* Add the contacts from this bucket to the return contacts */
for (Node c : this.buckets[bucketNumber].getContacts())
{
if (closest.size() < num)
{
closest.add(c);
}
else
{
break;
}
}
if (closest.size() >= num)
{
return closest;
}
/* If we still need more nodes, we add from buckets on either side of the closest bucket */
for (int i = 1; ((bucketNumber - i) >= 0 || (bucketNumber + i) < NodeId.ID_LENGTH); i++)
{
/* Check the bucket on the left side */
if (bucketNumber - i > 0)
{
for (Node c : this.buckets[bucketNumber - i].getContacts())
{
if (closest.size() < num)
{
closest.add(c);
}
else
{
break;
}
}
}
/* Check the bucket on the right side */
if (bucketNumber + i < NodeId.ID_LENGTH)
{
for (Node c : this.buckets[bucketNumber + i].getContacts())
{
if (closest.size() < num)
{
closest.add(c);
}
else
{
break;
}
}
}
/* If we have enough contacts, then stop adding */
if (closest.size() >= num)
{
break;
}
}
return closest;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder("\nPrinting Routing Table Started ***************** \n");
for (KadBucket b : this.buckets)
{
if (b.numNodes() > 0)
{
sb.append("# nodes in Bucket with depth ");
sb.append(b.getDepth());
sb.append(": ");
sb.append(b.numNodes());
sb.append("\n");
}
}
sb.append("\nPrinting Routing Table Ended ******************** ");
return sb.toString();
}
}

View File

@ -0,0 +1,40 @@
/**
* @author Joshua Kissoon
* @created 20140219
* @desc Testing connecting 2 nodes
*/
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Kademlia;
import kademlia.node.NodeId;
public class NodeConnectionTest
{
public static void main(String[] args)
{
try
{
/* Setting up 2 Kad networks */
Kademlia kad1 = new Kademlia("Joshua", new NodeId("12345678901234567890"), 7574);
System.out.println("Kad 1 Before: ");
System.out.println(kad1.getNode().getRoutingTable());
Kademlia kad2 = new Kademlia("Crystal", new NodeId("12345678901234567891"), 7572);
System.out.println("Kad 2 Before: ");
System.out.println(kad2.getNode().getRoutingTable());
/* Connecting 2 to 1 */
kad1.connect(kad2.getNode());
System.out.println("Kad 1 After: ");
System.out.println(kad1.getNode().getRoutingTable());
System.out.println("Kad 2 After: ");
System.out.println(kad2.getNode().getRoutingTable());
}
catch (IOException e)
{
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,31 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc Testing a simple send message
*/
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Kademlia;
import kademlia.message.SimpleMessage;
import kademlia.node.NodeId;
import kademlia.message.SimpleReceiver;
public class SimpleMessageTest
{
public static void main(String[] args)
{
try
{
Kademlia kad1 = new Kademlia("Joshua", new NodeId("12345678901234567890"), 7574);
Kademlia kad2 = new Kademlia("Crystal", new NodeId("12345678901234567891"), 7572);
kad1.getServer().sendMessage(kad2.getNode(), new SimpleMessage("Some Message"), new SimpleReceiver());
}
catch (IOException e)
{
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,47 @@
/**
* @author Joshua Kissoon
* @created 20140218
* @desc Serializes a message into a json message
*/
package kademlia.util;
import com.google.gson.Gson;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import kademlia.message.Message;
public class JsonSerializer
{
private final Gson gson;
public JsonSerializer()
{
this.gson = new Gson();
}
/**
* Writes a message to an output stream
*
* @param msg The message to write
* @param out The output stream to write the message to
*/
public void write(Message msg, OutputStream out)
{
try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
{
writer.beginArray();
this.gson.toJson(msg, msg.getClass(), writer);
writer.endArray();
}
catch (IOException e)
{
}
}
}