From 3338c5468053e8d8af31aaab1fb9f67772cf86c9 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 14 Nov 2023 03:06:42 +0300 Subject: [PATCH] [WIP] Revamp code base --- README.md | 7 +- build.gradle | 40 ++-- gradle/wrapper/gradle-wrapper.properties | 3 +- .../io/github/chronosx88/JGUN/Dispatcher.java | 109 ----------- .../java/io/github/chronosx88/JGUN/Dup.java | 45 ++--- .../io/github/chronosx88/JGUN/DupOpt.java | 6 - .../java/io/github/chronosx88/JGUN/Gun.java | 22 ++- .../java/io/github/chronosx88/JGUN/HAM.java | 178 +----------------- .../chronosx88/JGUN/NetworkHandler.java | 79 ++++++++ .../java/io/github/chronosx88/JGUN/Node.java | 82 -------- .../chronosx88/JGUN/NodeChangeListener.java | 4 +- .../io/github/chronosx88/JGUN/PathRef.java | 148 --------------- .../github/chronosx88/JGUN/PathReference.java | 40 ++++ .../java/io/github/chronosx88/JGUN/Utils.java | 146 -------------- .../chronosx88/JGUN/examples/MainClient.java | 4 +- .../JGUN/examples/MainClientServer.java | 87 +++++---- .../chronosx88/JGUN/examples/MainServer.java | 4 +- .../JGUN/futures/BaseCompletableFuture.java | 6 +- .../chronosx88/JGUN/futures/FutureGet.java | 4 +- .../chronosx88/JGUN/models/BaseMessage.java | 10 + .../chronosx88/JGUN/models/MemoryGraph.java | 27 +++ .../github/chronosx88/JGUN/models/Node.java | 34 ++++ .../chronosx88/JGUN/models/NodeMetadata.java | 21 +++ .../chronosx88/JGUN/models/acks/BaseAck.java | 16 ++ .../chronosx88/JGUN/models/acks/GetAck.java | 17 ++ .../chronosx88/JGUN/models/acks/PutAck.java | 12 ++ .../JGUN/models/requests/GetRequest.java | 13 ++ .../models/requests/GetRequestParams.java | 11 ++ .../JGUN/models/requests/PutRequest.java | 18 ++ .../chronosx88/JGUN/nodes/GunClient.java | 23 +-- .../chronosx88/JGUN/nodes/GunSuperPeer.java | 19 +- .../JGUN/storage/MemoryStorage.java | 47 +++++ .../chronosx88/JGUN/storage/Storage.java | 100 ++++++++++ .../JGUN/storageBackends/InMemoryGraph.java | 82 -------- .../JGUN/storageBackends/StorageBackend.java | 20 -- 35 files changed, 571 insertions(+), 913 deletions(-) delete mode 100644 src/main/java/io/github/chronosx88/JGUN/Dispatcher.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/DupOpt.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/Node.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/PathRef.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/PathReference.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/Utils.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/Node.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/storage/Storage.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java diff --git a/README.md b/README.md index aef8ae4..d5e0979 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ A realtime, decentralized, offline-first, mutable graph protocol to sync the Internet. ## Requirements -* JRE/JDK >= 1.8.0 +* JRE/JDK >= 11 ## Building 1. Clone repo: @@ -34,11 +34,8 @@ $ cd JGUN ``` 2. Compile it: ```bash -./gradlew shadowJar +./gradlew build ``` -3. Compiled JAR located in `./build/libs/` - -(Also exists precompiled JARs - see Releases (publishing to Maven coming soon...)) [⇧ back to top](#contents) diff --git a/build.gradle b/build.gradle index 39f4169..6232692 100644 --- a/build.gradle +++ b/build.gradle @@ -1,42 +1,26 @@ plugins { - id 'com.github.johnrengelman.shadow' version '4.0.4' id 'java' } group 'io.github.chronosx88' version '0.2.6' -sourceCompatibility = 1.8 +java { + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} repositories { mavenCentral() } dependencies { - implementation 'org.java-websocket:Java-WebSocket:1.4.0' + implementation 'org.java-websocket:Java-WebSocket:1.5.4' implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0' - implementation group: 'org.json', name: 'json', version: '20180813' - - testCompile group: 'junit', name: 'junit', version: '4.12' -} - -shadowJar { - relocate 'org.json', 'shadow.org.json' -} - -task sourcesJar(type: Jar, dependsOn: classes) { - classifier = 'sources' - from sourceSets.main.allSource -} - -task javadocJar(type: Jar, dependsOn: javadoc) { - classifier = 'javadoc' - from javadoc.destinationDir -} - -artifacts { - archives sourcesJar - archives javadocJar -} - - + implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3' + implementation 'javax.cache:cache-api:1.1.1' + implementation 'com.github.ben-manes.caffeine:jcache:3.1.5' + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' +} \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 183579e..ffed3a2 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Wed May 01 20:08:10 MSK 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip diff --git a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java deleted file mode 100644 index ec6a697..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java +++ /dev/null @@ -1,109 +0,0 @@ -package io.github.chronosx88.JGUN; - -import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; -import io.github.chronosx88.JGUN.nodes.Peer; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; -import org.java_websocket.client.WebSocketClient; -import org.json.JSONObject; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class Dispatcher { - private final Map> pendingFutures = new ConcurrentHashMap<>(); - private final Map changeListeners = new ConcurrentHashMap<>(); - private final Map forEachListeners = new ConcurrentHashMap<>(); - private final Peer peer; - private final StorageBackend graphStorage; - private final Dup dup; - private final Executor executorService = Executors.newCachedThreadPool(); - - public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) { - this.graphStorage = graphStorage; - this.peer = peer; - this.dup = dup; - } - - public void addPendingFuture(BaseCompletableFuture future) { - pendingFutures.put(future.getFutureID(), future); - } - - public void handleIncomingMessage(JSONObject message) { - if(message.has("put")) { - JSONObject ack = handlePut(message); - peer.emit(ack.toString()); - } - if(message.has("get")) { - JSONObject ack = handleGet(message); - peer.emit(ack.toString()); - } - if(message.has("@")) { - handleIncomingAck(message); - } - peer.emit(message.toString()); - } - - private JSONObject handleGet(JSONObject getData) { - InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage); - return new JSONObject() // Acknowledgment - .put( "#", dup.track(Dup.random()) ) - .put( "@", getData.getString("#") ) - .put( "put", getResults.toJSONObject() ) - .put( "ok", !(getResults.isEmpty()) ); - } - - private JSONObject handlePut(JSONObject message) { - boolean success = HAM.mix(new InMemoryGraph(message.getJSONObject("put")), graphStorage, changeListeners, forEachListeners); - return new JSONObject() // Acknowledgment - .put( "#", dup.track(Dup.random()) ) - .put( "@", message.getString("#") ) - .put( "ok", success); - } - - private void handleIncomingAck(JSONObject ack) { - if(ack.has("put")) { - if(pendingFutures.containsKey(ack.getString("@"))) { - BaseCompletableFuture future = pendingFutures.get(ack.getString("@")); - if(future instanceof FutureGet) { - ((FutureGet) future).complete(new InMemoryGraph(ack.getJSONObject("put")).toUserJSONObject()); - } - } - } - if(ack.has("ok")) { - if(pendingFutures.containsKey(ack.getString("@"))) { - BaseCompletableFuture future = pendingFutures.get(ack.getString("@")); - if(future instanceof FuturePut) { - ((FuturePut) future).complete(ack.getBoolean("ok")); - } - } - } - } - - public void sendPutRequest(String messageID, JSONObject data) { - executorService.execute(() -> { - InMemoryGraph graph = Utils.prepareDataForPut(data); - peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString()); - }); - } - - public void sendGetRequest(String messageID, String key, String field) { - executorService.execute(() -> { - JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field); - peer.emit(jsonGet.toString()); - }); - } - - public void addChangeListener(String soul, NodeChangeListener listener) { - changeListeners.put(soul, listener); - } - - public void addForEachChangeListener(String soul, NodeChangeListener.ForEach listener) { - forEachListeners.put(soul, listener); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/Dup.java b/src/main/java/io/github/chronosx88/JGUN/Dup.java index 874579f..563a354 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Dup.java +++ b/src/main/java/io/github/chronosx88/JGUN/Dup.java @@ -1,40 +1,35 @@ package io.github.chronosx88.JGUN; -import java.util.Map; +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.spi.CachingProvider; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; public class Dup { - private static char[] randomSeed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray(); - private Map s = new ConcurrentHashMap<>(); - private DupOpt opt = new DupOpt(); - private Thread to = null; + private final Cache cache; - public Dup() { - opt.max = 1000; - opt.age = 1000 * 9; + public Dup(long age) { + CachingProvider cachingProvider = Caching.getCachingProvider(); + CacheManager cacheManager = cachingProvider.getCacheManager(); + MutableConfiguration config = new MutableConfiguration<>(); + config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age))); + this.cache = cacheManager.createCache("dup", config); } - public String track(String id) { - s.put(id, System.currentTimeMillis()); - if(to == null) { - Utils.setTimeout(() -> { - for(Map.Entry entry : s.entrySet()) { - if(opt.age > (System.currentTimeMillis() - entry.getValue())) - continue; - s.remove(entry.getKey()); - } - to = null; - }, opt.age); - } - return id; + private void track(String id) { + cache.put(id, System.currentTimeMillis()); } - public boolean check(String id) { - if(s.containsKey(id)) { - track(id); + public boolean isDuplicated(String id) { + if(cache.containsKey(id)) { return true; } else { + track(id); return false; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/DupOpt.java b/src/main/java/io/github/chronosx88/JGUN/DupOpt.java deleted file mode 100644 index 4981c13..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/DupOpt.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.github.chronosx88.JGUN; - -public class DupOpt { - public int max; - public int age; -} diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java index 6c7fe22..197b397 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Gun.java +++ b/src/main/java/io/github/chronosx88/JGUN/Gun.java @@ -1,28 +1,38 @@ package io.github.chronosx88.JGUN; import io.github.chronosx88.JGUN.nodes.GunClient; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import io.github.chronosx88.JGUN.storage.Storage; import java.net.InetAddress; import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class Gun { - private Dispatcher dispatcher; private GunClient gunClient; + private final Map changeListeners = new ConcurrentHashMap<>(); + private final Map forEachListeners = new ConcurrentHashMap<>(); - public Gun(InetAddress address, int port, StorageBackend storage) { + public Gun(InetAddress address, int port, Storage storage) { try { this.gunClient = new GunClient(address, port, storage); - this.dispatcher = gunClient.getDispatcher(); this.gunClient.connectBlocking(); } catch (URISyntaxException | InterruptedException e) { e.printStackTrace(); } } - public PathRef get(String key) { - PathRef pathRef = new PathRef(dispatcher); + public PathReference get(String key) { + PathReference pathRef = new PathReference(this); pathRef.get(key); return pathRef; } + + protected void addChangeListener(String nodeID, NodeChangeListener listener) { + changeListeners.put(nodeID, listener); + } + + protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) { + forEachListeners.put(nodeID, listener); + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/HAM.java b/src/main/java/io/github/chronosx88/JGUN/HAM.java index c35ea70..df47c80 100644 --- a/src/main/java/io/github/chronosx88/JGUN/HAM.java +++ b/src/main/java/io/github/chronosx88/JGUN/HAM.java @@ -1,196 +1,36 @@ package io.github.chronosx88.JGUN; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; -import org.json.JSONObject; - -import java.util.Map; - public class HAM { - static class HAMResult { + public static class HAMResult { public boolean defer = false; // Defer means that the current state is greater than our computer time, and should only be processed when our computer time reaches this state public boolean historical = false; // Historical means the old state. This is usually ignored. - public boolean converge = false; // Everything is fine, you can do merge public boolean incoming = false; // Leave incoming value public boolean current = false; // Leave current value - public boolean state = false; } public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException { HAMResult result = new HAMResult(); - if(machineState < incomingState) { + if (machineState < incomingState) { // the incoming value is outside the boundary of the machine's state, it must be reprocessed in another state. result.defer = true; return result; - } - if(incomingState < currentState) { + } else if (currentState > incomingState) { // the incoming value is within the boundary of the machine's state, but not within the range. result.historical = true; + result.current = true; return result; - } - if(currentState < incomingState) { + } else if (currentState < incomingState) { // the incoming value is within both the boundary and the range of the machine's state. - result.converge = true; result.incoming = true; return result; - } - if(incomingState == currentState) { - // if incoming state and current state is the same - if(incomingValue.equals(currentValue)) { - result.state = true; - return result; - } - if((incomingValue.toString().compareTo(currentValue.toString())) < 0) { - result.converge = true; + } else { // if incoming state and current state is the same + if (incomingValue.equals(currentValue)) { result.current = true; return result; } - if((currentValue.toString().compareTo(incomingValue.toString())) < 0) { - result.converge = true; - result.incoming = true; - return result; - } + result.incoming = true; // always update local value with incoming value if state is the same + return result; } - throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!"); - } - - public static boolean mix(InMemoryGraph change, StorageBackend data, Map changeListeners, Map forEachListeners) { - long machine = System.currentTimeMillis(); - InMemoryGraph diff = null; - for(Map.Entry entry : change.entries()) { - Node node = entry.getValue(); - for(String key : node.values.keySet()) { - Object value = node.values.get(key); - if ("_".equals(key)) { continue; } - long state = node.states.getLong(key); - long was = -1; - Object known = null; - if(data == null) { - data = new InMemoryGraph(); - } - if(data.hasNode(node.soul)) { - if(data.getNode(node.soul).states.opt(key) != null) { - was = data.getNode(node.soul).states.getLong(key); - } - known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key); - } - HAMResult ham = null; - try { - ham = ham(machine, state, was, value, known); - } catch (IllegalArgumentException e) { - continue; - } - - if(ham != null) { - if(!ham.incoming) { - if(ham.defer) { - System.out.println("DEFER: " + key + " " + value); - // Hack for accessing value in lambda without making the variable final - StorageBackend[] graph = new StorageBackend[] {data}; - Utils.setTimeout(() -> mix(node, graph[0], changeListeners, forEachListeners), (int) (state - machine)); - } - continue; - } - } - - if(diff == null) { - diff = new InMemoryGraph(); - } - - if(!diff.hasNode(node.soul)) { - diff.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); - } - - if(!data.hasNode(node.soul)) { - data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); - } - - Node tmp = data.getNode(node.soul); - tmp.values.put(key, value); - tmp.states.put(key, state); - data.addNode(node.soul, tmp); - diff.getNode(node.soul).values.put(key, value); - diff.getNode(node.soul).states.put(key, state); - } - } - if(diff != null) { - for(Map.Entry entry : diff.entries()) { - if(changeListeners.containsKey(entry.getKey())) { - changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject()); - } - if(forEachListeners.containsKey(entry.getKey())) { - for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) { - forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue()); - } - } - } - } - return true; - } - - public static boolean mix(Node incomingNode, StorageBackend data, Map changeListeners, Map forEachListeners) { - long machine = System.currentTimeMillis(); - InMemoryGraph diff = null; - - for(String key : incomingNode.values.keySet()) { - Object value = incomingNode.values.get(key); - if ("_".equals(key)) { continue; } - long state = incomingNode.states.getLong(key); - long was = -1; - Object known = null; - if(data == null) { - data = new InMemoryGraph(); - } - if(data.hasNode(incomingNode.soul)) { - if(data.getNode(incomingNode.soul).states.opt(key) != null) { - was = data.getNode(incomingNode.soul).states.getLong(key); - } - known = data.getNode(incomingNode.soul).values.opt(key) == null ? 0 : data.getNode(incomingNode.soul).values.opt(key); - } - - HAMResult ham = ham(machine, state, was, value, known); - if(!ham.incoming) { - if(ham.defer) { - System.out.println("DEFER: " + key + " " + value); - // Hack for accessing value in lambda without making the variable final - StorageBackend[] graph = new StorageBackend[] {data}; - Utils.setTimeout(() -> mix(incomingNode, graph[0], changeListeners, forEachListeners), (int) (state - machine)); - } - continue; - } - - if(diff == null) { - diff = new InMemoryGraph(); - } - - if(!diff.hasNode(incomingNode.soul)) { - diff.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); - } - - if(!data.hasNode(incomingNode.soul)) { - data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); - } - - Node tmp = data.getNode(incomingNode.soul); - tmp.values.put(key, value); - tmp.states.put(key, state); - data.addNode(incomingNode.soul, tmp); - diff.getNode(incomingNode.soul).values.put(key, value); - diff.getNode(incomingNode.soul).states.put(key, state); - } - if(diff != null) { - for(Map.Entry entry : diff.entries()) { - if(changeListeners.containsKey(entry.getKey())) { - changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject()); - } - if(forEachListeners.containsKey(entry.getKey())) { - for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) { - forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue()); - } - } - } - } - return true; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java new file mode 100644 index 0000000..09d795b --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java @@ -0,0 +1,79 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; +import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.MemoryGraph; +import io.github.chronosx88.JGUN.models.acks.BaseAck; +import io.github.chronosx88.JGUN.models.acks.GetAck; +import io.github.chronosx88.JGUN.models.acks.PutAck; +import io.github.chronosx88.JGUN.models.requests.GetRequest; +import io.github.chronosx88.JGUN.models.requests.PutRequest; +import io.github.chronosx88.JGUN.nodes.Peer; +import io.github.chronosx88.JGUN.storage.Storage; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class NetworkHandler { + private final Map> pendingFutures = new ConcurrentHashMap<>(); + + private final Peer peer; + private final Storage graphStorage; + private final Dup dup; + private final Executor executorService = Executors.newCachedThreadPool(); + + public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) { + this.graphStorage = graphStorage; + this.peer = peer; + this.dup = dup; + } + + public void addPendingFuture(BaseCompletableFuture future) { + pendingFutures.put(future.getFutureID(), future); + } + + public void handleIncomingMessage(BaseMessage message) { + if (message instanceof GetRequest) { + handleGet((GetRequest) message); + } else if (message instanceof PutRequest) { + handlePut((PutRequest) message); + } else if (message instanceof BaseAck) { + handleAck((BaseAck) message); + } + peer.emit(message.toString()); + } + + private GetAck handleGet(GetRequest request) { + // TODO + throw new UnsupportedOperationException("TODO"); + } + + private PutAck handlePut(PutRequest request) { + // TODO + throw new UnsupportedOperationException("TODO"); + } + + private void handleAck(BaseAck ack) { + if (ack instanceof GetAck) { + // TODO + } else if (ack instanceof PutAck) { + // TODO + } + + throw new UnsupportedOperationException("TODO"); + } + + public void sendPutRequest(String messageID, MemoryGraph data) { + executorService.execute(() -> { + // TODO + }); + } + + public void sendGetRequest(String messageID, String key, String field) { + executorService.execute(() -> { + // TODO + }); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/Node.java b/src/main/java/io/github/chronosx88/JGUN/Node.java deleted file mode 100644 index 0a4cf27..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/Node.java +++ /dev/null @@ -1,82 +0,0 @@ -package io.github.chronosx88.JGUN; - -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; -import org.json.JSONObject; - -import java.io.Serializable; - -public class Node implements Comparable, Serializable { - public JSONObject values; // Data - public JSONObject states; // Metadata for diff - public String soul; // i.e. ID of node - - /** - * Create a Peer from a JSON object. - * - * @param rawData JSON object, which contains the data - */ - public Node(JSONObject rawData) { - this.values = new JSONObject(rawData.toString()); - this.states = values.getJSONObject("_").getJSONObject(">"); - this.soul = values.getJSONObject("_").getString("#"); - values.remove("_"); - } - - @Override - public int compareTo(Node other) { - return soul.compareTo(other.soul); - } - - @Override - public boolean equals(Object other) { - if (other == null) - return false; - if (other instanceof String) - return soul.equals(other); - if (other instanceof Node) - return compareTo((Node) other) == 0; - return false; - } - - @Override - public int hashCode() { - return soul.hashCode(); - } - - public boolean isNode(String key) { - return values.optJSONObject(key) != null; - } - - public Node getNode(String key, InMemoryGraph g) { - String soulRef = values.getJSONObject(key).getString("#"); - return g.getNode(soulRef); - } - - @Override - public String toString() { - JSONObject jsonObject = new JSONObject(values.toString()); - jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); - return jsonObject.toString(); - } - - public JSONObject toJSONObject() { - JSONObject jsonObject = new JSONObject(values.toString()); - jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); - return jsonObject; - } - - public JSONObject getMetadata() { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("_", new JSONObject().put("#", soul).put(">", states)); - return jsonObject; - } - - public void setMetadata(JSONObject metadata) { - soul = metadata.getJSONObject("_").getString("#"); - states = metadata.getJSONObject("_").getJSONObject(">"); - } - - public JSONObject toUserJSONObject() { - return values; - } -} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java index 3c2a8c5..72432af 100644 --- a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java +++ b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java @@ -1,10 +1,10 @@ package io.github.chronosx88.JGUN; -import org.json.JSONObject; +import io.github.chronosx88.JGUN.models.Node; @FunctionalInterface public interface NodeChangeListener { - void onChange(JSONObject node); + void onChange(Node node); interface ForEach { void onChange(String key, Object value); diff --git a/src/main/java/io/github/chronosx88/JGUN/PathRef.java b/src/main/java/io/github/chronosx88/JGUN/PathRef.java deleted file mode 100644 index 37b20e0..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/PathRef.java +++ /dev/null @@ -1,148 +0,0 @@ -package io.github.chronosx88.JGUN; - -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; -import java9.util.concurrent.CompletableFuture; -import org.json.JSONObject; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.ExecutionException; - -public class PathRef { - private final ArrayList path = new ArrayList<>(); - private Dispatcher dispatcher; - - public PathRef(Dispatcher dispatcher) { - this.dispatcher = dispatcher; - } - - public PathRef get(String key) { - path.add(key); - return this; - } - - public FutureGet getData() { - FutureGet futureGet = new FutureGet(Dup.random()); - new Thread(() -> { - Iterator iterator = path.iterator(); - String rootSoul = iterator.next(); - String field = iterator.hasNext() ? iterator.next() : null; - - iterator = path.iterator(); - iterator.next(); - - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - FutureGet futureGetRootSoul = new FutureGet(Dup.random()); - dispatcher.addPendingFuture(futureGetRootSoul); - dispatcher.sendGetRequest(futureGetRootSoul.getFutureID(), rootSoul, field); - JSONObject result = futureGetRootSoul.await(); - if(result != null && result.isEmpty()) { - result = null; - } - return result == null ? null : result.getJSONObject(rootSoul); - }); - do { - String soul = iterator.hasNext() ? iterator.next() : null; - String nextField = iterator.hasNext() ? iterator.next() : null; - future = future.thenApply(jsonObject -> { - if(jsonObject != null) { - if(soul != null) { - JSONObject tmp = null; - if(jsonObject.keySet().contains("#")) { - String nodeRef = jsonObject.getString("#"); - FutureGet get = new FutureGet(Dup.random()); - dispatcher.addPendingFuture(get); - dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField); - JSONObject result = get.await(); - if(result != null && result.isEmpty()) { - result = null; - } - result = result == null ? null : result.getJSONObject(nodeRef); - if(result != null) { - result = nextField == null ? result : result.getJSONObject(nextField); - } - tmp = result; - } - if(tmp != null) { - if(tmp.opt(soul) instanceof JSONObject) { - String nodeRef = tmp.getJSONObject(soul).getString("#"); - FutureGet get = new FutureGet(Dup.random()); - dispatcher.addPendingFuture(get); - dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField); - JSONObject result = get.await(); - if(result != null && result.isEmpty()) { - result = null; - } - result = result == null ? null : result.getJSONObject(nodeRef); - if(result != null) { - result = nextField == null ? result : result.getJSONObject(nextField); - } - return result; - } else { - return tmp; - } - } else { - if(jsonObject.opt(soul) instanceof JSONObject) { - String nodeRef = jsonObject.getJSONObject(soul).getString("#"); - FutureGet get = new FutureGet(Dup.random()); - dispatcher.addPendingFuture(get); - dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField); - JSONObject result = get.await(); - if(result != null && result.isEmpty()) { - result = null; - } - result = result == null ? null : result.getJSONObject(nodeRef); - if(result != null) { - result = nextField == null ? result : result.getJSONObject(nextField); - } - return result; - } else { - return jsonObject; - } - } - } else { - return jsonObject; - } - } - return null; - }); - } while(iterator.hasNext()); - - try { - JSONObject data = future.get(); - futureGet.complete(data); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } - }).start(); - return futureGet; - } - - public FuturePut put(JSONObject data) { - FuturePut futurePut = new FuturePut(Dup.random()); - dispatcher.addPendingFuture(futurePut); - JSONObject temp = new JSONObject(); - JSONObject temp1 = temp; - for (int i = 0; i < path.size(); i++) { - if(i != path.size() - 1) { - JSONObject object = new JSONObject(); - temp1.put(path.get(i), object); - temp1 = object; - } else { - temp1.put(path.get(i), data == null ? JSONObject.NULL : data); - } - - } - dispatcher.sendPutRequest(futurePut.getFutureID(), temp); - return futurePut; - } - - public void on(NodeChangeListener changeListener) { - dispatcher.addChangeListener(Utils.join("/", path), changeListener); - } - - public void map(NodeChangeListener.ForEach forEachListener) { - dispatcher.addForEachChangeListener(Utils.join("/", path), forEachListener); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/PathReference.java new file mode 100644 index 0000000..29b7fd5 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/PathReference.java @@ -0,0 +1,40 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.futures.FutureGet; +import io.github.chronosx88.JGUN.futures.FuturePut; + +import java.util.ArrayList; +import java.util.HashMap; + +public class PathReference { + private final ArrayList path = new ArrayList<>(); + + private Gun database; + + public PathReference(Gun db) { + this.database = db; + } + + public PathReference get(String key) { + path.add(key); + return this; + } + + public FutureGet getData() { + // TODO + return null; + } + + public FuturePut put(HashMap data) { + // TODO + return null; + } + + public void on(NodeChangeListener changeListener) { + database.addChangeListener(String.join("/", path), changeListener); + } + + public void map(NodeChangeListener.ForEach forEachListener) { + database.addForEachChangeListener(String.join("/", path), forEachListener); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/Utils.java b/src/main/java/io/github/chronosx88/JGUN/Utils.java deleted file mode 100644 index 04c8ab7..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/Utils.java +++ /dev/null @@ -1,146 +0,0 @@ -package io.github.chronosx88.JGUN; - -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; -import org.json.JSONObject; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.ConcurrentSkipListSet; - -public class Utils { - public static Thread setTimeout(Runnable runnable, int delay){ - Thread thread = new Thread(() -> { - try { - Thread.sleep(delay); - runnable.run(); - } - catch (Exception e){ - e.printStackTrace(); - } - }); - thread.start(); - return thread; - } - - public static Node newNode(String soul, JSONObject data) { - JSONObject states = new JSONObject(); - for (String key : data.keySet()) { - states.put(key, System.currentTimeMillis()); - } - data.put("_", new JSONObject().put("#", soul).put(">", states)); - return new Node(data); - } - - public static InMemoryGraph getRequest(JSONObject lex, StorageBackend graph) { - String soul = lex.getString("#"); - String key = lex.optString(".", null); - Node node = graph.getNode(soul); - Object tmp; - if(node == null) { - return new InMemoryGraph(); - } - if(key != null) { - tmp = node.values.opt(key); - if(tmp == null) { - return new InMemoryGraph(); - } - Node node1 = new Node(node.toJSONObject()); - node = Utils.newNode(node.soul, new JSONObject()); - node.setMetadata(node1.getMetadata()); - node.values.put(key, tmp); - JSONObject tmpStates = node1.states; - node.states.put(key, tmpStates.get(key)); - } - InMemoryGraph ack = new InMemoryGraph(); - ack.addNode(soul, node); - return ack; - } - - public static InMemoryGraph prepareDataForPut(JSONObject data) { - InMemoryGraph result = new InMemoryGraph(); - for (String objectKey : data.keySet()) { - Object object = data.get(objectKey); - if(object instanceof JSONObject) { - Node node = Utils.newNode(objectKey, (JSONObject) object); - ArrayList path = new ArrayList<>(); - path.add(objectKey); - prepareNodeForPut(node, result, path); - } - } - return result; - } - - private static void prepareNodeForPut(Node node, InMemoryGraph result, ArrayList path) { - for(String key : new ConcurrentSkipListSet<>(node.values.keySet())) { - Object value = node.values.get(key); - if(value instanceof JSONObject) { - path.add(key); - String soul = ""; - soul = Utils.join("/", path); - Node tmpNode = Utils.newNode(soul, (JSONObject) value); - node.values.remove(key); - node.values.put(key, new JSONObject().put("#", soul)); - prepareNodeForPut(tmpNode, result, new ArrayList<>(path)); - result.addNode(soul, tmpNode); - path.remove(key); - } - } - result.addNode(node.soul, node); - } - - public static JSONObject formatGetRequest(String messageID, String key, String field) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("#", messageID); - JSONObject getParameters = new JSONObject(); - getParameters.put("#", key); - if(field != null) { - getParameters.put(".", field); - } - jsonObject.put("get", getParameters); - return jsonObject; - } - - public static JSONObject formatPutRequest(String messageID, JSONObject data) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("#", messageID); - jsonObject.put("put", data); - return jsonObject; - } - - /** - * This check current nodes for existing IDs in our storage, and if there are existing IDs, it means to replace them. - * Prevents trailing nodes in storage - * @param incomingGraph The graph that came to us over the wire. - * @param graphStorage Graph storage in which the incoming graph will be saved - * @return Prepared graph for saving - */ - /*public static InMemoryGraph checkIncomingNodesForID(InMemoryGraph incomingGraph, StorageBackend graphStorage) { - for (Node node : incomingGraph.nodes()) { - for(node) - } - }*/ - - /** - * Returns a string containing the tokens joined by delimiters. - * - * @param delimiter a CharSequence that will be inserted between the tokens. If null, the string - * "null" will be used as the delimiter. - * @param tokens an array objects to be joined. Strings will be formed from the objects by - * calling object.toString(). If tokens is null, a NullPointerException will be thrown. If - * tokens is empty, an empty string will be returned. - */ - public static String join(CharSequence delimiter, Iterable tokens) { - final Iterator it = tokens.iterator(); - if (!it.hasNext()) { - return ""; - } - final StringBuilder sb = new StringBuilder(); - sb.append(it.next()); - while (it.hasNext()) { - sb.append(delimiter); - sb.append(it.next()); - } - return sb.toString(); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java index 2e24531..3bc557c 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java @@ -1,7 +1,7 @@ package io.github.chronosx88.JGUN.examples; import io.github.chronosx88.JGUN.nodes.GunClient; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storage.MemoryStorage; import java.net.Inet4Address; import java.net.URISyntaxException; @@ -9,7 +9,7 @@ import java.net.UnknownHostException; public class MainClient { public static void main(String[] args) throws URISyntaxException, UnknownHostException { - GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph()); + GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new MemoryStorage()); gunClient.connect(); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java index aa4bd7c..50a68b0 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java @@ -1,55 +1,52 @@ package io.github.chronosx88.JGUN.examples; import io.github.chronosx88.JGUN.Gun; -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; import io.github.chronosx88.JGUN.nodes.GunSuperPeer; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; -import org.json.JSONObject; +import io.github.chronosx88.JGUN.storage.MemoryStorage; import java.net.Inet4Address; import java.net.UnknownHostException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -public class MainClientServer { - public static void main(String[] args) { - GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new InMemoryGraph()); - gunSuperNode.start(); - Runnable task = () -> { - Gun gun = null; - try { - gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph()); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - gun.get("random").on(data -> { - if(data != null) { - System.out.println("New change in \"random\"! " + data.toString(2)); - } - }); - gun.get("random").get("dVFtzE9CL").on(data -> { - if(data != null) { - System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2)); - } else { - System.out.println("Now random/dVFtzE9CL is null!"); - } - - }); - gun.get("random").get("dVFtzE9CL").map(((key, value) -> { - System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString()); - })); - gun.get("random").map(((key, value) -> { - System.out.println("[Map] New change in \"random\"! " + key + " : " + value); - })); - System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await()); - System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await()); - System.out.println("Deleting an item random/dVFtzE9CL"); - gun.get("random").get("dVFtzE9CL").put(null).await(); - gun.get("random").put(new JSONObject().put("hello", "world")).await(); - }; - - Executor executorService = Executors.newSingleThreadExecutor(); - executorService.execute(task); - } -} +//public class MainClientServer { +// public static void main(String[] args) { +// GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new MemoryStorage()); +// gunSuperNode.start(); +// Runnable task = () -> { +// Gun gun = null; +// try { +// gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new MemoryStorage()); +// } catch (UnknownHostException e) { +// e.printStackTrace(); +// } +// gun.get("random").on(data -> { +// if(data != null) { +// System.out.println("New change in \"random\"! " + data.toString(2)); +// } +// }); +// gun.get("random").get("dVFtzE9CL").on(data -> { +// if(data != null) { +// System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2)); +// } else { +// System.out.println("Now random/dVFtzE9CL is null!"); +// } +// +// }); +// gun.get("random").get("dVFtzE9CL").map(((key, value) -> { +// System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString()); +// })); +// gun.get("random").map(((key, value) -> { +// System.out.println("[Map] New change in \"random\"! " + key + " : " + value); +// })); +// System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await()); +// System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await()); +// System.out.println("Deleting an item random/dVFtzE9CL"); +// gun.get("random").get("dVFtzE9CL").put(null).await(); +// gun.get("random").put(new JSONObject().put("hello", "world")).await(); +// }; +// +// Executor executorService = Executors.newSingleThreadExecutor(); +// executorService.execute(task); +// } +//} diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java index 873dbef..e6835e1 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java @@ -1,11 +1,11 @@ package io.github.chronosx88.JGUN.examples; import io.github.chronosx88.JGUN.nodes.GunSuperPeer; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.storage.MemoryStorage; public class MainServer { public static void main(String[] args) { - GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new InMemoryGraph()); + GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new MemoryStorage()); gunSuperNode.start(); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java index f624d9a..2801881 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java @@ -3,8 +3,10 @@ package io.github.chronosx88.JGUN.futures; import java.util.concurrent.ExecutionException; import java9.util.concurrent.CompletableFuture; +import lombok.Getter; +@Getter public class BaseCompletableFuture extends CompletableFuture { private final String futureID; @@ -13,10 +15,6 @@ public class BaseCompletableFuture extends CompletableFuture { futureID = id; } - public String getFutureID() { - return futureID; - } - public void addListener(final BaseFutureListener listener) { this.whenCompleteAsync((t, throwable) -> { if(throwable == null) { diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java index 31a5800..b4c79dc 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java @@ -1,8 +1,8 @@ package io.github.chronosx88.JGUN.futures; -import org.json.JSONObject; +import io.github.chronosx88.JGUN.models.MemoryGraph; -public class FutureGet extends BaseCompletableFuture { +public class FutureGet extends BaseCompletableFuture { public FutureGet(String id) { super(id); } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java new file mode 100644 index 0000000..5c9bf30 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java @@ -0,0 +1,10 @@ +package io.github.chronosx88.JGUN.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public abstract class BaseMessage { + @JsonProperty("#") + private String id; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java new file mode 100644 index 0000000..51ccd9a --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java @@ -0,0 +1,27 @@ +package io.github.chronosx88.JGUN.models; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Builder; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; + +import java.util.LinkedHashMap; +import java.util.Map; + +@Data +public class MemoryGraph { + @JsonIgnore + public final Map nodes = new LinkedHashMap<>(); + + @JsonAnyGetter + public Map nodes() { + return nodes; + } + + @JsonAnySetter + public void putNodes(String id, Node node) { + nodes.put(id, node); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/Node.java b/src/main/java/io/github/chronosx88/JGUN/models/Node.java new file mode 100644 index 0000000..49d429d --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/Node.java @@ -0,0 +1,34 @@ +package io.github.chronosx88.JGUN.models; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; + +import java.util.LinkedHashMap; +import java.util.Map; + +@Data +@Jacksonized +@Builder +public class Node { + @JsonProperty("_") + private NodeMetadata metadata; + + @JsonIgnore + @Builder.Default + public Map values = new LinkedHashMap<>(); // Data + + @JsonAnyGetter + public Map getValues() { + return values; + } + + @JsonAnySetter + public void allSetter(String key, String value) { + values.put(key, value); + } +} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java b/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java new file mode 100644 index 0000000..8d6ae11 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java @@ -0,0 +1,21 @@ +package io.github.chronosx88.JGUN.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; + +import java.util.HashMap; +import java.util.Map; + +@Data +@Builder +@Jacksonized +public class NodeMetadata { + @JsonProperty(">") + @Builder.Default + private Map states = new HashMap<>(); // field -> state + + @JsonProperty("#") + private String nodeID; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java new file mode 100644 index 0000000..5a64d68 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java @@ -0,0 +1,16 @@ +package io.github.chronosx88.JGUN.models.acks; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.chronosx88.JGUN.models.BaseMessage; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.jackson.Jacksonized; + +@Data +@EqualsAndHashCode(callSuper = true) +public abstract class BaseAck extends BaseMessage { + @JsonProperty("@") + private String replyTo; + private String ok; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java new file mode 100644 index 0000000..be491fb --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java @@ -0,0 +1,17 @@ +package io.github.chronosx88.JGUN.models.acks; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.chronosx88.JGUN.models.MemoryGraph; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.jackson.Jacksonized; + +@Data +@EqualsAndHashCode(callSuper = true) +@Builder +@Jacksonized +public class GetAck extends BaseAck { + @JsonProperty("put") + private MemoryGraph data; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java new file mode 100644 index 0000000..7495e97 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java @@ -0,0 +1,12 @@ +package io.github.chronosx88.JGUN.models.acks; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.jackson.Jacksonized; + +@Data +@EqualsAndHashCode(callSuper = true) +@Builder +@Jacksonized +public class PutAck extends BaseAck {} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java new file mode 100644 index 0000000..5b557e1 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java @@ -0,0 +1,13 @@ +package io.github.chronosx88.JGUN.models.requests; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.chronosx88.JGUN.models.BaseMessage; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class GetRequest extends BaseMessage { + @JsonProperty("get") + private GetRequestParams params; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java new file mode 100644 index 0000000..8d3d68a --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java @@ -0,0 +1,11 @@ +package io.github.chronosx88.JGUN.models.requests; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class GetRequestParams { + @JsonProperty("#") + private String nodeID; + + @JsonProperty(".") + private String field; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java new file mode 100644 index 0000000..80e706b --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java @@ -0,0 +1,18 @@ +package io.github.chronosx88.JGUN.models.requests; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.Node; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.jackson.Jacksonized; + +@Data +@Jacksonized +@Builder +@EqualsAndHashCode(callSuper = true) +public class PutRequest extends BaseMessage { + @JsonProperty("put") + private Node[] params; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java index c1c049c..e2fcc92 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java @@ -1,24 +1,22 @@ package io.github.chronosx88.JGUN.nodes; -import io.github.chronosx88.JGUN.Dispatcher; import io.github.chronosx88.JGUN.Dup; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; - +import io.github.chronosx88.JGUN.NetworkHandler; +import io.github.chronosx88.JGUN.storage.Storage; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; -import org.json.JSONObject; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; public class GunClient extends WebSocketClient implements Peer { - private Dup dup = new Dup(); - private final Dispatcher dispatcher; + private Dup dup = new Dup(1000*9); + private final NetworkHandler handler; - public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { + public GunClient(InetAddress address, int port, Storage storage) throws URISyntaxException { super(new URI("ws://" + address.getHostAddress() + ":" + port)); - this.dispatcher = new Dispatcher(storage, this, dup); + this.handler = new NetworkHandler(storage, this, dup); } @Override @@ -28,10 +26,7 @@ public class GunClient extends WebSocketClient implements Peer { @Override public void onMessage(String message) { - JSONObject jsonMsg = new JSONObject(message); - if(dup.check(jsonMsg.getString("#"))){ return; } - dup.track(jsonMsg.getString("#")); - dispatcher.handleIncomingMessage(jsonMsg); + // TODO } @Override @@ -45,10 +40,6 @@ public class GunClient extends WebSocketClient implements Peer { ex.printStackTrace(); } - public Dispatcher getDispatcher() { - return dispatcher; - } - @Override public void emit(String data) { this.send(data); diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java index afd78b3..e052149 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java @@ -1,24 +1,22 @@ package io.github.chronosx88.JGUN.nodes; -import io.github.chronosx88.JGUN.Dispatcher; import io.github.chronosx88.JGUN.Dup; -import io.github.chronosx88.JGUN.storageBackends.StorageBackend; - +import io.github.chronosx88.JGUN.NetworkHandler; +import io.github.chronosx88.JGUN.storage.Storage; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; -import org.json.JSONObject; import java.net.InetSocketAddress; public class GunSuperPeer extends WebSocketServer implements Peer { - private Dup dup = new Dup(); - private Dispatcher dispatcher; + private Dup dup = new Dup(1000*9); + private NetworkHandler handler; - public GunSuperPeer(int port, StorageBackend storageBackend) { + public GunSuperPeer(int port, Storage storage) { super(new InetSocketAddress(port)); setReuseAddr(true); - dispatcher = new Dispatcher(storageBackend, this, dup); + handler = new NetworkHandler(storage, this, dup); } @Override @@ -35,10 +33,7 @@ public class GunSuperPeer extends WebSocketServer implements Peer { @Override public void onMessage(WebSocket conn, String message) { - JSONObject jsonMsg = new JSONObject(message); - if(dup.check(jsonMsg.getString("#"))){ return; } - dup.track(jsonMsg.getString("#")); - dispatcher.handleIncomingMessage(jsonMsg); + // TODO } @Override diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java new file mode 100644 index 0000000..a2de416 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java @@ -0,0 +1,47 @@ +package io.github.chronosx88.JGUN.storage; + +import io.github.chronosx88.JGUN.models.Node; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + + +public class MemoryStorage extends Storage { + private final Map nodes; + + public MemoryStorage() { + nodes = new LinkedHashMap<>(); + } + + public Node getNode(String id) { + return nodes.get(id); + } + + @Override + void updateNode(Node node) { + // TODO + throw new UnsupportedOperationException("TODO"); + } + + public void addNode(String id, Node incomingNode) { + nodes.put(id, incomingNode); + } + + public boolean hasNode(String id) { + return nodes.containsKey(id); + } + + public Set> entries() { + return nodes.entrySet(); + } + + public Collection nodes() { + return nodes.values(); + } + + public boolean isEmpty() { + return nodes.isEmpty(); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java new file mode 100644 index 0000000..d2234c5 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java @@ -0,0 +1,100 @@ +package io.github.chronosx88.JGUN.storage; + +import io.github.chronosx88.JGUN.HAM; +import io.github.chronosx88.JGUN.NodeChangeListener; +import io.github.chronosx88.JGUN.models.MemoryGraph; +import io.github.chronosx88.JGUN.models.Node; +import io.github.chronosx88.JGUN.models.NodeMetadata; + +import java.util.*; + +public abstract class Storage { + abstract Node getNode(String id); + abstract void updateNode(Node node); + abstract void addNode(String id, Node node); + abstract boolean hasNode(String id); + abstract Set> entries(); + abstract Collection nodes(); + abstract boolean isEmpty(); + + /** + * Merge graph update (usually received from the network) + * @param update Graph update + * @param changeListeners + * @param forEachListeners + */ + public void mergeUpdate(MemoryGraph update, Map changeListeners, Map forEachListeners) { + long machine = System.currentTimeMillis(); + MemoryGraph diff = new MemoryGraph(); + for (Map.Entry entry : update.getNodes().entrySet()) { + Node node = entry.getValue(); + Node diffNode = this.mergeNode(node, machine); + if (Objects.nonNull(diffNode)) { + diff.nodes.put(diffNode.getMetadata().getNodeID(), diffNode); + } + } + if (!diff.nodes.isEmpty()) { + for (Map.Entry diffEntry : diff.getNodes().entrySet()) { + Node changedNode = diffEntry.getValue(); + if (!this.hasNode(changedNode.getMetadata().getNodeID())) { + this.addNode(changedNode.getMetadata().getNodeID(), changedNode); + } else { + this.updateNode(changedNode); + } + + if (changeListeners.containsKey(diffEntry.getKey())) { + changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue()); + } + if (forEachListeners.containsKey(diffEntry.getKey())) { + for (Map.Entry nodeEntry : changedNode.getValues().entrySet()) { + forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue()); + } + } + } + } + } + + /** + * Merge updated node + * @param incomingNode Updated node + * @param machineState Current machine state + * @return Node with changes or null if no changes + */ + public Node mergeNode(Node incomingNode, long machineState) { + Node changedNode = null; + for (String key : incomingNode.getValues().keySet()) { + Object value = incomingNode.getValues().get(key); + long state = incomingNode.getMetadata().getStates().get(key); + long previousState = -1; + Object currentValue = null; + if (this.hasNode(incomingNode.getMetadata().getNodeID())) { + Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID()); + Long prevStateFromStorage = currentNode.getMetadata().getStates().get(key); + if (!Objects.isNull(prevStateFromStorage)) { + previousState = prevStateFromStorage; + } + currentValue = currentNode.getValues().get(key); + } + HAM.HAMResult ham = HAM.ham(machineState, state, previousState, value, currentValue); + + if (!ham.incoming) { + if (ham.defer) { + // TODO handle deferred value + } + continue; + } + + if (Objects.isNull(changedNode)) { + changedNode = Node.builder() + .metadata(NodeMetadata.builder() + .nodeID(incomingNode.getMetadata().getNodeID()) + .build()) + .build(); + } + changedNode.values.put(key, value); + changedNode.getMetadata().getStates().put(key, state); + } + + return changedNode; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java deleted file mode 100644 index f8bee80..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java +++ /dev/null @@ -1,82 +0,0 @@ -package io.github.chronosx88.JGUN.storageBackends; - -import io.github.chronosx88.JGUN.Node; - -import org.json.JSONObject; - -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -public class InMemoryGraph implements StorageBackend { - - private final HashMap nodes; - - public InMemoryGraph(JSONObject source) { - nodes = new LinkedHashMap<>(); - - for (String soul : source.keySet()) - nodes.put(soul, new Node(source.getJSONObject(soul))); - } - - public InMemoryGraph() { - nodes = new LinkedHashMap<>(); - } - - public Node getNode(String soul) { - return nodes.get(soul); - } - - public void addNode(String soul, Node incomingNode) { - nodes.put(soul, incomingNode); - } - - public boolean hasNode(String soul) { - return nodes.containsKey(soul); - } - - public Set> entries() { - return nodes.entrySet(); - } - - public Collection nodes() { return nodes.values(); } - - @Override - public String toString() { - JSONObject jsonObject = new JSONObject(); - for(Map.Entry entry : nodes.entrySet()) { - jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); - } - return jsonObject.toString(); - } - - public String toPrettyString() { - JSONObject jsonObject = new JSONObject(); - for(Map.Entry entry : nodes.entrySet()) { - jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); - } - return jsonObject.toString(2); - } - - public JSONObject toJSONObject() { - JSONObject jsonObject = new JSONObject(); - for(Map.Entry entry : nodes.entrySet()) { - jsonObject.put(entry.getKey(), entry.getValue().toJSONObject()); - } - return jsonObject; - } - - public JSONObject toUserJSONObject() { - JSONObject jsonObject = new JSONObject(); - for(Map.Entry entry : nodes.entrySet()) { - jsonObject.put(entry.getKey(), entry.getValue().toUserJSONObject()); - } - return jsonObject; - } - - public boolean isEmpty() { - return nodes.isEmpty(); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java deleted file mode 100644 index e383f96..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.github.chronosx88.JGUN.storageBackends; - -import io.github.chronosx88.JGUN.Node; -import org.json.JSONObject; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -public interface StorageBackend { - Node getNode(String soul); - void addNode(String soul, Node node); - boolean hasNode(String soul); - Set> entries(); - Collection nodes(); - String toString(); - String toPrettyString(); - JSONObject toJSONObject(); - boolean isEmpty(); -}