From c67e501df6e3a08f92890c443338dd5fbf73134d Mon Sep 17 00:00:00 2001 From: Joshua Kissoon Date: Tue, 25 Feb 2014 22:57:46 +0530 Subject: [PATCH] Content Sending working between nodes --- src/kademlia/core/KadServer.java | 6 -- src/kademlia/message/AcknowledgeMessage.java | 8 +- src/kademlia/message/ConnectMessage.java | 8 +- src/kademlia/message/ContentStoreMessage.java | 6 +- src/kademlia/message/MessageFactory.java | 19 +++-- src/kademlia/message/NodeLookupMessage.java | 8 +- src/kademlia/message/NodeReplyMessage.java | 8 +- src/kademlia/message/SimpleMessage.java | 8 +- src/kademlia/message/StoreContentMessage.java | 74 +++++++++++++++++++ .../message/StoreContentReceiver.java | 39 ++++++++++ src/kademlia/message/Streamable.java | 26 +++++-- src/kademlia/node/Node.java | 8 +- src/kademlia/node/NodeId.java | 8 +- src/kademlia/operation/StoreOperation.java | 22 +++++- src/kademlia/serializer/JsonSerializer.java | 8 +- .../serializer/KadContentSerializer.java | 6 +- src/kademlia/tests/DHTContentImpl.java | 5 ++ 17 files changed, 214 insertions(+), 53 deletions(-) create mode 100644 src/kademlia/message/StoreContentMessage.java create mode 100644 src/kademlia/message/StoreContentReceiver.java diff --git a/src/kademlia/core/KadServer.java b/src/kademlia/core/KadServer.java index f03ec8e..1cc9dab 100644 --- a/src/kademlia/core/KadServer.java +++ b/src/kademlia/core/KadServer.java @@ -118,12 +118,6 @@ public class KadServer private void sendMessage(Node to, Message msg, int comm) throws IOException { - - Class c = msg.getClass(); - System.out.println(c.getSimpleName()); - System.out.println(c.getName()); - - /* Setup the message for transmission */ ByteArrayOutputStream bout = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout); diff --git a/src/kademlia/message/AcknowledgeMessage.java b/src/kademlia/message/AcknowledgeMessage.java index 7b7b01b..d377973 100644 --- a/src/kademlia/message/AcknowledgeMessage.java +++ b/src/kademlia/message/AcknowledgeMessage.java @@ -6,7 +6,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import kademlia.node.Node; @@ -21,19 +23,19 @@ public class AcknowledgeMessage implements Message this.origin = origin; } - public AcknowledgeMessage(DataInput in) throws IOException + public AcknowledgeMessage(DataInputStream in) throws IOException { this.fromStream(in); } @Override - public final void fromStream(DataInput in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { this.origin = new Node(in); } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { origin.toStream(out); } diff --git a/src/kademlia/message/ConnectMessage.java b/src/kademlia/message/ConnectMessage.java index 37ce51e..5764e66 100644 --- a/src/kademlia/message/ConnectMessage.java +++ b/src/kademlia/message/ConnectMessage.java @@ -6,7 +6,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import kademlia.node.Node; @@ -21,19 +23,19 @@ public class ConnectMessage implements Message this.origin = origin; } - public ConnectMessage(DataInput in) throws IOException + public ConnectMessage(DataInputStream in) throws IOException { this.fromStream(in); } @Override - public final void fromStream(DataInput in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { this.origin = new Node(in); } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { origin.toStream(out); } diff --git a/src/kademlia/message/ContentStoreMessage.java b/src/kademlia/message/ContentStoreMessage.java index 80d6810..a2bd183 100644 --- a/src/kademlia/message/ContentStoreMessage.java +++ b/src/kademlia/message/ContentStoreMessage.java @@ -1,7 +1,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import kademlia.dht.KadContent; import kademlia.node.Node; @@ -36,13 +38,13 @@ public class ContentStoreMessage implements Message } @Override - public void fromStream(DataInput in) + public void fromStream(DataInputStream in) { } @Override - public void toStream(DataOutput out) + public void toStream(DataOutputStream out) { /* @todo write the origin and the content to the stream */ } diff --git a/src/kademlia/message/MessageFactory.java b/src/kademlia/message/MessageFactory.java index 7569a9f..f9570cb 100644 --- a/src/kademlia/message/MessageFactory.java +++ b/src/kademlia/message/MessageFactory.java @@ -1,16 +1,17 @@ -/** - * @author Joshua - * @created - * @desc - */ package kademlia.message; -import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; import kademlia.core.KadServer; import kademlia.node.Node; import kademlia.operation.Receiver; +/** + * Handles creating messages and receivers + * + * @author Joshua Kissoon + * @since 20140202 + */ public class MessageFactory { @@ -21,7 +22,7 @@ public class MessageFactory this.localNode = local; } - public Message createMessage(byte code, DataInput in) throws IOException + public Message createMessage(byte code, DataInputStream in) throws IOException { switch (code) { @@ -35,6 +36,8 @@ public class MessageFactory return new NodeReplyMessage(in); case NodeLookupMessage.CODE: return new NodeLookupMessage(in); + case StoreContentMessage.CODE: + return new StoreContentMessage(in); default: System.out.println("No Message handler found for message. Code: " + code); return new SimpleMessage(in); @@ -53,6 +56,8 @@ public class MessageFactory return new ConnectReceiver(server, this.localNode); case NodeLookupMessage.CODE: return new NodeLookupReceiver(server, this.localNode); + case StoreContentMessage.CODE: + return new StoreContentReceiver(server, this.localNode); } } } diff --git a/src/kademlia/message/NodeLookupMessage.java b/src/kademlia/message/NodeLookupMessage.java index afb115a..b21bd1b 100644 --- a/src/kademlia/message/NodeLookupMessage.java +++ b/src/kademlia/message/NodeLookupMessage.java @@ -6,7 +6,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import kademlia.node.Node; import kademlia.node.NodeId; @@ -31,20 +33,20 @@ public class NodeLookupMessage implements Message this.lookupId = lookup; } - public NodeLookupMessage(DataInput in) throws IOException + public NodeLookupMessage(DataInputStream in) throws IOException { this.fromStream(in); } @Override - public final void fromStream(DataInput in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { this.origin = new Node(in); this.lookupId = new NodeId(in); } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { this.origin.toStream(out); this.lookupId.toStream(out); diff --git a/src/kademlia/message/NodeReplyMessage.java b/src/kademlia/message/NodeReplyMessage.java index 7c9b042..746867e 100644 --- a/src/kademlia/message/NodeReplyMessage.java +++ b/src/kademlia/message/NodeReplyMessage.java @@ -6,7 +6,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import kademlia.node.Node; @@ -24,13 +26,13 @@ public class NodeReplyMessage implements Message this.nodes = nodes; } - public NodeReplyMessage(DataInput in) throws IOException + public NodeReplyMessage(DataInputStream in) throws IOException { this.fromStream(in); } @Override - public final void fromStream(DataInput in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { /* Read in the origin */ this.origin = new Node(in); @@ -47,7 +49,7 @@ public class NodeReplyMessage implements Message } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { /* Add the origin node to the stream */ origin.toStream(out); diff --git a/src/kademlia/message/SimpleMessage.java b/src/kademlia/message/SimpleMessage.java index d51de42..1f333c2 100644 --- a/src/kademlia/message/SimpleMessage.java +++ b/src/kademlia/message/SimpleMessage.java @@ -6,7 +6,9 @@ package kademlia.message; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; public class SimpleMessage implements Message @@ -22,7 +24,7 @@ public class SimpleMessage implements Message this.content = message; } - public SimpleMessage(DataInput in) + public SimpleMessage(DataInputStream in) { System.out.println("Creating message from input stream."); this.fromStream(in); @@ -35,7 +37,7 @@ public class SimpleMessage implements Message } @Override - public void toStream(DataOutput out) + public void toStream(DataOutputStream out) { try { @@ -49,7 +51,7 @@ public class SimpleMessage implements Message } @Override - public final void fromStream(DataInput in) + public final void fromStream(DataInputStream in) { try { diff --git a/src/kademlia/message/StoreContentMessage.java b/src/kademlia/message/StoreContentMessage.java new file mode 100644 index 0000000..bdde47c --- /dev/null +++ b/src/kademlia/message/StoreContentMessage.java @@ -0,0 +1,74 @@ +package kademlia.message; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import kademlia.dht.KadContent; +import kademlia.node.Node; +import kademlia.serializer.JsonSerializer; + +/** + * A StoreContentMessage used to send a store message to a node + * + * @author Joshua Kissoon + * @since 20140225 + */ +public class StoreContentMessage implements Message +{ + + public static final byte CODE = 0x55; + + private KadContent content; + private Node origin; + + /** + * @param origin Where the message came from + * @param content The content to be stored + * + */ + public StoreContentMessage(Node origin, KadContent content) + { + this.content = content; + this.origin = origin; + } + + public StoreContentMessage(DataInputStream in) throws IOException + { + this.fromStream(in); + } + + @Override + public void toStream(DataOutputStream out) throws IOException + { + this.origin.toStream(out); + + /* Serialize the KadContent, then send it to the stream */ + JsonSerializer serializer = new JsonSerializer(); + serializer.write(content, out); + } + + @Override + public void fromStream(DataInputStream in) throws IOException + { + this.origin = new Node(in); + try + { + this.content = new JsonSerializer().read(in); + } + catch (ClassNotFoundException e) + { + e.printStackTrace(); + } + } + + @Override + public byte code() + { + return CODE; + } + + public String toString() + { + return "StoreMessage[origin=" + origin + ",content=" + content + "]"; + } +} diff --git a/src/kademlia/message/StoreContentReceiver.java b/src/kademlia/message/StoreContentReceiver.java new file mode 100644 index 0000000..3b437d3 --- /dev/null +++ b/src/kademlia/message/StoreContentReceiver.java @@ -0,0 +1,39 @@ +package kademlia.message; + +import kademlia.core.KadServer; +import kademlia.node.Node; +import kademlia.operation.Receiver; + +/** + * Receiver for incoming StoreContentMessage + * + * @author Joshua Kissoon + * @since 20140225 + */ +public class StoreContentReceiver implements Receiver +{ + + private final KadServer server; + private final Node localNode; + + public StoreContentReceiver(KadServer server, Node localNode) + { + this.server = server; + this.localNode = localNode; + } + + @Override + public void receive(Message incoming, int comm) + { + /* @todo - Insert the message sender into this node's routing table */ + StoreContentMessage msg = (StoreContentMessage) incoming; + System.out.println(this.localNode + " - Received a store content message"); + System.out.println(msg); + } + + @Override + public void timeout(int comm) + { + /* @todo Do something if the request times out */ + } +} diff --git a/src/kademlia/message/Streamable.java b/src/kademlia/message/Streamable.java index 593aa4e..fd038f3 100644 --- a/src/kademlia/message/Streamable.java +++ b/src/kademlia/message/Streamable.java @@ -1,7 +1,7 @@ package kademlia.message; -import java.io.DataInput; -import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; /** @@ -16,17 +16,27 @@ import java.io.IOException; * that classes implementing Streamble also provide a constructor of the form: *

* Streamable(DataInput in) throws IOException; - **/ -public interface Streamable { + * */ +public interface Streamable +{ + /** * Writes the internal state of the Streamable object to the output stream * in a format that can later be read by the same Streamble class using * the {@link #fromStream} method. - **/ - public void toStream(DataOutput out) throws IOException; + * + * @param out + * + * @throws java.io.IOException + */ + public void toStream(DataOutputStream out) throws IOException; /** * Reads the internal state of the Streamable object from the input stream. - **/ - public void fromStream(DataInput out) throws IOException; + * + * @param out + * + * @throws java.io.IOException + */ + public void fromStream(DataInputStream out) throws IOException; } diff --git a/src/kademlia/node/Node.java b/src/kademlia/node/Node.java index 9bfbe65..cfe9359 100644 --- a/src/kademlia/node/Node.java +++ b/src/kademlia/node/Node.java @@ -1,7 +1,9 @@ package kademlia.node; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -45,7 +47,7 @@ public class Node implements Streamable * * @throws IOException */ - public Node(DataInput in) throws IOException + public Node(DataInputStream in) throws IOException { this.fromStream(in); } @@ -79,7 +81,7 @@ public class Node implements Streamable } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { /* Add the NodeId to the stream */ this.nodeId.toStream(out); @@ -97,7 +99,7 @@ public class Node implements Streamable } @Override - public final void fromStream(DataInput in) throws IOException + public final void fromStream(DataInputStream in) throws IOException { /* Load the NodeId */ this.nodeId = new NodeId(in); diff --git a/src/kademlia/node/NodeId.java b/src/kademlia/node/NodeId.java index 352cbb4..c4386b6 100644 --- a/src/kademlia/node/NodeId.java +++ b/src/kademlia/node/NodeId.java @@ -6,7 +6,9 @@ package kademlia.node; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; @@ -60,7 +62,7 @@ public class NodeId implements Streamable * * @throws IOException */ - public NodeId(DataInput in) throws IOException + public NodeId(DataInputStream in) throws IOException { this.fromStream(in); } @@ -186,14 +188,14 @@ public class NodeId implements Streamable } @Override - public void toStream(DataOutput out) throws IOException + public void toStream(DataOutputStream out) throws IOException { /* Add the NodeId to the stream */ out.write(this.getBytes()); } @Override - public void fromStream(DataInput in) throws IOException + public void fromStream(DataInputStream in) throws IOException { byte[] input = new byte[ID_LENGTH / 8]; in.readFully(input); diff --git a/src/kademlia/operation/StoreOperation.java b/src/kademlia/operation/StoreOperation.java index c65dbd6..1d8a963 100644 --- a/src/kademlia/operation/StoreOperation.java +++ b/src/kademlia/operation/StoreOperation.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.util.ArrayList; import kademlia.core.KadServer; import kademlia.dht.KadContent; +import kademlia.message.Message; +import kademlia.message.StoreContentMessage; import kademlia.node.Node; /** @@ -36,11 +38,25 @@ public class StoreOperation implements Operation { /* Get the nodes on which we need to store the content */ ArrayList nodes = new NodeLookupOperation(this.server, this.localNode, this.content.getKey()).execute(); - - - System.out.println("Nodes to put content on: " + nodes); + /* Create the message */ + Message msg = new StoreContentMessage(this.localNode, this.content); + + /*Store the message on all of the K-Nodes*/ + for (Node n : nodes) + { + if (n.equals(this.localNode)) + { + /* Store the content locally */ + } + else + { + this.server.sendMessage(n, msg, null); + } + } + + /* Return how many nodes the content was stored on */ return nodes.size(); } diff --git a/src/kademlia/serializer/JsonSerializer.java b/src/kademlia/serializer/JsonSerializer.java index bc2cd0c..9dff46f 100644 --- a/src/kademlia/serializer/JsonSerializer.java +++ b/src/kademlia/serializer/JsonSerializer.java @@ -4,6 +4,7 @@ import com.google.gson.Gson; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; @@ -29,10 +30,9 @@ public class JsonSerializer implements KadContentSerializer } @Override - public void write(KadContent content, OutputStream out) throws IOException + public void write(KadContent content, DataOutputStream out) throws IOException { - try (DataOutputStream dout = new DataOutputStream(out); - JsonWriter writer = new JsonWriter(new OutputStreamWriter(out))) + try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(out))) { writer.beginArray(); @@ -48,7 +48,7 @@ public class JsonSerializer implements KadContentSerializer } @Override - public KadContent read(InputStream in) throws IOException, ClassNotFoundException + public KadContent read(DataInputStream in) throws IOException, ClassNotFoundException { try (DataInputStream din = new DataInputStream(in); JsonReader reader = new JsonReader(new InputStreamReader(in))) diff --git a/src/kademlia/serializer/KadContentSerializer.java b/src/kademlia/serializer/KadContentSerializer.java index 38f6a84..3a9329c 100644 --- a/src/kademlia/serializer/KadContentSerializer.java +++ b/src/kademlia/serializer/KadContentSerializer.java @@ -1,5 +1,7 @@ package kademlia.serializer; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,7 +26,7 @@ public interface KadContentSerializer * * @throws java.io.IOException */ - public void write(KadContent content, OutputStream out) throws IOException; + public void write(KadContent content, DataOutputStream out) throws IOException; /** * Read a KadContent from a DataInput Stream @@ -36,5 +38,5 @@ public interface KadContentSerializer * @throws java.io.IOException * @throws java.lang.ClassNotFoundException */ - public KadContent read(InputStream in) throws IOException, ClassNotFoundException; + public KadContent read(DataInputStream in) throws IOException, ClassNotFoundException; } diff --git a/src/kademlia/tests/DHTContentImpl.java b/src/kademlia/tests/DHTContentImpl.java index f5f4d15..05555af 100644 --- a/src/kademlia/tests/DHTContentImpl.java +++ b/src/kademlia/tests/DHTContentImpl.java @@ -70,4 +70,9 @@ public class DHTContentImpl implements KadContent { return this.createTs; } + + public String toString() + { + return "DHTContentImpl[data=" + this.data + "]"; + } }