[WIP] Revamp code base

This commit is contained in:
ChronosX88 2023-11-14 03:06:42 +03:00
parent a51e5959da
commit 3338c54680
35 changed files with 571 additions and 913 deletions

View File

@ -24,7 +24,7 @@
A realtime, decentralized, offline-first, mutable graph protocol to sync the Internet. A realtime, decentralized, offline-first, mutable graph protocol to sync the Internet.
## Requirements ## Requirements
* JRE/JDK >= 1.8.0 * JRE/JDK >= 11
## Building ## Building
1. Clone repo: 1. Clone repo:
@ -34,11 +34,8 @@ $ cd JGUN
``` ```
2. Compile it: 2. Compile it:
```bash ```bash
./gradlew shadowJar ./gradlew build
``` ```
3. Compiled JAR located in `./build/libs/`
(Also exists precompiled JARs - see Releases (publishing to Maven coming soon...))
<sub>[⇧ back to top](#contents)</sub> <sub>[⇧ back to top](#contents)</sub>

View File

@ -1,42 +1,26 @@
plugins { plugins {
id 'com.github.johnrengelman.shadow' version '4.0.4'
id 'java' id 'java'
} }
group 'io.github.chronosx88' group 'io.github.chronosx88'
version '0.2.6' version '0.2.6'
sourceCompatibility = 1.8 java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
repositories { repositories {
mavenCentral() mavenCentral()
} }
dependencies { dependencies {
implementation 'org.java-websocket:Java-WebSocket:1.4.0' implementation 'org.java-websocket:Java-WebSocket:1.5.4'
implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0' implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0'
implementation group: 'org.json', name: 'json', version: '20180813' implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3'
testCompile group: 'junit', name: 'junit', version: '4.12' implementation 'javax.cache:cache-api:1.1.1'
implementation 'com.github.ben-manes.caffeine:jcache:3.1.5'
compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
} }
shadowJar {
relocate 'org.json', 'shadow.org.json'
}
task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives sourcesJar
archives javadocJar
}

View File

@ -1,6 +1,5 @@
#Wed May 01 20:08:10 MSK 2019
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip

View File

@ -1,109 +0,0 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.java_websocket.client.WebSocketClient;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Dispatcher {
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener> changeListeners = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener.ForEach> forEachListeners = new ConcurrentHashMap<>();
private final Peer peer;
private final StorageBackend graphStorage;
private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool();
public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
this.graphStorage = graphStorage;
this.peer = peer;
this.dup = dup;
}
public void addPendingFuture(BaseCompletableFuture<?> future) {
pendingFutures.put(future.getFutureID(), future);
}
public void handleIncomingMessage(JSONObject message) {
if(message.has("put")) {
JSONObject ack = handlePut(message);
peer.emit(ack.toString());
}
if(message.has("get")) {
JSONObject ack = handleGet(message);
peer.emit(ack.toString());
}
if(message.has("@")) {
handleIncomingAck(message);
}
peer.emit(message.toString());
}
private JSONObject handleGet(JSONObject getData) {
InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage);
return new JSONObject() // Acknowledgment
.put( "#", dup.track(Dup.random()) )
.put( "@", getData.getString("#") )
.put( "put", getResults.toJSONObject() )
.put( "ok", !(getResults.isEmpty()) );
}
private JSONObject handlePut(JSONObject message) {
boolean success = HAM.mix(new InMemoryGraph(message.getJSONObject("put")), graphStorage, changeListeners, forEachListeners);
return new JSONObject() // Acknowledgment
.put( "#", dup.track(Dup.random()) )
.put( "@", message.getString("#") )
.put( "ok", success);
}
private void handleIncomingAck(JSONObject ack) {
if(ack.has("put")) {
if(pendingFutures.containsKey(ack.getString("@"))) {
BaseCompletableFuture<?> future = pendingFutures.get(ack.getString("@"));
if(future instanceof FutureGet) {
((FutureGet) future).complete(new InMemoryGraph(ack.getJSONObject("put")).toUserJSONObject());
}
}
}
if(ack.has("ok")) {
if(pendingFutures.containsKey(ack.getString("@"))) {
BaseCompletableFuture<?> future = pendingFutures.get(ack.getString("@"));
if(future instanceof FuturePut) {
((FuturePut) future).complete(ack.getBoolean("ok"));
}
}
}
}
public void sendPutRequest(String messageID, JSONObject data) {
executorService.execute(() -> {
InMemoryGraph graph = Utils.prepareDataForPut(data);
peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString());
});
}
public void sendGetRequest(String messageID, String key, String field) {
executorService.execute(() -> {
JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field);
peer.emit(jsonGet.toString());
});
}
public void addChangeListener(String soul, NodeChangeListener listener) {
changeListeners.put(soul, listener);
}
public void addForEachChangeListener(String soul, NodeChangeListener.ForEach listener) {
forEachListeners.put(soul, listener);
}
}

View File

