diff --git a/src/main/java/io/github/chronosx88/JGUN/HAM.java b/src/main/java/io/github/chronosx88/JGUN/HAM.java index e60a577..c35ea70 100644 --- a/src/main/java/io/github/chronosx88/JGUN/HAM.java +++ b/src/main/java/io/github/chronosx88/JGUN/HAM.java @@ -16,7 +16,7 @@ public class HAM { public boolean state = false; } - public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) { + public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException { HAMResult result = new HAMResult(); if(machineState < incomingState) { @@ -55,7 +55,7 @@ public class HAM { throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!"); } - public static InMemoryGraph mix(InMemoryGraph change, StorageBackend data) { + public static boolean mix(InMemoryGraph change, StorageBackend data, Map changeListeners, Map forEachListeners) { long machine = System.currentTimeMillis(); InMemoryGraph diff = null; for(Map.Entry entry : change.entries()) { @@ -75,18 +75,25 @@ public class HAM { } known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key); } - - HAMResult ham = ham(machine, state, was, value, known); - if(!ham.incoming) { - if(ham.defer) { - System.out.println("DEFER: " + key + " " + value); - // Hack for accessing value in lambda without making the variable final - StorageBackend[] graph = new StorageBackend[] {data}; - Utils.setTimeout(() -> mix(node, graph[0]), (int) (state - System.currentTimeMillis())); - } + HAMResult ham = null; + try { + ham = ham(machine, state, was, value, known); + } catch (IllegalArgumentException e) { continue; } + if(ham != null) { + if(!ham.incoming) { + if(ham.defer) { + System.out.println("DEFER: " + key + " " + value); + // Hack for accessing value in lambda without making the variable final + StorageBackend[] graph = new StorageBackend[] {data}; + Utils.setTimeout(() -> mix(node, graph[0], changeListeners, forEachListeners), (int) (state - machine)); + } + continue; + } + } + if(diff == null) { diff = new InMemoryGraph(); } @@ -99,18 +106,30 @@ public class HAM { data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); } - data.getNode(node.soul).values.put(key, value); + Node tmp = data.getNode(node.soul); + tmp.values.put(key, value); + tmp.states.put(key, state); + data.addNode(node.soul, tmp); diff.getNode(node.soul).values.put(key, value); - diff.getNode(node.soul).states.put(key, state); - data.getNode(node.soul).states.put(key, state); } } - - return diff; + if(diff != null) { + for(Map.Entry entry : diff.entries()) { + if(changeListeners.containsKey(entry.getKey())) { + changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject()); + } + if(forEachListeners.containsKey(entry.getKey())) { + for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) { + forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue()); + } + } + } + } + return true; } - public static InMemoryGraph mix(Node incomingNode, StorageBackend data) { + public static boolean mix(Node incomingNode, StorageBackend data, Map changeListeners, Map forEachListeners) { long machine = System.currentTimeMillis(); InMemoryGraph diff = null; @@ -136,7 +155,7 @@ public class HAM { System.out.println("DEFER: " + key + " " + value); // Hack for accessing value in lambda without making the variable final StorageBackend[] graph = new StorageBackend[] {data}; - Utils.setTimeout(() -> mix(incomingNode, graph[0]), (int) (state - System.currentTimeMillis())); + Utils.setTimeout(() -> mix(incomingNode, graph[0], changeListeners, forEachListeners), (int) (state - machine)); } continue; } @@ -153,12 +172,25 @@ public class HAM { data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); } - data.getNode(incomingNode.soul).values.put(key, value); + Node tmp = data.getNode(incomingNode.soul); + tmp.values.put(key, value); + tmp.states.put(key, state); + data.addNode(incomingNode.soul, tmp); diff.getNode(incomingNode.soul).values.put(key, value); - diff.getNode(incomingNode.soul).states.put(key, state); - data.getNode(incomingNode.soul).states.put(key, state); } - return diff; + if(diff != null) { + for(Map.Entry entry : diff.entries()) { + if(changeListeners.containsKey(entry.getKey())) { + changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject()); + } + if(forEachListeners.containsKey(entry.getKey())) { + for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) { + forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue()); + } + } + } + } + return true; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java index 653b4f6..f685e9e 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java @@ -29,7 +29,9 @@ public class GunSuperPeer extends WebSocketServer implements Peer { @Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { - System.out.println("Peer " + conn.getRemoteSocketAddress().toString() + " closed the connection for reason (code): " + reason + " (" + code + ")"); + if(conn != null) { + System.out.println("Peer " + conn.getRemoteSocketAddress().toString() + " closed the connection for reason (code): " + reason + " (" + code + ")"); + } } @Override