Started working on sending content messages

This commit is contained in:
Joshua Kissoon 2014-02-25 13:01:06 +05:30
parent 44077d344d
commit faef3d03ec
12 changed files with 244 additions and 17 deletions

4
src/kademlia/Todo Normal file
View File

@ -0,0 +1,4 @@
# What's left to add to the implementation
1. Implement the "Optimized Contact Accounting" features as mentioned in the paper
2. Implement the "Accelerated Lookups" featured as described in the paper

View File

@ -0,0 +1,39 @@
package kademlia.core;
import kademlia.node.NodeId;
/**
* A GET request can get content based on Key, Owner, Type, etc
*
* This is a class containing the parameters to be passed in a GET request
*
* We use a class since the number of filtering parameters can change later
*
* @author Joshua Kissoon
* @since 20140224
*/
public class GetParameter
{
private NodeId key;
private String owner = null;
private String type = null;
public GetParameter(NodeId key)
{
this.key = key;
}
public GetParameter(NodeId key, String owner)
{
this(key);
this.owner = owner;
}
public GetParameter(NodeId key, String owner, String type)
{
this(key, owner);
this.type = type;
}
}

View File

@ -12,12 +12,17 @@ import kademlia.node.NodeId;
import kademlia.operation.ConnectOperation; import kademlia.operation.ConnectOperation;
import kademlia.operation.Operation; import kademlia.operation.Operation;
import kademlia.operation.RefreshOperation; import kademlia.operation.RefreshOperation;
import kademlia.operation.StoreOperation;
/** /**
* The main Kademlia network management class * The main Kademlia network management class
* *
* @author Joshua Kissoon * @author Joshua Kissoon
* @since 20140215 * @since 20140215
*
* @todo When we receive a store message - if we have a newer version of the content, re-send this newer version to that node so as to update their version
* @todo Handle IPv6 Addresses
* @todo Handle compressing data
*/ */
public class Kademlia public class Kademlia
{ {
@ -115,22 +120,35 @@ public class Kademlia
* *
* @param content The content to put onto the DHT * @param content The content to put onto the DHT
* *
* @return Integer How many nodes the content was stored on
*
* @throws java.io.IOException
*
*/ */
public boolean put(DHTContent content) public int put(DHTContent content) throws IOException
{ {
return (int) new StoreOperation(server, localNode, content).execute();
return false;
} }
/** /**
* Get some content stored on the DHT * Get some content stored on the DHT
* The content returned is a JSON String in byte format; this string is parsed into a class
* *
* @param key The key of this content * @param param The parameters used to search for the content
* @param c The class to cast the returned object to
* *
* @return DHTContent The content * @return DHTContent The content
*/ */
public DHTContent get(NodeId key) public DHTContent get(GetParameter param, Class c)
{ {
return null; return null;
} }
/**
* @return String The ID of the owner of this local network
*/
public String getOwnerId()
{
return this.ownerId;
}
} }

View File

@ -22,11 +22,12 @@ public interface DHTContent
public String getType(); public String getType();
/** /**
* Each content will have an expiry date for when a user should delete it form his/her machine * Each content will have an created date
* This allows systems to know when to delete a content form his/her machine
* *
* @return long The expiry date of this content * @return long The create date of this content
*/ */
public long getExpiryDate(); public long getCreatedTimestamp();
/** /**
* @return The ID of the owner of this content * @return The ID of the owner of this content

View File

@ -32,7 +32,6 @@ public class ConnectReceiver implements Receiver
@Override @Override
public void receive(Message incoming, int comm) throws IOException public void receive(Message incoming, int comm) throws IOException
{ {
System.out.println("Received incoming connect message, sending acknowledgement message.");
ConnectMessage mess = (ConnectMessage) incoming; ConnectMessage mess = (ConnectMessage) incoming;
/* Update the local space by inserting the origin node. */ /* Update the local space by inserting the origin node. */

View File

@ -0,0 +1,59 @@
package kademlia.message;
import java.io.DataInput;
import java.io.DataOutput;
import kademlia.dht.DHTContent;
import kademlia.node.Node;
/**
* A Message used to send a content store request to another DHT
*
* @author Joshua Kissoon
* @since 20140224
*/
public class ContentStoreMessage implements Message
{
private final Node origin;
private final DHTContent content;
public final static byte CODE = 0x23;
/**
* @param origin Where did this content come from - it'll always be the local node
* @param content The Content to send
*/
public ContentStoreMessage(Node origin, DHTContent content)
{
this.origin = origin;
this.content = content;
}
@Override
public byte code()
{
return CODE;
}
@Override
public void fromStream(DataInput in)
{
}
@Override
public void toStream(DataOutput out)
{
/* @todo write the origin and the content to the stream */
}
public Node getOrigin()
{
return this.origin;
}
public DHTContent getContent()
{
return this.content;
}
}

View File

