diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..4167536 --- /dev/null +++ b/build.xml @@ -0,0 +1,73 @@ + + + + + + + + + + + Builds, tests, and runs the project Kademlia. + + + diff --git a/manifest.mf b/manifest.mf new file mode 100644 index 0000000..1574df4 --- /dev/null +++ b/manifest.mf @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +X-COMMENT: Main-Class will be added automatically by build + diff --git a/nbproject/build-impl.xml b/nbproject/build-impl.xml new file mode 100644 index 0000000..ed7498d --- /dev/null +++ b/nbproject/build-impl.xml @@ -0,0 +1,1407 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set src.dir + Must set test.src.dir + Must set build.dir + Must set dist.dir + Must set build.classes.dir + Must set dist.javadoc.dir + Must set build.test.classes.dir + Must set build.test.results.dir + Must set build.classes.excludes + Must set dist.jar + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + No tests executed. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set JVM to use for profiling in profiler.info.jvm + Must set profiler agent JVM arguments in profiler.info.jvmargs.agent + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + To run this application from the command line without Ant, try: + + java -jar "${dist.jar.resolved}" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set run.class + + + + Must select one file in the IDE or set run.class + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set debug.class + + + + + Must select one file in the IDE or set debug.class + + + + + Must set fix.includes + + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + Must select one file in the IDE or set profile.class + This target only works when run from inside the NetBeans IDE. + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set run.class + + + + + + Must select some files in the IDE or set test.includes + + + + + Must select one file in the IDE or set run.class + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + + + Some tests failed; see details above. + + + + + + + + + Must select some files in the IDE or set test.includes + + + + Some tests failed; see details above. + + + + Must select some files in the IDE or set test.class + Must select some method in the IDE or set test.method + + + + Some tests failed; see details above. + + + + + Must select one file in the IDE or set test.class + + + + Must select one file in the IDE or set test.class + Must select some method in the IDE or set test.method + + + + + + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/nbproject/genfiles.properties b/nbproject/genfiles.properties new file mode 100644 index 0000000..2dd29bc --- /dev/null +++ b/nbproject/genfiles.properties @@ -0,0 +1,8 @@ +build.xml.data.CRC32=7e563d6e +build.xml.script.CRC32=c3cd04bd +build.xml.stylesheet.CRC32=8064a381@1.68.1.46 +# This file is used by a NetBeans-based IDE to track changes in generated files such as build-impl.xml. +# Do not edit this file. You may delete it but then the IDE will never regenerate such files for you. +nbproject/build-impl.xml.data.CRC32=7e563d6e +nbproject/build-impl.xml.script.CRC32=5c6dd9f7 +nbproject/build-impl.xml.stylesheet.CRC32=5a01deb7@1.68.1.46 diff --git a/nbproject/project.properties b/nbproject/project.properties new file mode 100644 index 0000000..8976072 --- /dev/null +++ b/nbproject/project.properties @@ -0,0 +1,77 @@ +annotation.processing.enabled=true +annotation.processing.enabled.in.editor=false +annotation.processing.processors.list= +annotation.processing.run.all.processors=true +annotation.processing.source.output=${build.generated.sources.dir}/ap-source-output +application.title=Kademlia +application.vendor=Joshua +build.classes.dir=${build.dir}/classes +build.classes.excludes=**/*.java,**/*.form +# This directory is removed when the project is cleaned: +build.dir=build +build.generated.dir=${build.dir}/generated +build.generated.sources.dir=${build.dir}/generated-sources +# Only compile against the classpath explicitly listed here: +build.sysclasspath=ignore +build.test.classes.dir=${build.dir}/test/classes +build.test.results.dir=${build.dir}/test/results +# Uncomment to specify the preferred debugger connection transport: +#debug.transport=dt_socket +debug.classpath=\ + ${run.classpath} +debug.test.classpath=\ + ${run.test.classpath} +# Files in build.classes.dir which should be excluded from distribution jar +dist.archive.excludes= +# This directory is removed when the project is cleaned: +dist.dir=dist +dist.jar=${dist.dir}/Kademlia.jar +dist.javadoc.dir=${dist.dir}/javadoc +endorsed.classpath= +excludes= +file.reference.gson-2.2.4.jar=C:\\Users\\Joshua\\Documents\\NetBeansProjects\\Libraries\\gson-2.2.4.jar +includes=** +jar.compress=false +javac.classpath=\ + ${file.reference.gson-2.2.4.jar} +# Space-separated list of extra javac options +javac.compilerargs= +javac.deprecation=false +javac.processorpath=\ + ${javac.classpath} +javac.source=1.7 +javac.target=1.7 +javac.test.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir} +javac.test.processorpath=\ + ${javac.test.classpath} +javadoc.additionalparam= +javadoc.author=false +javadoc.encoding=${source.encoding} +javadoc.noindex=false +javadoc.nonavbar=false +javadoc.notree=false +javadoc.private=false +javadoc.splitindex=true +javadoc.use=true +javadoc.version=false +javadoc.windowtitle= +main.class=kademlia.KademliaBasic +manifest.file=manifest.mf +meta.inf.dir=${src.dir}/META-INF +mkdist.disabled=false +platform.active=default_platform +run.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir} +# Space-separated list of JVM arguments used when running the project. +# You may also define separate properties like run-sys-prop.name=value instead of -Dname=value. +# To set system properties for unit tests define test-sys-prop.name=value: +run.jvmargs= +run.test.classpath=\ + ${javac.test.classpath}:\ + ${build.test.classes.dir} +source.encoding=UTF-8 +src.dir=src +test.src.dir=test diff --git a/nbproject/project.xml b/nbproject/project.xml new file mode 100644 index 0000000..29e8f9a --- /dev/null +++ b/nbproject/project.xml @@ -0,0 +1,15 @@ + + + org.netbeans.modules.java.j2seproject + + + Kademlia + + + + + + + + + diff --git a/src/kademlia/KademliaBasic.java b/src/kademlia/KademliaBasic.java new file mode 100644 index 0000000..f039034 --- /dev/null +++ b/src/kademlia/KademliaBasic.java @@ -0,0 +1,24 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package kademlia; + +/** + * + * @author Joshua + */ +public class KademliaBasic +{ + + /** + * @param args the command line arguments + */ + public static void main(String[] args) + { + // TODO code application logic here + } + +} diff --git a/src/kademlia/core/Configuration.java b/src/kademlia/core/Configuration.java new file mode 100644 index 0000000..d04da07 --- /dev/null +++ b/src/kademlia/core/Configuration.java @@ -0,0 +1,50 @@ +package kademlia.core; + +/** + * A set of Kademlia configuration parameters. Default values are + * supplied and can be changed by the application as necessary. + * */ +public class Configuration +{ + + /** + * Interval in milliseconds between execution of RestoreOperations. + * */ + public static long RESTORE_INTERVAL = 60 * 60 * 1000; + + /** + * If no reply received from a node in this period (in milliseconds) + * consider the node unresponsive. + * */ + public static long RESPONSE_TIMEOUT = 3000; + + /** + * Maximum number of milliseconds for performing an operation. + * */ + public static long OPERATION_TIMEOUT = 10000; + + /** + * Maximum number of concurrent messages in transit. + * */ + public static int CONCURRENCY = 2; + + /** + * Log base exponent. + * */ + public static int B = 2; + + /** + * Bucket size. + * */ + public static int K = 3; + + /** + * Size of replacement cache. + * */ + public static int RCSIZE = 3; + + /** + * Number of times a node can be marked as stale before it is actually removed. + * */ + public static int STALE = 1; +} diff --git a/src/kademlia/core/KadServer.java b/src/kademlia/core/KadServer.java new file mode 100644 index 0000000..75be02c --- /dev/null +++ b/src/kademlia/core/KadServer.java @@ -0,0 +1,245 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc This server handles sending and receiving messages + */ +package kademlia.core; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.util.HashMap; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import kademlia.message.Message; +import kademlia.message.MessageFactory; +import kademlia.node.Node; +import kademlia.operation.Receiver; + +public class KadServer +{ + + /* Constants */ + private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB + + /* Server Objects */ + private final int udpPort; + private final DatagramSocket socket; + private boolean isRunning; + private final HashMap receivers; + private final Timer timer; // Schedule future tasks + + /* Factories */ + private final MessageFactory messageFactory; + + + { + isRunning = true; + } + + public KadServer(int udpPort, MessageFactory mFactory) throws SocketException + { + this.udpPort = udpPort; + this.socket = new DatagramSocket(udpPort); + this.receivers = new HashMap<>(); + this.timer = new Timer(true); + + this.messageFactory = mFactory; + + /* Start listening for incoming requests in a new thread */ + this.startListener(); + } + + /** + * Starts the listener to listen for incoming messages + */ + private void startListener() + { + new Thread() + { + @Override + public void run() + { + listen(); + } + }.start(); + } + + /** + * Sends a message + * + * @param msg The message to send + * @param to The node to send the message to + * @param recv The receiver to handle the response message + * + * @throws IOException + */ + public void sendMessage(Node to, Message msg, Receiver recv) throws IOException + { + if (!isRunning) + { + throw new IllegalStateException("Kad Server is not running."); + } + + /* Generate a random communication ID */ + int comm = new Integer(new Random().nextInt()); + + /* Setup the receiver to handle message response */ + receivers.put(comm, recv); + timer.schedule(new TimeoutTask(comm, recv), Configuration.RESPONSE_TIMEOUT); + + /* Send the message */ + sendMessage(to, msg, comm); + } + + public void reply(Node to, Message msg, int comm) throws IOException + { + if (!isRunning) + { + throw new IllegalStateException("Kad Server is not running."); + } + sendMessage(to, msg, comm); + } + + private void sendMessage(Node to, Message msg, int comm) throws IOException + { + + + /* Setup the message for transmission */ + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(bout); + dout.writeInt(comm); + dout.writeByte(msg.code()); + msg.toStream(dout); + dout.close(); + + byte[] data = bout.toByteArray(); + + if (data.length > DATAGRAM_BUFFER_SIZE) + { + throw new IOException("MEssage is too big"); + } + + /* Everything is good, now create the packet and send it */ + DatagramPacket pkt = new DatagramPacket(data, 0, data.length); + pkt.setSocketAddress(to.getSocketAddress()); + socket.send(pkt); + } + + /** + * Listen for incoming messages in a separate thread + */ + private void listen() + { + try + { + while (isRunning) + { + try + { + /* Wait for a packet */ + byte[] buffer = new byte[DATAGRAM_BUFFER_SIZE]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); + + /* We've received a packet, now handle it */ + ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength()); + DataInputStream din = new DataInputStream(bin); + + /* Read in the conversation Id to know which handler to handle this response */ + int comm = din.readInt(); + byte messCode = din.readByte(); + + Message msg = messageFactory.createMessage(messCode, din); + din.close(); + System.out.println("Message Received: " + msg); + + System.out.println("Receivers: " + receivers); + + /* Get a receiver for this message */ + Receiver receiver; + if (this.receivers.containsKey(comm)) + { + /* If there is a reciever in the receivers to handle this */ + receiver = this.receivers.remove(comm); + } + else + { + /* There is currently no receivers, try to get one */ + receiver = messageFactory.createReceiver(messCode, this); + } + + /* Invoke the receiver */ + if (receiver != null) + { + receiver.receive(msg, comm); + } + } + catch (IOException e) + { + this.isRunning = false; + e.printStackTrace(); + } + } + } + finally + { + if (!socket.isClosed()) + { + socket.close(); + } + this.isRunning = false; + } + } + + /** + * Remove a conversation receiver + * + * @param comm The id of this conversation + */ + private synchronized void unregister(int comm) + { + Integer key = new Integer(comm); + receivers.remove(key); + } + + /** + * Task that gets called by a separate thread if a timeout for a receiver occurs. + * When a reply arrives this task must be cancelled using the cancel() + * method inherited from TimerTask. In this case the caller is + * responsible for removing the task from the tasks map. + * */ + class TimeoutTask extends TimerTask + { + + private final int comm; + private final Receiver recv; + + public TimeoutTask(int comm, Receiver recv) + { + this.comm = comm; + this.recv = recv; + } + + @Override + public void run() + { + try + { + unregister(comm); + recv.timeout(comm); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + +} diff --git a/src/kademlia/core/Kademlia.java b/src/kademlia/core/Kademlia.java new file mode 100644 index 0000000..34da90b --- /dev/null +++ b/src/kademlia/core/Kademlia.java @@ -0,0 +1,105 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc The main Kademlia network management class + */ +package kademlia.core; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Timer; +import java.util.TimerTask; +import kademlia.exceptions.RoutingException; +import kademlia.message.MessageFactory; +import kademlia.node.Node; +import kademlia.node.NodeId; +import kademlia.operation.ConnectOperation; +import kademlia.operation.Operation; + +public class Kademlia +{ + + /* Kademlia Attributes */ + private final String name; + + /* Objects to be used */ + private final Node localNode; + private final KadServer server; + private final Timer timer; + + /* Factories */ + private final MessageFactory messageFactory; + + /** + * Creates a Kademlia DistributedMap using the specified name as filename base. + * If the id cannot be read from disk the specified defaultId is used. + * The instance is bootstraped to an existing network by specifying the + * address of a bootstrap node in the network. + * + * @param name The Name of this node used for storage + * @param defaultId Default id for the node + * @param udpPort The UDP port to use for routing messages + * + * @throws IOException If an error occurred while reading id or local map + * from disk or a network error occurred while + * attempting to connect to the network + * */ + public Kademlia(String name, NodeId defaultId, int udpPort) throws IOException + { + this.name = name; + this.localNode = new Node(defaultId, InetAddress.getLocalHost(), udpPort); + this.messageFactory = new MessageFactory(localNode); + this.server = new KadServer(udpPort, this.messageFactory); + this.timer = new Timer(true); + + /* Schedule Recurring RestoreOperation */ + timer.schedule( + new TimerTask() + { + @Override + public void run() + { + /** + * @todo Create Operation that + * Refreshes all buckets and sends HashMessages to all nodes that are + * among the K closest to mappings stored at this node. Also deletes any + * mappings that this node is no longer among the K closest to. + * */ + } + }, + // Delay // Interval + Configuration.RESTORE_INTERVAL, Configuration.RESTORE_INTERVAL + ); + } + + /** + * @return Node The local node for this system + */ + public Node getNode() + { + return this.localNode; + } + + /** + * @return The KadServer used to send/receive messages + */ + public KadServer getServer() + { + return this.server; + } + + /** + * Connect to an existing peer-to-peer network. + * + * @param n The known node in the peer-to-peer network + * + * @throws RoutingException If the bootstrap node could not be contacted + * @throws IOException If a network error occurred + * @throws IllegalStateException If this object is closed + * */ + public final void connect(Node n) throws IOException, RoutingException + { + Operation op = new ConnectOperation(this.server, this.localNode, n); + op.execute(); + } +} diff --git a/src/kademlia/exceptions/RoutingException.java b/src/kademlia/exceptions/RoutingException.java new file mode 100644 index 0000000..c28f671 --- /dev/null +++ b/src/kademlia/exceptions/RoutingException.java @@ -0,0 +1,22 @@ +/** + * @author Joshua Kissoon + * @created 20140219 + * @desc An exception to be thrown whenever there is a routing problem + */ +package kademlia.exceptions; + +import java.io.IOException; + +public class RoutingException extends IOException +{ + + public RoutingException() + { + super(); + } + + public RoutingException(String message) + { + super(message); + } +} diff --git a/src/kademlia/message/AcknowledgeMessage.java b/src/kademlia/message/AcknowledgeMessage.java new file mode 100644 index 0000000..c6ef459 --- /dev/null +++ b/src/kademlia/message/AcknowledgeMessage.java @@ -0,0 +1,57 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc A message used to acknowledge a request from a node + */ +package kademlia.message; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import kademlia.node.Node; + +public class AcknowledgeMessage implements Message +{ + + private Node origin; + public static final byte CODE = 0x10; + + public AcknowledgeMessage(Node origin) + { + this.origin = origin; + } + + public AcknowledgeMessage(DataInput in) throws IOException + { + this.fromStream(in); + } + + @Override + public final void fromStream(DataInput in) throws IOException + { + this.origin = new Node(in); + } + + @Override + public void toStream(DataOutput out) throws IOException + { + origin.toStream(out); + } + + public Node getOrigin() + { + return this.origin; + } + + @Override + public byte code() + { + return CODE; + } + + @Override + public String toString() + { + return "AcknowledgeMessage[origin=" + origin.getNodeId() + "]"; + } +} diff --git a/src/kademlia/message/ConnectMessage.java b/src/kademlia/message/ConnectMessage.java new file mode 100644 index 0000000..7f885ae --- /dev/null +++ b/src/kademlia/message/ConnectMessage.java @@ -0,0 +1,57 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc A message used to connect nodes + */ +package kademlia.message; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import kademlia.node.Node; + +public class ConnectMessage implements Message +{ + + private Node origin; + public static final byte CODE = 0x11; + + public ConnectMessage(Node origin) + { + this.origin = origin; + } + + public ConnectMessage(DataInput in) throws IOException + { + this.fromStream(in); + } + + @Override + public final void fromStream(DataInput in) throws IOException + { + this.origin = new Node(in); + } + + @Override + public void toStream(DataOutput out) throws IOException + { + origin.toStream(out); + } + + public Node getOrigin() + { + return this.origin; + } + + @Override + public byte code() + { + return CODE; + } + + @Override + public String toString() + { + return "ConnectMessage[origin NodeId=" + origin.getNodeId() + "]"; + } +} diff --git a/src/kademlia/message/ConnectReceiver.java b/src/kademlia/message/ConnectReceiver.java new file mode 100644 index 0000000..dbef95a --- /dev/null +++ b/src/kademlia/message/ConnectReceiver.java @@ -0,0 +1,58 @@ +/** + * @author Joshua Kissoon + * @created 20140219 + * @desc Receives a ConnectMessage and sends an AcknowledgeMessage as reply + */ +package kademlia.message; + +import java.io.IOException; +import kademlia.core.KadServer; +import kademlia.node.Node; +import kademlia.operation.Receiver; + +public class ConnectReceiver implements Receiver +{ + + private final KadServer server; + private final Node localNode; + + public ConnectReceiver(KadServer server, Node local) + { + this.server = server; + this.localNode = local; + } + + /** + * Handle receiving a ConnectMessage + * + * @param comm + * + * @throws java.io.IOException + */ + @Override + public void receive(Message incoming, int comm) throws IOException + { + ConnectMessage mess = (ConnectMessage) incoming; + + /* Update the local space by inserting the origin node. */ + this.localNode.getRoutingTable().insert(mess.getOrigin()); + + /* Respond to the connect request */ + AcknowledgeMessage msg = new AcknowledgeMessage(this.localNode); + + /* Reply to the connect message with an Acknowledgement */ + this.server.reply(mess.getOrigin(), msg, comm); + } + + /** + * We don't need to do anything here + * + * @param comm + * + * @throws java.io.IOException + */ + @Override + public void timeout(int comm) throws IOException + { + } +} diff --git a/src/kademlia/message/Message.java b/src/kademlia/message/Message.java new file mode 100644 index 0000000..890c4c7 --- /dev/null +++ b/src/kademlia/message/Message.java @@ -0,0 +1,10 @@ +package kademlia.message; + +public interface Message extends Streamable { + /** + * The unique code for the message type, used to differentiate all messages + * from each other. Since this is of byte type there can + * be at most 256 different message types. + **/ + public byte code(); +} diff --git a/src/kademlia/message/MessageFactory.java b/src/kademlia/message/MessageFactory.java new file mode 100644 index 0000000..8542717 --- /dev/null +++ b/src/kademlia/message/MessageFactory.java @@ -0,0 +1,49 @@ +/** + * @author Joshua + * @created + * @desc + */ +package kademlia.message; + +import java.io.DataInput; +import java.io.IOException; +import kademlia.core.KadServer; +import kademlia.node.Node; +import kademlia.operation.Receiver; + +public class MessageFactory +{ + + private final Node localNode; + + public MessageFactory(Node local) + { + this.localNode = local; + } + + public Message createMessage(byte code, DataInput in) throws IOException + { + switch (code) + { + default: + case SimpleMessage.CODE: + return new SimpleMessage(in); + case ConnectMessage.CODE: + return new ConnectMessage(in); + case AcknowledgeMessage.CODE: + return new AcknowledgeMessage(in); + } + } + + public Receiver createReceiver(byte code, KadServer server) + { + switch (code) + { + default: + case SimpleMessage.CODE: + return new SimpleReceiver(); + case ConnectMessage.CODE: + return new ConnectReceiver(server, this.localNode); + } + } +} diff --git a/src/kademlia/message/SimpleMessage.java b/src/kademlia/message/SimpleMessage.java new file mode 100644 index 0000000..afdc972 --- /dev/null +++ b/src/kademlia/message/SimpleMessage.java @@ -0,0 +1,72 @@ +/** + * @author Joshua Kissoon + * @created 20140217 + * @desc A simple message used for testing the system + */ +package kademlia.message; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class SimpleMessage implements Message +{ + + /* Message constants */ + public static final byte CODE = 0x01; + + private String content; + + public SimpleMessage(String message) + { + this.content = message; + } + + public SimpleMessage(DataInput in) + { + System.out.println("Creating message from input stream."); + this.fromStream(in); + } + + @Override + public byte code() + { + return CODE; + } + + @Override + public void toStream(DataOutput out) + { + try + { + out.writeInt(this.content.length()); + out.writeBytes(this.content); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + @Override + public final void fromStream(DataInput in) + { + try + { + byte[] buff = new byte[in.readInt()]; + in.readFully(buff); + + this.content = new String(buff); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + @Override + public String toString() + { + return this.content; + } +} diff --git a/src/kademlia/message/SimpleReceiver.java b/src/kademlia/message/SimpleReceiver.java new file mode 100644 index 0000000..7a41496 --- /dev/null +++ b/src/kademlia/message/SimpleReceiver.java @@ -0,0 +1,26 @@ +/** + * @author Joshua + * @created + * @desc + */ +package kademlia.message; + +import java.io.IOException; +import kademlia.message.Message; +import kademlia.operation.Receiver; + +public class SimpleReceiver implements Receiver +{ + + @Override + public void receive(Message incoming, int conversationId) + { + System.out.println("Received message: " + incoming); + } + + @Override + public void timeout(int conversationId) throws IOException + { + System.out.println(""); + } +} diff --git a/src/kademlia/message/Streamable.java b/src/kademlia/message/Streamable.java new file mode 100644 index 0000000..593aa4e --- /dev/null +++ b/src/kademlia/message/Streamable.java @@ -0,0 +1,32 @@ +package kademlia.message; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A Streamable object is able to write it's state to an output stream and + * a class implementing Streamable must be able to recreate an instance of + * the class from an input stream. No information about class name is written + * to the output stream so it must be known what class type is expected when + * reading objects back in from an input stream. This gives a space + * advantage over Serializable. + *

