Persistent content storage now working perfectly!!!

This commit is contained in:
Joshua Kissoon 2014-02-26 11:40:06 +05:30
parent c67e501df6
commit 9f14c66a31
10 changed files with 250 additions and 21 deletions

View File

@ -47,4 +47,9 @@ public class Configuration
* Number of times a node can be marked as stale before it is actually removed. * Number of times a node can be marked as stale before it is actually removed.
* */ * */
public static int STALE = 1; public static int STALE = 1;
/**
* Local Storage location - Relative to the user's home folder (Cross-Platform)
*/
public static String localFolder = "kademlia";
} }

View File

@ -96,11 +96,15 @@ public class KadServer
/* Generate a random communication ID */ /* Generate a random communication ID */
int comm = new Integer(new Random().nextInt()); int comm = new Integer(new Random().nextInt());
/* Setup the receiver to handle message response */ /* If we have a receiver */
receivers.put(comm, recv); if (recv != null)
TimerTask task = new TimeoutTask(comm, recv); {
timer.schedule(task, Configuration.RESPONSE_TIMEOUT); /* Setup the receiver to handle message response */
tasks.put(comm, task); receivers.put(comm, recv);
TimerTask task = new TimeoutTask(comm, recv);
timer.schedule(task, Configuration.RESPONSE_TIMEOUT);
tasks.put(comm, task);
}
/* Send the message */ /* Send the message */
sendMessage(to, msg, comm); sendMessage(to, msg, comm);

View File

@ -4,6 +4,7 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import kademlia.dht.DHT;
import kademlia.dht.KadContent; import kademlia.dht.KadContent;
import kademlia.exceptions.RoutingException; import kademlia.exceptions.RoutingException;
import kademlia.message.MessageFactory; import kademlia.message.MessageFactory;
@ -33,6 +34,7 @@ public class Kademlia
/* Objects to be used */ /* Objects to be used */
private final Node localNode; private final Node localNode;
private final KadServer server; private final KadServer server;
private final DHT dht;
private final Timer timer; private final Timer timer;
/* Factories */ /* Factories */
@ -56,7 +58,8 @@ public class Kademlia
{ {
this.ownerId = ownerId; this.ownerId = ownerId;
this.localNode = new Node(defaultId, InetAddress.getLocalHost(), udpPort); this.localNode = new Node(defaultId, InetAddress.getLocalHost(), udpPort);
this.messageFactory = new MessageFactory(localNode); this.dht = new DHT();
this.messageFactory = new MessageFactory(localNode, this.dht);
this.server = new KadServer(udpPort, this.messageFactory, this.localNode); this.server = new KadServer(udpPort, this.messageFactory, this.localNode);
this.timer = new Timer(true); this.timer = new Timer(true);

66
src/kademlia/dht/DHT.java Normal file
View File

@ -0,0 +1,66 @@
package kademlia.dht;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import kademlia.core.Configuration;
import kademlia.serializer.JsonSerializer;
/**
* The main Distributed Hash Table class that manages the entire DHT
*
* @author Joshua Kissoon
* @since 20140226
*/
public class DHT
{
private final StorageEntryManager entriesManager;
{
entriesManager = new StorageEntryManager();
}
/**
* Handle storing content locally
*
* @param content The DHT content to store
*
* @throws java.io.IOException
*/
public void store(KadContent content) throws IOException
{
/* Keep track of this content in the entries manager */
this.entriesManager.put(new StorageEntry(content));
/**
* Now we store the content locally in a file
* Each content is stored in a folder named after the first 10 characters of the NodeId
*
* The name of the file containing the content is the hash of this content
*/
String storagePath = System.getProperty("user.home") + File.separator + Configuration.localFolder;
File mainStorageFolder = new File(storagePath);
/* Create the folder if it doesn't exist */
if (!mainStorageFolder.isDirectory())
{
mainStorageFolder.mkdir();
}
/* Check if a folder after the first 10 characters of Hex(nodeId) exist, if not, create it */
String folderName = content.getKey().hexRepresentation().substring(0, 20);
File contentStorageFolder = new File(mainStorageFolder + File.separator + folderName);
if (!contentStorageFolder.isDirectory())
{
contentStorageFolder.mkdir();
}
/* Write the content to a file and store it in the folder */
File contentFile = new File(String.valueOf(content.hashCode()) + ".kct");
DataOutputStream dout = new DataOutputStream(new FileOutputStream(contentStorageFolder + File.separator + contentFile));
new JsonSerializer().write(content, dout);
}
}

View File

@ -0,0 +1,52 @@
package kademlia.dht;
import java.util.Objects;
import kademlia.node.NodeId;
/**
* Keeps track of data for a Content stored in the DHT
* Used by the StorageEntryManager class
*
* @author Joshua Kissoon
* @since 20140226
*/
public class StorageEntry
{
private final NodeId key;
private final String ownerId;
private final String type;
public StorageEntry(KadContent content)
{
this.key = content.getKey();
this.ownerId = content.getOwnerId();
this.type = content.getType();
}
public NodeId getKey()
{
return this.key;
}
@Override
public boolean equals(Object o)
{
if (o instanceof StorageEntry)
{
return this.hashCode() == o.hashCode();
}
return false;
}
@Override
public int hashCode()
{
int hash = 3;
hash = 23 * hash + Objects.hashCode(this.key);
hash = 23 * hash + Objects.hashCode(this.ownerId);
hash = 23 * hash + Objects.hashCode(this.type);
return hash;
}
}

View File

@ -0,0 +1,68 @@
package kademlia.dht;
import java.util.ArrayList;
import java.util.HashMap;
import kademlia.node.NodeId;
/**
* It would be infeasible to keep all content in memory to be send when requested
* Instead we store content into files
* We use this Class to keep track of all content stored
*
* @author Joshua Kissoon
* @since 20140226
*/
public class StorageEntryManager
{
private final HashMap<NodeId, ArrayList<StorageEntry>> entries;
{
entries = new HashMap<>();
}
/**
* Add a new entry to our storage
*
* @param entry
*/
public void put(StorageEntry entry)
{
if (!this.entries.containsKey(entry.getKey()))
{
this.entries.put(entry.getKey(), new ArrayList<StorageEntry>());
}
this.entries.get(entry.getKey()).add(entry);
}
/**
* Checks if our DHT has a Content for the given criteria
*
* @todo Add searching for content by type and ownerID
*
* @param key
*
* @return boolean
*/
public boolean contains(NodeId key)
{
return this.entries.containsKey(key);
}
/**
* Checks if our DHT has a Content for the given criteria
*
* @todo Add finding for content by type and ownerID
*
* @param key
*
* @return List of content for the specific search parameters
*/
public ArrayList<StorageEntry> get(NodeId key)
{
return this.entries.get(key);
}
}

View File

@ -3,6 +3,7 @@ package kademlia.message;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import kademlia.core.KadServer; import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.node.Node; import kademlia.node.Node;
import kademlia.operation.Receiver; import kademlia.operation.Receiver;
@ -16,10 +17,12 @@ public class MessageFactory
{ {
private final Node localNode; private final Node localNode;
private final DHT dht;
public MessageFactory(Node local) public MessageFactory(Node local, DHT dht)
{ {
this.localNode = local; this.localNode = local;
this.dht = dht;
} }
public Message createMessage(byte code, DataInputStream in) throws IOException public Message createMessage(byte code, DataInputStream in) throws IOException
@ -57,7 +60,7 @@ public class MessageFactory
case NodeLookupMessage.CODE: case NodeLookupMessage.CODE:
return new NodeLookupReceiver(server, this.localNode); return new NodeLookupReceiver(server, this.localNode);
case StoreContentMessage.CODE: case StoreContentMessage.CODE:
return new StoreContentReceiver(server, this.localNode); return new StoreContentReceiver(server, this.localNode, this.dht);
} }
} }
} }

