diff --git a/settings.gradle b/settings.gradle index 31c437b..ea95be5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ -rootProject.name = 'GunJava' +rootProject.name = 'JGUN' diff --git a/src/main/java/io/github/chronosx88/GunJava/Client.java b/src/main/java/io/github/chronosx88/GunJava/Client.java deleted file mode 100644 index e353580..0000000 --- a/src/main/java/io/github/chronosx88/GunJava/Client.java +++ /dev/null @@ -1,86 +0,0 @@ -package io.github.chronosx88.GunJava; - -import io.github.chronosx88.GunJava.storageBackends.MemoryBackend; -import io.github.chronosx88.GunJava.storageBackends.StorageBackend; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import org.json.JSONObject; - -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; - -public class Client extends WebSocketClient { - private Dup dup = new Dup(); - private StorageBackend graph = new MemoryBackend(); - - public Client(InetAddress address, int port) throws URISyntaxException { - super(new URI("ws://" + address.getHostAddress() + ":" + port)); - } - - @Override - public void onOpen(ServerHandshake handshakeData) { - System.out.println("Connection open. Status: " + handshakeData.getHttpStatus()); - Utils.setTimeout(() -> { - JSONObject msg = new JSONObject(); - msg - .put("#", dup.track(Dup.random())) - .put("get", new JSONObject() - .put("#", "FDSA") - .put(".", "species")); - this.send(msg.toString()); - }, 2000); - - Utils.setTimeout(() -> { - JSONObject msg = new JSONObject(); - msg.put("#", dup.track(Dup.random())); - msg.put("put", new JSONObject() - .put("ASDF", Utils.newNode("ASDF", new JSONObject() - .put("name", "Mark Nadal") - .put("boss", new JSONObject().put("#", "FDSA"))).toJSONObject()) - .put("FDSA", Utils.newNode("FDSA", new JSONObject().put("name", "Fluffy").put("species", "a kitty").put("slave", new JSONObject().put("#", "ASDF"))).toJSONObject())); - this.send(msg.toString()); - }, (int) (1000 * Math.random())); - Utils.setTimeout(() -> { - JSONObject msg = new JSONObject(); - msg.put("#", dup.track(Dup.random())); - msg.put("put", new JSONObject() - .put("ASDF", Utils.newNode("ASDF", new JSONObject() - .put("name", "Mark")).toJSONObject()) - .put("FDSA", Utils.newNode("FDSA", new JSONObject().put("species", "felis silvestris").put("color", "ginger")).toJSONObject())); - this.send(msg.toString()); - }, (int) (1000 * Math.random())); - } - - @Override - public void onMessage(String message) { - JSONObject msg = new JSONObject(message); - if(dup.check(msg.getString("#"))){ return; } - dup.track(msg.getString("#")); - if(msg.opt("put") != null) { - HAM.mix(new MemoryBackend(msg.getJSONObject("put")), graph); - } - if(msg.opt("get") != null) { - MemoryBackend getResults = Utils.getRequest(msg.getJSONObject("get"), graph); - JSONObject ack = new JSONObject() - .put("#", dup.track(Dup.random())) - .put("@", msg.getString("#")) - .put("put", getResults.toJSONObject()); - this.send(ack.toString()); - } - System.out.println("---------------"); - System.out.println(msg.toString(2)); - this.send(message); - } - - @Override - public void onClose(int code, String reason, boolean remote) { - System.out.println("Connection closed. Code/reason/remote: " + code + "/" + reason + "/" + remote); - } - - @Override - public void onError(Exception ex) { - System.out.println("Terrible fail: "); - ex.printStackTrace(); - } -} diff --git a/src/main/java/io/github/chronosx88/GunJava/MainClient.java b/src/main/java/io/github/chronosx88/GunJava/MainClient.java deleted file mode 100644 index 94b14e3..0000000 --- a/src/main/java/io/github/chronosx88/GunJava/MainClient.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.github.chronosx88.GunJava; - -import java.net.Inet4Address; -import java.net.URISyntaxException; -import java.net.UnknownHostException; - -public class MainClient { - public static void main(String[] args) throws URISyntaxException, UnknownHostException { - Client client = new Client(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054); - client.connect(); - } -} diff --git a/src/main/java/io/github/chronosx88/GunJava/MainClientServer.java b/src/main/java/io/github/chronosx88/GunJava/MainClientServer.java deleted file mode 100644 index 9c683e8..0000000 --- a/src/main/java/io/github/chronosx88/GunJava/MainClientServer.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.github.chronosx88.GunJava; - -import java.net.Inet4Address; -import java.net.URISyntaxException; -import java.net.UnknownHostException; - -public class MainClientServer { - public static void main(String[] args) throws URISyntaxException, UnknownHostException { - Server server = new Server(21334); - server.start(); - Client client = new Client(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334); - client.connect(); - } -} diff --git a/src/main/java/io/github/chronosx88/GunJava/MainServer.java b/src/main/java/io/github/chronosx88/GunJava/MainServer.java deleted file mode 100644 index f0d40ab..0000000 --- a/src/main/java/io/github/chronosx88/GunJava/MainServer.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.github.chronosx88.GunJava; - -public class MainServer { - public static void main(String[] args) { - Server server = new Server(5054); - server.start(); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java new file mode 100644 index 0000000..95bd3de --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java @@ -0,0 +1,62 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.futures.BaseFuture; +import io.github.chronosx88.JGUN.nodes.Peer; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import org.json.JSONObject; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class Dispatcher { + private final Map> pendingFutures = new ConcurrentHashMap<>(); + private final StorageBackend graphStorage; + private final Peer peer; + + public Dispatcher(StorageBackend graphStorage, Peer peer) { + this.graphStorage = graphStorage; + this.peer = peer; + } + + public void addPendingFuture(BaseFuture future) { + pendingFutures.put(future.getFutureID(), future); + } + + public void handleIncomingMessage(JSONObject message) { + // FIXME + } + + private JSONObject handleGet(JSONObject getData) { + return null; // FIXME + } + + private JSONObject handlePut(JSONObject putData) { + return null; // FIXME + } + + private void handleIncomingAck(JSONObject ack) { + // FIXME + } + + public void sendPutRequest(JSONObject data) { + new Thread(() -> { + InMemoryGraph graph = Utils.prepareDataForPut(data); + peer.emit(Utils.formatPutRequest(graph.toJSONObject()).toString()); + }).start(); + } + + public void sendGetRequest(String key) { + new Thread(() -> { + JSONObject jsonGet = Utils.formatGetRequest(key, null); + peer.emit(jsonGet.toString()); + }).start(); + } + + public void sendGetRequest(String key, String field) { + new Thread(() -> { + JSONObject jsonGet = Utils.formatGetRequest(key, field); + peer.emit(jsonGet.toString()); + }).start(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/Dup.java b/src/main/java/io/github/chronosx88/JGUN/Dup.java new file mode 100644 index 0000000..6e19635 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/Dup.java @@ -0,0 +1,47 @@ +package io.github.chronosx88.JGUN; + +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class Dup { + private static char[] randomSeed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray(); + private static Random random = new Random(System.currentTimeMillis()); + private Map s = new ConcurrentHashMap<>(); + private DupOpt opt = new DupOpt(); + private Thread to = null; + + public Dup() { + opt.max = 1000; + opt.age = 1000 * 9; + } + + public String track(String id) { + s.put(id, System.currentTimeMillis()); + if(to == null) { + Utils.setTimeout(() -> { + for(Map.Entry entry : s.entrySet()) { + if(opt.age > (System.currentTimeMillis() - entry.getValue())) + continue; + s.remove(entry.getKey()); + } + to = null; + }, opt.age); + } + return id; + } + + public boolean check(String id) { + if(s.containsKey(id)) { + track(id); + return true; + } else { + return false; + } + } + + public static String random() { + return UUID.randomUUID().toString(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/DupOpt.java b/src/main/java/io/github/chronosx88/JGUN/DupOpt.java new file mode 100644 index 0000000..4981c13 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/DupOpt.java @@ -0,0 +1,6 @@ +package io.github.chronosx88.JGUN; + +public class DupOpt { + public int max; + public int age; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/HAM.java b/src/main/java/io/github/chronosx88/JGUN/HAM.java new file mode 100644 index 0000000..fce4129 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/HAM.java @@ -0,0 +1,164 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import org.json.JSONObject; + +import java.util.Map; + +public class HAM { + private static long defer = Long.MAX_VALUE; + + static class HAMResult { + public boolean defer = false; + public boolean historical = false; + public boolean converge = false; + public boolean incoming = false; + public boolean current = false; + public boolean state = false; + public String err = null; + } + + public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) { + HAMResult result = new HAMResult(); + + if(machineState < incomingState) { + result.defer = true; + return result; + } + if(incomingState < currentState){ + result.historical = true; + return result; + } + if(currentState < incomingState) { + result.converge = true; + result.incoming = true; + return result; + } + if(incomingState == currentState) { + if(incomingValue.equals(currentValue)) { + result.state = true; + return result; + } + if((incomingValue.toString().compareTo(currentValue.toString())) < 0) { + result.converge = true; + result.current = true; + return result; + } + if((currentValue.toString().compareTo(incomingValue.toString())) < 0) { + result.converge = true; + result.incoming = true; + return result; + } + } + result.err = "Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!"; + return result; + } + + public static InMemoryGraph mix(InMemoryGraph change, StorageBackend data) { + long machine = System.currentTimeMillis(); + InMemoryGraph diff = null; + for(Map.Entry entry : change.entries()) { + Node node = entry.getValue(); + for(String key : node.values.keySet()) { + Object value = node.values.get(key); + if ("_".equals(key)) { continue; } + long state = node.states.getLong(key); + long was = -1; + Object known = null; + if(data == null) { + data = new InMemoryGraph(); + } + if(data.hasNode(node.soul)) { + if(data.getNode(node.soul).states.opt(key) != null) { + was = data.getNode(node.soul).states.getLong(key); + } + known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key); + } + + HAMResult ham = ham(machine, state, was, value, known); + if(!ham.incoming) { + if(ham.defer) { + System.out.println("DEFER: " + key + " " + value); + // Hack for accessing value in lambda without making the variable final + StorageBackend[] graph = new StorageBackend[] {data}; + Utils.setTimeout(() -> mix(node, graph[0]), (int) (state - System.currentTimeMillis())); + } + continue; + } + + if(diff == null) { + diff = new InMemoryGraph(); + } + + if(!diff.hasNode(node.soul)) { + diff.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); + } + + if(!data.hasNode(node.soul)) { + data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); + } + + data.getNode(node.soul).values.put(key, value); + diff.getNode(node.soul).values.put(key, value); + + diff.getNode(node.soul).states.put(key, state); + data.getNode(node.soul).states.put(key, state); + } + } + + return diff; + } + + public static InMemoryGraph mix(Node incomingNode, StorageBackend data) { + long machine = System.currentTimeMillis(); + InMemoryGraph diff = null; + + for(String key : incomingNode.values.keySet()) { + Object value = incomingNode.values.get(key); + if ("_".equals(key)) { continue; } + long state = incomingNode.states.getLong(key); + long was = -1; + Object known = null; + if(data == null) { + data = new InMemoryGraph(); + } + if(data.hasNode(incomingNode.soul)) { + if(data.getNode(incomingNode.soul).states.opt(key) != null) { + was = data.getNode(incomingNode.soul).states.getLong(key); + } + known = data.getNode(incomingNode.soul).values.opt(key) == null ? 0 : data.getNode(incomingNode.soul).values.opt(key); + } + + HAMResult ham = ham(machine, state, was, value, known); + if(!ham.incoming) { + if(ham.defer) { + System.out.println("DEFER: " + key + " " + value); + // Hack for accessing value in lambda without making the variable final + StorageBackend[] graph = new StorageBackend[] {data}; + Utils.setTimeout(() -> mix(incomingNode, graph[0]), (int) (state - System.currentTimeMillis())); + } + continue; + } + + if(diff == null) { + diff = new InMemoryGraph(); + } + + if(!diff.hasNode(incomingNode.soul)) { + diff.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); + } + + if(!data.hasNode(incomingNode.soul)) { + data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); + } + + data.getNode(incomingNode.soul).values.put(key, value); + diff.getNode(incomingNode.soul).values.put(key, value); + + diff.getNode(incomingNode.soul).states.put(key, state); + data.getNode(incomingNode.soul).states.put(key, state); + } + return diff; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/Node.java b/src/main/java/io/github/chronosx88/JGUN/Node.java new file mode 100644 index 0000000..43e44fe --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/Node.java @@ -0,0 +1,76 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import org.json.JSONObject; + +public class Node implements Comparable { + public JSONObject values; // Data + public JSONObject states; // Metadata for diff + public String soul; // i.e. ID of node + + /** + * Create a Peer from a JSON object. + * + * @param rawData JSON object, which contains the data + */ + public Node(JSONObject rawData) { + this.values = new JSONObject(rawData.toString()); + this.states = values.getJSONObject("_").getJSONObject(">"); + this.soul = values.getJSONObject("_").getString("#"); + values.remove("_"); + } + + @Override + public int compareTo(Node other) { + return soul.compareTo(other.soul); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other instanceof String) + return soul.equals(other); + if (other instanceof Node) + return compareTo((Node) other) == 0; + return false; + } + + @Override + public int hashCode() { + return soul.hashCode(); + } + + public boolean isNode(String key) { + return values.optJSONObject(key) != null; + } + + public Node getNode(String key, InMemoryGraph g) { + String soulRef = values.getJSONObject(key).getString("#"); + return g.getNode(soulRef); + } + + @Override + public String toString() { + JSONObject jsonObject = new JSONObject(values.toString()); + jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); + return jsonObject.toString(); + } + + public JSONObject toJSONObject() { + JSONObject jsonObject = new JSONObject(values.toString()); + jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); + return jsonObject; + } + + public JSONObject getMetadata() { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); + return jsonObject; + } + + public void setMetadata(JSONObject metadata) { + soul = metadata.getJSONObject("_").getString("#"); + states = metadata.getJSONObject("_").getJSONObject(">"); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/PathRef.java b/src/main/java/io/github/chronosx88/JGUN/PathRef.java new file mode 100644 index 0000000..946185f --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/PathRef.java @@ -0,0 +1,29 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.futures.builders.GetBuilder; +import io.github.chronosx88.JGUN.futures.builders.PutBuilder; +import org.json.JSONObject; + +import java.util.ArrayList; + +public class PathRef { + private final ArrayList path = new ArrayList<>(); + private Dispatcher dispatcher; + + public PathRef(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + public PathRef get(String key) { + path.add(key); + return this; + } + + public GetBuilder getData() { + return new GetBuilder(path); + } + + public PutBuilder put(JSONObject data) { + return new PutBuilder(dispatcher, data, path); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/Utils.java b/src/main/java/io/github/chronosx88/JGUN/Utils.java new file mode 100644 index 0000000..661836f --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/Utils.java @@ -0,0 +1,100 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.futures.builders.PutBuilder; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import org.json.JSONObject; + +public class Utils { + public static Thread setTimeout(Runnable runnable, int delay){ + Thread thread = new Thread(() -> { + try { + Thread.sleep(delay); + runnable.run(); + } + catch (Exception e){ + e.printStackTrace(); + } + }); + thread.start(); + return thread; + } + + public static Node newNode(String soul, JSONObject data) { + JSONObject states = new JSONObject(); + for (String key : data.keySet()) { + states.put(key, System.currentTimeMillis()); + } + data.put("_", new JSONObject().put("#", soul).put(">", states)); + return new Node(data); + } + + public static InMemoryGraph getRequest(JSONObject lex, StorageBackend graph) { + String soul = lex.getString("#"); + String key = lex.optString(".", null); + Node node = graph.getNode(soul); + Object tmp; + if(node == null) { + return new InMemoryGraph(); + } + if(key != null) { + tmp = node.values.opt(key); + if(tmp == null) { + return new InMemoryGraph(); + } + Node node1 = new Node(node.toJSONObject()); + node = Utils.newNode(node.soul, new JSONObject()); + node.setMetadata(node1.getMetadata()); + node.values.put(key, tmp); + JSONObject tmpStates = node1.states; + node.states.put(key, tmpStates.get(key)); + } + InMemoryGraph ack = new InMemoryGraph(); + ack.addNode(soul, node); + return ack; + } + + public static InMemoryGraph prepareDataForPut(JSONObject data) { + InMemoryGraph result = new InMemoryGraph(); + for (String objectKey : data.keySet()) { + JSONObject object = data.getJSONObject(objectKey); + Node node = Utils.newNode(objectKey, object); + prepareNodeForPut(node, result); + } + return result; + } + + private static void prepareNodeForPut(Node node, InMemoryGraph result) { + for (String key : node.values.keySet()) { + Object value = node.values.get(key); + if(value instanceof JSONObject) { + String soul = Dup.random(); + Node tmpNode = Utils.newNode(soul, (JSONObject) value); + node.values.remove(key); + node.values.put(key, new JSONObject().put("#", soul)); + prepareNodeForPut(tmpNode, result); + result.addNode(soul, tmpNode); + } + } + result.addNode(node.soul, node); + } + + public static JSONObject formatGetRequest(String key, String field) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("#", Dup.random()); + JSONObject getParameters = new JSONObject(); + getParameters.put("#", key); + if(field != null) { + getParameters.put(".", field); + } + jsonObject.put("get", getParameters); + return jsonObject; + } + + public static JSONObject formatPutRequest(JSONObject data) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("#", Dup.random()); + jsonObject.put("put", data); + return jsonObject; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java new file mode 100644 index 0000000..5d796f1 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java @@ -0,0 +1,15 @@ +package io.github.chronosx88.JGUN.entrypoints; + +import io.github.chronosx88.JGUN.nodes.GunPeer; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; + +import java.net.Inet4Address; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class MainClient { + public static void main(String[] args) throws URISyntaxException, UnknownHostException { + GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph()); + gunClient.connect(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java new file mode 100644 index 0000000..80add5b --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java @@ -0,0 +1,18 @@ +package io.github.chronosx88.JGUN.entrypoints; + +import io.github.chronosx88.JGUN.nodes.GunPeer; +import io.github.chronosx88.JGUN.nodes.GunSuperPeer; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; + +import java.net.Inet4Address; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class MainClientServer { + public static void main(String[] args) throws URISyntaxException, UnknownHostException { + GunSuperPeer gunSuperNode = new GunSuperPeer(21334); + gunSuperNode.start(); + GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph()); + gunClient.connect(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java new file mode 100644 index 0000000..ddb8a6f --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java @@ -0,0 +1,10 @@ +package io.github.chronosx88.JGUN.entrypoints; + +import io.github.chronosx88.JGUN.nodes.GunSuperPeer; + +public class MainServer { + public static void main(String[] args) { + GunSuperPeer gunSuperNode = new GunSuperPeer(5054); + gunSuperNode.start(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java new file mode 100644 index 0000000..65b10d1 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java @@ -0,0 +1,394 @@ +/* + * Copyright 2019 Thomas Bocek + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.github.chronosx88.JGUN.futures; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * The base for all BaseFuture implementations. Be aware of possible deadlocks. Never await from a listener. This class + * is heavily inspired by MINA and Netty. + * + * @param + * The class that extends BaseFuture and is used to return back the type for method calls. + * @author Thomas Bocek + */ +public abstract class BaseFuture { + private static final Logger LOG = LoggerFactory.getLogger(BaseFuture.class); + + public enum FutureType { + INIT, OK, FAILED, CANCEL + } + + // Listeners that gets notified if the future finished + private final List> listeners = new ArrayList<>(1); + + // While a future is running, the process may add cancellations for faster + // cancel operations, e.g. cancel connection attempt + private volatile Cancelable cancel = null; + + private final CountDownLatch listenersFinished = new CountDownLatch(1); + + protected final Object lock; + + // set the ready flag if operation completed + protected boolean completed = false; + + // by default false, change in case of success. An unfinished operation is + // always set to failed + protected FutureType type = FutureType.INIT; + + protected String reason = "unknown"; + + private K self; + + private String futureID; + + /** + * Default constructor that sets the lock object, which is used for synchronization to this instance. + */ + public BaseFuture(String futureID) { + this.lock = this; + this.futureID = futureID; + } + + /** + * @param self2 + * Set the type so that we are able to return it to the user. This is for making the API much more + * usable. + */ + protected void self(final K self2) { + this.self = self2; + } + + /** + * @return The object that stored this object. This is necessary for the builder pattern when using generics. + */ + protected K self() { + return self; + } + + public K await() throws InterruptedException { + synchronized (lock) { + while (!completed) { + lock.wait(); + } + } + return self; + } + + public K awaitUninterruptibly() { + synchronized (lock) { + while (!completed) { + try { + lock.wait(); + } catch (final InterruptedException e) { + LOG.debug("interrupted, but ignoring", e); + } + } + } + return self; + } + + public boolean await(final long timeoutMillis) throws InterruptedException { + return await0(timeoutMillis, true); + } + + public boolean awaitUninterruptibly(final long timeoutMillis) { + try { + return await0(timeoutMillis, false); + } catch (final InterruptedException e) { + throw new RuntimeException("This should never ever happen."); + } + } + + /** + * Internal await operation that also checks for potential deadlocks. + * + * @param timeoutMillis + * The time to wait + * @param interrupt + * Flag to indicate if the method can throw an InterruptedException + * @return True if this future has finished in timeoutMillis time, false otherwise + * @throws InterruptedException + * If the flag interrupt is true and this thread has been interrupted. + */ + private boolean await0(final long timeoutMillis, final boolean interrupt) throws InterruptedException { + final long startTime = (timeoutMillis <= 0) ? 0 : System.currentTimeMillis(); + long waitTime = timeoutMillis; + synchronized (lock) { + if (completed) { + return completed; + } else if (waitTime <= 0) { + return completed; + } + while (true) { + try { + lock.wait(waitTime); + } catch (final InterruptedException e) { + if (interrupt) { + throw e; + } + } + if (completed) { + return true; + } else { + waitTime = timeoutMillis - (System.currentTimeMillis() - startTime); + if (waitTime <= 0) { + return completed; + } + } + } + } + } + + public boolean isCompleted() { + synchronized (lock) { + return completed; + } + } + + public boolean isSuccess() { + synchronized (lock) { + return completed && (type == FutureType.OK); + } + } + + public boolean isFailed() { + synchronized (lock) { + // failed means failed or canceled + return completed && (type != FutureType.OK); + } + } + + public boolean isCanceled() { + synchronized (lock) { + return completed && (type == FutureType.CANCEL); + } + } + + public K failed(final BaseFuture origin) { + return failed(origin.failedReason()); + } + + public K failed(final String failed, final BaseFuture origin) { + StringBuilder sb = new StringBuilder(failed); + return failed(sb.append(" <-> ").append(origin.failedReason()).toString()); + } + + public K failed(final Throwable t) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + t.printStackTrace(printWriter); + return failed(stringWriter.toString()); + } + + public K failed(final String failed, final Throwable t) { + if (t == null) { + return failed("n/a"); + } + StringBuilder sb = new StringBuilder(failed); + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + t.printStackTrace(printWriter); + return failed(sb.append(" <-> ").append(stringWriter.toString()).toString()); + } + + public K failed(final String failed) { + synchronized (lock) { + if (!completedAndNotify()) { + return self; + } + this.reason = failed; + this.type = FutureType.FAILED; + } + notifyListeners(); + return self; + } + + public String failedReason() { + final StringBuffer sb = new StringBuffer("Future (compl/canc):"); + synchronized (lock) { + sb.append(completed).append("/") + .append(", ").append(type.name()) + .append(", ").append(reason); + return sb.toString(); + } + } + + public FutureType type() { + synchronized (lock) { + return type; + } + } + + /** + * Make sure that the calling method has synchronized (lock). + * + * @return True if notified. It will notify if completed is not set yet. + */ + protected boolean completedAndNotify() { + if (!completed) { + completed = true; + lock.notifyAll(); + return true; + } else { + return false; + } + } + + public K awaitListeners() throws InterruptedException { + boolean wait = false; + synchronized (lock) { + while (!completed) { + lock.wait(); + } + if(listeners.size() > 0) { + wait = true; + } + } + if(wait) { + listenersFinished.await(); + } + return self; + } + + public K awaitListenersUninterruptibly() { + boolean wait = false; + synchronized (lock) { + while (!completed) { + try { + lock.wait(); + } catch (final InterruptedException e) { + LOG.debug("interrupted, but ignoring", e); + } + } + if(listeners.size() > 0) { + wait = true; + } + } + while(wait) { + try { + listenersFinished.await(); + wait = false; + } catch (InterruptedException e) { + LOG.debug("interrupted, but ignoring", e); + } + } + return self; + } + + public K addListener(final BaseFutureListener listener) { + boolean notifyNow = false; + synchronized (lock) { + if (completed) { + notifyNow = true; + } else { + listeners.add(listener); + } + } + // called only once + if (notifyNow) { + callOperationComplete(listener); + } + return self; + } + + /** + * Call operation complete or call fail listener. If the fail listener fails, its printed as a stack trace. + * + * @param listener + * The listener to call + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void callOperationComplete(final BaseFutureListener listener) { + try { + listener.onComplete(this); + } catch (final Exception e) { + try { + listener.onError(this, e); + } catch (final Exception e1) { + LOG.error("Unexpected exception in exceptionCaught()", e1); + } + } + } + + + /** + * Always call this from outside synchronized(lock)! + */ + protected void notifyListeners() { + // if this is synchronized, it will deadlock, so do not lock this! + // There won't be any visibility problem or concurrent modification + // because 'ready' flag will be checked against both addListener and + // removeListener calls. + // + // This method doesn't need synchronization because: + // 1) This method is always called after synchronized (this) block. + // Hence any listener list modification happens-before this method. + // 2) This method is called only when 'done' is true. Once 'done' + // becomes true, the listener list is never modified - see add/removeListener() + for (final BaseFutureListener listener : listeners) { + callOperationComplete(listener); + } + + listeners.clear(); + listenersFinished.countDown(); + // all events are one time events. It cannot happen that you get + // notified twice + } + + public K removeListener(final BaseFutureListener listener) { + synchronized (lock) { + if (!completed) { + listeners.remove(listener); + } + } + return self; + } + + public K setCancel(final Cancelable cancel) { + synchronized (lock) { + if (!completed) { + this.cancel = cancel; + } + } + return self; + } + + public void cancel() { + synchronized (lock) { + if (!completedAndNotify()) { + return; + } + this.type = FutureType.CANCEL; + } + if(cancel != null) { + cancel.cancel(); + } + notifyListeners(); + } + + public String getFutureID() { + return futureID; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java new file mode 100644 index 0000000..0b934b3 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java @@ -0,0 +1,6 @@ +package io.github.chronosx88.JGUN.futures; + +public interface BaseFutureListener { + void onComplete(F future); + void onError(F future, Throwable exception); +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java b/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java new file mode 100644 index 0000000..315213d --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java @@ -0,0 +1,29 @@ +/* + * Copyright 2009 Thomas Bocek + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.github.chronosx88.JGUN.futures; + +/** + * A cancelable class should implement this interface and use it for future + * objects. + * + * @author Thomas Bocek + */ +public interface Cancelable { + /** + * Gets called if a future is cancelled. + */ + void cancel(); +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java new file mode 100644 index 0000000..96ca6f4 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java @@ -0,0 +1,47 @@ +package io.github.chronosx88.JGUN.futures; + +import org.json.JSONObject; + +public class FutureGet extends BaseFuture { + private JSONObject data; + private GetStatus getStatus; + + enum GetStatus { + OK, NOT_FOUND + } + + public FutureGet(String id) { + super(id); + self(this); + } + + public JSONObject getData() { + return data; + } + + @Override + public boolean isSuccess() { + synchronized (lock) { + return completed && (getStatus == GetStatus.OK) && (type == FutureType.OK); + } + } + + public FutureGet done(JSONObject data) { + synchronized (lock) { + if(!data.isEmpty()) { + this.getStatus = GetStatus.OK; + this.type = FutureType.OK; + } else { + this.getStatus = GetStatus.NOT_FOUND; + this.type = FutureType.FAILED; + this.reason = "Not found"; + } + this.data = data; + if (!completedAndNotify()) { + return this; + } + } + notifyListeners(); + return this; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java new file mode 100644 index 0000000..d500f2b --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java @@ -0,0 +1,24 @@ +package io.github.chronosx88.JGUN.futures; + +public class FuturePut extends BaseFuture { + public FuturePut(String id) { + super(id); + self(this); + } + + public FuturePut done(boolean success) { + synchronized (lock) { + if(success) { + this.type = FutureType.OK; + } else { + this.type = FutureType.FAILED; + } + + if (!completedAndNotify()) { + return this; + } + } + notifyListeners(); + return this; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/builders/GetBuilder.java b/src/main/java/io/github/chronosx88/JGUN/futures/builders/GetBuilder.java new file mode 100644 index 0000000..8cb8ccb --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/builders/GetBuilder.java @@ -0,0 +1,11 @@ +package io.github.chronosx88.JGUN.futures.builders; + +import java.util.ArrayList; + +public class GetBuilder { + private final ArrayList path; + + public GetBuilder(ArrayList path) { + this.path = path; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/builders/PutBuilder.java b/src/main/java/io/github/chronosx88/JGUN/futures/builders/PutBuilder.java new file mode 100644 index 0000000..d2c465c --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/builders/PutBuilder.java @@ -0,0 +1,43 @@ +package io.github.chronosx88.JGUN.futures.builders; + +import io.github.chronosx88.JGUN.Dispatcher; +import io.github.chronosx88.JGUN.Dup; +import io.github.chronosx88.JGUN.futures.FuturePut; +import org.json.JSONObject; + +import java.util.ArrayList; + +public class PutBuilder { + private JSONObject data; + private ArrayList path; + private Dispatcher dispatcher; + + public PutBuilder(Dispatcher dispatcher, JSONObject data, ArrayList path) { + this.dispatcher = dispatcher; + this.data = data; + this.path = path; + } + + public JSONObject getData() { + return data; + } + + public ArrayList getPath() { + return path; + } + + public void setData(JSONObject data) { + this.data = data; + } + + public void setPath(ArrayList path) { + this.path = path; + } + + public FuturePut build() { + FuturePut futurePut = new FuturePut(Dup.random()); + dispatcher.addPendingFuture(futurePut); + dispatcher.sendPutRequest(data); + return futurePut; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunNodeBuilder.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunNodeBuilder.java new file mode 100644 index 0000000..dc8221c --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunNodeBuilder.java @@ -0,0 +1,4 @@ +package io.github.chronosx88.JGUN.nodes; + +public class GunNodeBuilder { +} diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java new file mode 100644 index 0000000..79325d2 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java @@ -0,0 +1,72 @@ +package io.github.chronosx88.JGUN.nodes; + +import io.github.chronosx88.JGUN.*; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.json.JSONObject; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; + +public class GunPeer extends WebSocketClient implements Peer { + private Dup dup = new Dup(); + private final StorageBackend storage; + private final Dispatcher dispatcher; + + public GunPeer(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { + super(new URI("ws://" + address.getHostAddress() + ":" + port)); + this.storage = storage; + this.dispatcher = new Dispatcher(storage, this); + } + + @Override + public void onOpen(ServerHandshake handshakeData) { + System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus()); + } + + @Override + public void onMessage(String message) { + JSONObject msg = new JSONObject(message); + if(dup.check(msg.getString("#"))){ return; } + dup.track(msg.getString("#")); + if(msg.opt("put") != null) { + HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), storage); + } + if(msg.opt("get") != null) { + InMemoryGraph getResults = Utils.getRequest(msg.getJSONObject("get"), storage); + JSONObject ack = new JSONObject() + .put("#", dup.track(Dup.random())) + .put("@", msg.getString("#")) + .put("put", getResults.toJSONObject()); + emit(ack.toString()); + } + System.out.println("---------------"); + System.out.println(msg.toString(2)); + emit(message); + } + + @Override + public void onClose(int code, String reason, boolean remote) { + System.out.println("Connection closed. Code/reason/remote: " + code + "/" + reason + "/" + remote); + } + + @Override + public void onError(Exception ex) { + System.out.println("Terrible fail: "); + ex.printStackTrace(); + } + + @Override + public void emit(String data) { + this.send(data); + } + + public PathRef get(String key) { + PathRef pathRef = new PathRef(dispatcher); + pathRef.get(key); + return pathRef; + } +} diff --git a/src/main/java/io/github/chronosx88/GunJava/Server.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java similarity index 62% rename from src/main/java/io/github/chronosx88/GunJava/Server.java rename to src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java index e06da0c..fec57dd 100644 --- a/src/main/java/io/github/chronosx88/GunJava/Server.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java @@ -1,20 +1,21 @@ -package io.github.chronosx88.GunJava; +package io.github.chronosx88.JGUN.nodes; -import io.github.chronosx88.GunJava.storageBackends.MemoryBackend; +import io.github.chronosx88.JGUN.Dup; +import io.github.chronosx88.JGUN.HAM; +import io.github.chronosx88.JGUN.Utils; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; import org.json.JSONObject; import java.net.InetSocketAddress; -import java.util.Timer; -public class Server extends WebSocketServer { - private Timer timer = new Timer(true); +public class GunSuperPeer extends WebSocketServer implements Peer { private Dup dup = new Dup(); - private MemoryBackend graph = new MemoryBackend(); + private InMemoryGraph graph = new InMemoryGraph(); - public Server(int port) { + public GunSuperPeer(int port) { super(new InetSocketAddress(port)); setReuseAddr(true); } @@ -35,17 +36,23 @@ public class Server extends WebSocketServer { if(dup.check(msg.getString("#"))) { return; } dup.track(msg.getString("#")); if(msg.opt("put") != null) { - HAM.mix(new MemoryBackend(msg.getJSONObject("put")), graph); + HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), graph); } if(msg.opt("get") != null) { - MemoryBackend result = Utils.getRequest(msg.optJSONObject("get"), graph); + InMemoryGraph result = Utils.getRequest(msg.optJSONObject("get"), graph); + JSONObject ack = new JSONObject(); if(!result.isEmpty()) { - JSONObject ack = new JSONObject(); emit(ack .put("#", dup.track(Dup.random())) .put("@", msg.getString("#")) .put("put", result.toJSONObject()) .toString()); + } else { + emit(ack + .put("#", dup.track(Dup.random())) + .put("@", msg.getString("#")) + .put("ok", false) + .toString()); } } emit(message); @@ -53,12 +60,13 @@ public class Server extends WebSocketServer { @Override public void onError(WebSocket conn, Exception ex) { - // + System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress()); + ex.printStackTrace(); } @Override public void onStart() { - System.out.println("Server started on port: " + getPort()); + System.out.println("GunSuperPeer started on port: " + getPort()); } public void emit(String data) { diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java new file mode 100644 index 0000000..913c41a --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java @@ -0,0 +1,5 @@ +package io.github.chronosx88.JGUN.nodes; + +public interface Peer { + void emit(String data); +} diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java new file mode 100644 index 0000000..87f4e2e --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java @@ -0,0 +1,69 @@ +package io.github.chronosx88.JGUN.storageBackends; + +import io.github.chronosx88.JGUN.Node; +import org.json.JSONObject; + +import java.util.*; + +public class InMemoryGraph implements StorageBackend { + + private final HashMap nodes; + + public InMemoryGraph(JSONObject source) { + nodes = new LinkedHashMap<>(); + + for (String soul : source.keySet()) + nodes.put(soul, new Node(source.getJSONObject(soul))); + } + + public InMemoryGraph() { + nodes = new LinkedHashMap<>(); + } + + public Node getNode(String soul) { + return nodes.getOrDefault(soul, null); + } + + public void addNode(String soul, Node incomingNode) { + nodes.put(soul, incomingNode); + } + + public boolean hasNode(String soul) { + return nodes.containsKey(soul); + } + + public Set> entries() { + return nodes.entrySet(); + } + + public Collection nodes() { return nodes.values(); } + + @Override + public String toString() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject.toString(); + } + + public String toPrettyString() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject.toString(2); + } + + public JSONObject toJSONObject() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject; + } + + public boolean isEmpty() { + return nodes.isEmpty(); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java new file mode 100644 index 0000000..e383f96 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java @@ -0,0 +1,20 @@ +package io.github.chronosx88.JGUN.storageBackends; + +import io.github.chronosx88.JGUN.Node; +import org.json.JSONObject; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface StorageBackend { + Node getNode(String soul); + void addNode(String soul, Node node); + boolean hasNode(String soul); + Set> entries(); + Collection nodes(); + String toString(); + String toPrettyString(); + JSONObject toJSONObject(); + boolean isEmpty(); +}