Cleaned up the core code, added comments and removed all Netbeans generated warnings

This commit is contained in:
Joshua Kissoon 2014-03-06 16:51:42 +05:30
parent 796e41dd9a
commit 689a35b7bf
4 changed files with 111 additions and 63 deletions

View File

@ -3,7 +3,8 @@ package kademlia.core;
/**
* A set of Kademlia configuration parameters. Default values are
* supplied and can be changed by the application as necessary.
* */
*
*/
public class Configuration
{
@ -47,7 +48,7 @@ public class Configuration
* Number of times a node can be marked as stale before it is actually removed.
* */
public static int STALE = 1;
/**
* Local Storage location - Relative to the user's home folder (Cross-Platform)
*/

View File

@ -19,17 +19,35 @@ public class GetParameter
private String ownerId = null;
private String type = null;
/**
* Construct a GetParameter to search for data by NodeId
*
* @param key
*/
public GetParameter(NodeId key)
{
this.key = key;
}
/**
* Construct a GetParameter to search for data by NodeId and owner
*
* @param key
* @param owner
*/
public GetParameter(NodeId key, String owner)
{
this(key);
this.ownerId = owner;
}
/**
* Construct a GetParameter to search for data by NodeId, owner, type
*
* @param key
* @param owner
* @param type
*/
public GetParameter(NodeId key, String owner, String type)
{
this(key, owner);

View File

@ -19,14 +19,15 @@ import kademlia.node.Node;
import kademlia.operation.Receiver;
/**
* The server that handles sending and receiving messages between nodes on the Kad Network
*
* @author Joshua Kissoon
* @created 20140215
* @desc This server handles sending and receiving messages
*/
public class KadServer
{
/* Constants */
/* Maximum size of a Datagram Packet */
private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB
/* Server Objects */
@ -50,6 +51,15 @@ public class KadServer
this.timer = new Timer(true);
}
/**
* Initialize our KadServer
*
* @param udpPort The port to listen on
* @param mFactory Factory used to create messages
* @param localNode Local node on which this server runs on
*
* @throws java.net.SocketException
*/
public KadServer(int udpPort, MessageFactory mFactory, Node localNode) throws SocketException
{
this.udpPort = udpPort;
@ -85,6 +95,8 @@ public class KadServer
* @param to The node to send the message to
* @param recv The receiver to handle the response message
*
* @return Integer The communication ID of this message
*
* @throws IOException
*/
public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException
@ -95,7 +107,7 @@ public class KadServer
}
/* Generate a random communication ID */
int comm = new Integer(new Random().nextInt());
int comm = new Random().nextInt();
/* If we have a receiver */
if (recv != null)
@ -109,9 +121,19 @@ public class KadServer
/* Send the message */
sendMessage(to, msg, comm);
return comm;
}
/**
* Method called to reply to a message received
*
* @param to The Node to send the reply to
* @param msg The reply message
* @param comm The communication ID - the one received
*
* @throws java.io.IOException
*/
public synchronized void reply(Node to, Message msg, int comm) throws IOException
{
if (!isRunning)
@ -121,27 +143,32 @@ public class KadServer
sendMessage(to, msg, comm);
}
/**
* Internal sendMessage method called by the public sendMessage method after a communicationId is generated
*/
private void sendMessage(Node to, Message msg, int comm) throws IOException
{
/* Setup the message for transmission */
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
dout.writeInt(comm);
dout.writeByte(msg.code());
msg.toStream(dout);
dout.close();
byte[] data = bout.toByteArray();
if (data.length > DATAGRAM_BUFFER_SIZE)
/* Use a try-with resource to auto-close streams after usage */
try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout);)
{
throw new IOException("Message is too big");
}
/* Setup the message for transmission */
dout.writeInt(comm);
dout.writeByte(msg.code());
msg.toStream(dout);
dout.close();
/* Everything is good, now create the packet and send it */
DatagramPacket pkt = new DatagramPacket(data, 0, data.length);
pkt.setSocketAddress(to.getSocketAddress());
socket.send(pkt);
byte[] data = bout.toByteArray();
if (data.length > DATAGRAM_BUFFER_SIZE)
{
throw new IOException("Message is too big");
}
/* Everything is good, now create the packet and send it */
DatagramPacket pkt = new DatagramPacket(data, 0, data.length);
pkt.setSocketAddress(to.getSocketAddress());
socket.send(pkt);
}
}
/**
@ -161,46 +188,48 @@ public class KadServer
socket.receive(packet);
/* We've received a packet, now handle it */
ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
DataInputStream din = new DataInputStream(bin);
/* Read in the conversation Id to know which handler to handle this response */
int comm = din.readInt();
byte messCode = din.readByte();
Message msg = messageFactory.createMessage(messCode, din);
din.close();
//System.out.println(this.localNode.getNodeId() + " Message Received: [Comm: " + comm + "] " + msg);
/* Get a receiver for this message */
Receiver receiver;
if (this.receivers.containsKey(comm))
try (ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
DataInputStream din = new DataInputStream(bin);)
{
//System.out.println("Receiver found");
/* If there is a reciever in the receivers to handle this */
synchronized (this)
/* Read in the conversation Id to know which handler to handle this response */
int comm = din.readInt();
byte messCode = din.readByte();
Message msg = messageFactory.createMessage(messCode, din);
din.close();
//System.out.println(this.localNode.getNodeId() + " Message Received: [Comm: " + comm + "] " + msg);
/* Get a receiver for this message */
Receiver receiver;
if (this.receivers.containsKey(comm))
{
receiver = this.receivers.remove(comm);
TimerTask task = (TimerTask) tasks.remove(comm);
task.cancel();
/* If there is a reciever in the receivers to handle this */
synchronized (this)
{
receiver = this.receivers.remove(comm);
TimerTask task = (TimerTask) tasks.remove(comm);
task.cancel();
}
}
else
{
/* There is currently no receivers, try to get one */
receiver = messageFactory.createReceiver(messCode, this);
}
}
else
{
/* There is currently no receivers, try to get one */
receiver = messageFactory.createReceiver(messCode, this);
}
/* Invoke the receiver */
if (receiver != null)
{
receiver.receive(msg, comm);
/* Invoke the receiver */
if (receiver != null)
{
receiver.receive(msg, comm);
}
}
}
catch (IOException e)
{
this.isRunning = false;
e.printStackTrace();
System.out.println("Server ran into a problem in listener method. Message: " + e.getMessage());
}
}
}
@ -221,7 +250,7 @@ public class KadServer
*/
private synchronized void unregister(int comm)
{
Integer key = new Integer(comm);
Integer key = comm;
receivers.remove(key);
this.tasks.remove(key);
}
@ -254,7 +283,7 @@ public class KadServer
}
catch (IOException e)
{
e.printStackTrace();
System.out.println("Cannot unregister a receiver. Message: " + e.getMessage());
}
}
}