@ -1,40 +1,35 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import java.util.Map; import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.spi.CachingProvider;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;
public class Dup { public class Dup {
private static char[] randomSeed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray(); private final Cache<String, Long> cache;
private Map<String, Long> s = new ConcurrentHashMap<>();
private DupOpt opt = new DupOpt();
private Thread to = null;
public Dup() { public Dup(long age) {
opt.max = 1000; CachingProvider cachingProvider = Caching.getCachingProvider();
opt.age = 1000 * 9; CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, Long> config = new MutableConfiguration<>();
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age)));
this.cache = cacheManager.createCache("dup", config);
} }
public String track(String id) { private void track(String id) {
s.put(id, System.currentTimeMillis()); cache.put(id, System.currentTimeMillis());
if(to == null) {
Utils.setTimeout(() -> {
for(Map.Entry<String, Long> entry : s.entrySet()) {
if(opt.age > (System.currentTimeMillis() - entry.getValue()))
continue;
s.remove(entry.getKey());
}
to = null;
}, opt.age);
}
return id;
} }
public boolean check(String id) { public boolean isDuplicated(String id) {
if(s.containsKey(id)) { if(cache.containsKey(id)) {
track(id);
return true; return true;
} else { } else {
track(id);
return false; return false;
} }
} }

View File

@ -1,6 +0,0 @@
package io.github.chronosx88.JGUN;
public class DupOpt {
public int max;
public int age;
}

View File

@ -1,28 +1,38 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.nodes.GunClient; import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend; import io.github.chronosx88.JGUN.storage.Storage;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Gun { public class Gun {
private Dispatcher dispatcher;
private GunClient gunClient; private GunClient gunClient;
private final Map<String, NodeChangeListener> changeListeners = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener.ForEach> forEachListeners = new ConcurrentHashMap<>();
public Gun(InetAddress address, int port, StorageBackend storage) { public Gun(InetAddress address, int port, Storage storage) {
try { try {
this.gunClient = new GunClient(address, port, storage); this.gunClient = new GunClient(address, port, storage);
this.dispatcher = gunClient.getDispatcher();
this.gunClient.connectBlocking(); this.gunClient.connectBlocking();
} catch (URISyntaxException | InterruptedException e) { } catch (URISyntaxException | InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public PathRef get(String key) { public PathReference get(String key) {
PathRef pathRef = new PathRef(dispatcher); PathReference pathRef = new PathReference(this);
pathRef.get(key); pathRef.get(key);
return pathRef; return pathRef;
} }
protected void addChangeListener(String nodeID, NodeChangeListener listener) {
changeListeners.put(nodeID, listener);
}
protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) {
forEachListeners.put(nodeID, listener);
}
} }

View File

@ -1,196 +1,36 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.json.JSONObject;
import java.util.Map;
public class HAM { public class HAM {
static class HAMResult { public static class HAMResult {
public boolean defer = false; // Defer means that the current state is greater than our computer time, and should only be processed when our computer time reaches this state public boolean defer = false; // Defer means that the current state is greater than our computer time, and should only be processed when our computer time reaches this state
public boolean historical = false; // Historical means the old state. This is usually ignored. public boolean historical = false; // Historical means the old state. This is usually ignored.
public boolean converge = false; // Everything is fine, you can do merge
public boolean incoming = false; // Leave incoming value public boolean incoming = false; // Leave incoming value
public boolean current = false; // Leave current value public boolean current = false; // Leave current value
public boolean state = false;
} }
public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException { public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException {
HAMResult result = new HAMResult(); HAMResult result = new HAMResult();
if(machineState < incomingState) { if (machineState < incomingState) {
// the incoming value is outside the boundary of the machine's state, it must be reprocessed in another state. // the incoming value is outside the boundary of the machine's state, it must be reprocessed in another state.
result.defer = true; result.defer = true;
return result; return result;
} } else if (currentState > incomingState) {
if(incomingState < currentState) {
// the incoming value is within the boundary of the machine's state, but not within the range. // the incoming value is within the boundary of the machine's state, but not within the range.
result.historical = true; result.historical = true;
result.current = true;
return result; return result;
} } else if (currentState < incomingState) {
if(currentState < incomingState) {
// the incoming value is within both the boundary and the range of the machine's state. // the incoming value is within both the boundary and the range of the machine's state.
result.converge = true;
result.incoming = true; result.incoming = true;
return result; return result;
} } else { // if incoming state and current state is the same
if(incomingState == currentState) { if (incomingValue.equals(currentValue)) {
// if incoming state and current state is the same
if(incomingValue.equals(currentValue)) {
result.state = true;
return result;
}
if((incomingValue.toString().compareTo(currentValue.toString())) < 0) {
result.converge = true;
result.current = true; result.current = true;
return result; return result;
} }
if((currentValue.toString().compareTo(incomingValue.toString())) < 0) { result.incoming = true; // always update local value with incoming value if state is the same
result.converge = true;
result.incoming = true;
return result; return result;
} }
} }
throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!");
}
public static boolean mix(InMemoryGraph change, StorageBackend data, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
long machine = System.currentTimeMillis();
InMemoryGraph diff = null;
for(Map.Entry<String, Node> entry : change.entries()) {
Node node = entry.getValue();
for(String key : node.values.keySet()) {
Object value = node.values.get(key);
if ("_".equals(key)) { continue; }
long state = node.states.getLong(key);
long was = -1;
Object known = null;
if(data == null) {
data = new InMemoryGraph();
}
if(data.hasNode(node.soul)) {
if(data.getNode(node.soul).states.opt(key) != null) {
was = data.getNode(node.soul).states.getLong(key);
}
known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key);
}
HAMResult ham = null;
try {
ham = ham(machine, state, was, value, known);
} catch (IllegalArgumentException e) {
continue;
}
if(ham != null) {
if(!ham.incoming) {
if(ham.defer) {
System.out.println("DEFER: " + key + " " + value);
// Hack for accessing value in lambda without making the variable final
StorageBackend[] graph = new StorageBackend[] {data};
Utils.setTimeout(() -> mix(node, graph[0], changeListeners, forEachListeners), (int) (state - machine));
}
continue;
}
}
if(diff == null) {
diff = new InMemoryGraph();
}
if(!diff.hasNode(node.soul)) {
diff.addNode(node.soul, Utils.newNode(node.soul, new JSONObject()));
}
if(!data.hasNode(node.soul)) {
data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject()));
}
Node tmp = data.getNode(node.soul);
tmp.values.put(key, value);
tmp.states.put(key, state);
data.addNode(node.soul, tmp);
diff.getNode(node.soul).values.put(key, value);
diff.getNode(node.soul).states.put(key, state);
}
}
if(diff != null) {
for(Map.Entry<String, Node> entry : diff.entries()) {
if(changeListeners.containsKey(entry.getKey())) {
changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
}
if(forEachListeners.containsKey(entry.getKey())) {
for(Map.Entry<String, Object> jsonEntry : entry.getValue().values.toMap().entrySet()) {
forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
}
}
}
}
return true;
}
public static boolean mix(Node incomingNode, StorageBackend data, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
long machine = System.currentTimeMillis();
InMemoryGraph diff = null;
for(String key : incomingNode.values.keySet()) {
Object value = incomingNode.values.get(key);
if ("_".equals(key)) { continue; }
long state = incomingNode.states.getLong(key);
long was = -1;
Object known = null;
if(data == null) {
data = new InMemoryGraph();
}
if(data.hasNode(incomingNode.soul)) {
if(data.getNode(incomingNode.soul).states.opt(key) != null) {
was = data.getNode(incomingNode.soul).states.getLong(key);
}
known = data.getNode(incomingNode.soul).values.opt(key) == null ? 0 : data.getNode(incomingNode.soul).values.opt(key);
}
HAMResult ham = ham(machine, state, was, value, known);
if(!ham.incoming) {
if(ham.defer) {
System.out.println("DEFER: " + key + " " + value);
// Hack for accessing value in lambda without making the variable final
StorageBackend[] graph = new StorageBackend[] {data};
Utils.setTimeout(() -> mix(incomingNode, graph[0], changeListeners, forEachListeners), (int) (state - machine));
}
continue;
}
if(diff == null) {
diff = new InMemoryGraph();
}
if(!diff.hasNode(incomingNode.soul)) {
diff.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject()));
}
if(!data.hasNode(incomingNode.soul)) {
data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject()));
}
Node tmp = data.getNode(incomingNode.soul);
tmp.values.put(key, value);
tmp.states.put(key, state);
data.addNode(incomingNode.soul, tmp);
diff.getNode(incomingNode.soul).values.put(key, value);
diff.getNode(incomingNode.soul).states.put(key, state);
}
if(diff != null) {
for(Map.Entry<String, Node> entry : diff.entries()) {
if(changeListeners.containsKey(entry.getKey())) {
changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
}
if(forEachListeners.containsKey(entry.getKey())) {
for(Map.Entry<String, Object> jsonEntry : entry.getValue().values.toMap().entrySet()) {
forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
}
}
}
}
return true;
}
} }