View File

@ -48,7 +48,7 @@ public class StoreContentMessage implements Message
} }
@Override @Override
public void fromStream(DataInputStream in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
this.origin = new Node(in); this.origin = new Node(in);
try try
@ -61,12 +61,23 @@ public class StoreContentMessage implements Message
} }
} }
public Node getOrigin()
{
return this.origin;
}
public KadContent getContent()
{
return this.content;
}
@Override @Override
public byte code() public byte code()
{ {
return CODE; return CODE;
} }
@Override
public String toString() public String toString()
{ {
return "StoreMessage[origin=" + origin + ",content=" + content + "]"; return "StoreMessage[origin=" + origin + ",content=" + content + "]";

View File

@ -1,6 +1,8 @@
package kademlia.message; package kademlia.message;
import java.io.IOException;
import kademlia.core.KadServer; import kademlia.core.KadServer;
import kademlia.dht.DHT;
import kademlia.node.Node; import kademlia.node.Node;
import kademlia.operation.Receiver; import kademlia.operation.Receiver;
@ -15,20 +17,37 @@ public class StoreContentReceiver implements Receiver
private final KadServer server; private final KadServer server;
private final Node localNode; private final Node localNode;
private final DHT dht;
public StoreContentReceiver(KadServer server, Node localNode) public StoreContentReceiver(KadServer server, Node localNode, DHT dht)
{ {
this.server = server; this.server = server;
this.localNode = localNode; this.localNode = localNode;
this.dht = dht;
} }
@Override @Override
public void receive(Message incoming, int comm) public void receive(Message incoming, int comm)
{ {
/* @todo - Insert the message sender into this node's routing table */ /* It's a StoreContentMessage we're receiving */
StoreContentMessage msg = (StoreContentMessage) incoming; StoreContentMessage msg = (StoreContentMessage) incoming;
/* Insert the message sender into this node's routing table */
this.localNode.getRoutingTable().insert(msg.getOrigin());
System.out.println(this.localNode + " - Received a store content message"); System.out.println(this.localNode + " - Received a store content message");
System.out.println(msg); System.out.println(msg);
try
{
/* Store this Content into the DHT */
this.dht.store(msg.getContent());
}
catch (IOException e)
{
System.err.println("Unable to store received content; Message: " + e.getMessage());
}
} }
@Override @Override

View File

@ -5,9 +5,7 @@
*/ */
package kademlia.node; package kademlia.node;
import java.io.DataInput;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
@ -25,8 +23,6 @@ public class NodeId implements Streamable
* Construct the NodeId from some string * Construct the NodeId from some string
* *
* @param data The user generated key string * @param data The user generated key string
*
* @todo Throw an exception if the key is too short or too long
*/ */
public NodeId(String data) public NodeId(String data)
{ {
@ -202,15 +198,17 @@ public class NodeId implements Streamable
this.keyBytes = input; this.keyBytes = input;
} }
public String hexRepresentation()
{
/* Returns the hex format of this NodeId */
BigInteger bi = new BigInteger(1, this.keyBytes);
return String.format("%0" + (this.keyBytes.length << 1) + "X", bi);
}
@Override @Override
public String toString() public String toString()
{ {
// StringBuilder sb = new StringBuilder("NodeId: "); return this.hexRepresentation();
BigInteger bi = new BigInteger(1, this.keyBytes);
return String.format("%0" + (this.keyBytes.length << 1) + "X", bi);
//sb.append(Hex.encodeBase64URLSafeString(this.keyBytes));
//return sb.toString();
} }
} }