Kad Server

- Added a specific exception to throw when the server is down

Kademlia Node
- Setup Shutdown proerly
- Updated a few things
This commit is contained in:
Joshua Kissoon 2014-04-28 16:46:32 +05:30
parent 54ac3fe740
commit fa47aceda9
6 changed files with 56 additions and 24 deletions

View File

@ -54,10 +54,16 @@ public class KademliaNode
private final transient KadServer server; private final transient KadServer server;
private final transient DHT dht; private final transient DHT dht;
private transient RoutingTable routingTable; private transient RoutingTable routingTable;
private final transient Timer timer;
private final int udpPort; private final int udpPort;
private transient KadConfiguration config; private transient KadConfiguration config;
/* Timer used to execute refresh operations */
private final transient Timer refreshOperationTimer;
private final transient TimerTask refreshOperationTTask;
/* Whether this node is up and running */
private boolean isRunning = false;
/* Factories */ /* Factories */
private final transient MessageFactory messageFactory; private final transient MessageFactory messageFactory;
@ -88,11 +94,10 @@ public class KademliaNode
this.routingTable = routingTable; this.routingTable = routingTable;
this.messageFactory = new MessageFactory(this, this.dht, this.config); this.messageFactory = new MessageFactory(this, this.dht, this.config);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode, this.config); this.server = new KadServer(udpPort, this.messageFactory, this.localNode, this.config);
this.timer = new Timer(true); this.refreshOperationTimer = new Timer(true);
/* Schedule Recurring RestoreOperation */ /* Schedule Recurring RestoreOperation */
timer.schedule( refreshOperationTTask = new TimerTask()
new TimerTask()
{ {
@Override @Override
public void run() public void run()
@ -107,19 +112,22 @@ public class KademliaNode
System.err.println("Refresh Operation Failed; Message: " + e.getMessage()); System.err.println("Refresh Operation Failed; Message: " + e.getMessage());
} }
} }
}, };
// Delay // Interval refreshOperationTimer.schedule(refreshOperationTTask, this.config.restoreInterval(), this.config.restoreInterval());
this.config.restoreInterval(), this.config.restoreInterval()
); this.isRunning = true;
} }
public KademliaNode(String ownerId, NodeId defaultId, int udpPort, RoutingTable routingTable, KadConfiguration config) throws IOException public KademliaNode(String ownerId, Node node, int udpPort, RoutingTable routingTable, KadConfiguration config) throws IOException
{ {
this(ownerId, this(
new Node(defaultId, InetAddress.getLocalHost(), udpPort, config), ownerId,
node,
udpPort, udpPort,
new DHT(ownerId, config),
routingTable, routingTable,
new DHT(ownerId, config), config); config
);
} }
public KademliaNode(String ownerId, Node node, int udpPort, KadConfiguration config) throws IOException public KademliaNode(String ownerId, Node node, int udpPort, KadConfiguration config) throws IOException
@ -128,7 +136,6 @@ public class KademliaNode
ownerId, ownerId,
node, node,
udpPort, udpPort,
new DHT(ownerId, config),
new RoutingTable(node, config), new RoutingTable(node, config),
config config
); );
@ -283,7 +290,6 @@ public class KademliaNode
/** /**
* Get some content stored on the DHT * Get some content stored on the DHT
* The content returned is a JSON String in byte format; this string is parsed into a class
* *
* @param param The parameters used to search for the content * @param param The parameters used to search for the content
* *
@ -344,6 +350,13 @@ public class KademliaNode
/* Shut down the server */ /* Shut down the server */
this.server.shutdown(); this.server.shutdown();
/* Close off the timer tasks */
this.refreshOperationTTask.cancel();
this.refreshOperationTimer.cancel();
this.refreshOperationTimer.purge();
this.isRunning = false;
/* Save this Kademlia instance's state if required */ /* Save this Kademlia instance's state if required */
if (saveState) if (saveState)
{ {

View File

@ -10,7 +10,7 @@ import java.io.File;
public class DefaultConfiguration implements KadConfiguration public class DefaultConfiguration implements KadConfiguration
{ {
private final static long RESTORE_INTERVAL = 1000 * 1000; // Default at 1 hour private final static long RESTORE_INTERVAL = 10 * 1000; // Default at 1 hour
private final static long RESPONSE_TIMEOUT = 1500; private final static long RESPONSE_TIMEOUT = 1500;
private final static long OPERATION_TIMEOUT = 3000; private final static long OPERATION_TIMEOUT = 3000;
private final static int CONCURRENCY = 10; private final static int CONCURRENCY = 10;

View File

@ -13,6 +13,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import kademlia.exceptions.KadServerDownException;
import kademlia.message.Message; import kademlia.message.Message;
import kademlia.message.MessageFactory; import kademlia.message.MessageFactory;
import kademlia.node.Node; import kademlia.node.Node;
@ -104,12 +105,13 @@ public class KadServer
* @return Integer The communication ID of this message * @return Integer The communication ID of this message
* *
* @throws IOException * @throws IOException
* @throws kademlia.exceptions.KadServerDownException
*/ */
public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException public synchronized int sendMessage(Node to, Message msg, Receiver recv) throws IOException, KadServerDownException
{ {
if (!isRunning) if (!isRunning)
{ {
throw new IllegalStateException("Kad Server is not running on node " + this.localNode); throw new KadServerDownException("Kad Server is not running on node " + this.localNode);
} }
/* Generate a random communication ID */ /* Generate a random communication ID */

View File

@ -0,0 +1,21 @@
package kademlia.exceptions;
/**
* An exception to be thrown whenever the Kad Server is down
*
* @author Joshua Kissoon
* @created 20140428
*/
public class KadServerDownException extends RoutingException
{
public KadServerDownException()
{
super();
}
public KadServerDownException(String message)
{
super(message);
}
}

View File

@ -44,7 +44,7 @@ public class ConnectOperation implements Operation, Receiver
} }
@Override @Override
public synchronized void execute() public synchronized void execute() throws IOException
{ {
try try
{ {
@ -88,9 +88,9 @@ public class ConnectOperation implements Operation, Receiver
*/ */
new BucketRefreshOperation(this.server, this.localNode, this.config).execute(); new BucketRefreshOperation(this.server, this.localNode, this.config).execute();
} }
catch (IOException | InterruptedException e) catch (InterruptedException e)
{ {
e.printStackTrace(); System.err.println("Connect operation was interrupted. ");
} }
} }

View File

@ -43,10 +43,6 @@ public class RoutingTableStateTesting
kad9.bootstrap(kad0.getNode()); kad9.bootstrap(kad0.getNode());
/* Lets shut down a node and then try putting a content on the network. We'll then see how the un-responsive contacts work */ /* Lets shut down a node and then try putting a content on the network. We'll then see how the un-responsive contacts work */
}
catch (IllegalStateException e)
{
} }
catch (Exception e) catch (Exception e)
{ {