View File

@ -0,0 +1,79 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.acks.BaseAck;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.acks.PutAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storage.Storage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class NetworkHandler {
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>();
private final Peer peer;
private final Storage graphStorage;
private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool();
public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) {
this.graphStorage = graphStorage;
this.peer = peer;
this.dup = dup;
}
public void addPendingFuture(BaseCompletableFuture<?> future) {
pendingFutures.put(future.getFutureID(), future);
}
public void handleIncomingMessage(BaseMessage message) {
if (message instanceof GetRequest) {
handleGet((GetRequest) message);
} else if (message instanceof PutRequest) {
handlePut((PutRequest) message);
} else if (message instanceof BaseAck) {
handleAck((BaseAck) message);
}
peer.emit(message.toString());
}
private GetAck handleGet(GetRequest request) {
// TODO
throw new UnsupportedOperationException("TODO");
}
private PutAck handlePut(PutRequest request) {
// TODO
throw new UnsupportedOperationException("TODO");
}
private void handleAck(BaseAck ack) {
if (ack instanceof GetAck) {
// TODO
} else if (ack instanceof PutAck) {
// TODO
}
throw new UnsupportedOperationException("TODO");
}
public void sendPutRequest(String messageID, MemoryGraph data) {
executorService.execute(() -> {
// TODO
});
}
public void sendGetRequest(String messageID, String key, String field) {
executorService.execute(() -> {
// TODO
});
}
}

View File

