2014-02-18 20:37:07 +00:00
|
|
|
package kademlia.core;
|
|
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
|
import java.io.DataInputStream;
|
|
|
|
import java.io.DataOutputStream;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.net.DatagramPacket;
|
|
|
|
import java.net.DatagramSocket;
|
|
|
|
import java.net.SocketException;
|
|
|
|
import java.util.HashMap;
|
2014-02-26 06:27:59 +00:00
|
|
|
import java.util.Map;
|
2014-02-18 20:37:07 +00:00
|
|
|
import java.util.Random;
|
|
|
|
import java.util.Timer;
|
|
|
|
import java.util.TimerTask;
|
|
|
|
import kademlia.message.Message;
|
|
|
|
import kademlia.message.MessageFactory;
|
|
|
|
import kademlia.node.Node;
|
2014-04-19 14:41:10 +00:00
|
|
|
import kademlia.message.Receiver;
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 05:51:08 +00:00
|
|
|
/**
|
2014-03-06 11:21:42 +00:00
|
|
|
* The server that handles sending and receiving messages between nodes on the Kad Network
|
|
|
|
*
|
2014-03-06 05:51:08 +00:00
|
|
|
* @author Joshua Kissoon
|
|
|
|
* @created 20140215
|
|
|
|
*/
|
2014-02-18 20:37:07 +00:00
|
|
|
public class KadServer
|
|
|
|
{
|
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/* Maximum size of a Datagram Packet */
|
2014-02-18 20:37:07 +00:00
|
|
|
private static final int DATAGRAM_BUFFER_SIZE = 64 * 1024; // 64KB
|
|
|
|
|
2014-03-29 09:07:01 +00:00
|
|
|
/* Basic Kad Objects */
|
2014-03-31 17:20:57 +00:00
|
|
|
private final transient KadConfiguration config;
|
2014-03-29 09:07:01 +00:00
|
|
|
|
2014-02-18 20:37:07 +00:00
|
|
|
/* Server Objects */
|
|
|
|
private final int udpPort;
|
|
|
|
private final DatagramSocket socket;
|
2014-04-25 10:34:05 +00:00
|
|
|
private transient boolean isRunning;
|
2014-02-26 06:27:59 +00:00
|
|
|
private final Map<Integer, Receiver> receivers;
|
2014-02-18 20:37:07 +00:00
|
|
|
private final Timer timer; // Schedule future tasks
|
2014-02-26 06:27:59 +00:00
|
|
|
private final Map<Integer, TimerTask> tasks; // Keep track of scheduled tasks
|
2014-02-25 08:12:08 +00:00
|
|
|
|
2014-02-22 14:07:04 +00:00
|
|
|
private final Node localNode;
|
2014-02-18 20:37:07 +00:00
|
|
|
|
|
|
|
/* Factories */
|
|
|
|
private final MessageFactory messageFactory;
|
|
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
isRunning = true;
|
2014-02-19 05:00:42 +00:00
|
|
|
this.tasks = new HashMap<>();
|
|
|
|
this.receivers = new HashMap<>();
|
|
|
|
this.timer = new Timer(true);
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/**
|
|
|
|
* Initialize our KadServer
|
|
|
|
*
|
|
|
|
* @param udpPort The port to listen on
|
|
|
|
* @param mFactory Factory used to create messages
|
|
|
|
* @param localNode Local node on which this server runs on
|
2014-03-29 09:07:01 +00:00
|
|
|
* @param config
|
2014-03-06 11:21:42 +00:00
|
|
|
*
|
|
|
|
* @throws java.net.SocketException
|
|
|
|
*/
|
2014-03-29 09:07:01 +00:00
|
|
|
public KadServer(int udpPort, MessageFactory mFactory, Node localNode, KadConfiguration config) throws SocketException
|
2014-02-18 20:37:07 +00:00
|
|
|
{
|
|
|
|
this.udpPort = udpPort;
|
2014-03-29 09:07:01 +00:00
|
|
|
this.config = config;
|
|
|
|
|
2014-02-18 20:37:07 +00:00
|
|
|
this.socket = new DatagramSocket(udpPort);
|
2014-02-25 08:12:08 +00:00
|
|
|
|
2014-02-22 14:07:04 +00:00
|
|
|
this.localNode = localNode;
|
2014-02-18 20:37:07 +00:00
|
|
|
|
|
|
|
this.messageFactory = mFactory;
|
|
|
|
|
|
|
|
/* Start listening for incoming requests in a new thread */
|
|
|
|
this.startListener();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Starts the listener to listen for incoming messages
|
|
|
|
*/
|
|
|
|
private void startListener()
|
|
|
|
{
|
|
|
|
new Thread()
|
|
|
|
{
|
|
|
|
@Override
|
|
|
|
public void run()
|
|
|
|
{
|
|
|
|
listen();
|
|
|
|
}
|
|
|
|
}.start();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Sends a message
|
|
|
|
*
|
|
|
|
* @param msg The message to send
|
|
|
|
* @param to The node to send the message to
|
|
|
|
* @param recv The receiver to handle the response message
|
|
|
|
*
|
2014-03-06 11:21:42 +00:00
|
|
|
* @return Integer The communication ID of this message
|
|
|
|
*
|
2014-02-18 20:37:07 +00:00
|
|
|
* @throws IOException
|
|
|
|
*/
|
2014-02-19 07:30:29 +00:00
|
|
|
public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException
|
2014-02-18 20:37:07 +00:00
|
|
|
{
|
|
|
|
if (!isRunning)
|
|
|
|
{
|
2014-04-25 10:34:05 +00:00
|
|
|
throw new IllegalStateException("Kad Server is not running on node " + this.localNode);
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Generate a random communication ID */
|
2014-03-06 11:21:42 +00:00
|
|
|
int comm = new Random().nextInt();
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-02-26 06:10:06 +00:00
|
|
|
/* If we have a receiver */
|
|
|
|
if (recv != null)
|
|
|
|
{
|
|
|
|
/* Setup the receiver to handle message response */
|
2014-03-22 07:05:00 +00:00
|
|
|
//System.out.println(this.localNode + " Putting Receiver for comm: " + comm + " Receiver: " + recv);
|
2014-02-26 06:10:06 +00:00
|
|
|
receivers.put(comm, recv);
|
|
|
|
TimerTask task = new TimeoutTask(comm, recv);
|
2014-03-29 09:07:01 +00:00
|
|
|
timer.schedule(task, this.config.responseTimeout());
|
2014-02-26 06:10:06 +00:00
|
|
|
tasks.put(comm, task);
|
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
|
|
|
|
/* Send the message */
|
|
|
|
sendMessage(to, msg, comm);
|
2014-03-06 11:21:42 +00:00
|
|
|
|
2014-02-19 07:30:29 +00:00
|
|
|
return comm;
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/**
|
|
|
|
* Method called to reply to a message received
|
|
|
|
*
|
|
|
|
* @param to The Node to send the reply to
|
|
|
|
* @param msg The reply message
|
|
|
|
* @param comm The communication ID - the one received
|
|
|
|
*
|
|
|
|
* @throws java.io.IOException
|
|
|
|
*/
|
2014-02-19 07:30:29 +00:00
|
|
|
public synchronized void reply(Node to, Message msg, int comm) throws IOException
|
2014-02-18 20:37:07 +00:00
|
|
|
{
|
|
|
|
if (!isRunning)
|
|
|
|
{
|
|
|
|
throw new IllegalStateException("Kad Server is not running.");
|
|
|
|
}
|
|
|
|
sendMessage(to, msg, comm);
|
|
|
|
}
|
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/**
|
|
|
|
* Internal sendMessage method called by the public sendMessage method after a communicationId is generated
|
|
|
|
*/
|
2014-02-18 20:37:07 +00:00
|
|
|
private void sendMessage(Node to, Message msg, int comm) throws IOException
|
|
|
|
{
|
2014-03-06 11:21:42 +00:00
|
|
|
/* Use a try-with resource to auto-close streams after usage */
|
|
|
|
try (ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout);)
|
|
|
|
{
|
|
|
|
/* Setup the message for transmission */
|
|
|
|
dout.writeInt(comm);
|
|
|
|
dout.writeByte(msg.code());
|
|
|
|
msg.toStream(dout);
|
|
|
|
dout.close();
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
byte[] data = bout.toByteArray();
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
if (data.length > DATAGRAM_BUFFER_SIZE)
|
|
|
|
{
|
|
|
|
throw new IOException("Message is too big");
|
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/* Everything is good, now create the packet and send it */
|
|
|
|
DatagramPacket pkt = new DatagramPacket(data, 0, data.length);
|
|
|
|
pkt.setSocketAddress(to.getSocketAddress());
|
|
|
|
socket.send(pkt);
|
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Listen for incoming messages in a separate thread
|
|
|
|
*/
|
|
|
|
private void listen()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
while (isRunning)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
/* Wait for a packet */
|
|
|
|
byte[] buffer = new byte[DATAGRAM_BUFFER_SIZE];
|
|
|
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
|
|
|
socket.receive(packet);
|
|
|
|
|
|
|
|
/* We've received a packet, now handle it */
|
2014-03-06 11:21:42 +00:00
|
|
|
try (ByteArrayInputStream bin = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
|
|
|
|
DataInputStream din = new DataInputStream(bin);)
|
|
|
|
{
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/* Read in the conversation Id to know which handler to handle this response */
|
|
|
|
int comm = din.readInt();
|
|
|
|
byte messCode = din.readByte();
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
Message msg = messageFactory.createMessage(messCode, din);
|
|
|
|
din.close();
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-22 07:05:00 +00:00
|
|
|
//System.out.println(this.localNode.getNodeId() + " Message Received: [Comm: " + comm + "] " + msg);
|
2014-03-06 11:21:42 +00:00
|
|
|
|
|
|
|
/* Get a receiver for this message */
|
|
|
|
Receiver receiver;
|
|
|
|
if (this.receivers.containsKey(comm))
|
2014-02-19 05:00:42 +00:00
|
|
|
{
|
2014-03-06 11:21:42 +00:00
|
|
|
/* If there is a reciever in the receivers to handle this */
|
|
|
|
synchronized (this)
|
|
|
|
{
|
|
|
|
receiver = this.receivers.remove(comm);
|
|
|
|
TimerTask task = (TimerTask) tasks.remove(comm);
|
|
|
|
task.cancel();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* There is currently no receivers, try to get one */
|
|
|
|
receiver = messageFactory.createReceiver(messCode, this);
|
2014-02-19 05:00:42 +00:00
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
|
2014-03-06 11:21:42 +00:00
|
|
|
/* Invoke the receiver */
|
|
|
|
if (receiver != null)
|
|
|
|
{
|
|
|
|
receiver.receive(msg, comm);
|
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (IOException e)
|
|
|
|
{
|
|
|
|
this.isRunning = false;
|
2014-03-22 07:05:00 +00:00
|
|
|
System.err.println("Server ran into a problem in listener method. Message: " + e.getMessage());
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
finally
|
|
|
|
{
|
|
|
|
if (!socket.isClosed())
|
|
|
|
{
|
|
|
|
socket.close();
|
|
|
|
}
|
|
|
|
this.isRunning = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Remove a conversation receiver
|
|
|
|
*
|
|
|
|
* @param comm The id of this conversation
|
|
|
|
*/
|
|
|
|
private synchronized void unregister(int comm)
|
|
|
|
{
|
2014-03-09 14:42:11 +00:00
|
|
|
receivers.remove(comm);
|
|
|
|
this.tasks.remove(comm);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Stops listening and shuts down the server
|
|
|
|
*/
|
|
|
|
public void shutdown()
|
|
|
|
{
|
|
|
|
this.isRunning = false;
|
|
|
|
this.socket.close();
|
|
|
|
timer.cancel();
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Task that gets called by a separate thread if a timeout for a receiver occurs.
|
2014-03-09 14:42:11 +00:00
|
|
|
* When a reply arrives this task must be canceled using the <code>cancel()</code>
|
2014-02-18 20:37:07 +00:00
|
|
|
* method inherited from <code>TimerTask</code>. In this case the caller is
|
|
|
|
* responsible for removing the task from the <code>tasks</code> map.
|
|
|
|
* */
|
|
|
|
class TimeoutTask extends TimerTask
|
|
|
|
{
|
|
|
|
|
|
|
|
private final int comm;
|
|
|
|
private final Receiver recv;
|
|
|
|
|
|
|
|
public TimeoutTask(int comm, Receiver recv)
|
|
|
|
{
|
|
|
|
this.comm = comm;
|
|
|
|
this.recv = recv;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void run()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
unregister(comm);
|
|
|
|
recv.timeout(comm);
|
|
|
|
}
|
|
|
|
catch (IOException e)
|
|
|
|
{
|
2014-03-22 07:05:00 +00:00
|
|
|
System.err.println("Cannot unregister a receiver. Message: " + e.getMessage());
|
2014-02-18 20:37:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-03-22 07:05:00 +00:00
|
|
|
|
2014-03-22 05:23:05 +00:00
|
|
|
public void printReceivers()
|
|
|
|
{
|
2014-03-22 07:05:00 +00:00
|
|
|
for (Integer r : this.receivers.keySet())
|
2014-03-22 05:23:05 +00:00
|
|
|
{
|
|
|
|
System.out.println("Receiver for comm: " + r + "; Receiver: " + this.receivers.get(r));
|
|
|
|
}
|
|
|
|
}
|
2014-02-18 20:37:07 +00:00
|
|
|
|
|
|
|
}
|