Content Sending working between nodes

This commit is contained in:
Joshua Kissoon 2014-02-25 22:57:46 +05:30
parent fa4b29305e
commit c67e501df6
17 changed files with 214 additions and 53 deletions

View File

@ -118,12 +118,6 @@ public class KadServer
private void sendMessage(Node to, Message msg, int comm) throws IOException private void sendMessage(Node to, Message msg, int comm) throws IOException
{ {
Class c = msg.getClass();
System.out.println(c.getSimpleName());
System.out.println(c.getName());
/* Setup the message for transmission */ /* Setup the message for transmission */
ByteArrayOutputStream bout = new ByteArrayOutputStream(); ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout); DataOutputStream dout = new DataOutputStream(bout);

View File

@ -6,7 +6,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import kademlia.node.Node; import kademlia.node.Node;
@ -21,19 +23,19 @@ public class AcknowledgeMessage implements Message
this.origin = origin; this.origin = origin;
} }
public AcknowledgeMessage(DataInput in) throws IOException public AcknowledgeMessage(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@Override @Override
public final void fromStream(DataInput in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
this.origin = new Node(in); this.origin = new Node(in);
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
origin.toStream(out); origin.toStream(out);
} }

View File

@ -6,7 +6,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import kademlia.node.Node; import kademlia.node.Node;
@ -21,19 +23,19 @@ public class ConnectMessage implements Message
this.origin = origin; this.origin = origin;
} }
public ConnectMessage(DataInput in) throws IOException public ConnectMessage(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@Override @Override
public final void fromStream(DataInput in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
this.origin = new Node(in); this.origin = new Node(in);
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
origin.toStream(out); origin.toStream(out);
} }

View File

@ -1,7 +1,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import kademlia.dht.KadContent; import kademlia.dht.KadContent;
import kademlia.node.Node; import kademlia.node.Node;
@ -36,13 +38,13 @@ public class ContentStoreMessage implements Message
} }
@Override @Override
public void fromStream(DataInput in) public void fromStream(DataInputStream in)
{ {
} }
@Override @Override
public void toStream(DataOutput out) public void toStream(DataOutputStream out)
{ {
/* @todo write the origin and the content to the stream */ /* @todo write the origin and the content to the stream */
} }

View File

@ -1,16 +1,17 @@
/**
* @author Joshua
* @created
* @desc
*/
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import kademlia.core.KadServer; import kademlia.core.KadServer;
import kademlia.node.Node; import kademlia.node.Node;
import kademlia.operation.Receiver; import kademlia.operation.Receiver;
/**
* Handles creating messages and receivers
*
* @author Joshua Kissoon
* @since 20140202
*/
public class MessageFactory public class MessageFactory
{ {
@ -21,7 +22,7 @@ public class MessageFactory
this.localNode = local; this.localNode = local;
} }
public Message createMessage(byte code, DataInput in) throws IOException public Message createMessage(byte code, DataInputStream in) throws IOException
{ {
switch (code) switch (code)
{ {
@ -35,6 +36,8 @@ public class MessageFactory
return new NodeReplyMessage(in); return new NodeReplyMessage(in);
case NodeLookupMessage.CODE: case NodeLookupMessage.CODE:
return new NodeLookupMessage(in); return new NodeLookupMessage(in);
case StoreContentMessage.CODE:
return new StoreContentMessage(in);
default: default:
System.out.println("No Message handler found for message. Code: " + code); System.out.println("No Message handler found for message. Code: " + code);
return new SimpleMessage(in); return new SimpleMessage(in);
@ -53,6 +56,8 @@ public class MessageFactory
return new ConnectReceiver(server, this.localNode); return new ConnectReceiver(server, this.localNode);
case NodeLookupMessage.CODE: case NodeLookupMessage.CODE:
return new NodeLookupReceiver(server, this.localNode); return new NodeLookupReceiver(server, this.localNode);
case StoreContentMessage.CODE:
return new StoreContentReceiver(server, this.localNode);
} }
} }
} }

View File