@ -29,6 +29,10 @@ public class NodeId implements Streamable
public NodeId(String data) public NodeId(String data)
{ {
keyBytes = data.getBytes(); keyBytes = data.getBytes();
if (keyBytes.length != ID_LENGTH / 8)
{
throw new IllegalArgumentException("Specified Data need to be " + (ID_LENGTH / 8) + " characters long.");
}
} }
/** /**
@ -42,6 +46,10 @@ public class NodeId implements Streamable
public NodeId(byte[] bytes) public NodeId(byte[] bytes)
{ {
if (bytes.length != ID_LENGTH / 8)
{
throw new IllegalArgumentException("Specified Data need to be " + (ID_LENGTH / 8) + " characters long.");
}
this.keyBytes = bytes; this.keyBytes = bytes;
} }

View File

@ -46,7 +46,6 @@ public class ConnectOperation implements Operation, Receiver
{ {
try try
{ {
System.out.println("Connect Operation executing.");
/* Contact the bootstrap node */ /* Contact the bootstrap node */
this.error = true; this.error = true;
this.attempts = 0; this.attempts = 0;

View File

@ -125,11 +125,9 @@ public class NodeLookupOperation implements Operation, Receiver
for (Node o : list) for (Node o : list)
{ {
/* If this node is not in the list, add the node */ /* If this node is not in the list, add the node */
System.out.println("Trying to add node " + o.getNodeId() + " hash: " + o.hashCode());
System.out.println("Contains Key for this node: " + nodes.containsKey(o));
if (!nodes.containsKey(o)) if (!nodes.containsKey(o))
{ {
System.out.println("Adding unasked node " + o.getNodeId()); System.out.println("Adding node " + o.getNodeId());
nodes.put(o, UNASKED); nodes.put(o, UNASKED);
} }
} }
@ -162,7 +160,6 @@ public class NodeLookupOperation implements Operation, Receiver
/* Get unqueried nodes among the K closest seen that have not FAILED */ /* Get unqueried nodes among the K closest seen that have not FAILED */
ArrayList<Node> unasked = this.closestNodesNotFailed(UNASKED); ArrayList<Node> unasked = this.closestNodesNotFailed(UNASKED);
System.out.println("Unasked nodes found: ");
for (Node nn : unasked) for (Node nn : unasked)
{ {
System.out.println(nn.getNodeId()); System.out.println(nn.getNodeId());
@ -218,7 +215,7 @@ public class NodeLookupOperation implements Operation, Receiver
} }
} }
} }
return closestNodes; return closestNodes;
} }

View File

@ -36,6 +36,9 @@ public class StoreOperation implements Operation
{ {
/* Get the nodes on which we need to store the content */ /* Get the nodes on which we need to store the content */
ArrayList<Node> nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute(); ArrayList<Node> nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute();
return null; System.out.println("Nodes to put content on: " + nodes);
/* Return how many nodes the content was stored on */
return nodes.size();
} }
} }

View File

@ -0,0 +1,39 @@
package kademlia.tests;
import java.io.IOException;
import kademlia.core.Kademlia;
import kademlia.node.NodeId;
/**
* Testing sending and receiving content between 2 Nodes on a network
*
* @author Joshua Kissoon
* @since 20140224
*/
public class ContentSendingTest
{
public static void main(String[] args)
{
try
{
/* Setting up 2 Kad networks */
Kademlia kad1 = new Kademlia("JoshuaK", new NodeId("ASF45678947584567467"), 7574);
System.out.println("Created Node Kad 1: " + kad1.getNode().getNodeId());
Kademlia kad2 = new Kademlia("Crystal", new NodeId("ASERTKJDHGVHERJHGFLK"), 7572);
System.out.println("Created Node Kad 2: " + kad2.getNode().getNodeId());
kad2.connect(kad1.getNode());
/**
* Lets create the content and share it
*/
DHTContentImpl c = new DHTContentImpl(kad2.getOwnerId(), "Some Data");
kad2.put(c);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}

View File

@ -1,12 +1,73 @@
package kademlia.tests; package kademlia.tests;
import kademlia.dht.DHTContent;
import kademlia.node.NodeId;
/** /**
* A simple DHT Content object to test DHT storage * A simple DHT Content object to test DHT storage
* *
* @author Joshua Kissoon * @author Joshua Kissoon
* @since 20140224 * @since 20140224
*/ */
public class DHTContentImpl public class DHTContentImpl implements DHTContent
{ {
private final NodeId key;
private String data;
private final String ownerId;
private final long createTs;
private static final String type = "DHTContentImpl";
{
this.createTs = System.currentTimeMillis() / 1000L;
}
public DHTContentImpl(String ownerId, String data)
{
this.ownerId = ownerId;
this.data = data;
this.key = new NodeId();
}
public DHTContentImpl(NodeId key, String ownerId)
{
this.key = key;
this.ownerId = ownerId;
}
public void setData(String newData)
{
this.data = newData;
}
public String getData()
{
return this.data;
}
@Override
public NodeId getKey()
{
return this.key;
}
@Override
public String getType()
{
return type;
}
@Override
public String getOwnerId()
{
return this.ownerId;
}
@Override
public long getCreatedTimestamp()
{
return this.createTs;
}
} }