diff --git a/README.md b/README.md
index aef8ae4..d5e0979 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@
A realtime, decentralized, offline-first, mutable graph protocol to sync the Internet.
## Requirements
-* JRE/JDK >= 1.8.0
+* JRE/JDK >= 11
## Building
1. Clone repo:
@@ -34,11 +34,8 @@ $ cd JGUN
```
2. Compile it:
```bash
-./gradlew shadowJar
+./gradlew build
```
-3. Compiled JAR located in `./build/libs/`
-
-(Also exists precompiled JARs - see Releases (publishing to Maven coming soon...))
[⇧ back to top](#contents)
diff --git a/build.gradle b/build.gradle
index 39f4169..6232692 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,42 +1,26 @@
plugins {
- id 'com.github.johnrengelman.shadow' version '4.0.4'
id 'java'
}
group 'io.github.chronosx88'
version '0.2.6'
-sourceCompatibility = 1.8
+java {
+ sourceCompatibility = JavaVersion.VERSION_11
+ targetCompatibility = JavaVersion.VERSION_11
+}
repositories {
mavenCentral()
}
dependencies {
- implementation 'org.java-websocket:Java-WebSocket:1.4.0'
+ implementation 'org.java-websocket:Java-WebSocket:1.5.4'
implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0'
- implementation group: 'org.json', name: 'json', version: '20180813'
-
- testCompile group: 'junit', name: 'junit', version: '4.12'
-}
-
-shadowJar {
- relocate 'org.json', 'shadow.org.json'
-}
-
-task sourcesJar(type: Jar, dependsOn: classes) {
- classifier = 'sources'
- from sourceSets.main.allSource
-}
-
-task javadocJar(type: Jar, dependsOn: javadoc) {
- classifier = 'javadoc'
- from javadoc.destinationDir
-}
-
-artifacts {
- archives sourcesJar
- archives javadocJar
-}
-
-
+ implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3'
+ implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3'
+ implementation 'javax.cache:cache-api:1.1.1'
+ implementation 'com.github.ben-manes.caffeine:jcache:3.1.5'
+ compileOnly 'org.projectlombok:lombok:1.18.30'
+ annotationProcessor 'org.projectlombok:lombok:1.18.30'
+}
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 183579e..ffed3a2 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,5 @@
-#Wed May 01 20:08:10 MSK 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip
diff --git a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java
deleted file mode 100644
index ec6a697..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package io.github.chronosx88.JGUN;
-
-import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
-import io.github.chronosx88.JGUN.futures.FutureGet;
-import io.github.chronosx88.JGUN.futures.FuturePut;
-import io.github.chronosx88.JGUN.nodes.Peer;
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
-import org.java_websocket.client.WebSocketClient;
-import org.json.JSONObject;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-public class Dispatcher {
- private final Map> pendingFutures = new ConcurrentHashMap<>();
- private final Map changeListeners = new ConcurrentHashMap<>();
- private final Map forEachListeners = new ConcurrentHashMap<>();
- private final Peer peer;
- private final StorageBackend graphStorage;
- private final Dup dup;
- private final Executor executorService = Executors.newCachedThreadPool();
-
- public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
- this.graphStorage = graphStorage;
- this.peer = peer;
- this.dup = dup;
- }
-
- public void addPendingFuture(BaseCompletableFuture> future) {
- pendingFutures.put(future.getFutureID(), future);
- }
-
- public void handleIncomingMessage(JSONObject message) {
- if(message.has("put")) {
- JSONObject ack = handlePut(message);
- peer.emit(ack.toString());
- }
- if(message.has("get")) {
- JSONObject ack = handleGet(message);
- peer.emit(ack.toString());
- }
- if(message.has("@")) {
- handleIncomingAck(message);
- }
- peer.emit(message.toString());
- }
-
- private JSONObject handleGet(JSONObject getData) {
- InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage);
- return new JSONObject() // Acknowledgment
- .put( "#", dup.track(Dup.random()) )
- .put( "@", getData.getString("#") )
- .put( "put", getResults.toJSONObject() )
- .put( "ok", !(getResults.isEmpty()) );
- }
-
- private JSONObject handlePut(JSONObject message) {
- boolean success = HAM.mix(new InMemoryGraph(message.getJSONObject("put")), graphStorage, changeListeners, forEachListeners);
- return new JSONObject() // Acknowledgment
- .put( "#", dup.track(Dup.random()) )
- .put( "@", message.getString("#") )
- .put( "ok", success);
- }
-
- private void handleIncomingAck(JSONObject ack) {
- if(ack.has("put")) {
- if(pendingFutures.containsKey(ack.getString("@"))) {
- BaseCompletableFuture> future = pendingFutures.get(ack.getString("@"));
- if(future instanceof FutureGet) {
- ((FutureGet) future).complete(new InMemoryGraph(ack.getJSONObject("put")).toUserJSONObject());
- }
- }
- }
- if(ack.has("ok")) {
- if(pendingFutures.containsKey(ack.getString("@"))) {
- BaseCompletableFuture> future = pendingFutures.get(ack.getString("@"));
- if(future instanceof FuturePut) {
- ((FuturePut) future).complete(ack.getBoolean("ok"));
- }
- }
- }
- }
-
- public void sendPutRequest(String messageID, JSONObject data) {
- executorService.execute(() -> {
- InMemoryGraph graph = Utils.prepareDataForPut(data);
- peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString());
- });
- }
-
- public void sendGetRequest(String messageID, String key, String field) {
- executorService.execute(() -> {
- JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field);
- peer.emit(jsonGet.toString());
- });
- }
-
- public void addChangeListener(String soul, NodeChangeListener listener) {
- changeListeners.put(soul, listener);
- }
-
- public void addForEachChangeListener(String soul, NodeChangeListener.ForEach listener) {
- forEachListeners.put(soul, listener);
- }
-}
diff --git a/src/main/java/io/github/chronosx88/JGUN/Dup.java b/src/main/java/io/github/chronosx88/JGUN/Dup.java
index 874579f..563a354 100644
--- a/src/main/java/io/github/chronosx88/JGUN/Dup.java
+++ b/src/main/java/io/github/chronosx88/JGUN/Dup.java
@@ -1,40 +1,35 @@
package io.github.chronosx88.JGUN;
-import java.util.Map;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.Caching;
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.spi.CachingProvider;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
public class Dup {
- private static char[] randomSeed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
- private Map s = new ConcurrentHashMap<>();
- private DupOpt opt = new DupOpt();
- private Thread to = null;
+ private final Cache cache;
- public Dup() {
- opt.max = 1000;
- opt.age = 1000 * 9;
+ public Dup(long age) {
+ CachingProvider cachingProvider = Caching.getCachingProvider();
+ CacheManager cacheManager = cachingProvider.getCacheManager();
+ MutableConfiguration config = new MutableConfiguration<>();
+ config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age)));
+ this.cache = cacheManager.createCache("dup", config);
}
- public String track(String id) {
- s.put(id, System.currentTimeMillis());
- if(to == null) {
- Utils.setTimeout(() -> {
- for(Map.Entry entry : s.entrySet()) {
- if(opt.age > (System.currentTimeMillis() - entry.getValue()))
- continue;
- s.remove(entry.getKey());
- }
- to = null;
- }, opt.age);
- }
- return id;
+ private void track(String id) {
+ cache.put(id, System.currentTimeMillis());
}
- public boolean check(String id) {
- if(s.containsKey(id)) {
- track(id);
+ public boolean isDuplicated(String id) {
+ if(cache.containsKey(id)) {
return true;
} else {
+ track(id);
return false;
}
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/DupOpt.java b/src/main/java/io/github/chronosx88/JGUN/DupOpt.java
deleted file mode 100644
index 4981c13..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/DupOpt.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package io.github.chronosx88.JGUN;
-
-public class DupOpt {
- public int max;
- public int age;
-}
diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java
index 6c7fe22..197b397 100644
--- a/src/main/java/io/github/chronosx88/JGUN/Gun.java
+++ b/src/main/java/io/github/chronosx88/JGUN/Gun.java
@@ -1,28 +1,38 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.nodes.GunClient;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
+import io.github.chronosx88.JGUN.storage.Storage;
import java.net.InetAddress;
import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class Gun {
- private Dispatcher dispatcher;
private GunClient gunClient;
+ private final Map changeListeners = new ConcurrentHashMap<>();
+ private final Map forEachListeners = new ConcurrentHashMap<>();
- public Gun(InetAddress address, int port, StorageBackend storage) {
+ public Gun(InetAddress address, int port, Storage storage) {
try {
this.gunClient = new GunClient(address, port, storage);
- this.dispatcher = gunClient.getDispatcher();
this.gunClient.connectBlocking();
} catch (URISyntaxException | InterruptedException e) {
e.printStackTrace();
}
}
- public PathRef get(String key) {
- PathRef pathRef = new PathRef(dispatcher);
+ public PathReference get(String key) {
+ PathReference pathRef = new PathReference(this);
pathRef.get(key);
return pathRef;
}
+
+ protected void addChangeListener(String nodeID, NodeChangeListener listener) {
+ changeListeners.put(nodeID, listener);
+ }
+
+ protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) {
+ forEachListeners.put(nodeID, listener);
+ }
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/HAM.java b/src/main/java/io/github/chronosx88/JGUN/HAM.java
index c35ea70..df47c80 100644
--- a/src/main/java/io/github/chronosx88/JGUN/HAM.java
+++ b/src/main/java/io/github/chronosx88/JGUN/HAM.java
@@ -1,196 +1,36 @@
package io.github.chronosx88.JGUN;
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
-import org.json.JSONObject;
-
-import java.util.Map;
-
public class HAM {
- static class HAMResult {
+ public static class HAMResult {
public boolean defer = false; // Defer means that the current state is greater than our computer time, and should only be processed when our computer time reaches this state
public boolean historical = false; // Historical means the old state. This is usually ignored.
- public boolean converge = false; // Everything is fine, you can do merge
public boolean incoming = false; // Leave incoming value
public boolean current = false; // Leave current value
- public boolean state = false;
}
public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException {
HAMResult result = new HAMResult();
- if(machineState < incomingState) {
+ if (machineState < incomingState) {
// the incoming value is outside the boundary of the machine's state, it must be reprocessed in another state.
result.defer = true;
return result;
- }
- if(incomingState < currentState) {
+ } else if (currentState > incomingState) {
// the incoming value is within the boundary of the machine's state, but not within the range.
result.historical = true;
+ result.current = true;
return result;
- }
- if(currentState < incomingState) {
+ } else if (currentState < incomingState) {
// the incoming value is within both the boundary and the range of the machine's state.
- result.converge = true;
result.incoming = true;
return result;
- }
- if(incomingState == currentState) {
- // if incoming state and current state is the same
- if(incomingValue.equals(currentValue)) {
- result.state = true;
- return result;
- }
- if((incomingValue.toString().compareTo(currentValue.toString())) < 0) {
- result.converge = true;
+ } else { // if incoming state and current state is the same
+ if (incomingValue.equals(currentValue)) {
result.current = true;
return result;
}
- if((currentValue.toString().compareTo(incomingValue.toString())) < 0) {
- result.converge = true;
- result.incoming = true;
- return result;
- }
+ result.incoming = true; // always update local value with incoming value if state is the same
+ return result;
}
- throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!");
- }
-
- public static boolean mix(InMemoryGraph change, StorageBackend data, Map changeListeners, Map forEachListeners) {
- long machine = System.currentTimeMillis();
- InMemoryGraph diff = null;
- for(Map.Entry entry : change.entries()) {
- Node node = entry.getValue();
- for(String key : node.values.keySet()) {
- Object value = node.values.get(key);
- if ("_".equals(key)) { continue; }
- long state = node.states.getLong(key);
- long was = -1;
- Object known = null;
- if(data == null) {
- data = new InMemoryGraph();
- }
- if(data.hasNode(node.soul)) {
- if(data.getNode(node.soul).states.opt(key) != null) {
- was = data.getNode(node.soul).states.getLong(key);
- }
- known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key);
- }
- HAMResult ham = null;
- try {
- ham = ham(machine, state, was, value, known);
- } catch (IllegalArgumentException e) {
- continue;
- }
-
- if(ham != null) {
- if(!ham.incoming) {
- if(ham.defer) {
- System.out.println("DEFER: " + key + " " + value);
- // Hack for accessing value in lambda without making the variable final
- StorageBackend[] graph = new StorageBackend[] {data};
- Utils.setTimeout(() -> mix(node, graph[0], changeListeners, forEachListeners), (int) (state - machine));
- }
- continue;
- }
- }
-
- if(diff == null) {
- diff = new InMemoryGraph();
- }
-
- if(!diff.hasNode(node.soul)) {
- diff.addNode(node.soul, Utils.newNode(node.soul, new JSONObject()));
- }
-
- if(!data.hasNode(node.soul)) {
- data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject()));
- }
-
- Node tmp = data.getNode(node.soul);
- tmp.values.put(key, value);
- tmp.states.put(key, state);
- data.addNode(node.soul, tmp);
- diff.getNode(node.soul).values.put(key, value);
- diff.getNode(node.soul).states.put(key, state);
- }
- }
- if(diff != null) {
- for(Map.Entry entry : diff.entries()) {
- if(changeListeners.containsKey(entry.getKey())) {
- changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
- }
- if(forEachListeners.containsKey(entry.getKey())) {
- for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) {
- forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
- }
- }
- }
- }
- return true;
- }
-
- public static boolean mix(Node incomingNode, StorageBackend data, Map changeListeners, Map forEachListeners) {
- long machine = System.currentTimeMillis();
- InMemoryGraph diff = null;
-
- for(String key : incomingNode.values.keySet()) {
- Object value = incomingNode.values.get(key);
- if ("_".equals(key)) { continue; }
- long state = incomingNode.states.getLong(key);
- long was = -1;
- Object known = null;
- if(data == null) {
- data = new InMemoryGraph();
- }
- if(data.hasNode(incomingNode.soul)) {
- if(data.getNode(incomingNode.soul).states.opt(key) != null) {
- was = data.getNode(incomingNode.soul).states.getLong(key);
- }
- known = data.getNode(incomingNode.soul).values.opt(key) == null ? 0 : data.getNode(incomingNode.soul).values.opt(key);
- }
-
- HAMResult ham = ham(machine, state, was, value, known);
- if(!ham.incoming) {
- if(ham.defer) {
- System.out.println("DEFER: " + key + " " + value);
- // Hack for accessing value in lambda without making the variable final
- StorageBackend[] graph = new StorageBackend[] {data};
- Utils.setTimeout(() -> mix(incomingNode, graph[0], changeListeners, forEachListeners), (int) (state - machine));
- }
- continue;
- }
-
- if(diff == null) {
- diff = new InMemoryGraph();
- }
-
- if(!diff.hasNode(incomingNode.soul)) {
- diff.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject()));
- }
-
- if(!data.hasNode(incomingNode.soul)) {
- data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject()));
- }
-
- Node tmp = data.getNode(incomingNode.soul);
- tmp.values.put(key, value);
- tmp.states.put(key, state);
- data.addNode(incomingNode.soul, tmp);
- diff.getNode(incomingNode.soul).values.put(key, value);
- diff.getNode(incomingNode.soul).states.put(key, state);
- }
- if(diff != null) {
- for(Map.Entry entry : diff.entries()) {
- if(changeListeners.containsKey(entry.getKey())) {
- changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
- }
- if(forEachListeners.containsKey(entry.getKey())) {
- for(Map.Entry jsonEntry : entry.getValue().values.toMap().entrySet()) {
- forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
- }
- }
- }
- }
- return true;
}
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java
new file mode 100644
index 0000000..09d795b
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java
@@ -0,0 +1,79 @@
+package io.github.chronosx88.JGUN;
+
+import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
+import io.github.chronosx88.JGUN.models.BaseMessage;
+import io.github.chronosx88.JGUN.models.MemoryGraph;
+import io.github.chronosx88.JGUN.models.acks.BaseAck;
+import io.github.chronosx88.JGUN.models.acks.GetAck;
+import io.github.chronosx88.JGUN.models.acks.PutAck;
+import io.github.chronosx88.JGUN.models.requests.GetRequest;
+import io.github.chronosx88.JGUN.models.requests.PutRequest;
+import io.github.chronosx88.JGUN.nodes.Peer;
+import io.github.chronosx88.JGUN.storage.Storage;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class NetworkHandler {
+ private final Map> pendingFutures = new ConcurrentHashMap<>();
+
+ private final Peer peer;
+ private final Storage graphStorage;
+ private final Dup dup;
+ private final Executor executorService = Executors.newCachedThreadPool();
+
+ public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) {
+ this.graphStorage = graphStorage;
+ this.peer = peer;
+ this.dup = dup;
+ }
+
+ public void addPendingFuture(BaseCompletableFuture> future) {
+ pendingFutures.put(future.getFutureID(), future);
+ }
+
+ public void handleIncomingMessage(BaseMessage message) {
+ if (message instanceof GetRequest) {
+ handleGet((GetRequest) message);
+ } else if (message instanceof PutRequest) {
+ handlePut((PutRequest) message);
+ } else if (message instanceof BaseAck) {
+ handleAck((BaseAck) message);
+ }
+ peer.emit(message.toString());
+ }
+
+ private GetAck handleGet(GetRequest request) {
+ // TODO
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ private PutAck handlePut(PutRequest request) {
+ // TODO
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ private void handleAck(BaseAck ack) {
+ if (ack instanceof GetAck) {
+ // TODO
+ } else if (ack instanceof PutAck) {
+ // TODO
+ }
+
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ public void sendPutRequest(String messageID, MemoryGraph data) {
+ executorService.execute(() -> {
+ // TODO
+ });
+ }
+
+ public void sendGetRequest(String messageID, String key, String field) {
+ executorService.execute(() -> {
+ // TODO
+ });
+ }
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/Node.java b/src/main/java/io/github/chronosx88/JGUN/Node.java
deleted file mode 100644
index 0a4cf27..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/Node.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package io.github.chronosx88.JGUN;
-
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
-import org.json.JSONObject;
-
-import java.io.Serializable;
-
-public class Node implements Comparable, Serializable {
- public JSONObject values; // Data
- public JSONObject states; // Metadata for diff
- public String soul; // i.e. ID of node
-
- /**
- * Create a Peer from a JSON object.
- *
- * @param rawData JSON object, which contains the data
- */
- public Node(JSONObject rawData) {
- this.values = new JSONObject(rawData.toString());
- this.states = values.getJSONObject("_").getJSONObject(">");
- this.soul = values.getJSONObject("_").getString("#");
- values.remove("_");
- }
-
- @Override
- public int compareTo(Node other) {
- return soul.compareTo(other.soul);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other instanceof String)
- return soul.equals(other);
- if (other instanceof Node)
- return compareTo((Node) other) == 0;
- return false;
- }
-
- @Override
- public int hashCode() {
- return soul.hashCode();
- }
-
- public boolean isNode(String key) {
- return values.optJSONObject(key) != null;
- }
-
- public Node getNode(String key, InMemoryGraph g) {
- String soulRef = values.getJSONObject(key).getString("#");
- return g.getNode(soulRef);
- }
-
- @Override
- public String toString() {
- JSONObject jsonObject = new JSONObject(values.toString());
- jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
- return jsonObject.toString();
- }
-
- public JSONObject toJSONObject() {
- JSONObject jsonObject = new JSONObject(values.toString());
- jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
- return jsonObject;
- }
-
- public JSONObject getMetadata() {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
- return jsonObject;
- }
-
- public void setMetadata(JSONObject metadata) {
- soul = metadata.getJSONObject("_").getString("#");
- states = metadata.getJSONObject("_").getJSONObject(">");
- }
-
- public JSONObject toUserJSONObject() {
- return values;
- }
-}
\ No newline at end of file
diff --git a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java
index 3c2a8c5..72432af 100644
--- a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java
+++ b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java
@@ -1,10 +1,10 @@
package io.github.chronosx88.JGUN;
-import org.json.JSONObject;
+import io.github.chronosx88.JGUN.models.Node;
@FunctionalInterface
public interface NodeChangeListener {
- void onChange(JSONObject node);
+ void onChange(Node node);
interface ForEach {
void onChange(String key, Object value);
diff --git a/src/main/java/io/github/chronosx88/JGUN/PathRef.java b/src/main/java/io/github/chronosx88/JGUN/PathRef.java
deleted file mode 100644
index 37b20e0..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/PathRef.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package io.github.chronosx88.JGUN;
-
-import io.github.chronosx88.JGUN.futures.FutureGet;
-import io.github.chronosx88.JGUN.futures.FuturePut;
-import java9.util.concurrent.CompletableFuture;
-import org.json.JSONObject;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ExecutionException;
-
-public class PathRef {
- private final ArrayList path = new ArrayList<>();
- private Dispatcher dispatcher;
-
- public PathRef(Dispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- public PathRef get(String key) {
- path.add(key);
- return this;
- }
-
- public FutureGet getData() {
- FutureGet futureGet = new FutureGet(Dup.random());
- new Thread(() -> {
- Iterator iterator = path.iterator();
- String rootSoul = iterator.next();
- String field = iterator.hasNext() ? iterator.next() : null;
-
- iterator = path.iterator();
- iterator.next();
-
- CompletableFuture future = CompletableFuture.supplyAsync(() -> {
- FutureGet futureGetRootSoul = new FutureGet(Dup.random());
- dispatcher.addPendingFuture(futureGetRootSoul);
- dispatcher.sendGetRequest(futureGetRootSoul.getFutureID(), rootSoul, field);
- JSONObject result = futureGetRootSoul.await();
- if(result != null && result.isEmpty()) {
- result = null;
- }
- return result == null ? null : result.getJSONObject(rootSoul);
- });
- do {
- String soul = iterator.hasNext() ? iterator.next() : null;
- String nextField = iterator.hasNext() ? iterator.next() : null;
- future = future.thenApply(jsonObject -> {
- if(jsonObject != null) {
- if(soul != null) {
- JSONObject tmp = null;
- if(jsonObject.keySet().contains("#")) {
- String nodeRef = jsonObject.getString("#");
- FutureGet get = new FutureGet(Dup.random());
- dispatcher.addPendingFuture(get);
- dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
- JSONObject result = get.await();
- if(result != null && result.isEmpty()) {
- result = null;
- }
- result = result == null ? null : result.getJSONObject(nodeRef);
- if(result != null) {
- result = nextField == null ? result : result.getJSONObject(nextField);
- }
- tmp = result;
- }
- if(tmp != null) {
- if(tmp.opt(soul) instanceof JSONObject) {
- String nodeRef = tmp.getJSONObject(soul).getString("#");
- FutureGet get = new FutureGet(Dup.random());
- dispatcher.addPendingFuture(get);
- dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
- JSONObject result = get.await();
- if(result != null && result.isEmpty()) {
- result = null;
- }
- result = result == null ? null : result.getJSONObject(nodeRef);
- if(result != null) {
- result = nextField == null ? result : result.getJSONObject(nextField);
- }
- return result;
- } else {
- return tmp;
- }
- } else {
- if(jsonObject.opt(soul) instanceof JSONObject) {
- String nodeRef = jsonObject.getJSONObject(soul).getString("#");
- FutureGet get = new FutureGet(Dup.random());
- dispatcher.addPendingFuture(get);
- dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
- JSONObject result = get.await();
- if(result != null && result.isEmpty()) {
- result = null;
- }
- result = result == null ? null : result.getJSONObject(nodeRef);
- if(result != null) {
- result = nextField == null ? result : result.getJSONObject(nextField);
- }
- return result;
- } else {
- return jsonObject;
- }
- }
- } else {
- return jsonObject;
- }
- }
- return null;
- });
- } while(iterator.hasNext());
-
- try {
- JSONObject data = future.get();
- futureGet.complete(data);
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }).start();
- return futureGet;
- }
-
- public FuturePut put(JSONObject data) {
- FuturePut futurePut = new FuturePut(Dup.random());
- dispatcher.addPendingFuture(futurePut);
- JSONObject temp = new JSONObject();
- JSONObject temp1 = temp;
- for (int i = 0; i < path.size(); i++) {
- if(i != path.size() - 1) {
- JSONObject object = new JSONObject();
- temp1.put(path.get(i), object);
- temp1 = object;
- } else {
- temp1.put(path.get(i), data == null ? JSONObject.NULL : data);
- }
-
- }
- dispatcher.sendPutRequest(futurePut.getFutureID(), temp);
- return futurePut;
- }
-
- public void on(NodeChangeListener changeListener) {
- dispatcher.addChangeListener(Utils.join("/", path), changeListener);
- }
-
- public void map(NodeChangeListener.ForEach forEachListener) {
- dispatcher.addForEachChangeListener(Utils.join("/", path), forEachListener);
- }
-}
diff --git a/src/main/java/io/github/chronosx88/JGUN/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/PathReference.java
new file mode 100644
index 0000000..29b7fd5
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/PathReference.java
@@ -0,0 +1,40 @@
+package io.github.chronosx88.JGUN;
+
+import io.github.chronosx88.JGUN.futures.FutureGet;
+import io.github.chronosx88.JGUN.futures.FuturePut;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class PathReference {
+ private final ArrayList path = new ArrayList<>();
+
+ private Gun database;
+
+ public PathReference(Gun db) {
+ this.database = db;
+ }
+
+ public PathReference get(String key) {
+ path.add(key);
+ return this;
+ }
+
+ public FutureGet getData() {
+ // TODO
+ return null;
+ }
+
+ public FuturePut put(HashMap data) {
+ // TODO
+ return null;
+ }
+
+ public void on(NodeChangeListener changeListener) {
+ database.addChangeListener(String.join("/", path), changeListener);
+ }
+
+ public void map(NodeChangeListener.ForEach forEachListener) {
+ database.addForEachChangeListener(String.join("/", path), forEachListener);
+ }
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/Utils.java b/src/main/java/io/github/chronosx88/JGUN/Utils.java
deleted file mode 100644
index 04c8ab7..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/Utils.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package io.github.chronosx88.JGUN;
-
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
-import org.json.JSONObject;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-public class Utils {
- public static Thread setTimeout(Runnable runnable, int delay){
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(delay);
- runnable.run();
- }
- catch (Exception e){
- e.printStackTrace();
- }
- });
- thread.start();
- return thread;
- }
-
- public static Node newNode(String soul, JSONObject data) {
- JSONObject states = new JSONObject();
- for (String key : data.keySet()) {
- states.put(key, System.currentTimeMillis());
- }
- data.put("_", new JSONObject().put("#", soul).put(">", states));
- return new Node(data);
- }
-
- public static InMemoryGraph getRequest(JSONObject lex, StorageBackend graph) {
- String soul = lex.getString("#");
- String key = lex.optString(".", null);
- Node node = graph.getNode(soul);
- Object tmp;
- if(node == null) {
- return new InMemoryGraph();
- }
- if(key != null) {
- tmp = node.values.opt(key);
- if(tmp == null) {
- return new InMemoryGraph();
- }
- Node node1 = new Node(node.toJSONObject());
- node = Utils.newNode(node.soul, new JSONObject());
- node.setMetadata(node1.getMetadata());
- node.values.put(key, tmp);
- JSONObject tmpStates = node1.states;
- node.states.put(key, tmpStates.get(key));
- }
- InMemoryGraph ack = new InMemoryGraph();
- ack.addNode(soul, node);
- return ack;
- }
-
- public static InMemoryGraph prepareDataForPut(JSONObject data) {
- InMemoryGraph result = new InMemoryGraph();
- for (String objectKey : data.keySet()) {
- Object object = data.get(objectKey);
- if(object instanceof JSONObject) {
- Node node = Utils.newNode(objectKey, (JSONObject) object);
- ArrayList path = new ArrayList<>();
- path.add(objectKey);
- prepareNodeForPut(node, result, path);
- }
- }
- return result;
- }
-
- private static void prepareNodeForPut(Node node, InMemoryGraph result, ArrayList path) {
- for(String key : new ConcurrentSkipListSet<>(node.values.keySet())) {
- Object value = node.values.get(key);
- if(value instanceof JSONObject) {
- path.add(key);
- String soul = "";
- soul = Utils.join("/", path);
- Node tmpNode = Utils.newNode(soul, (JSONObject) value);
- node.values.remove(key);
- node.values.put(key, new JSONObject().put("#", soul));
- prepareNodeForPut(tmpNode, result, new ArrayList<>(path));
- result.addNode(soul, tmpNode);
- path.remove(key);
- }
- }
- result.addNode(node.soul, node);
- }
-
- public static JSONObject formatGetRequest(String messageID, String key, String field) {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("#", messageID);
- JSONObject getParameters = new JSONObject();
- getParameters.put("#", key);
- if(field != null) {
- getParameters.put(".", field);
- }
- jsonObject.put("get", getParameters);
- return jsonObject;
- }
-
- public static JSONObject formatPutRequest(String messageID, JSONObject data) {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("#", messageID);
- jsonObject.put("put", data);
- return jsonObject;
- }
-
- /**
- * This check current nodes for existing IDs in our storage, and if there are existing IDs, it means to replace them.
- * Prevents trailing nodes in storage
- * @param incomingGraph The graph that came to us over the wire.
- * @param graphStorage Graph storage in which the incoming graph will be saved
- * @return Prepared graph for saving
- */
- /*public static InMemoryGraph checkIncomingNodesForID(InMemoryGraph incomingGraph, StorageBackend graphStorage) {
- for (Node node : incomingGraph.nodes()) {
- for(node)
- }
- }*/
-
- /**
- * Returns a string containing the tokens joined by delimiters.
- *
- * @param delimiter a CharSequence that will be inserted between the tokens. If null, the string
- * "null" will be used as the delimiter.
- * @param tokens an array objects to be joined. Strings will be formed from the objects by
- * calling object.toString(). If tokens is null, a NullPointerException will be thrown. If
- * tokens is empty, an empty string will be returned.
- */
- public static String join(CharSequence delimiter, Iterable tokens) {
- final Iterator> it = tokens.iterator();
- if (!it.hasNext()) {
- return "";
- }
- final StringBuilder sb = new StringBuilder();
- sb.append(it.next());
- while (it.hasNext()) {
- sb.append(delimiter);
- sb.append(it.next());
- }
- return sb.toString();
- }
-}
diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java
index 2e24531..3bc557c 100644
--- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java
+++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java
@@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunClient;
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
+import io.github.chronosx88.JGUN.storage.MemoryStorage;
import java.net.Inet4Address;
import java.net.URISyntaxException;
@@ -9,7 +9,7 @@ import java.net.UnknownHostException;
public class MainClient {
public static void main(String[] args) throws URISyntaxException, UnknownHostException {
- GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph());
+ GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new MemoryStorage());
gunClient.connect();
}
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java
index aa4bd7c..50a68b0 100644
--- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java
+++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java
@@ -1,55 +1,52 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.Gun;
-import io.github.chronosx88.JGUN.futures.FutureGet;
-import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
-import org.json.JSONObject;
+import io.github.chronosx88.JGUN.storage.MemoryStorage;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-public class MainClientServer {
- public static void main(String[] args) {
- GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new InMemoryGraph());
- gunSuperNode.start();
- Runnable task = () -> {
- Gun gun = null;
- try {
- gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph());
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- gun.get("random").on(data -> {
- if(data != null) {
- System.out.println("New change in \"random\"! " + data.toString(2));
- }
- });
- gun.get("random").get("dVFtzE9CL").on(data -> {
- if(data != null) {
- System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2));
- } else {
- System.out.println("Now random/dVFtzE9CL is null!");
- }
-
- });
- gun.get("random").get("dVFtzE9CL").map(((key, value) -> {
- System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString());
- }));
- gun.get("random").map(((key, value) -> {
- System.out.println("[Map] New change in \"random\"! " + key + " : " + value);
- }));
- System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await());
- System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await());
- System.out.println("Deleting an item random/dVFtzE9CL");
- gun.get("random").get("dVFtzE9CL").put(null).await();
- gun.get("random").put(new JSONObject().put("hello", "world")).await();
- };
-
- Executor executorService = Executors.newSingleThreadExecutor();
- executorService.execute(task);
- }
-}
+//public class MainClientServer {
+// public static void main(String[] args) {
+// GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new MemoryStorage());
+// gunSuperNode.start();
+// Runnable task = () -> {
+// Gun gun = null;
+// try {
+// gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new MemoryStorage());
+// } catch (UnknownHostException e) {
+// e.printStackTrace();
+// }
+// gun.get("random").on(data -> {
+// if(data != null) {
+// System.out.println("New change in \"random\"! " + data.toString(2));
+// }
+// });
+// gun.get("random").get("dVFtzE9CL").on(data -> {
+// if(data != null) {
+// System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2));
+// } else {
+// System.out.println("Now random/dVFtzE9CL is null!");
+// }
+//
+// });
+// gun.get("random").get("dVFtzE9CL").map(((key, value) -> {
+// System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString());
+// }));
+// gun.get("random").map(((key, value) -> {
+// System.out.println("[Map] New change in \"random\"! " + key + " : " + value);
+// }));
+// System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await());
+// System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await());
+// System.out.println("Deleting an item random/dVFtzE9CL");
+// gun.get("random").get("dVFtzE9CL").put(null).await();
+// gun.get("random").put(new JSONObject().put("hello", "world")).await();
+// };
+//
+// Executor executorService = Executors.newSingleThreadExecutor();
+// executorService.execute(task);
+// }
+//}
diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java
index 873dbef..e6835e1 100644
--- a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java
+++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java
@@ -1,11 +1,11 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
-import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
+import io.github.chronosx88.JGUN.storage.MemoryStorage;
public class MainServer {
public static void main(String[] args) {
- GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new InMemoryGraph());
+ GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new MemoryStorage());
gunSuperNode.start();
}
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java
index f624d9a..2801881 100644
--- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java
+++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java
@@ -3,8 +3,10 @@ package io.github.chronosx88.JGUN.futures;
import java.util.concurrent.ExecutionException;
import java9.util.concurrent.CompletableFuture;
+import lombok.Getter;
+@Getter
public class BaseCompletableFuture extends CompletableFuture {
private final String futureID;
@@ -13,10 +15,6 @@ public class BaseCompletableFuture extends CompletableFuture {
futureID = id;
}
- public String getFutureID() {
- return futureID;
- }
-
public void addListener(final BaseFutureListener listener) {
this.whenCompleteAsync((t, throwable) -> {
if(throwable == null) {
diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java
index 31a5800..b4c79dc 100644
--- a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java
+++ b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java
@@ -1,8 +1,8 @@
package io.github.chronosx88.JGUN.futures;
-import org.json.JSONObject;
+import io.github.chronosx88.JGUN.models.MemoryGraph;
-public class FutureGet extends BaseCompletableFuture {
+public class FutureGet extends BaseCompletableFuture {
public FutureGet(String id) {
super(id);
}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java
new file mode 100644
index 0000000..5c9bf30
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java
@@ -0,0 +1,10 @@
+package io.github.chronosx88.JGUN.models;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+@Data
+public abstract class BaseMessage {
+ @JsonProperty("#")
+ private String id;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java
new file mode 100644
index 0000000..51ccd9a
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java
@@ -0,0 +1,27 @@
+package io.github.chronosx88.JGUN.models;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.jackson.Jacksonized;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@Data
+public class MemoryGraph {
+ @JsonIgnore
+ public final Map nodes = new LinkedHashMap<>();
+
+ @JsonAnyGetter
+ public Map nodes() {
+ return nodes;
+ }
+
+ @JsonAnySetter
+ public void putNodes(String id, Node node) {
+ nodes.put(id, node);
+ }
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/Node.java b/src/main/java/io/github/chronosx88/JGUN/models/Node.java
new file mode 100644
index 0000000..49d429d
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/Node.java
@@ -0,0 +1,34 @@
+package io.github.chronosx88.JGUN.models;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.jackson.Jacksonized;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@Data
+@Jacksonized
+@Builder
+public class Node {
+ @JsonProperty("_")
+ private NodeMetadata metadata;
+
+ @JsonIgnore
+ @Builder.Default
+ public Map values = new LinkedHashMap<>(); // Data
+
+ @JsonAnyGetter
+ public Map getValues() {
+ return values;
+ }
+
+ @JsonAnySetter
+ public void allSetter(String key, String value) {
+ values.put(key, value);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java b/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java
new file mode 100644
index 0000000..8d6ae11
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/NodeMetadata.java
@@ -0,0 +1,21 @@
+package io.github.chronosx88.JGUN.models;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.jackson.Jacksonized;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Data
+@Builder
+@Jacksonized
+public class NodeMetadata {
+ @JsonProperty(">")
+ @Builder.Default
+ private Map states = new HashMap<>(); // field -> state
+
+ @JsonProperty("#")
+ private String nodeID;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java
new file mode 100644
index 0000000..5a64d68
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java
@@ -0,0 +1,16 @@
+package io.github.chronosx88.JGUN.models.acks;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.chronosx88.JGUN.models.BaseMessage;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public abstract class BaseAck extends BaseMessage {
+ @JsonProperty("@")
+ private String replyTo;
+ private String ok;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java
new file mode 100644
index 0000000..be491fb
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java
@@ -0,0 +1,17 @@
+package io.github.chronosx88.JGUN.models.acks;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.chronosx88.JGUN.models.MemoryGraph;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+@Jacksonized
+public class GetAck extends BaseAck {
+ @JsonProperty("put")
+ private MemoryGraph data;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java
new file mode 100644
index 0000000..7495e97
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java
@@ -0,0 +1,12 @@
+package io.github.chronosx88.JGUN.models.acks;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@Builder
+@Jacksonized
+public class PutAck extends BaseAck {}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java
new file mode 100644
index 0000000..5b557e1
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java
@@ -0,0 +1,13 @@
+package io.github.chronosx88.JGUN.models.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.chronosx88.JGUN.models.BaseMessage;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class GetRequest extends BaseMessage {
+ @JsonProperty("get")
+ private GetRequestParams params;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java
new file mode 100644
index 0000000..8d3d68a
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java
@@ -0,0 +1,11 @@
+package io.github.chronosx88.JGUN.models.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class GetRequestParams {
+ @JsonProperty("#")
+ private String nodeID;
+
+ @JsonProperty(".")
+ private String field;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java
new file mode 100644
index 0000000..80e706b
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java
@@ -0,0 +1,18 @@
+package io.github.chronosx88.JGUN.models.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.chronosx88.JGUN.models.BaseMessage;
+import io.github.chronosx88.JGUN.models.Node;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.jackson.Jacksonized;
+
+@Data
+@Jacksonized
+@Builder
+@EqualsAndHashCode(callSuper = true)
+public class PutRequest extends BaseMessage {
+ @JsonProperty("put")
+ private Node[] params;
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java
index c1c049c..e2fcc92 100644
--- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java
+++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java
@@ -1,24 +1,22 @@
package io.github.chronosx88.JGUN.nodes;
-import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
-
+import io.github.chronosx88.JGUN.NetworkHandler;
+import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
-import org.json.JSONObject;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
public class GunClient extends WebSocketClient implements Peer {
- private Dup dup = new Dup();
- private final Dispatcher dispatcher;
+ private Dup dup = new Dup(1000*9);
+ private final NetworkHandler handler;
- public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException {
+ public GunClient(InetAddress address, int port, Storage storage) throws URISyntaxException {
super(new URI("ws://" + address.getHostAddress() + ":" + port));
- this.dispatcher = new Dispatcher(storage, this, dup);
+ this.handler = new NetworkHandler(storage, this, dup);
}
@Override
@@ -28,10 +26,7 @@ public class GunClient extends WebSocketClient implements Peer {
@Override
public void onMessage(String message) {
- JSONObject jsonMsg = new JSONObject(message);
- if(dup.check(jsonMsg.getString("#"))){ return; }
- dup.track(jsonMsg.getString("#"));
- dispatcher.handleIncomingMessage(jsonMsg);
+ // TODO
}
@Override
@@ -45,10 +40,6 @@ public class GunClient extends WebSocketClient implements Peer {
ex.printStackTrace();
}
- public Dispatcher getDispatcher() {
- return dispatcher;
- }
-
@Override
public void emit(String data) {
this.send(data);
diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java
index afd78b3..e052149 100644
--- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java
+++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java
@@ -1,24 +1,22 @@
package io.github.chronosx88.JGUN.nodes;
-import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup;
-import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
-
+import io.github.chronosx88.JGUN.NetworkHandler;
+import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
-import org.json.JSONObject;
import java.net.InetSocketAddress;
public class GunSuperPeer extends WebSocketServer implements Peer {
- private Dup dup = new Dup();
- private Dispatcher dispatcher;
+ private Dup dup = new Dup(1000*9);
+ private NetworkHandler handler;
- public GunSuperPeer(int port, StorageBackend storageBackend) {
+ public GunSuperPeer(int port, Storage storage) {
super(new InetSocketAddress(port));
setReuseAddr(true);
- dispatcher = new Dispatcher(storageBackend, this, dup);
+ handler = new NetworkHandler(storage, this, dup);
}
@Override
@@ -35,10 +33,7 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
@Override
public void onMessage(WebSocket conn, String message) {
- JSONObject jsonMsg = new JSONObject(message);
- if(dup.check(jsonMsg.getString("#"))){ return; }
- dup.track(jsonMsg.getString("#"));
- dispatcher.handleIncomingMessage(jsonMsg);
+ // TODO
}
@Override
diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java
new file mode 100644
index 0000000..a2de416
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java
@@ -0,0 +1,47 @@
+package io.github.chronosx88.JGUN.storage;
+
+import io.github.chronosx88.JGUN.models.Node;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+public class MemoryStorage extends Storage {
+ private final Map nodes;
+
+ public MemoryStorage() {
+ nodes = new LinkedHashMap<>();
+ }
+
+ public Node getNode(String id) {
+ return nodes.get(id);
+ }
+
+ @Override
+ void updateNode(Node node) {
+ // TODO
+ throw new UnsupportedOperationException("TODO");
+ }
+
+ public void addNode(String id, Node incomingNode) {
+ nodes.put(id, incomingNode);
+ }
+
+ public boolean hasNode(String id) {
+ return nodes.containsKey(id);
+ }
+
+ public Set> entries() {
+ return nodes.entrySet();
+ }
+
+ public Collection nodes() {
+ return nodes.values();
+ }
+
+ public boolean isEmpty() {
+ return nodes.isEmpty();
+ }
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java
new file mode 100644
index 0000000..d2234c5
--- /dev/null
+++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java
@@ -0,0 +1,100 @@
+package io.github.chronosx88.JGUN.storage;
+
+import io.github.chronosx88.JGUN.HAM;
+import io.github.chronosx88.JGUN.NodeChangeListener;
+import io.github.chronosx88.JGUN.models.MemoryGraph;
+import io.github.chronosx88.JGUN.models.Node;
+import io.github.chronosx88.JGUN.models.NodeMetadata;
+
+import java.util.*;
+
+public abstract class Storage {
+ abstract Node getNode(String id);
+ abstract void updateNode(Node node);
+ abstract void addNode(String id, Node node);
+ abstract boolean hasNode(String id);
+ abstract Set> entries();
+ abstract Collection nodes();
+ abstract boolean isEmpty();
+
+ /**
+ * Merge graph update (usually received from the network)
+ * @param update Graph update
+ * @param changeListeners
+ * @param forEachListeners
+ */
+ public void mergeUpdate(MemoryGraph update, Map changeListeners, Map forEachListeners) {
+ long machine = System.currentTimeMillis();
+ MemoryGraph diff = new MemoryGraph();
+ for (Map.Entry entry : update.getNodes().entrySet()) {
+ Node node = entry.getValue();
+ Node diffNode = this.mergeNode(node, machine);
+ if (Objects.nonNull(diffNode)) {
+ diff.nodes.put(diffNode.getMetadata().getNodeID(), diffNode);
+ }
+ }
+ if (!diff.nodes.isEmpty()) {
+ for (Map.Entry diffEntry : diff.getNodes().entrySet()) {
+ Node changedNode = diffEntry.getValue();
+ if (!this.hasNode(changedNode.getMetadata().getNodeID())) {
+ this.addNode(changedNode.getMetadata().getNodeID(), changedNode);
+ } else {
+ this.updateNode(changedNode);
+ }
+
+ if (changeListeners.containsKey(diffEntry.getKey())) {
+ changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue());
+ }
+ if (forEachListeners.containsKey(diffEntry.getKey())) {
+ for (Map.Entry nodeEntry : changedNode.getValues().entrySet()) {
+ forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Merge updated node
+ * @param incomingNode Updated node
+ * @param machineState Current machine state
+ * @return Node with changes or null if no changes
+ */
+ public Node mergeNode(Node incomingNode, long machineState) {
+ Node changedNode = null;
+ for (String key : incomingNode.getValues().keySet()) {
+ Object value = incomingNode.getValues().get(key);
+ long state = incomingNode.getMetadata().getStates().get(key);
+ long previousState = -1;
+ Object currentValue = null;
+ if (this.hasNode(incomingNode.getMetadata().getNodeID())) {
+ Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID());
+ Long prevStateFromStorage = currentNode.getMetadata().getStates().get(key);
+ if (!Objects.isNull(prevStateFromStorage)) {
+ previousState = prevStateFromStorage;
+ }
+ currentValue = currentNode.getValues().get(key);
+ }
+ HAM.HAMResult ham = HAM.ham(machineState, state, previousState, value, currentValue);
+
+ if (!ham.incoming) {
+ if (ham.defer) {
+ // TODO handle deferred value
+ }
+ continue;
+ }
+
+ if (Objects.isNull(changedNode)) {
+ changedNode = Node.builder()
+ .metadata(NodeMetadata.builder()
+ .nodeID(incomingNode.getMetadata().getNodeID())
+ .build())
+ .build();
+ }
+ changedNode.values.put(key, value);
+ changedNode.getMetadata().getStates().put(key, state);
+ }
+
+ return changedNode;
+ }
+}
diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java
deleted file mode 100644
index f8bee80..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/storageBackends/InMemoryGraph.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package io.github.chronosx88.JGUN.storageBackends;
-
-import io.github.chronosx88.JGUN.Node;
-
-import org.json.JSONObject;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class InMemoryGraph implements StorageBackend {
-
- private final HashMap nodes;
-
- public InMemoryGraph(JSONObject source) {
- nodes = new LinkedHashMap<>();
-
- for (String soul : source.keySet())
- nodes.put(soul, new Node(source.getJSONObject(soul)));
- }
-
- public InMemoryGraph() {
- nodes = new LinkedHashMap<>();
- }
-
- public Node getNode(String soul) {
- return nodes.get(soul);
- }
-
- public void addNode(String soul, Node incomingNode) {
- nodes.put(soul, incomingNode);
- }
-
- public boolean hasNode(String soul) {
- return nodes.containsKey(soul);
- }
-
- public Set> entries() {
- return nodes.entrySet();
- }
-
- public Collection nodes() { return nodes.values(); }
-
- @Override
- public String toString() {
- JSONObject jsonObject = new JSONObject();
- for(Map.Entry entry : nodes.entrySet()) {
- jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
- }
- return jsonObject.toString();
- }
-
- public String toPrettyString() {
- JSONObject jsonObject = new JSONObject();
- for(Map.Entry entry : nodes.entrySet()) {
- jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
- }
- return jsonObject.toString(2);
- }
-
- public JSONObject toJSONObject() {
- JSONObject jsonObject = new JSONObject();
- for(Map.Entry entry : nodes.entrySet()) {
- jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
- }
- return jsonObject;
- }
-
- public JSONObject toUserJSONObject() {
- JSONObject jsonObject = new JSONObject();
- for(Map.Entry entry : nodes.entrySet()) {
- jsonObject.put(entry.getKey(), entry.getValue().toUserJSONObject());
- }
- return jsonObject;
- }
-
- public boolean isEmpty() {
- return nodes.isEmpty();
- }
-}
diff --git a/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java b/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java
deleted file mode 100644
index e383f96..0000000
--- a/src/main/java/io/github/chronosx88/JGUN/storageBackends/StorageBackend.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package io.github.chronosx88.JGUN.storageBackends;
-
-import io.github.chronosx88.JGUN.Node;
-import org.json.JSONObject;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-public interface StorageBackend {
- Node getNode(String soul);
- void addNode(String soul, Node node);
- boolean hasNode(String soul);
- Set> entries();
- Collection nodes();
- String toString();
- String toPrettyString();
- JSONObject toJSONObject();
- boolean isEmpty();
-}