@ -1,82 +0,0 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import org.json.JSONObject;
import java.io.Serializable;
public class Node implements Comparable<Node>, Serializable {
public JSONObject values; // Data
public JSONObject states; // Metadata for diff
public String soul; // i.e. ID of node
/**
* Create a Peer from a JSON object.
*
* @param rawData JSON object, which contains the data
*/
public Node(JSONObject rawData) {
this.values = new JSONObject(rawData.toString());
this.states = values.getJSONObject("_").getJSONObject(">");
this.soul = values.getJSONObject("_").getString("#");
values.remove("_");
}
@Override
public int compareTo(Node other) {
return soul.compareTo(other.soul);
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other instanceof String)
return soul.equals(other);
if (other instanceof Node)
return compareTo((Node) other) == 0;
return false;
}
@Override
public int hashCode() {
return soul.hashCode();
}
public boolean isNode(String key) {
return values.optJSONObject(key) != null;
}
public Node getNode(String key, InMemoryGraph g) {
String soulRef = values.getJSONObject(key).getString("#");
return g.getNode(soulRef);
}
@Override
public String toString() {
JSONObject jsonObject = new JSONObject(values.toString());
jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
return jsonObject.toString();
}
public JSONObject toJSONObject() {
JSONObject jsonObject = new JSONObject(values.toString());
jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
return jsonObject;
}
public JSONObject getMetadata() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("_", new JSONObject().put("#", soul).put(">", states));
return jsonObject;
}
public void setMetadata(JSONObject metadata) {
soul = metadata.getJSONObject("_").getString("#");
states = metadata.getJSONObject("_").getJSONObject(">");
}
public JSONObject toUserJSONObject() {
return values;
}
}

View File