@ -6,7 +6,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import kademlia.node.Node; import kademlia.node.Node;
import kademlia.node.NodeId; import kademlia.node.NodeId;
@ -31,20 +33,20 @@ public class NodeLookupMessage implements Message
this.lookupId = lookup; this.lookupId = lookup;
} }
public NodeLookupMessage(DataInput in) throws IOException public NodeLookupMessage(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@Override @Override
public final void fromStream(DataInput in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
this.origin = new Node(in); this.origin = new Node(in);
this.lookupId = new NodeId(in); this.lookupId = new NodeId(in);
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
this.origin.toStream(out); this.origin.toStream(out);
this.lookupId.toStream(out); this.lookupId.toStream(out);

View File

@ -6,7 +6,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import kademlia.node.Node; import kademlia.node.Node;
@ -24,13 +26,13 @@ public class NodeReplyMessage implements Message
this.nodes = nodes; this.nodes = nodes;
} }
public NodeReplyMessage(DataInput in) throws IOException public NodeReplyMessage(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@Override @Override
public final void fromStream(DataInput in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
/* Read in the origin */ /* Read in the origin */
this.origin = new Node(in); this.origin = new Node(in);
@ -47,7 +49,7 @@ public class NodeReplyMessage implements Message
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
/* Add the origin node to the stream */ /* Add the origin node to the stream */
origin.toStream(out); origin.toStream(out);

View File

@ -6,7 +6,9 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
public class SimpleMessage implements Message public class SimpleMessage implements Message
@ -22,7 +24,7 @@ public class SimpleMessage implements Message
this.content = message; this.content = message;
} }
public SimpleMessage(DataInput in) public SimpleMessage(DataInputStream in)
{ {
System.out.println("Creating message from input stream."); System.out.println("Creating message from input stream.");
this.fromStream(in); this.fromStream(in);
@ -35,7 +37,7 @@ public class SimpleMessage implements Message
} }
@Override @Override
public void toStream(DataOutput out) public void toStream(DataOutputStream out)
{ {
try try
{ {
@ -49,7 +51,7 @@ public class SimpleMessage implements Message
} }
@Override @Override
public final void fromStream(DataInput in) public final void fromStream(DataInputStream in)
{ {
try try
{ {

View File

@ -0,0 +1,74 @@
package kademlia.message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import kademlia.dht.KadContent;
import kademlia.node.Node;
import kademlia.serializer.JsonSerializer;
/**
* A StoreContentMessage used to send a store message to a node
*
* @author Joshua Kissoon
* @since 20140225
*/
public class StoreContentMessage implements Message
{
public static final byte CODE = 0x55;
private KadContent content;
private Node origin;
/**
* @param origin Where the message came from
* @param content The content to be stored
*
*/
public StoreContentMessage(Node origin, KadContent content)
{
this.content = content;
this.origin = origin;
}
public StoreContentMessage(DataInputStream in) throws IOException
{
this.fromStream(in);
}
@Override
public void toStream(DataOutputStream out) throws IOException
{
this.origin.toStream(out);
/* Serialize the KadContent, then send it to the stream */
JsonSerializer serializer = new JsonSerializer();
serializer.write(content, out);
}
@Override
public void fromStream(DataInputStream in) throws IOException
{
this.origin = new Node(in);
try
{
this.content = new JsonSerializer().read(in);
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
}
@Override
public byte code()
{
return CODE;
}
public String toString()
{
return "StoreMessage[origin=" + origin + ",content=" + content + "]";
}
}

View File

@ -0,0 +1,39 @@
package kademlia.message;
import kademlia.core.KadServer;
import kademlia.node.Node;
import kademlia.operation.Receiver;
/**
* Receiver for incoming StoreContentMessage
*
* @author Joshua Kissoon
* @since 20140225
*/
public class StoreContentReceiver implements Receiver
{
private final KadServer server;
private final Node localNode;
public StoreContentReceiver(KadServer server, Node localNode)
{
this.server = server;
this.localNode = localNode;
}
@Override
public void receive(Message incoming, int comm)
{
/* @todo - Insert the message sender into this node's routing table */
StoreContentMessage msg = (StoreContentMessage) incoming;
System.out.println(this.localNode + " - Received a store content message");
System.out.println(msg);
}
@Override
public void timeout(int comm)
{
/* @todo Do something if the request times out */
}
}

View File

@ -1,7 +1,7 @@
package kademlia.message; package kademlia.message;
import java.io.DataInput; import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
/** /**
@ -16,17 +16,27 @@ import java.io.IOException;
* that classes implementing Streamble also provide a constructor of the form: * that classes implementing Streamble also provide a constructor of the form:
* <p> * <p>
* <code>Streamable(DataInput in) throws IOException;</code> * <code>Streamable(DataInput in) throws IOException;</code>
**/ * */
public interface Streamable { public interface Streamable
{
/** /**
* Writes the internal state of the Streamable object to the output stream * Writes the internal state of the Streamable object to the output stream
* in a format that can later be read by the same Streamble class using * in a format that can later be read by the same Streamble class using
* the {@link #fromStream} method. * the {@link #fromStream} method.
**/ *
public void toStream(DataOutput out) throws IOException; * @param out
*
* @throws java.io.IOException
*/
public void toStream(DataOutputStream out) throws IOException;
/** /**
* Reads the internal state of the Streamable object from the input stream. * Reads the internal state of the Streamable object from the input stream.
**/ *
public void fromStream(DataInput out) throws IOException; * @param out
*
* @throws java.io.IOException
*/
public void fromStream(DataInputStream out) throws IOException;
} }

View File

@ -1,7 +1,9 @@
package kademlia.node; package kademlia.node;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -45,7 +47,7 @@ public class Node implements Streamable
* *
* @throws IOException * @throws IOException
*/ */
public Node(DataInput in) throws IOException public Node(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@ -79,7 +81,7 @@ public class Node implements Streamable
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
/* Add the NodeId to the stream */ /* Add the NodeId to the stream */
this.nodeId.toStream(out); this.nodeId.toStream(out);
@ -97,7 +99,7 @@ public class Node implements Streamable
} }
@Override @Override
public final void fromStream(DataInput in) throws IOException public final void fromStream(DataInputStream in) throws IOException
{ {
/* Load the NodeId */ /* Load the NodeId */
this.nodeId = new NodeId(in); this.nodeId = new NodeId(in);

View File

@ -6,7 +6,9 @@
package kademlia.node; package kademlia.node;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Arrays; import java.util.Arrays;
@ -60,7 +62,7 @@ public class NodeId implements Streamable
* *
* @throws IOException * @throws IOException
*/ */
public NodeId(DataInput in) throws IOException public NodeId(DataInputStream in) throws IOException
{ {
this.fromStream(in); this.fromStream(in);
} }
@ -186,14 +188,14 @@ public class NodeId implements Streamable
} }
@Override @Override
public void toStream(DataOutput out) throws IOException public void toStream(DataOutputStream out) throws IOException
{ {
/* Add the NodeId to the stream */ /* Add the NodeId to the stream */
out.write(this.getBytes()); out.write(this.getBytes());
} }
@Override @Override
public void fromStream(DataInput in) throws IOException public void fromStream(DataInputStream in) throws IOException
{ {
byte[] input = new byte[ID_LENGTH / 8]; byte[] input = new byte[ID_LENGTH / 8];
in.readFully(input); in.readFully(input);

View File

@ -4,6 +4,8 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import kademlia.core.KadServer; import kademlia.core.KadServer;
import kademlia.dht.KadContent; import kademlia.dht.KadContent;
import kademlia.message.Message;
import kademlia.message.StoreContentMessage;
import kademlia.node.Node; import kademlia.node.Node;
/** /**
@ -36,11 +38,25 @@ public class StoreOperation implements Operation
{ {
/* Get the nodes on which we need to store the content */ /* Get the nodes on which we need to store the content */
ArrayList<Node> nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute(); ArrayList<Node> nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute();
System.out.println("Nodes to put content on: " + nodes); System.out.println("Nodes to put content on: " + nodes);
/* Create the message */
Message msg = new StoreContentMessage(this.localNode, this.content);
/*Store the message on all of the K-Nodes*/
for (Node n : nodes)
{
if (n.equals(this.localNode))
{
/* Store the content locally */
}
else
{
this.server.sendMessage(n, msg, null);
}
}
/* Return how many nodes the content was stored on */ /* Return how many nodes the content was stored on */
return nodes.size(); return nodes.size();
} }

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter; import com.google.gson.stream.JsonWriter;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -29,10 +30,9 @@ public class JsonSerializer implements KadContentSerializer
} }
@Override @Override
public void write(KadContent content, OutputStream out) throws IOException public void write(KadContent content, DataOutputStream out) throws IOException
{ {
try (DataOutputStream dout = new DataOutputStream(out); try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
JsonWriter writer = new JsonWriter(new OutputStreamWriter(out)))
{ {
writer.beginArray(); writer.beginArray();
@ -48,7 +48,7 @@ public class JsonSerializer implements KadContentSerializer
} }
@Override @Override
public KadContent read(InputStream in) throws IOException, ClassNotFoundException public KadContent read(DataInputStream in) throws IOException, ClassNotFoundException
{ {
try (DataInputStream din = new DataInputStream(in); try (DataInputStream din = new DataInputStream(in);
JsonReader reader = new JsonReader(new InputStreamReader(in))) JsonReader reader = new JsonReader(new InputStreamReader(in)))

View File

@ -1,5 +1,7 @@
package kademlia.serializer; package kademlia.serializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -24,7 +26,7 @@ public interface KadContentSerializer
* *
* @throws java.io.IOException * @throws java.io.IOException
*/ */
public void write(KadContent content, OutputStream out) throws IOException; public void write(KadContent content, DataOutputStream out) throws IOException;
/** /**
* Read a KadContent from a DataInput Stream * Read a KadContent from a DataInput Stream
@ -36,5 +38,5 @@ public interface KadContentSerializer
* @throws java.io.IOException * @throws java.io.IOException
* @throws java.lang.ClassNotFoundException * @throws java.lang.ClassNotFoundException
*/ */
public KadContent read(InputStream in) throws IOException, ClassNotFoundException; public KadContent read(DataInputStream in) throws IOException, ClassNotFoundException;
} }

View File

@ -70,4 +70,9 @@ public class DHTContentImpl implements KadContent
{ {
return this.createTs; return this.createTs;
} }
public String toString()
{
return "DHTContentImpl[data=" + this.data + "]";
}
} }