From 9f14c66a314de93b0efe51324bacdfa33f9ef0fd Mon Sep 17 00:00:00 2001 From: Joshua Kissoon Date: Wed, 26 Feb 2014 11:40:06 +0530 Subject: [PATCH] Persistent content storage now working perfectly!!! --- src/kademlia/core/Configuration.java | 5 ++ src/kademlia/core/KadServer.java | 14 ++-- src/kademlia/core/Kademlia.java | 5 +- src/kademlia/dht/DHT.java | 66 ++++++++++++++++++ src/kademlia/dht/StorageEntry.java | 52 ++++++++++++++ src/kademlia/dht/StorageEntryManager.java | 68 +++++++++++++++++++ src/kademlia/message/MessageFactory.java | 7 +- src/kademlia/message/StoreContentMessage.java | 13 +++- .../message/StoreContentReceiver.java | 23 ++++++- src/kademlia/node/NodeId.java | 18 +++-- 10 files changed, 250 insertions(+), 21 deletions(-) create mode 100644 src/kademlia/dht/DHT.java create mode 100644 src/kademlia/dht/StorageEntry.java create mode 100644 src/kademlia/dht/StorageEntryManager.java diff --git a/src/kademlia/core/Configuration.java b/src/kademlia/core/Configuration.java index 81e0e29..61bd624 100644 --- a/src/kademlia/core/Configuration.java +++ b/src/kademlia/core/Configuration.java @@ -47,4 +47,9 @@ public class Configuration * Number of times a node can be marked as stale before it is actually removed. * */ public static int STALE = 1; + + /** + * Local Storage location - Relative to the user's home folder (Cross-Platform) + */ + public static String localFolder = "kademlia"; } diff --git a/src/kademlia/core/KadServer.java b/src/kademlia/core/KadServer.java index 1cc9dab..5c07fac 100644 --- a/src/kademlia/core/KadServer.java +++ b/src/kademlia/core/KadServer.java @@ -96,11 +96,15 @@ public class KadServer /* Generate a random communication ID */ int comm = new Integer(new Random().nextInt()); - /* Setup the receiver to handle message response */ - receivers.put(comm, recv); - TimerTask task = new TimeoutTask(comm, recv); - timer.schedule(task, Configuration.RESPONSE_TIMEOUT); - tasks.put(comm, task); + /* If we have a receiver */ + if (recv != null) + { + /* Setup the receiver to handle message response */ + receivers.put(comm, recv); + TimerTask task = new TimeoutTask(comm, recv); + timer.schedule(task, Configuration.RESPONSE_TIMEOUT); + tasks.put(comm, task); + } /* Send the message */ sendMessage(to, msg, comm); diff --git a/src/kademlia/core/Kademlia.java b/src/kademlia/core/Kademlia.java index 7823920..616cfbd 100644 --- a/src/kademlia/core/Kademlia.java +++ b/src/kademlia/core/Kademlia.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Timer; import java.util.TimerTask; +import kademlia.dht.DHT; import kademlia.dht.KadContent; import kademlia.exceptions.RoutingException; import kademlia.message.MessageFactory; @@ -33,6 +34,7 @@ public class Kademlia /* Objects to be used */ private final Node localNode; private final KadServer server; + private final DHT dht; private final Timer timer; /* Factories */ @@ -56,7 +58,8 @@ public class Kademlia { this.ownerId = ownerId; this.localNode = new Node(defaultId, InetAddress.getLocalHost(), udpPort); - this.messageFactory = new MessageFactory(localNode); + this.dht = new DHT(); + this.messageFactory = new MessageFactory(localNode, this.dht); this.server = new KadServer(udpPort, this.messageFactory, this.localNode); this.timer = new Timer(true); diff --git a/src/kademlia/dht/DHT.java b/src/kademlia/dht/DHT.java new file mode 100644 index 0000000..87c373d --- /dev/null +++ b/src/kademlia/dht/DHT.java @@ -0,0 +1,66 @@ +package kademlia.dht; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import kademlia.core.Configuration; +import kademlia.serializer.JsonSerializer; + +/** + * The main Distributed Hash Table class that manages the entire DHT + * + * @author Joshua Kissoon + * @since 20140226 + */ +public class DHT +{ + + private final StorageEntryManager entriesManager; + + + { + entriesManager = new StorageEntryManager(); + } + + /** + * Handle storing content locally + * + * @param content The DHT content to store + * + * @throws java.io.IOException + */ + public void store(KadContent content) throws IOException + { + /* Keep track of this content in the entries manager */ + this.entriesManager.put(new StorageEntry(content)); + + /** + * Now we store the content locally in a file + * Each content is stored in a folder named after the first 10 characters of the NodeId + * + * The name of the file containing the content is the hash of this content + */ + String storagePath = System.getProperty("user.home") + File.separator + Configuration.localFolder; + File mainStorageFolder = new File(storagePath); + + /* Create the folder if it doesn't exist */ + if (!mainStorageFolder.isDirectory()) + { + mainStorageFolder.mkdir(); + } + + /* Check if a folder after the first 10 characters of Hex(nodeId) exist, if not, create it */ + String folderName = content.getKey().hexRepresentation().substring(0, 20); + File contentStorageFolder = new File(mainStorageFolder + File.separator + folderName); + if (!contentStorageFolder.isDirectory()) + { + contentStorageFolder.mkdir(); + } + + /* Write the content to a file and store it in the folder */ + File contentFile = new File(String.valueOf(content.hashCode()) + ".kct"); + DataOutputStream dout = new DataOutputStream(new FileOutputStream(contentStorageFolder + File.separator + contentFile)); + new JsonSerializer().write(content, dout); + } +} diff --git a/src/kademlia/dht/StorageEntry.java b/src/kademlia/dht/StorageEntry.java new file mode 100644 index 0000000..673a53b --- /dev/null +++ b/src/kademlia/dht/StorageEntry.java @@ -0,0 +1,52 @@ +package kademlia.dht; + +import java.util.Objects; +import kademlia.node.NodeId; + +/** + * Keeps track of data for a Content stored in the DHT + * Used by the StorageEntryManager class + * + * @author Joshua Kissoon + * @since 20140226 + */ +public class StorageEntry +{ + + private final NodeId key; + private final String ownerId; + private final String type; + + public StorageEntry(KadContent content) + { + this.key = content.getKey(); + this.ownerId = content.getOwnerId(); + this.type = content.getType(); + } + + public NodeId getKey() + { + return this.key; + } + + @Override + public boolean equals(Object o) + { + if (o instanceof StorageEntry) + { + return this.hashCode() == o.hashCode(); + } + + return false; + } + + @Override + public int hashCode() + { + int hash = 3; + hash = 23 * hash + Objects.hashCode(this.key); + hash = 23 * hash + Objects.hashCode(this.ownerId); + hash = 23 * hash + Objects.hashCode(this.type); + return hash; + } +} diff --git a/src/kademlia/dht/StorageEntryManager.java b/src/kademlia/dht/StorageEntryManager.java new file mode 100644 index 0000000..e54d79c --- /dev/null +++ b/src/kademlia/dht/StorageEntryManager.java @@ -0,0 +1,68 @@ +package kademlia.dht; + +import java.util.ArrayList; +import java.util.HashMap; +import kademlia.node.NodeId; + +/** + * It would be infeasible to keep all content in memory to be send when requested + * Instead we store content into files + * We use this Class to keep track of all content stored + * + * @author Joshua Kissoon + * @since 20140226 + */ +public class StorageEntryManager +{ + + private final HashMap> entries; + + + { + entries = new HashMap<>(); + } + + /** + * Add a new entry to our storage + * + * @param entry + */ + public void put(StorageEntry entry) + { + if (!this.entries.containsKey(entry.getKey())) + { + this.entries.put(entry.getKey(), new ArrayList()); + } + + this.entries.get(entry.getKey()).add(entry); + } + + /** + * Checks if our DHT has a Content for the given criteria + * + * @todo Add searching for content by type and ownerID + * + * @param key + * + * @return boolean + */ + public boolean contains(NodeId key) + { + return this.entries.containsKey(key); + } + + /** + * Checks if our DHT has a Content for the given criteria + * + * @todo Add finding for content by type and ownerID + * + * @param key + * + * @return List of content for the specific search parameters + */ + public ArrayList get(NodeId key) + { + return this.entries.get(key); + } + +} diff --git a/src/kademlia/message/MessageFactory.java b/src/kademlia/message/MessageFactory.java index f9570cb..19b30bf 100644 --- a/src/kademlia/message/MessageFactory.java +++ b/src/kademlia/message/MessageFactory.java @@ -3,6 +3,7 @@ package kademlia.message; import java.io.DataInputStream; import java.io.IOException; import kademlia.core.KadServer; +import kademlia.dht.DHT; import kademlia.node.Node; import kademlia.operation.Receiver; @@ -16,10 +17,12 @@ public class MessageFactory { private final Node localNode; + private final DHT dht; - public MessageFactory(Node local) + public MessageFactory(Node local, DHT dht) { this.localNode = local; + this.dht = dht; } public Message createMessage(byte code, DataInputStream in) throws IOException @@ -57,7 +60,7 @@ public class MessageFactory case NodeLookupMessage.CODE: return new NodeLookupReceiver(server, this.localNode); case StoreContentMessage.CODE: - return new StoreContentReceiver(server, this.localNode); + return new StoreContentReceiver(server, this.localNode, this.dht); } } } diff --git a/src/kademlia/message/StoreContentMessage.java b/src/kademlia/message/StoreContentMessage.java index bdde47c..fca1673 100644 --- a/src/kademlia/message/StoreContentMessage.java +++ b/src/kademlia/message/StoreContentMessage.java @@ -48,7 +48,7 @@ public class StoreContentMessage implements Message } @Override - public void fromStream(DataInputStream in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { this.origin = new Node(in); try @@ -61,12 +61,23 @@ public class StoreContentMessage implements Message } } + public Node getOrigin() + { + return this.origin; + } + + public KadContent getContent() + { + return this.content; + } + @Override public byte code() { return CODE; } + @Override public String toString() { return "StoreMessage[origin=" + origin + ",content=" + content + "]"; diff --git a/src/kademlia/message/StoreContentReceiver.java b/src/kademlia/message/StoreContentReceiver.java index 3b437d3..b6832df 100644 --- a/src/kademlia/message/StoreContentReceiver.java +++ b/src/kademlia/message/StoreContentReceiver.java @@ -1,6 +1,8 @@ package kademlia.message; +import java.io.IOException; import kademlia.core.KadServer; +import kademlia.dht.DHT; import kademlia.node.Node; import kademlia.operation.Receiver; @@ -15,20 +17,37 @@ public class StoreContentReceiver implements Receiver private final KadServer server; private final Node localNode; + private final DHT dht; - public StoreContentReceiver(KadServer server, Node localNode) + public StoreContentReceiver(KadServer server, Node localNode, DHT dht) { this.server = server; this.localNode = localNode; + this.dht = dht; } @Override public void receive(Message incoming, int comm) { - /* @todo - Insert the message sender into this node's routing table */ + /* It's a StoreContentMessage we're receiving */ StoreContentMessage msg = (StoreContentMessage) incoming; + + /* Insert the message sender into this node's routing table */ + this.localNode.getRoutingTable().insert(msg.getOrigin()); + System.out.println(this.localNode + " - Received a store content message"); System.out.println(msg); + + try + { + /* Store this Content into the DHT */ + this.dht.store(msg.getContent()); + } + catch (IOException e) + { + System.err.println("Unable to store received content; Message: " + e.getMessage()); + } + } @Override diff --git a/src/kademlia/node/NodeId.java b/src/kademlia/node/NodeId.java index c4386b6..21b9b1f 100644 --- a/src/kademlia/node/NodeId.java +++ b/src/kademlia/node/NodeId.java @@ -5,9 +5,7 @@ */ package kademlia.node; -import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.math.BigInteger; @@ -25,8 +23,6 @@ public class NodeId implements Streamable * Construct the NodeId from some string * * @param data The user generated key string - * - * @todo Throw an exception if the key is too short or too long */ public NodeId(String data) { @@ -202,15 +198,17 @@ public class NodeId implements Streamable this.keyBytes = input; } + public String hexRepresentation() + { + /* Returns the hex format of this NodeId */ + BigInteger bi = new BigInteger(1, this.keyBytes); + return String.format("%0" + (this.keyBytes.length << 1) + "X", bi); + } + @Override public String toString() { - // StringBuilder sb = new StringBuilder("NodeId: "); - BigInteger bi = new BigInteger(1, this.keyBytes); - return String.format("%0" + (this.keyBytes.length << 1) + "X", bi); - //sb.append(Hex.encodeBase64URLSafeString(this.keyBytes)); - - //return sb.toString(); + return this.hexRepresentation(); } }