@ -1,10 +1,10 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import org.json.JSONObject; import io.github.chronosx88.JGUN.models.Node;
@FunctionalInterface @FunctionalInterface
public interface NodeChangeListener { public interface NodeChangeListener {
void onChange(JSONObject node); void onChange(Node node);
interface ForEach { interface ForEach {
void onChange(String key, Object value); void onChange(String key, Object value);

View File

@ -1,148 +0,0 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import java9.util.concurrent.CompletableFuture;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
public class PathRef {
private final ArrayList<String> path = new ArrayList<>();
private Dispatcher dispatcher;
public PathRef(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public PathRef get(String key) {
path.add(key);
return this;
}
public FutureGet getData() {
FutureGet futureGet = new FutureGet(Dup.random());
new Thread(() -> {
Iterator<String> iterator = path.iterator();
String rootSoul = iterator.next();
String field = iterator.hasNext() ? iterator.next() : null;
iterator = path.iterator();
iterator.next();
CompletableFuture<JSONObject> future = CompletableFuture.supplyAsync(() -> {
FutureGet futureGetRootSoul = new FutureGet(Dup.random());
dispatcher.addPendingFuture(futureGetRootSoul);
dispatcher.sendGetRequest(futureGetRootSoul.getFutureID(), rootSoul, field);
JSONObject result = futureGetRootSoul.await();
if(result != null && result.isEmpty()) {
result = null;
}
return result == null ? null : result.getJSONObject(rootSoul);
});
do {
String soul = iterator.hasNext() ? iterator.next() : null;
String nextField = iterator.hasNext() ? iterator.next() : null;
future = future.thenApply(jsonObject -> {
if(jsonObject != null) {
if(soul != null) {
JSONObject tmp = null;
if(jsonObject.keySet().contains("#")) {
String nodeRef = jsonObject.getString("#");
FutureGet get = new FutureGet(Dup.random());
dispatcher.addPendingFuture(get);
dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
JSONObject result = get.await();
if(result != null && result.isEmpty()) {
result = null;
}
result = result == null ? null : result.getJSONObject(nodeRef);
if(result != null) {
result = nextField == null ? result : result.getJSONObject(nextField);
}
tmp = result;
}
if(tmp != null) {
if(tmp.opt(soul) instanceof JSONObject) {
String nodeRef = tmp.getJSONObject(soul).getString("#");
FutureGet get = new FutureGet(Dup.random());
dispatcher.addPendingFuture(get);
dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
JSONObject result = get.await();
if(result != null && result.isEmpty()) {
result = null;
}
result = result == null ? null : result.getJSONObject(nodeRef);
if(result != null) {
result = nextField == null ? result : result.getJSONObject(nextField);
}
return result;
} else {
return tmp;
}
} else {
if(jsonObject.opt(soul) instanceof JSONObject) {
String nodeRef = jsonObject.getJSONObject(soul).getString("#");
FutureGet get = new FutureGet(Dup.random());
dispatcher.addPendingFuture(get);
dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
JSONObject result = get.await();
if(result != null && result.isEmpty()) {
result = null;
}
result = result == null ? null : result.getJSONObject(nodeRef);
if(result != null) {
result = nextField == null ? result : result.getJSONObject(nextField);
}
return result;
} else {
return jsonObject;
}
}
} else {
return jsonObject;
}
}
return null;
});
} while(iterator.hasNext());
try {
JSONObject data = future.get();
futureGet.complete(data);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
return futureGet;
}
public FuturePut put(JSONObject data) {
FuturePut futurePut = new FuturePut(Dup.random());
dispatcher.addPendingFuture(futurePut);
JSONObject temp = new JSONObject();
JSONObject temp1 = temp;
for (int i = 0; i < path.size(); i++) {
if(i != path.size() - 1) {
JSONObject object = new JSONObject();
temp1.put(path.get(i), object);
temp1 = object;
} else {
temp1.put(path.get(i), data == null ? JSONObject.NULL : data);
}
}
dispatcher.sendPutRequest(futurePut.getFutureID(), temp);
return futurePut;
}
public void on(NodeChangeListener changeListener) {
dispatcher.addChangeListener(Utils.join("/", path), changeListener);
}
public void map(NodeChangeListener.ForEach forEachListener) {
dispatcher.addForEachChangeListener(Utils.join("/", path), forEachListener);
}
}

View File

@ -0,0 +1,40 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import java.util.ArrayList;
import java.util.HashMap;
public class PathReference {
private final ArrayList<String> path = new ArrayList<>();
private Gun database;
public PathReference(Gun db) {
this.database = db;
}
public PathReference get(String key) {
path.add(key);
return this;
}
public FutureGet getData() {
// TODO
return null;
}
public FuturePut put(HashMap<String, Object> data) {
// TODO
return null;
}
public void on(NodeChangeListener changeListener) {
database.addChangeListener(String.join("/", path), changeListener);
}
public void map(NodeChangeListener.ForEach forEachListener) {
database.addForEachChangeListener(String.join("/", path), forEachListener);
}
}

View File

@ -1,146 +0,0 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentSkipListSet;
public class Utils {
public static Thread setTimeout(Runnable runnable, int delay){
Thread thread = new Thread(() -> {
try {
Thread.sleep(delay);
runnable.run();
}
catch (Exception e){
e.printStackTrace();
}
});
thread.start();
return thread;
}
public static Node newNode(String soul, JSONObject data) {
JSONObject states = new JSONObject();
for (String key : data.keySet()) {
states.put(key, System.currentTimeMillis());
}
data.put("_", new JSONObject().put("#", soul).put(">", states));
return new Node(data);
}
public static InMemoryGraph getRequest(JSONObject lex, StorageBackend graph) {
String soul = lex.getString("#");
String key = lex.optString(".", null);
Node node = graph.getNode(soul);
Object tmp;
if(node == null) {
return new InMemoryGraph();
}
if(key != null) {
tmp = node.values.opt(key);
if(tmp == null) {
return new InMemoryGraph();
}
Node node1 = new Node(node.toJSONObject());
node = Utils.newNode(node.soul, new JSONObject());
node.setMetadata(node1.getMetadata());
node.values.put(key, tmp);
JSONObject tmpStates = node1.states;
node.states.put(key, tmpStates.get(key));
}
InMemoryGraph ack = new InMemoryGraph();
ack.addNode(soul, node);
return ack;
}
public static InMemoryGraph prepareDataForPut(JSONObject data) {
InMemoryGraph result = new InMemoryGraph();
for (String objectKey : data.keySet()) {
Object object = data.get(objectKey);
if(object instanceof JSONObject) {
Node node = Utils.newNode(objectKey, (JSONObject) object);
ArrayList<String> path = new ArrayList<>();
path.add(objectKey);
prepareNodeForPut(node, result, path);
}
}
return result;
}
private static void prepareNodeForPut(Node node, InMemoryGraph result, ArrayList<String> path) {
for(String key : new ConcurrentSkipListSet<>(node.values.keySet())) {
Object value = node.values.get(key);
if(value instanceof JSONObject) {
path.add(key);
String soul = "";
soul = Utils.join("/", path);
Node tmpNode = Utils.newNode(soul, (JSONObject) value);
node.values.remove(key);
node.values.put(key, new JSONObject().put("#", soul));
prepareNodeForPut(tmpNode, result, new ArrayList<>(path));
result.addNode(soul, tmpNode);
path.remove(key);
}
}
result.addNode(node.soul, node);
}
public static JSONObject formatGetRequest(String messageID, String key, String field) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("#", messageID);
JSONObject getParameters = new JSONObject();
getParameters.put("#", key);
if(field != null) {
getParameters.put(".", field);
}
jsonObject.put("get", getParameters);
return jsonObject;
}
public static JSONObject formatPutRequest(String messageID, JSONObject data) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("#", messageID);
jsonObject.put("put", data);
return jsonObject;
}
/**
* This check current nodes for existing IDs in our storage, and if there are existing IDs, it means to replace them.
* Prevents trailing nodes in storage
* @param incomingGraph The graph that came to us over the wire.
* @param graphStorage Graph storage in which the incoming graph will be saved
* @return Prepared graph for saving
*/
/*public static InMemoryGraph checkIncomingNodesForID(InMemoryGraph incomingGraph, StorageBackend graphStorage) {
for (Node node : incomingGraph.nodes()) {
for(node)
}
}*/
/**
* Returns a string containing the tokens joined by delimiters.
*
* @param delimiter a CharSequence that will be inserted between the tokens. If null, the string
* "null" will be used as the delimiter.
* @param tokens an array objects to be joined. Strings will be formed from the objects by
* calling object.toString(). If tokens is null, a NullPointerException will be thrown. If
* tokens is empty, an empty string will be returned.
*/
public static String join(CharSequence delimiter, Iterable tokens) {
final Iterator<?> it = tokens.iterator();
if (!it.hasNext()) {
return "";
}
final StringBuilder sb = new StringBuilder();
sb.append(it.next());
while (it.hasNext()) {
sb.append(delimiter);
sb.append(it.next());
}
return sb.toString();
}
}

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunClient; import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import io.github.chronosx88.JGUN.storage.MemoryStorage;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -9,7 +9,7 @@ import java.net.UnknownHostException;
public class MainClient { public class MainClient {
public static void main(String[] args) throws URISyntaxException, UnknownHostException { public static void main(String[] args) throws URISyntaxException, UnknownHostException {
GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph()); GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new MemoryStorage());
gunClient.connect(); gunClient.connect();
} }
} }

View File

@ -1,55 +1,52 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.Gun; import io.github.chronosx88.JGUN.Gun;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer; import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import io.github.chronosx88.JGUN.storage.MemoryStorage;
import org.json.JSONObject;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class MainClientServer { //public class MainClientServer {
public static void main(String[] args) { // public static void main(String[] args) {
GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new InMemoryGraph()); // GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new MemoryStorage());
gunSuperNode.start(); // gunSuperNode.start();
Runnable task = () -> { // Runnable task = () -> {
Gun gun = null; // Gun gun = null;
try { // try {
gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph()); // gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new MemoryStorage());
} catch (UnknownHostException e) { // } catch (UnknownHostException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
gun.get("random").on(data -> { // gun.get("random").on(data -> {
if(data != null) { // if(data != null) {
System.out.println("New change in \"random\"! " + data.toString(2)); // System.out.println("New change in \"random\"! " + data.toString(2));
} // }
}); // });
gun.get("random").get("dVFtzE9CL").on(data -> { // gun.get("random").get("dVFtzE9CL").on(data -> {
if(data != null) { // if(data != null) {
System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2)); // System.out.println("New change in \"random/dVFtzE9CL\"! " + data.toString(2));
} else { // } else {
System.out.println("Now random/dVFtzE9CL is null!"); // System.out.println("Now random/dVFtzE9CL is null!");
} // }
//
}); // });
gun.get("random").get("dVFtzE9CL").map(((key, value) -> { // gun.get("random").get("dVFtzE9CL").map(((key, value) -> {
System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString()); // System.out.println("[Map] New change in \"random/dVFtzE9CL\"! " + key + " : " + value.toString());
})); // }));
gun.get("random").map(((key, value) -> { // gun.get("random").map(((key, value) -> {
System.out.println("[Map] New change in \"random\"! " + key + " : " + value); // System.out.println("[Map] New change in \"random\"! " + key + " : " + value);
})); // }));
System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await()); // System.out.println("[FuturePut] Success: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")).await());
System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await()); // System.out.println("[FuturePut] Putting an item again: " + gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")).await());
System.out.println("Deleting an item random/dVFtzE9CL"); // System.out.println("Deleting an item random/dVFtzE9CL");
gun.get("random").get("dVFtzE9CL").put(null).await(); // gun.get("random").get("dVFtzE9CL").put(null).await();
gun.get("random").put(new JSONObject().put("hello", "world")).await(); // gun.get("random").put(new JSONObject().put("hello", "world")).await();
}; // };
//
Executor executorService = Executors.newSingleThreadExecutor(); // Executor executorService = Executors.newSingleThreadExecutor();
executorService.execute(task); // executorService.execute(task);
} // }
} //}