+ * Since the exact class must be known anyway prior to reading, it is incouraged + * that classes implementing Streamble also provide a constructor of the form: + *

+ * Streamable(DataInput in) throws IOException; + **/ +public interface Streamable { + /** + * Writes the internal state of the Streamable object to the output stream + * in a format that can later be read by the same Streamble class using + * the {@link #fromStream} method. + **/ + public void toStream(DataOutput out) throws IOException; + + /** + * Reads the internal state of the Streamable object from the input stream. + **/ + public void fromStream(DataInput out) throws IOException; +} diff --git a/src/kademlia/node/Node.java b/src/kademlia/node/Node.java new file mode 100644 index 0000000..b85fe03 --- /dev/null +++ b/src/kademlia/node/Node.java @@ -0,0 +1,118 @@ +/** + * @author Joshua + * @created + * @desc + */ +package kademlia.node; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import kademlia.message.Streamable; +import kademlia.routing.RoutingTable; + +public class Node implements Streamable +{ + + private NodeId nodeId; + private InetAddress inetAddress; + private int port; + + private final RoutingTable routingTable; + + + { + this.routingTable = new RoutingTable(this); + } + + public Node(NodeId nid, InetAddress ip, int port) + { + this.nodeId = nid; + this.inetAddress = ip; + this.port = port; + } + + /** + * Load the Node's data from a DataInput stream + * + * @param in + * + * @throws IOException + */ + public Node(DataInput in) throws IOException + { + this.fromStream(in); + } + + /** + * Set the InetAddress of this node + * + * @param addr The new InetAddress of this node + */ + public void setInetAddress(InetAddress addr) + { + this.inetAddress = addr; + } + + /** + * @return The NodeId object of this node + */ + public NodeId getNodeId() + { + return this.nodeId; + } + + /** + * Creates a SocketAddress for this node + * + * @return + */ + public SocketAddress getSocketAddress() + { + return new InetSocketAddress(this.inetAddress, this.port); + } + + @Override + public void toStream(DataOutput out) throws IOException + { + /* Add the NodeId to the stream */ + this.nodeId.toStream(out); + + /* Add the Node's IP address to the stream */ + byte[] a = inetAddress.getAddress(); + if (a.length != 4) + { + throw new RuntimeException("Expected InetAddress of 4 bytes, got " + a.length); + } + out.write(a); + + /* Add the port to the stream */ + out.writeInt(port); + } + + @Override + public final void fromStream(DataInput in) throws IOException + { + /* Load the NodeId */ + this.nodeId = new NodeId(in); + + /* Load the IP Address */ + byte[] ip = new byte[4]; + in.readFully(ip); + this.inetAddress = InetAddress.getByAddress(ip); + + /* Read in the port */ + this.port = in.readInt(); + } + + /** + * @return The RoutingTable of this Node + */ + public RoutingTable getRoutingTable() + { + return this.routingTable; + } +} diff --git a/src/kademlia/node/NodeId.java b/src/kademlia/node/NodeId.java new file mode 100644 index 0000000..5711608 --- /dev/null +++ b/src/kademlia/node/NodeId.java @@ -0,0 +1,165 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc Represents a Kademlia Node ID + */ +package kademlia.node; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import kademlia.message.Streamable; + +public class NodeId implements Streamable +{ + + public final static int ID_LENGTH = 160; + private byte[] keyBytes; + + /** + * Construct the NodeId from some string + * + * @param data The user generated key string + * + * @todo Throw an exception if the key is too short or too long + */ + public NodeId(String data) + { + keyBytes = data.getBytes(); + } + + /** + * Generate a random key + */ + public NodeId() + { + keyBytes = new byte[ID_LENGTH]; + new Random().nextBytes(keyBytes); + } + + public NodeId(byte[] bytes) + { + this.keyBytes = bytes; + } + + /** + * Load the NodeId from a DataInput stream + * + * @param in The stream from which to load the NodeId + * + * @throws IOException + */ + public NodeId(DataInput in) throws IOException + { + this.fromStream(in); + } + + public byte[] getBytes() + { + return this.keyBytes; + } + + /** + * Compares a NodeId to this NodeId + * + * @param nid The NodeId to compare to this NodeId + * + * @return boolean Whether the 2 NodeIds are equal + */ + public boolean equals(NodeId nid) + { + return Arrays.equals(keyBytes, nid.getBytes()); + } + + /** + * Checks if a given NodeId is less than this NodeId + * + * @param nid The NodeId to compare to this NodeId + * + * @return boolean Whether the given NodeId is less than this NodeId + */ + public boolean lessThan(NodeId nid) + { + byte[] nidBytes = nid.getBytes(); + for (int i = 0; i < ID_LENGTH; i++) + { + if (this.keyBytes[i] != nidBytes[i]) + { + return this.keyBytes[i] < nidBytes[i]; + } + } + + /* We got here means they're equal */ + return false; + } + + /** + * Checks the distance between this and another NodeId + * + * @param nid + * + * @return The distance of this NodeId from the given NodeId + */ + public NodeId xor(NodeId nid) + { + byte[] result = new byte[ID_LENGTH]; + byte[] nidBytes = nid.getBytes(); + for (int i = 0; i < ID_LENGTH / 8; i++) + { + result[i] = (byte) (this.keyBytes[i] ^ nidBytes[i]); + } + + return new NodeId(result); + } + + /** + * Checks the number of leading 0's in this NodeId + * + * @return int The number of leading 0's + */ + public int prefixLength() + { + int prefixLength = 0; + + for (byte b : this.keyBytes) + { + if (b == 0) + { + prefixLength++; + } + else + { + break; + } + } + + return prefixLength; + } + + @Override + public void toStream(DataOutput out) throws IOException + { + /* Add the NodeId to the stream */ + out.write(this.getBytes()); + } + + @Override + public void fromStream(DataInput in) throws IOException + { + byte[] input = new byte[ID_LENGTH / 8]; + in.readFully(input); + this.keyBytes = input; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("NodeId: "); + sb.append(new String(this.keyBytes)); + + return sb.toString(); + } + +} diff --git a/src/kademlia/operation/ConnectOperation.java b/src/kademlia/operation/ConnectOperation.java new file mode 100644 index 0000000..ae24b84 --- /dev/null +++ b/src/kademlia/operation/ConnectOperation.java @@ -0,0 +1,122 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc Operation that handles connecting to an existing Kademlia network using a bootstrap node + */ +package kademlia.operation; + +import java.io.IOException; +import kademlia.core.Configuration; +import kademlia.core.KadServer; +import kademlia.exceptions.RoutingException; +import kademlia.message.AcknowledgeMessage; +import kademlia.message.ConnectMessage; +import kademlia.message.Message; +import kademlia.node.Node; + +public class ConnectOperation implements Operation, Receiver +{ + + public static final int MAX_CONNECT_ATTEMPTS = 5; // Try 5 times to connect to a node + + private final KadServer server; + private final Node localNode; + private final Node bootstrapNode; + + private boolean error; + private int attempts; + + /** + * @param server The message server used to send/receive messages + * @param local The local node + * @param bootstrap Node to use to bootstrap the local node onto the network + */ + public ConnectOperation(KadServer server, Node local, Node bootstrap) + { + this.server = server; + this.localNode = local; + this.bootstrapNode = bootstrap; + } + + /** + * @return null + */ + @Override + public synchronized Object execute() + { + try + { + /* Contact the bootstrap node */ + this.error = true; + this.attempts = 0; + Message m = new ConnectMessage(this.localNode); + + /* Send a connect message to the bootstrap node */ + server.sendMessage(this.bootstrapNode, m, this); + + /* Wait for a while */ + wait(Configuration.OPERATION_TIMEOUT); + + if (error) + { + /* Means the contact failed */ + throw new RoutingException("Bootstrap node did not respond: " + bootstrapNode); + } + + /* @todo Perform lookup for our own ID to get nodes close to us */ + /* @todo Refresh buckets to get a good routing table */ + return null; + + } + catch (IOException | InterruptedException e) + { + e.printStackTrace(); + } + + return null; + } + + /** + * Receives an AcknowledgeMessage from the bootstrap node. + * + * @param comm + */ + @Override + public synchronized void receive(Message incoming, int comm) + { + /* The incoming message will be an acknowledgement message */ + AcknowledgeMessage msg = (AcknowledgeMessage) incoming; + System.out.println("ConnectOperation now handling Acknowledgement Message: " + msg); + + /* The bootstrap node has responded, insert it into our space */ + this.localNode.getRoutingTable().insert(this.bootstrapNode); + + /* We got a response, so the error is false */ + error = false; + + /* Wake up any waiting thread */ + notify(); + } + + /** + * Resends a ConnectMessage to the boot strap node a maximum of MAX_ATTEMPTS + * times. + * + * @param comm + * + * @throws java.io.IOException + */ + @Override + public synchronized void timeout(int comm) throws IOException + { + if (++this.attempts < MAX_CONNECT_ATTEMPTS) + { + this.server.sendMessage(this.bootstrapNode, new ConnectMessage(this.localNode), this); + } + else + { + /* We just exit, so notify all other threads that are possibly waiting */ + notify(); + } + } +} diff --git a/src/kademlia/operation/Operation.java b/src/kademlia/operation/Operation.java new file mode 100644 index 0000000..3b8bbaf --- /dev/null +++ b/src/kademlia/operation/Operation.java @@ -0,0 +1,17 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc Interface for different Kademlia operations + */ +package kademlia.operation; + +public interface Operation +{ + + /** + * Starts an operation and returns when the operation is finished + * + * @return The return value can differ per operation + */ + public Object execute(); +} diff --git a/src/kademlia/operation/PingOperation.java b/src/kademlia/operation/PingOperation.java new file mode 100644 index 0000000..ee9e870 --- /dev/null +++ b/src/kademlia/operation/PingOperation.java @@ -0,0 +1,36 @@ +///** +// * @author Joshua Kissoon +// * @created 20140218 +// * @desc Implementation of the Kademlia Ping operation +// */ +//package kademlia.operation; +// +//import kademlia.core.KadServer; +//import kademlia.node.Node; +// +//public class PingOperation implements Operation +//{ +// +// private final KadServer server; +// private final Node localNode; +// private final Node toPing; +// +// /** +// * @param server The Kademlia server used to send & receive messages +// * @param local The local node +// * @param toPing The node to send the ping message to +// */ +// public PingOperation(KadServer server, Node local, Node toPing) +// { +// this.server = server; +// this.localNode = local; +// this.toPing = toPing; +// } +// +// @Override +// public Object execute() +// { +// /* @todo Create a pingmessage and send this message to the toPing node, +// then handle the reply from this node using a reciever */ +// } +//} diff --git a/src/kademlia/operation/Receiver.java b/src/kademlia/operation/Receiver.java new file mode 100644 index 0000000..f63596b --- /dev/null +++ b/src/kademlia/operation/Receiver.java @@ -0,0 +1,31 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc A receiver waits for incoming messages and perform some action when the message is received + */ +package kademlia.operation; + +import java.io.IOException; +import kademlia.message.Message; + +public interface Receiver +{ + + /** + * Message is received, now handle it + * + * @param conversationId The ID of this conversation, used for further conversations + * @param incoming The incoming + */ + public void receive(Message incoming, int conversationId) throws IOException; + + /** + * If no reply is received in MessageServer.TIMEOUT seconds for the + * message with communication id comm, the MessageServer calls this method + * + * @param conversationId The conversation ID of this communication + * + * @throws IOException if an I/O error occurs + * */ + public void timeout(int conversationId) throws IOException; +} diff --git a/src/kademlia/routing/Bucket.java b/src/kademlia/routing/Bucket.java new file mode 100644 index 0000000..b27dd87 --- /dev/null +++ b/src/kademlia/routing/Bucket.java @@ -0,0 +1,27 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc A bucket for the DHT Protocol + */ +package kademlia.routing; + +import kademlia.node.Node; + +public interface Bucket +{ + + /** + * Adds a new node to the bucket + * + * @param n the new node + */ + public void insert(Node n); + + /** + * Marks a node as dead: the dead node will be replace if + * insert was invoked + * + * @param n the dead node + */ + public void markDead(Node n); +} diff --git a/src/kademlia/routing/KadBucket.java b/src/kademlia/routing/KadBucket.java new file mode 100644 index 0000000..27fb0b8 --- /dev/null +++ b/src/kademlia/routing/KadBucket.java @@ -0,0 +1,82 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc A bucket in the Kademlia routing table + */ +package kademlia.routing; + +import java.util.ArrayList; +import kademlia.node.Node; + +public class KadBucket implements Bucket +{ + + private final int depth; + private final ArrayList nodes; + + + { + nodes = new ArrayList<>(); + } + + /** + * @param depth How deep in the routing tree is this bucket + */ + public KadBucket(int depth) + { + this.depth = depth; + } + + @Override + public void insert(Node n) + { + /*@todo Check if the bucket is filled already and handle this */ + /* Check if the contact is already in the bucket */ + if (this.nodes.contains(n)) + { + /* @todo If it is, then move it to the front */ + /* @todo Possibly use a doubly linked list instead of an ArrayList */ + } + else + { + nodes.add(n); + } + } + + public int numNodes() + { + return this.nodes.size(); + } + + public int getDepth() + { + return this.depth; + } + + @Override + public void markDead(Node n) + { + this.nodes.remove(n); + } + + public ArrayList getContacts() + { + return this.nodes; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("Printing bucket at depth: "); + sb.append(this.depth); + sb.append("\n Nodes: \n"); + for (Node n : this.nodes) + { + sb.append("Node: "); + sb.append(n.getNodeId().toString()); + sb.append("\n"); + } + + return sb.toString(); + } +} diff --git a/src/kademlia/routing/RoutingTable.java b/src/kademlia/routing/RoutingTable.java new file mode 100644 index 0000000..ec23aed --- /dev/null +++ b/src/kademlia/routing/RoutingTable.java @@ -0,0 +1,146 @@ +/** + * @author Joshua Kissoon + * @created 20140215 + * @desc Implementation of a Kademlia routing table + */ +package kademlia.routing; + +import java.util.ArrayList; +import kademlia.node.Node; +import kademlia.node.NodeId; + +public class RoutingTable +{ + + private final Node node; // The current node + private final KadBucket[] buckets; + + + { + buckets = new KadBucket[NodeId.ID_LENGTH]; // 160 buckets; 1 for each level in the tree + } + + public RoutingTable(Node node) + { + this.node = node; + + /* Initialize all of the buckets to a specific depth */ + for (int i = 0; i < NodeId.ID_LENGTH; i++) + { + buckets[i] = new KadBucket(i); + } + } + + /** + * Adds a new contact to the routing table + * + * @param n The contact to add + */ + public void insert(Node n) + { + /* Find the prefix length of how far this node is away from the contact node */ + int prefixLength = this.node.getNodeId().xor(n.getNodeId()).prefixLength(); + + /* Put this contact to the bucket that stores contacts prefixLength distance away */ + this.buckets[prefixLength].insert(n); + } + + /** + * Find the closest set of contacts to a given NodeId + * + * @param target The NodeId to find contacts close to + * @param num The number of contacts to find + * + * @return ArrayList An ArrayList of num contacts closest to target + */ + public ArrayList findClosest(NodeId target, int num) + { + ArrayList closest = new ArrayList<>(num); + + /* Get the bucket number to search for closest from */ + int bucketNumber = this.node.getNodeId().xor(target).prefixLength(); + + /* Add the contacts from this bucket to the return contacts */ + for (Node c : this.buckets[bucketNumber].getContacts()) + { + if (closest.size() < num) + { + closest.add(c); + } + else + { + break; + } + } + + if (closest.size() >= num) + { + return closest; + } + + /* If we still need more nodes, we add from buckets on either side of the closest bucket */ + for (int i = 1; ((bucketNumber - i) >= 0 || (bucketNumber + i) < NodeId.ID_LENGTH); i++) + { + /* Check the bucket on the left side */ + if (bucketNumber - i > 0) + { + for (Node c : this.buckets[bucketNumber - i].getContacts()) + { + if (closest.size() < num) + { + closest.add(c); + } + else + { + break; + } + } + } + + /* Check the bucket on the right side */ + if (bucketNumber + i < NodeId.ID_LENGTH) + { + for (Node c : this.buckets[bucketNumber + i].getContacts()) + { + if (closest.size() < num) + { + closest.add(c); + } + else + { + break; + } + } + } + + /* If we have enough contacts, then stop adding */ + if (closest.size() >= num) + { + break; + } + } + + return closest; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("\nPrinting Routing Table Started ***************** \n"); + for (KadBucket b : this.buckets) + { + if (b.numNodes() > 0) + { + sb.append("# nodes in Bucket with depth "); + sb.append(b.getDepth()); + sb.append(": "); + sb.append(b.numNodes()); + sb.append("\n"); + } + } + sb.append("\nPrinting Routing Table Ended ******************** "); + + return sb.toString(); + } + +} diff --git a/src/kademlia/tests/NodeConnectionTest.java b/src/kademlia/tests/NodeConnectionTest.java new file mode 100644 index 0000000..2bba791 --- /dev/null +++ b/src/kademlia/tests/NodeConnectionTest.java @@ -0,0 +1,40 @@ +/** + * @author Joshua Kissoon + * @created 20140219 + * @desc Testing connecting 2 nodes + */ +package kademlia.tests; + +import java.io.IOException; +import kademlia.core.Kademlia; +import kademlia.node.NodeId; + +public class NodeConnectionTest +{ + + public static void main(String[] args) + { + try + { + /* Setting up 2 Kad networks */ + Kademlia kad1 = new Kademlia("Joshua", new NodeId("12345678901234567890"), 7574); + System.out.println("Kad 1 Before: "); + System.out.println(kad1.getNode().getRoutingTable()); + + Kademlia kad2 = new Kademlia("Crystal", new NodeId("12345678901234567891"), 7572); + System.out.println("Kad 2 Before: "); + System.out.println(kad2.getNode().getRoutingTable()); + + /* Connecting 2 to 1 */ + kad1.connect(kad2.getNode()); + System.out.println("Kad 1 After: "); + System.out.println(kad1.getNode().getRoutingTable()); + System.out.println("Kad 2 After: "); + System.out.println(kad2.getNode().getRoutingTable()); + } + catch (IOException e) + { + e.printStackTrace(); + } + } +} diff --git a/src/kademlia/tests/SimpleMessageTest.java b/src/kademlia/tests/SimpleMessageTest.java new file mode 100644 index 0000000..8ee01c3 --- /dev/null +++ b/src/kademlia/tests/SimpleMessageTest.java @@ -0,0 +1,31 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc Testing a simple send message + */ +package kademlia.tests; + +import java.io.IOException; +import kademlia.core.Kademlia; +import kademlia.message.SimpleMessage; +import kademlia.node.NodeId; +import kademlia.message.SimpleReceiver; + +public class SimpleMessageTest +{ + + public static void main(String[] args) + { + try + { + Kademlia kad1 = new Kademlia("Joshua", new NodeId("12345678901234567890"), 7574); + Kademlia kad2 = new Kademlia("Crystal", new NodeId("12345678901234567891"), 7572); + + kad1.getServer().sendMessage(kad2.getNode(), new SimpleMessage("Some Message"), new SimpleReceiver()); + } + catch (IOException e) + { + e.printStackTrace(); + } + } +} diff --git a/src/kademlia/util/JsonSerializer.java b/src/kademlia/util/JsonSerializer.java new file mode 100644 index 0000000..1075803 --- /dev/null +++ b/src/kademlia/util/JsonSerializer.java @@ -0,0 +1,47 @@ +/** + * @author Joshua Kissoon + * @created 20140218 + * @desc Serializes a message into a json message + */ +package kademlia.util; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import kademlia.message.Message; + +public class JsonSerializer +{ + + private final Gson gson; + + public JsonSerializer() + { + this.gson = new Gson(); + } + + /** + * Writes a message to an output stream + * + * @param msg The message to write + * @param out The output stream to write the message to + */ + public void write(Message msg, OutputStream out) + { + try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out))) + { + writer.beginArray(); + + this.gson.toJson(msg, msg.getClass(), writer); + + writer.endArray(); + } + catch (IOException e) + { + + } + } + +}