From baf0a01684cbf9c73f67b1b9ab8ad49af1b08c61 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 3 May 2019 12:20:14 +0400 Subject: [PATCH] Implemented HAM (this is conflict resolution system) --- build.gradle | 1 + .../io/github/chronosx88/GunJava/Client.java | 61 ++++++++++ .../io/github/chronosx88/GunJava/Dup.java | 4 +- .../io/github/chronosx88/GunJava/Graph.java | 62 ++++++++++ .../io/github/chronosx88/GunJava/HAM.java | 107 ++++++++++++++++++ .../io/github/chronosx88/GunJava/Main.java | 40 ------- .../github/chronosx88/GunJava/MainClient.java | 12 ++ .../github/chronosx88/GunJava/MainServer.java | 8 ++ .../io/github/chronosx88/GunJava/Node.java | 64 +++++++++++ .../io/github/chronosx88/GunJava/Server.java | 26 ++--- .../io/github/chronosx88/GunJava/Utils.java | 11 ++ 11 files changed, 341 insertions(+), 55 deletions(-) create mode 100644 src/main/java/io/github/chronosx88/GunJava/Client.java create mode 100644 src/main/java/io/github/chronosx88/GunJava/Graph.java create mode 100644 src/main/java/io/github/chronosx88/GunJava/HAM.java delete mode 100644 src/main/java/io/github/chronosx88/GunJava/Main.java create mode 100644 src/main/java/io/github/chronosx88/GunJava/MainClient.java create mode 100644 src/main/java/io/github/chronosx88/GunJava/MainServer.java create mode 100644 src/main/java/io/github/chronosx88/GunJava/Node.java diff --git a/build.gradle b/build.gradle index ba76629..7649820 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ repositories { dependencies { implementation 'org.java-websocket:Java-WebSocket:1.4.0' implementation group: 'org.json', name: 'json', version: '20180813' + implementation 'com.google.code.gson:gson:2.8.5' testCompile group: 'junit', name: 'junit', version: '4.12' } diff --git a/src/main/java/io/github/chronosx88/GunJava/Client.java b/src/main/java/io/github/chronosx88/GunJava/Client.java new file mode 100644 index 0000000..33ec4d8 --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/Client.java @@ -0,0 +1,61 @@ +package io.github.chronosx88.GunJava; + +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.json.JSONObject; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; + +public class Client extends WebSocketClient { + private Dup dup = new Dup(); + + public Client(InetAddress address, int port) throws URISyntaxException { + super(new URI("ws://" + address.getHostAddress() + ":" + port)); + } + + @Override + public void onOpen(ServerHandshake handshakeData) { + System.out.println("Connection open. Status: " + handshakeData.getHttpStatus()); + Utils.setTimeout(() -> { + JSONObject msg = new JSONObject(); + msg.put("#", dup.track(Dup.random(3))); + msg.put("put", new JSONObject() + .put("ASDF", Utils.newNode("ASDF", new JSONObject() + .put("name", "Mark Nadal") + .put("boss", new JSONObject().put("#", "FDSA"))).toJSONObject()) + .put("FDSA", Utils.newNode("FDSA", new JSONObject().put("name", "Fluffy").put("species", "a kitty").put("slave", new JSONObject().put("#", "ASDF"))).toJSONObject())); + this.send(msg.toString()); + }, 1000); + Utils.setTimeout(() -> { + JSONObject msg = new JSONObject(); + msg.put("#", dup.track(Dup.random(3))); + msg.put("put", new JSONObject() + .put("ASDF", Utils.newNode("ASDF", new JSONObject() + .put("name", "Mark")).toJSONObject()) + .put("FDSA", Utils.newNode("FDSA", new JSONObject().put("species", "felis silvestris").put("color", "ginger")).toJSONObject())); + this.send(msg.toString()); + }, 2000); + } + + @Override + public void onMessage(String message) { + JSONObject msg = new JSONObject(message); + if(dup.check(msg.getString("#"))) { return; } + dup.track(msg.getString("#")); + System.out.println(msg.toString()); + this.send(message); + } + + @Override + public void onClose(int code, String reason, boolean remote) { + System.out.println("Connection closed. Code/reason/remote: " + code + "/" + reason + "/" + remote); + } + + @Override + public void onError(Exception ex) { + System.out.println("Terrible fail: "); + ex.printStackTrace(); + } +} diff --git a/src/main/java/io/github/chronosx88/GunJava/Dup.java b/src/main/java/io/github/chronosx88/GunJava/Dup.java index d033851..5faf3d9 100644 --- a/src/main/java/io/github/chronosx88/GunJava/Dup.java +++ b/src/main/java/io/github/chronosx88/GunJava/Dup.java @@ -40,9 +40,9 @@ public class Dup { } } - public String random() { + public static String random(int len) { StringBuilder sb = new StringBuilder(); - for (int i = 0; i < randomPack.length; i++) { + for (int i = 0; i < len; i++) { sb.append(randomPack[random.nextInt(randomPack.length)]); } return sb.toString(); diff --git a/src/main/java/io/github/chronosx88/GunJava/Graph.java b/src/main/java/io/github/chronosx88/GunJava/Graph.java new file mode 100644 index 0000000..7d0355f --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/Graph.java @@ -0,0 +1,62 @@ +package io.github.chronosx88.GunJava; + +import org.json.JSONObject; + +import java.util.*; + +public class Graph { + + private final HashMap nodes; + + public Graph(JSONObject source) { + nodes = new LinkedHashMap<>(); + + for (String soul : source.keySet()) + nodes.put(soul, new Node(source.getJSONObject(soul))); + } + + public Graph() { + nodes = new LinkedHashMap<>(); + } + + public Node getNode(String soul) { + return nodes.getOrDefault(soul, null); + } + + public void addNode(String soul, Node incomingNode) { + nodes.put(soul, incomingNode); + } + + public boolean hasNode(String soul) { + return nodes.containsKey(soul); + } + + public Set> entries() { + return nodes.entrySet(); + } + + @Override + public String toString() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject.toString(); + } + + public String toPrettyString() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject.toString(2); + } + + public JSONObject toJSONObject() { + JSONObject jsonObject = new JSONObject(); + for(Map.Entry entry : nodes.entrySet()) { + jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); + } + return jsonObject; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/GunJava/HAM.java b/src/main/java/io/github/chronosx88/GunJava/HAM.java new file mode 100644 index 0000000..e5c3de5 --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/HAM.java @@ -0,0 +1,107 @@ +package io.github.chronosx88.GunJava; + +import org.json.JSONObject; + +import java.util.Iterator; +import java.util.Map; + +public class HAM { + static class HAMResult { + public boolean defer = false; + public boolean historical = false; + public boolean converge = false; + public boolean incoming = false; + public boolean current = false; + public boolean state = false; + public String err = null; + } + + public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) { + HAMResult result = new HAMResult(); + + if(machineState < incomingState) { + result.defer = true; + return result; + } + if(incomingState < currentState){ + result.historical = true; + return result; + } + if(currentState < incomingState) { + result.converge = true; + result.incoming = true; + return result; + } + if(incomingState == currentState) { + if(incomingValue.equals(currentValue)) { + result.state = true; + return result; + } + if((incomingValue.toString().compareTo(currentValue.toString())) < 0) { + result.converge = true; + result.current = true; + return result; + } + if((currentValue.toString().compareTo(incomingValue.toString())) < 0) { + result.converge = true; + result.incoming = true; + return result; + } + } + result.err = "Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!"; + return result; + } + + public static Graph mix(Graph change, Graph graph) { + long machine = System.currentTimeMillis(); + Graph diff = null; + for(Map.Entry entry : change.entries()) { + Node node = entry.getValue(); + for(String key : node.values.keySet()) { + Object value = node.values.get(key); + if ("_".equals(key)) { continue; } + long state = node.states.getLong(key); + long was = -1; + Object known = null; + if(graph == null) { + graph = new Graph(); + } + if(graph.hasNode(node.soul)) { + if(graph.getNode(node.soul).states.opt(key) != null) { + was = graph.getNode(node.soul).states.getLong(key); + } + known = graph.getNode(node.soul).values.opt(key) == null ? 0 : graph.getNode(node.soul).values.opt(key); + } + + HAMResult ham = ham(machine, state, was, value, known); + if(!ham.incoming) { + if(ham.defer) { + System.out.println("DEFER: " + key + " " + value); + // FIXME + } + continue; + } + + if(diff == null) { + diff = new Graph(); + } + + if(!diff.hasNode(node.soul)) { + diff.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); + } + + if(!graph.hasNode(node.soul)) { + graph.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); + } + + graph.getNode(node.soul).values.put(key, value); + diff.getNode(node.soul).values.put(key, value); + + diff.getNode(node.soul).states.put(key, state); + graph.getNode(node.soul).states.put(key, state); + } + } + + return diff; + } +} diff --git a/src/main/java/io/github/chronosx88/GunJava/Main.java b/src/main/java/io/github/chronosx88/GunJava/Main.java deleted file mode 100644 index 3a647ad..0000000 --- a/src/main/java/io/github/chronosx88/GunJava/Main.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.github.chronosx88.GunJava; - -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import org.json.JSONObject; - -import java.net.URI; -import java.net.URISyntaxException; - -public class Main { - public static void main(String[] args) throws URISyntaxException { - Server server = new Server(5054); - server.start(); - WebSocketClient client = new WebSocketClient(new URI("ws://127.0.0.1:5054")) { - @Override - public void onOpen(ServerHandshake handshakeData) { - System.out.println("open " + handshakeData.getHttpStatusMessage()); - } - - @Override - public void onMessage(String message) { - JSONObject msg = new JSONObject(message); - System.out.println(msg); - this.send(message); - } - - @Override - public void onClose(int code, String reason, boolean remote) { - System.out.println("close " + code + " " + reason + " " + remote); - } - - @Override - public void onError(Exception ex) { - System.out.println("error"); - ex.printStackTrace(); - } - }; - client.connect(); - } -} diff --git a/src/main/java/io/github/chronosx88/GunJava/MainClient.java b/src/main/java/io/github/chronosx88/GunJava/MainClient.java new file mode 100644 index 0000000..94b14e3 --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/MainClient.java @@ -0,0 +1,12 @@ +package io.github.chronosx88.GunJava; + +import java.net.Inet4Address; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class MainClient { + public static void main(String[] args) throws URISyntaxException, UnknownHostException { + Client client = new Client(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054); + client.connect(); + } +} diff --git a/src/main/java/io/github/chronosx88/GunJava/MainServer.java b/src/main/java/io/github/chronosx88/GunJava/MainServer.java new file mode 100644 index 0000000..f0d40ab --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/MainServer.java @@ -0,0 +1,8 @@ +package io.github.chronosx88.GunJava; + +public class MainServer { + public static void main(String[] args) { + Server server = new Server(5054); + server.start(); + } +} diff --git a/src/main/java/io/github/chronosx88/GunJava/Node.java b/src/main/java/io/github/chronosx88/GunJava/Node.java new file mode 100644 index 0000000..9fb48c2 --- /dev/null +++ b/src/main/java/io/github/chronosx88/GunJava/Node.java @@ -0,0 +1,64 @@ +package io.github.chronosx88.GunJava; + +import org.json.JSONObject; + +public class Node implements Comparable { + public JSONObject values; // Data + public final JSONObject states; // Metadata for diff + public final String soul; // i.e. ID of node + + /** + * Create a Node from a JSON object. + * + * @param rawData JSON object, which contains the data + */ + public Node(JSONObject rawData) { + this.values = rawData; + this.states = values.getJSONObject("_").getJSONObject(">"); + this.soul = values.getJSONObject("_").getString("#"); + values.remove("_"); + } + + @Override + public int compareTo(Node other) { + return soul.compareTo(other.soul); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other instanceof String) + return soul.equals(other); + if (other instanceof Node) + return compareTo((Node) other) == 0; + return false; + } + + @Override + public int hashCode() { + return soul.hashCode(); + } + + public boolean isNode(String key) { + return values.optJSONObject(key) != null; + } + + public Node getNode(String key, Graph g) { + String soulRef = values.getJSONObject(key).getString("#"); + return g.getNode(soulRef); + } + + @Override + public String toString() { + JSONObject jsonObject = new JSONObject(values.toString()); + jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); + return jsonObject.toString(); + } + + public JSONObject toJSONObject() { + JSONObject jsonObject = new JSONObject(values.toString()); + jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); + return jsonObject; + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/GunJava/Server.java b/src/main/java/io/github/chronosx88/GunJava/Server.java index f1a584b..049f7a7 100644 --- a/src/main/java/io/github/chronosx88/GunJava/Server.java +++ b/src/main/java/io/github/chronosx88/GunJava/Server.java @@ -6,12 +6,14 @@ import org.java_websocket.server.WebSocketServer; import org.json.JSONObject; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Timer; -import java.util.TimerTask; public class Server extends WebSocketServer { private Timer timer = new Timer(true); private Dup dup = new Dup(); + private ArrayList peers = new ArrayList<>(); + private Graph graph = new Graph(); public Server(int port) { super(new InetSocketAddress(port)); @@ -19,16 +21,7 @@ public class Server extends WebSocketServer { @Override public void onOpen(WebSocket conn, ClientHandshake handshake) { - final int[] count = {0}; - timer.schedule(new TimerTask() { - @Override - public void run() { - count[0] += 1; - JSONObject msg = new JSONObject(); - msg.put("#", dup.track(String.valueOf(count[0]))); - conn.send(msg.toString()); - } - }, 0, 1000); + peers.add(conn); } @Override @@ -39,9 +32,16 @@ public class Server extends WebSocketServer { @Override public void onMessage(WebSocket conn, String message) { JSONObject msg = new JSONObject(message); - //if(dup.check(msg.getString("#"))) { return; } + if(dup.check(msg.getString("#"))) { return; } dup.track(msg.getString("#")); - System.out.println("received: " + msg.toString()); + if(msg.opt("put") != null) { + HAM.mix(new Graph(msg.getJSONObject("put")), graph); + System.out.println("----------------"); + System.out.println(graph.toPrettyString()); + } + for (WebSocket peer : peers) { + peer.send(message); + } } @Override diff --git a/src/main/java/io/github/chronosx88/GunJava/Utils.java b/src/main/java/io/github/chronosx88/GunJava/Utils.java index 14d87cc..3032336 100644 --- a/src/main/java/io/github/chronosx88/GunJava/Utils.java +++ b/src/main/java/io/github/chronosx88/GunJava/Utils.java @@ -1,5 +1,7 @@ package io.github.chronosx88.GunJava; +import org.json.JSONObject; + public class Utils { public static Thread setTimeout(Runnable runnable, int delay){ Thread thread = new Thread(() -> { @@ -14,4 +16,13 @@ public class Utils { thread.start(); return thread; } + + public static Node newNode(String soul, JSONObject data) { + JSONObject states = new JSONObject(); + for (String key : data.keySet()) { + states.put(key, System.currentTimeMillis()); + } + data.put("_", new JSONObject().put("#", soul).put(">", states)); + return new Node(data); + } }