View File

@ -1,11 +1,11 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer; import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import io.github.chronosx88.JGUN.storage.MemoryStorage;
public class MainServer { public class MainServer {
public static void main(String[] args) { public static void main(String[] args) {
GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new InMemoryGraph()); GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new MemoryStorage());
gunSuperNode.start(); gunSuperNode.start();
} }
} }

View File

@ -3,8 +3,10 @@ package io.github.chronosx88.JGUN.futures;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java9.util.concurrent.CompletableFuture; import java9.util.concurrent.CompletableFuture;
import lombok.Getter;
@Getter
public class BaseCompletableFuture<T> extends CompletableFuture<T> { public class BaseCompletableFuture<T> extends CompletableFuture<T> {
private final String futureID; private final String futureID;
@ -13,10 +15,6 @@ public class BaseCompletableFuture<T> extends CompletableFuture<T> {
futureID = id; futureID = id;
} }
public String getFutureID() {
return futureID;
}
public void addListener(final BaseFutureListener<T> listener) { public void addListener(final BaseFutureListener<T> listener) {
this.whenCompleteAsync((t, throwable) -> { this.whenCompleteAsync((t, throwable) -> {
if(throwable == null) { if(throwable == null) {

View File

@ -1,8 +1,8 @@
package io.github.chronosx88.JGUN.futures; package io.github.chronosx88.JGUN.futures;
import org.json.JSONObject; import io.github.chronosx88.JGUN.models.MemoryGraph;
public class FutureGet extends BaseCompletableFuture<JSONObject> { public class FutureGet extends BaseCompletableFuture<MemoryGraph> {
public FutureGet(String id) { public FutureGet(String id) {
super(id); super(id);
} }

View File

@ -0,0 +1,10 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public abstract class BaseMessage {
@JsonProperty("#")
private String id;
}

View File

@ -0,0 +1,27 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import java.util.LinkedHashMap;
import java.util.Map;
@Data
public class MemoryGraph {
@JsonIgnore
public final Map<String, Node> nodes = new LinkedHashMap<>();
@JsonAnyGetter
public Map<String, Node> nodes() {
return nodes;
}
@JsonAnySetter
public void putNodes(String id, Node node) {
nodes.put(id, node);
}
}

View File

@ -0,0 +1,34 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import java.util.LinkedHashMap;
import java.util.Map;
@Data
@Jacksonized
@Builder
public class Node {
@JsonProperty("_")
private NodeMetadata metadata;
@JsonIgnore
@Builder.Default
public Map<String, Object> values = new LinkedHashMap<>(); // Data
@JsonAnyGetter
public Map<String, Object> getValues() {
return values;
}
@JsonAnySetter
public void allSetter(String key, String value) {
values.put(key, value);
}
}

View File

@ -0,0 +1,21 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import java.util.HashMap;
import java.util.Map;
@Data
@Builder
@Jacksonized
public class NodeMetadata {
@JsonProperty(">")
@Builder.Default
private Map<String, Long> states = new HashMap<>(); // field -> state
@JsonProperty("#")
private String nodeID;
}

View File

@ -0,0 +1,16 @@
package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@EqualsAndHashCode(callSuper = true)
public abstract class BaseAck extends BaseMessage {
@JsonProperty("@")
private String replyTo;
private String ok;
}

View File

@ -0,0 +1,17 @@
package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized
public class GetAck extends BaseAck {
@JsonProperty("put")
private MemoryGraph data;
}

View File

@ -0,0 +1,12 @@
package io.github.chronosx88.JGUN.models.acks;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized
public class PutAck extends BaseAck {}

View File

@ -0,0 +1,13 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class GetRequest extends BaseMessage {
@JsonProperty("get")
private GetRequestParams params;
}

View File

@ -0,0 +1,11 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
public class GetRequestParams {
@JsonProperty("#")
private String nodeID;
@JsonProperty(".")
private String field;
}

View File

@ -0,0 +1,18 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.Node;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@Jacksonized
@Builder
@EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage {
@JsonProperty("put")
private Node[] params;
}

View File

@ -1,24 +1,22 @@
package io.github.chronosx88.JGUN.nodes; package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup; import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend; import io.github.chronosx88.JGUN.NetworkHandler;
import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.client.WebSocketClient; import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake; import org.java_websocket.handshake.ServerHandshake;
import org.json.JSONObject;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
public class GunClient extends WebSocketClient implements Peer { public class GunClient extends WebSocketClient implements Peer {
private Dup dup = new Dup(); private Dup dup = new Dup(1000*9);
private final Dispatcher dispatcher; private final NetworkHandler handler;
public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { public GunClient(InetAddress address, int port, Storage storage) throws URISyntaxException {
super(new URI("ws://" + address.getHostAddress() + ":" + port)); super(new URI("ws://" + address.getHostAddress() + ":" + port));
this.dispatcher = new Dispatcher(storage, this, dup); this.handler = new NetworkHandler(storage, this, dup);
} }
@Override @Override
@ -28,10 +26,7 @@ public class GunClient extends WebSocketClient implements Peer {
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
JSONObject jsonMsg = new JSONObject(message); // TODO
if(dup.check(jsonMsg.getString("#"))){ return; }
dup.track(jsonMsg.getString("#"));
dispatcher.handleIncomingMessage(jsonMsg);
} }
@Override @Override
@ -45,10 +40,6 @@ public class GunClient extends WebSocketClient implements Peer {
ex.printStackTrace(); ex.printStackTrace();
} }
public Dispatcher getDispatcher() {
return dispatcher;
}
@Override @Override
public void emit(String data) { public void emit(String data) {
this.send(data); this.send(data);

View File

@ -1,24 +1,22 @@
package io.github.chronosx88.JGUN.nodes; package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup; import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend; import io.github.chronosx88.JGUN.NetworkHandler;
import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.WebSocket; import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer; import org.java_websocket.server.WebSocketServer;
import org.json.JSONObject;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
public class GunSuperPeer extends WebSocketServer implements Peer { public class GunSuperPeer extends WebSocketServer implements Peer {
private Dup dup = new Dup(); private Dup dup = new Dup(1000*9);
private Dispatcher dispatcher; private NetworkHandler handler;
public GunSuperPeer(int port, StorageBackend storageBackend) { public GunSuperPeer(int port, Storage storage) {
super(new InetSocketAddress(port)); super(new InetSocketAddress(port));
setReuseAddr(true); setReuseAddr(true);
dispatcher = new Dispatcher(storageBackend, this, dup); handler = new NetworkHandler(storage, this, dup);
} }
@Override @Override
@ -35,10 +33,7 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
@Override @Override
public void onMessage(WebSocket conn, String message) { public void onMessage(WebSocket conn, String message) {
JSONObject jsonMsg = new JSONObject(message); // TODO
if(dup.check(jsonMsg.getString("#"))){ return; }
dup.track(jsonMsg.getString("#"));
dispatcher.handleIncomingMessage(jsonMsg);
} }
@Override @Override

View File

@ -0,0 +1,47 @@
package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.models.Node;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class MemoryStorage extends Storage {
private final Map<String, Node> nodes;
public MemoryStorage() {
nodes = new LinkedHashMap<>();
}
public Node getNode(String id) {
return nodes.get(id);
}
@Override
void updateNode(Node node) {
// TODO
throw new UnsupportedOperationException("TODO");
}
public void addNode(String id, Node incomingNode) {
nodes.put(id, incomingNode);
}
public boolean hasNode(String id) {
return nodes.containsKey(id);
}
public Set<Map.Entry<String, Node>> entries() {
return nodes.entrySet();
}
public Collection<Node> nodes() {
return nodes.values();
}
public boolean isEmpty() {
return nodes.isEmpty();
}
}

View File

@ -0,0 +1,100 @@
package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.HAM;
import io.github.chronosx88.JGUN.NodeChangeListener;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.util.*;
public abstract class Storage {
abstract Node getNode(String id);
abstract void updateNode(Node node);
abstract void addNode(String id, Node node);
abstract boolean hasNode(String id);
abstract Set<Map.Entry<String, Node>> entries();
abstract Collection<Node> nodes();
abstract boolean isEmpty();
/**
* Merge graph update (usually received from the network)
* @param update Graph update
* @param changeListeners
* @param forEachListeners
*/
public void mergeUpdate(MemoryGraph update, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
long machine = System.currentTimeMillis();
MemoryGraph diff = new MemoryGraph();
for (Map.Entry<String, Node> entry : update.getNodes().entrySet()) {
Node node = entry.getValue();
Node diffNode = this.mergeNode(node, machine);
if (Objects.nonNull(diffNode)) {
diff.nodes.put(diffNode.getMetadata().getNodeID(), diffNode);
}
}
if (!diff.nodes.isEmpty()) {
for (Map.Entry<String, Node> diffEntry : diff.getNodes().entrySet()) {
Node changedNode = diffEntry.getValue();
if (!this.hasNode(changedNode.getMetadata().getNodeID())) {
this.addNode(changedNode.getMetadata().getNodeID(), changedNode);
} else {
this.updateNode(changedNode);
}
if (changeListeners.containsKey(diffEntry.getKey())) {
changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue());
}
if (forEachListeners.containsKey(diffEntry.getKey())) {
for (Map.Entry<String, Object> nodeEntry : changedNode.getValues().entrySet()) {
forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue());
}
}
}
}
}
/**
* Merge updated node
* @param incomingNode Updated node
* @param machineState Current machine state
* @return Node with changes or null if no changes
*/
public Node mergeNode(Node incomingNode, long machineState) {
Node changedNode = null;
for (String key : incomingNode.getValues().keySet()) {
Object value = incomingNode.getValues().get(key);
long state = incomingNode.getMetadata().getStates().get(key);
long previousState = -1;
Object currentValue = null;
if (this.hasNode(incomingNode.getMetadata().getNodeID())) {
Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID());
Long prevStateFromStorage = currentNode.getMetadata().getStates().get(key);
if (!Objects.isNull(prevStateFromStorage)) {
previousState = prevStateFromStorage;
}
currentValue = currentNode.getValues().get(key);
}
HAM.HAMResult ham = HAM.ham(machineState, state, previousState, value, currentValue);
if (!ham.incoming) {
if (ham.defer) {
// TODO handle deferred value
}
continue;
}
if (Objects.isNull(changedNode)) {
changedNode = Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(incomingNode.getMetadata().getNodeID())
.build())
.build();
}
changedNode.values.put(key, value);
changedNode.getMetadata().getStates().put(key, state);
}
return changedNode;
}
}

View File

@ -1,82 +0,0 @@
package io.github.chronosx88.JGUN.storageBackends;
import io.github.chronosx88.JGUN.Node;
import org.json.JSONObject;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class InMemoryGraph implements StorageBackend {
private final HashMap<String, Node> nodes;
public InMemoryGraph(JSONObject source) {
nodes = new LinkedHashMap<>();
for (String soul : source.keySet())
nodes.put(soul, new Node(source.getJSONObject(soul)));
}
public InMemoryGraph() {
nodes = new LinkedHashMap<>();
}
public Node getNode(String soul) {
return nodes.get(soul);
}
public void addNode(String soul, Node incomingNode) {
nodes.put(soul, incomingNode);
}
public boolean hasNode(String soul) {
return nodes.containsKey(soul);
}
public Set<Map.Entry<String, Node>> entries() {
return nodes.entrySet();
}
public Collection<Node> nodes() { return nodes.values(); }
@Override
public String toString() {
JSONObject jsonObject = new JSONObject();
for(Map.Entry<String, Node> entry : nodes.entrySet()) {
jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
}
return jsonObject.toString();
}
public String toPrettyString() {
JSONObject jsonObject = new JSONObject();
for(Map.Entry<String, Node> entry : nodes.entrySet()) {
jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
}
return jsonObject.toString(2);
}
public JSONObject toJSONObject() {
JSONObject jsonObject = new JSONObject();
for(Map.Entry<String, Node> entry : nodes.entrySet()) {
jsonObject.put(entry.getKey(), entry.getValue().toJSONObject());
}
return jsonObject;
}
public JSONObject toUserJSONObject() {
JSONObject jsonObject = new JSONObject();
for(Map.Entry<String, Node> entry : nodes.entrySet()) {
jsonObject.put(entry.getKey(), entry.getValue().toUserJSONObject());
}
return jsonObject;
}
public boolean isEmpty() {
return nodes.isEmpty();
}
}

View File

@ -1,20 +0,0 @@
package io.github.chronosx88.JGUN.storageBackends;
import io.github.chronosx88.JGUN.Node;
import org.json.JSONObject;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
public interface StorageBackend {
Node getNode(String soul);
void addNode(String soul, Node node);
boolean hasNode(String soul);
Set<Map.Entry<String, Node>> entries();
Collection<Node> nodes();
String toString();
String toPrettyString();
JSONObject toJSONObject();
boolean isEmpty();
}