View File

@ -31,10 +31,10 @@ public class NodeLookupOperation implements Operation, Receiver
{
/* Constants */
private static final Byte UNASKED = new Byte((byte) 0x00);
private static final Byte AWAITING = new Byte((byte) 0x01);
private static final Byte ASKED = new Byte((byte) 0x02);
private static final Byte FAILED = new Byte((byte) 0x03);
private static final Byte UNASKED = (byte) 0x00;
private static final Byte AWAITING = (byte) 0x01;
private static final Byte ASKED = (byte) 0x02;
private static final Byte FAILED = (byte) 0x03;
private final KadServer server;
private final Node localNode;
@ -187,7 +187,7 @@ public class NodeLookupOperation implements Operation, Receiver
int comm = server.sendMessage(n, lookupMessage, this);
this.nodes.put(n, AWAITING);
this.messagesTransiting.put(new Integer(comm), n);
this.messagesTransiting.put(comm, n);
}
/* We're not finished as yet, return false */
@ -268,14 +268,14 @@ public class NodeLookupOperation implements Operation, Receiver
/* Add the origin node to our routing table */
Node origin = msg.getOrigin();
System.out.println(this.localNode.getNodeId() + " Lookup Operation Response From: " + origin.getNodeId());
//System.out.println(this.localNode.getNodeId() + " Lookup Operation Response From: " + origin.getNodeId());
this.localNode.getRoutingTable().insert(origin);
/* Set that we've completed ASKing the origin node */
this.nodes.put(origin, ASKED);
/* Remove this msg from messagesTransiting since it's completed now */
this.messagesTransiting.remove(new Integer(comm));
this.messagesTransiting.remove(comm);
/* Add the received nodes to our nodes list to query */
this.addNodes(msg.getNodes());
@ -303,7 +303,7 @@ public class NodeLookupOperation implements Operation, Receiver
/* Mark this node as failed */
this.nodes.put(n, FAILED);
this.localNode.getRoutingTable().remove(n);
this.messagesTransiting.remove(new Integer(comm));
this.messagesTransiting.remove(comm);
this.askNodesorFinish();
}