mirror of
https://github.com/ChronosX88/JGUN.git
synced 2024-11-22 06:22:19 +00:00
Now all futures in JGUN are children of CompletableFuture, correctly putting, getting, deleting.
This commit is contained in:
parent
92b734c511
commit
b899018045
@ -1,21 +1,22 @@
|
|||||||
package io.github.chronosx88.JGUN;
|
package io.github.chronosx88.JGUN;
|
||||||
|
|
||||||
import io.github.chronosx88.JGUN.futures.BaseFuture;
|
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
|
||||||
import io.github.chronosx88.JGUN.futures.FutureGet;
|
import io.github.chronosx88.JGUN.futures.FutureGet;
|
||||||
import io.github.chronosx88.JGUN.futures.FuturePut;
|
import io.github.chronosx88.JGUN.futures.FuturePut;
|
||||||
import io.github.chronosx88.JGUN.nodes.Peer;
|
import io.github.chronosx88.JGUN.nodes.Peer;
|
||||||
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
||||||
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
|
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
|
||||||
|
import org.java_websocket.client.WebSocketClient;
|
||||||
import org.json.JSONObject;
|
import org.json.JSONObject;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class Dispatcher {
|
public class Dispatcher {
|
||||||
private final Map<String, BaseFuture<? extends BaseFuture>> pendingFutures = new ConcurrentHashMap<>();
|
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>();
|
||||||
|
private final Peer peer;
|
||||||
private final StorageBackend graphStorage;
|
private final StorageBackend graphStorage;
|
||||||
private final Dup dup;
|
private final Dup dup;
|
||||||
private final Peer peer;
|
|
||||||
|
|
||||||
public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
|
public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
|
||||||
this.graphStorage = graphStorage;
|
this.graphStorage = graphStorage;
|
||||||
@ -23,7 +24,7 @@ public class Dispatcher {
|
|||||||
this.dup = dup;
|
this.dup = dup;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addPendingFuture(BaseFuture<? extends BaseFuture> future) {
|
public void addPendingFuture(BaseCompletableFuture<?> future) {
|
||||||
pendingFutures.put(future.getFutureID(), future);
|
pendingFutures.put(future.getFutureID(), future);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,7 +33,7 @@ public class Dispatcher {
|
|||||||
JSONObject ack = handlePut(message);
|
JSONObject ack = handlePut(message);
|
||||||
peer.emit(ack.toString());
|
peer.emit(ack.toString());
|
||||||
}
|
}
|
||||||
if(message.has("get")) {
|
if(message.has("await")) {
|
||||||
JSONObject ack = handleGet(message);
|
JSONObject ack = handleGet(message);
|
||||||
peer.emit(ack.toString());
|
peer.emit(ack.toString());
|
||||||
}
|
}
|
||||||
@ -42,7 +43,7 @@ public class Dispatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private JSONObject handleGet(JSONObject getData) {
|
private JSONObject handleGet(JSONObject getData) {
|
||||||
InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage);
|
InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("await"), graphStorage);
|
||||||
return new JSONObject() // Acknowledgment
|
return new JSONObject() // Acknowledgment
|
||||||
.put( "#", dup.track(Dup.random()) )
|
.put( "#", dup.track(Dup.random()) )
|
||||||
.put( "@", getData.getString("#") )
|
.put( "@", getData.getString("#") )
|
||||||
@ -61,32 +62,32 @@ public class Dispatcher {
|
|||||||
private void handleIncomingAck(JSONObject ack) {
|
private void handleIncomingAck(JSONObject ack) {
|
||||||
if(ack.has("put")) {
|
if(ack.has("put")) {
|
||||||
if(pendingFutures.containsKey(ack.getString("@"))) {
|
if(pendingFutures.containsKey(ack.getString("@"))) {
|
||||||
BaseFuture<? extends BaseFuture> future = pendingFutures.get(ack.getString("@"));
|
BaseCompletableFuture<?> future = pendingFutures.get(ack.getString("@"));
|
||||||
if(future instanceof FutureGet) {
|
if(future instanceof FutureGet) {
|
||||||
((FutureGet) future).done(ack.getJSONObject("put"));
|
((FutureGet) future).complete(ack.getJSONObject("put"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(ack.has("ok")) {
|
if(ack.has("ok")) {
|
||||||
if(pendingFutures.containsKey(ack.getString("@"))) {
|
if(pendingFutures.containsKey(ack.getString("@"))) {
|
||||||
BaseFuture<? extends BaseFuture> future = pendingFutures.get(ack.getString("@"));
|
BaseCompletableFuture<?> future = pendingFutures.get(ack.getString("@"));
|
||||||
if(future instanceof FuturePut) {
|
if(future instanceof FuturePut) {
|
||||||
((FuturePut) future).done(ack.getBoolean("ok"));
|
((FuturePut) future).complete(ack.getBoolean("ok"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendPutRequest(JSONObject data) {
|
public void sendPutRequest(String messageID, JSONObject data) {
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
InMemoryGraph graph = Utils.prepareDataForPut(data);
|
InMemoryGraph graph = Utils.prepareDataForPut(data);
|
||||||
peer.emit(Utils.formatPutRequest(graph.toJSONObject()).toString());
|
peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString());
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendGetRequest(String key, String field) {
|
public void sendGetRequest(String messageID, String key, String field) {
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
JSONObject jsonGet = Utils.formatGetRequest(key, field);
|
JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field);
|
||||||
peer.emit(jsonGet.toString());
|
peer.emit(jsonGet.toString());
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
|
28
src/main/java/io/github/chronosx88/JGUN/Gun.java
Normal file
28
src/main/java/io/github/chronosx88/JGUN/Gun.java
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package io.github.chronosx88.JGUN;
|
||||||
|
|
||||||
|
import io.github.chronosx88.JGUN.nodes.GunClient;
|
||||||
|
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
public class Gun {
|
||||||
|
private Dispatcher dispatcher;
|
||||||
|
private GunClient gunClient;
|
||||||
|
|
||||||
|
public Gun(InetAddress address, int port, StorageBackend storage) {
|
||||||
|
try {
|
||||||
|
this.gunClient = new GunClient(address, port, storage);
|
||||||
|
this.dispatcher = gunClient.getDispatcher();
|
||||||
|
this.gunClient.connectBlocking();
|
||||||
|
} catch (URISyntaxException | InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public PathRef get(String key) {
|
||||||
|
PathRef pathRef = new PathRef(dispatcher);
|
||||||
|
pathRef.get(key);
|
||||||
|
return pathRef;
|
||||||
|
}
|
||||||
|
}
|
@ -24,43 +24,52 @@ public class PathRef {
|
|||||||
|
|
||||||
public FutureGet getData() {
|
public FutureGet getData() {
|
||||||
FutureGet futureGet = new FutureGet(Dup.random());
|
FutureGet futureGet = new FutureGet(Dup.random());
|
||||||
Iterator<String> iterator = path.iterator();
|
|
||||||
new Thread(() -> {
|
new Thread(() -> {
|
||||||
|
Iterator<String> iterator = path.iterator();
|
||||||
|
String rootSoul = iterator.next();
|
||||||
|
String field = iterator.hasNext() ? iterator.next() : null;
|
||||||
|
|
||||||
|
iterator = path.iterator();
|
||||||
|
iterator.next();
|
||||||
|
|
||||||
CompletableFuture<JSONObject> future = CompletableFuture.supplyAsync(() -> {
|
CompletableFuture<JSONObject> future = CompletableFuture.supplyAsync(() -> {
|
||||||
String rootSoul = iterator.next();
|
|
||||||
String field = iterator.hasNext() ? iterator.next() : null;
|
|
||||||
FutureGet futureGetRootSoul = new FutureGet(Dup.random());
|
FutureGet futureGetRootSoul = new FutureGet(Dup.random());
|
||||||
dispatcher.addPendingFuture(futureGetRootSoul);
|
dispatcher.addPendingFuture(futureGetRootSoul);
|
||||||
dispatcher.sendGetRequest(rootSoul, field);
|
dispatcher.sendGetRequest(futureGetRootSoul.getFutureID(), rootSoul, field);
|
||||||
futureGetRootSoul.awaitUninterruptibly();
|
JSONObject result = futureGetRootSoul.await();
|
||||||
if(futureGetRootSoul.isSuccess() && futureGetRootSoul.getData() != null) {
|
if(result != null && result.isEmpty()) {
|
||||||
return futureGetRootSoul.getData();
|
result = null;
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
return result == null ? null : result.getJSONObject(rootSoul);
|
||||||
});
|
});
|
||||||
while(iterator.hasNext()) {
|
do {
|
||||||
|
String soul = iterator.hasNext() ? iterator.next() : null;
|
||||||
|
String nextField = iterator.hasNext() ? iterator.next() : null;
|
||||||
future = future.thenApply(jsonObject -> {
|
future = future.thenApply(jsonObject -> {
|
||||||
String soul = iterator.next();
|
|
||||||
String field = iterator.hasNext() ? iterator.next() : null;
|
|
||||||
if(jsonObject != null) {
|
if(jsonObject != null) {
|
||||||
String nodeRef = jsonObject.getJSONObject(soul).getString("#");
|
if(soul != null) {
|
||||||
FutureGet get = new FutureGet(Dup.random());
|
if(jsonObject.get(soul) instanceof JSONObject) {
|
||||||
dispatcher.addPendingFuture(get);
|
String nodeRef = jsonObject.getJSONObject(soul).getString("#");
|
||||||
dispatcher.sendGetRequest(nodeRef, field);
|
FutureGet get = new FutureGet(Dup.random());
|
||||||
get.awaitUninterruptibly();
|
dispatcher.addPendingFuture(get);
|
||||||
if(get.isSuccess() && get.getData() != null) {
|
dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField);
|
||||||
return get.getData();
|
JSONObject result = get.await();
|
||||||
|
if(result != null && result.isEmpty()) {
|
||||||
|
result = null;
|
||||||
|
}
|
||||||
|
return result == null ? null : result.getJSONObject(nodeRef);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return jsonObject;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
} while(iterator.hasNext());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
JSONObject data = future.get();
|
JSONObject data = future.get();
|
||||||
futureGet.done(data);
|
futureGet.complete(data);
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
@ -71,7 +80,19 @@ public class PathRef {
|
|||||||
public FuturePut put(JSONObject data) {
|
public FuturePut put(JSONObject data) {
|
||||||
FuturePut futurePut = new FuturePut(Dup.random());
|
FuturePut futurePut = new FuturePut(Dup.random());
|
||||||
dispatcher.addPendingFuture(futurePut);
|
dispatcher.addPendingFuture(futurePut);
|
||||||
dispatcher.sendPutRequest(data);
|
JSONObject temp = new JSONObject();
|
||||||
|
JSONObject temp1 = temp;
|
||||||
|
for (int i = 0; i < path.size(); i++) {
|
||||||
|
if(i != path.size() - 1) {
|
||||||
|
JSONObject object = new JSONObject();
|
||||||
|
temp1.put(path.get(i), object);
|
||||||
|
temp1 = object;
|
||||||
|
} else {
|
||||||
|
temp1.put(path.get(i), data == null ? JSONObject.NULL : data);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
dispatcher.sendPutRequest(futurePut.getFutureID(), temp);
|
||||||
return futurePut;
|
return futurePut;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,11 +59,13 @@ public class Utils {
|
|||||||
public static InMemoryGraph prepareDataForPut(JSONObject data) {
|
public static InMemoryGraph prepareDataForPut(JSONObject data) {
|
||||||
InMemoryGraph result = new InMemoryGraph();
|
InMemoryGraph result = new InMemoryGraph();
|
||||||
for (String objectKey : data.keySet()) {
|
for (String objectKey : data.keySet()) {
|
||||||
JSONObject object = data.getJSONObject(objectKey);
|
Object object = data.get(objectKey);
|
||||||
Node node = Utils.newNode(objectKey, object);
|
if(object instanceof JSONObject) {
|
||||||
ArrayList<String> path = new ArrayList<>();
|
Node node = Utils.newNode(objectKey, (JSONObject) object);
|
||||||
path.add(objectKey);
|
ArrayList<String> path = new ArrayList<>();
|
||||||
prepareNodeForPut(node, result, path);
|
path.add(objectKey);
|
||||||
|
prepareNodeForPut(node, result, path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -86,21 +88,21 @@ public class Utils {
|
|||||||
result.addNode(node.soul, node);
|
result.addNode(node.soul, node);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JSONObject formatGetRequest(String key, String field) {
|
public static JSONObject formatGetRequest(String messageID, String key, String field) {
|
||||||
JSONObject jsonObject = new JSONObject();
|
JSONObject jsonObject = new JSONObject();
|
||||||
jsonObject.put("#", Dup.random());
|
jsonObject.put("#", messageID);
|
||||||
JSONObject getParameters = new JSONObject();
|
JSONObject getParameters = new JSONObject();
|
||||||
getParameters.put("#", key);
|
getParameters.put("#", key);
|
||||||
if(field != null) {
|
if(field != null) {
|
||||||
getParameters.put(".", field);
|
getParameters.put(".", field);
|
||||||
}
|
}
|
||||||
jsonObject.put("get", getParameters);
|
jsonObject.put("await", getParameters);
|
||||||
return jsonObject;
|
return jsonObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JSONObject formatPutRequest(JSONObject data) {
|
public static JSONObject formatPutRequest(String messageID, JSONObject data) {
|
||||||
JSONObject jsonObject = new JSONObject();
|
JSONObject jsonObject = new JSONObject();
|
||||||
jsonObject.put("#", Dup.random());
|
jsonObject.put("#", messageID);
|
||||||
jsonObject.put("put", data);
|
jsonObject.put("put", data);
|
||||||
return jsonObject;
|
return jsonObject;
|
||||||
}
|
}
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
package io.github.chronosx88.JGUN.entrypoints;
|
|
||||||
|
|
||||||
import io.github.chronosx88.JGUN.nodes.GunPeer;
|
|
||||||
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
|
|
||||||
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
|
||||||
|
|
||||||
import java.net.Inet4Address;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
|
|
||||||
public class MainClientServer {
|
|
||||||
public static void main(String[] args) throws URISyntaxException, UnknownHostException {
|
|
||||||
GunSuperPeer gunSuperNode = new GunSuperPeer(21334);
|
|
||||||
gunSuperNode.start();
|
|
||||||
GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph());
|
|
||||||
gunClient.connect();
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,6 @@
|
|||||||
package io.github.chronosx88.JGUN.entrypoints;
|
package io.github.chronosx88.JGUN.examples;
|
||||||
|
|
||||||
import io.github.chronosx88.JGUN.nodes.GunPeer;
|
import io.github.chronosx88.JGUN.nodes.GunClient;
|
||||||
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
||||||
|
|
||||||
import java.net.Inet4Address;
|
import java.net.Inet4Address;
|
||||||
@ -9,7 +9,7 @@ import java.net.UnknownHostException;
|
|||||||
|
|
||||||
public class MainClient {
|
public class MainClient {
|
||||||
public static void main(String[] args) throws URISyntaxException, UnknownHostException {
|
public static void main(String[] args) throws URISyntaxException, UnknownHostException {
|
||||||
GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph());
|
GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph());
|
||||||
gunClient.connect();
|
gunClient.connect();
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
package io.github.chronosx88.JGUN.examples;
|
||||||
|
|
||||||
|
import io.github.chronosx88.JGUN.Gun;
|
||||||
|
import io.github.chronosx88.JGUN.futures.FutureGet;
|
||||||
|
import io.github.chronosx88.JGUN.futures.FuturePut;
|
||||||
|
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
|
||||||
|
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
||||||
|
import org.json.JSONObject;
|
||||||
|
|
||||||
|
import java.net.Inet4Address;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
|
||||||
|
public class MainClientServer {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
GunSuperPeer gunSuperNode = new GunSuperPeer(21334);
|
||||||
|
gunSuperNode.start();
|
||||||
|
new Thread(() -> {
|
||||||
|
Gun gun = null;
|
||||||
|
try {
|
||||||
|
gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph());
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
FuturePut futurePut = gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world"));
|
||||||
|
boolean success = futurePut.await();
|
||||||
|
System.out.println("[FuturePut] Success: " + success);
|
||||||
|
FutureGet futureGet = gun.get("random").get("dVFtzE9CL").getData();
|
||||||
|
JSONObject result = futureGet.await();
|
||||||
|
System.out.println("[FutureGet] Result of get: " + result.toString(2));
|
||||||
|
FuturePut futurePut1 = gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123"));
|
||||||
|
System.out.println("[FuturePut1] Putting an item again: " + futurePut1.await());
|
||||||
|
FutureGet futureGet1 = gun.get("random").get("dVFtzE9CL").getData();
|
||||||
|
JSONObject result1 = futureGet1.await();
|
||||||
|
System.out.println("[FutureGet] Result of get: " + result1.toString(2));
|
||||||
|
System.out.println("Deleting an item random/dVFtzE9CL");
|
||||||
|
gun.get("random").get("dVFtzE9CL").put(null).await();
|
||||||
|
JSONObject resultNull = gun.get("random").get("dVFtzE9CL").getData().await();
|
||||||
|
if(resultNull == null) {
|
||||||
|
System.out.println("Now random/dVFtzE9CL is null!");
|
||||||
|
}
|
||||||
|
gun.get("random").put(new JSONObject().put("hello", "world")).await();
|
||||||
|
System.out.println(gun.get("random").getData().await().toString(2));
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package io.github.chronosx88.JGUN.entrypoints;
|
package io.github.chronosx88.JGUN.examples;
|
||||||
|
|
||||||
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
|
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
|
||||||
|
|
@ -0,0 +1,37 @@
|
|||||||
|
package io.github.chronosx88.JGUN.futures;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
public class BaseCompletableFuture<T> extends CompletableFuture<T> {
|
||||||
|
private final String futureID;
|
||||||
|
|
||||||
|
public BaseCompletableFuture(String id) {
|
||||||
|
super();
|
||||||
|
futureID = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFutureID() {
|
||||||
|
return futureID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addListener(final BaseFutureListener<T> listener) {
|
||||||
|
this.whenCompleteAsync((t, throwable) -> {
|
||||||
|
if(throwable == null) {
|
||||||
|
listener.onComplete(t);
|
||||||
|
} else {
|
||||||
|
listener.onError(t, throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public T await() {
|
||||||
|
T t = null;
|
||||||
|
try {
|
||||||
|
t = super.get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
}
|
@ -1,394 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2019 Thomas Bocek
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
||||||
* use this file except in compliance with the License. You may obtain a copy of
|
|
||||||
* the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
package io.github.chronosx88.JGUN.futures;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The base for all BaseFuture implementations. Be aware of possible deadlocks. Never await from a listener. This class
|
|
||||||
* is heavily inspired by MINA and Netty.
|
|
||||||
*
|
|
||||||
* @param <K>
|
|
||||||
* The class that extends BaseFuture and is used to return back the type for method calls.
|
|
||||||
* @author Thomas Bocek
|
|
||||||
*/
|
|
||||||
public abstract class BaseFuture<K extends BaseFuture> {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BaseFuture.class);
|
|
||||||
|
|
||||||
public enum FutureType {
|
|
||||||
INIT, OK, FAILED, CANCEL
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listeners that gets notified if the future finished
|
|
||||||
private final List<BaseFutureListener<? extends BaseFuture>> listeners = new ArrayList<>(1);
|
|
||||||
|
|
||||||
// While a future is running, the process may add cancellations for faster
|
|
||||||
// cancel operations, e.g. cancel connection attempt
|
|
||||||
private volatile Cancelable cancel = null;
|
|
||||||
|
|
||||||
private final CountDownLatch listenersFinished = new CountDownLatch(1);
|
|
||||||
|
|
||||||
protected final Object lock;
|
|
||||||
|
|
||||||
// set the ready flag if operation completed
|
|
||||||
protected boolean completed = false;
|
|
||||||
|
|
||||||
// by default false, change in case of success. An unfinished operation is
|
|
||||||
// always set to failed
|
|
||||||
protected FutureType type = FutureType.INIT;
|
|
||||||
|
|
||||||
protected String reason = "unknown";
|
|
||||||
|
|
||||||
private K self;
|
|
||||||
|
|
||||||
private String futureID;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default constructor that sets the lock object, which is used for synchronization to this instance.
|
|
||||||
*/
|
|
||||||
public BaseFuture(String futureID) {
|
|
||||||
this.lock = this;
|
|
||||||
this.futureID = futureID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param self2
|
|
||||||
* Set the type so that we are able to return it to the user. This is for making the API much more
|
|
||||||
* usable.
|
|
||||||
*/
|
|
||||||
protected void self(final K self2) {
|
|
||||||
this.self = self2;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return The object that stored this object. This is necessary for the builder pattern when using generics.
|
|
||||||
*/
|
|
||||||
protected K self() {
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public K await() throws InterruptedException {
|
|
||||||
synchronized (lock) {
|
|
||||||
while (!completed) {
|
|
||||||
lock.wait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public K awaitUninterruptibly() {
|
|
||||||
synchronized (lock) {
|
|
||||||
while (!completed) {
|
|
||||||
try {
|
|
||||||
lock.wait();
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
LOG.debug("interrupted, but ignoring", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean await(final long timeoutMillis) throws InterruptedException {
|
|
||||||
return await0(timeoutMillis, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean awaitUninterruptibly(final long timeoutMillis) {
|
|
||||||
try {
|
|
||||||
return await0(timeoutMillis, false);
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
throw new RuntimeException("This should never ever happen.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal await operation that also checks for potential deadlocks.
|
|
||||||
*
|
|
||||||
* @param timeoutMillis
|
|
||||||
* The time to wait
|
|
||||||
* @param interrupt
|
|
||||||
* Flag to indicate if the method can throw an InterruptedException
|
|
||||||
* @return True if this future has finished in timeoutMillis time, false otherwise
|
|
||||||
* @throws InterruptedException
|
|
||||||
* If the flag interrupt is true and this thread has been interrupted.
|
|
||||||
*/
|
|
||||||
private boolean await0(final long timeoutMillis, final boolean interrupt) throws InterruptedException {
|
|
||||||
final long startTime = (timeoutMillis <= 0) ? 0 : System.currentTimeMillis();
|
|
||||||
long waitTime = timeoutMillis;
|
|
||||||
synchronized (lock) {
|
|
||||||
if (completed) {
|
|
||||||
return completed;
|
|
||||||
} else if (waitTime <= 0) {
|
|
||||||
return completed;
|
|
||||||
}
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
lock.wait(waitTime);
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
if (interrupt) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (completed) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
|
|
||||||
if (waitTime <= 0) {
|
|
||||||
return completed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isCompleted() {
|
|
||||||
synchronized (lock) {
|
|
||||||
return completed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isSuccess() {
|
|
||||||
synchronized (lock) {
|
|
||||||
return completed && (type == FutureType.OK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isFailed() {
|
|
||||||
synchronized (lock) {
|
|
||||||
// failed means failed or canceled
|
|
||||||
return completed && (type != FutureType.OK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isCanceled() {
|
|
||||||
synchronized (lock) {
|
|
||||||
return completed && (type == FutureType.CANCEL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public K failed(final BaseFuture origin) {
|
|
||||||
return failed(origin.failedReason());
|
|
||||||
}
|
|
||||||
|
|
||||||
public K failed(final String failed, final BaseFuture origin) {
|
|
||||||
StringBuilder sb = new StringBuilder(failed);
|
|
||||||
return failed(sb.append(" <-> ").append(origin.failedReason()).toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
public K failed(final Throwable t) {
|
|
||||||
StringWriter stringWriter = new StringWriter();
|
|
||||||
PrintWriter printWriter = new PrintWriter(stringWriter);
|
|
||||||
t.printStackTrace(printWriter);
|
|
||||||
return failed(stringWriter.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
public K failed(final String failed, final Throwable t) {
|
|
||||||
if (t == null) {
|
|
||||||
return failed("n/a");
|
|
||||||
}
|
|
||||||
StringBuilder sb = new StringBuilder(failed);
|
|
||||||
StringWriter stringWriter = new StringWriter();
|
|
||||||
PrintWriter printWriter = new PrintWriter(stringWriter);
|
|
||||||
t.printStackTrace(printWriter);
|
|
||||||
return failed(sb.append(" <-> ").append(stringWriter.toString()).toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
public K failed(final String failed) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (!completedAndNotify()) {
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
this.reason = failed;
|
|
||||||
this.type = FutureType.FAILED;
|
|
||||||
}
|
|
||||||
notifyListeners();
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String failedReason() {
|
|
||||||
final StringBuffer sb = new StringBuffer("Future (compl/canc):");
|
|
||||||
synchronized (lock) {
|
|
||||||
sb.append(completed).append("/")
|
|
||||||
.append(", ").append(type.name())
|
|
||||||
.append(", ").append(reason);
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public FutureType type() {
|
|
||||||
synchronized (lock) {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Make sure that the calling method has synchronized (lock).
|
|
||||||
*
|
|
||||||
* @return True if notified. It will notify if completed is not set yet.
|
|
||||||
*/
|
|
||||||
protected boolean completedAndNotify() {
|
|
||||||
if (!completed) {
|
|
||||||
completed = true;
|
|
||||||
lock.notifyAll();
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public K awaitListeners() throws InterruptedException {
|
|
||||||
boolean wait = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
while (!completed) {
|
|
||||||
lock.wait();
|
|
||||||
}
|
|
||||||
if(listeners.size() > 0) {
|
|
||||||
wait = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(wait) {
|
|
||||||
listenersFinished.await();
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public K awaitListenersUninterruptibly() {
|
|
||||||
boolean wait = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
while (!completed) {
|
|
||||||
try {
|
|
||||||
lock.wait();
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
LOG.debug("interrupted, but ignoring", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(listeners.size() > 0) {
|
|
||||||
wait = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while(wait) {
|
|
||||||
try {
|
|
||||||
listenersFinished.await();
|
|
||||||
wait = false;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.debug("interrupted, but ignoring", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public K addListener(final BaseFutureListener<? extends BaseFuture> listener) {
|
|
||||||
boolean notifyNow = false;
|
|
||||||
synchronized (lock) {
|
|
||||||
if (completed) {
|
|
||||||
notifyNow = true;
|
|
||||||
} else {
|
|
||||||
listeners.add(listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// called only once
|
|
||||||
if (notifyNow) {
|
|
||||||
callOperationComplete(listener);
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Call operation complete or call fail listener. If the fail listener fails, its printed as a stack trace.
|
|
||||||
*
|
|
||||||
* @param listener
|
|
||||||
* The listener to call
|
|
||||||
*/
|
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
||||||
private void callOperationComplete(final BaseFutureListener listener) {
|
|
||||||
try {
|
|
||||||
listener.onComplete(this);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
try {
|
|
||||||
listener.onError(this, e);
|
|
||||||
} catch (final Exception e1) {
|
|
||||||
LOG.error("Unexpected exception in exceptionCaught()", e1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Always call this from outside synchronized(lock)!
|
|
||||||
*/
|
|
||||||
protected void notifyListeners() {
|
|
||||||
// if this is synchronized, it will deadlock, so do not lock this!
|
|
||||||
// There won't be any visibility problem or concurrent modification
|
|
||||||
// because 'ready' flag will be checked against both addListener and
|
|
||||||
// removeListener calls.
|
|
||||||
//
|
|
||||||
// This method doesn't need synchronization because:
|
|
||||||
// 1) This method is always called after synchronized (this) block.
|
|
||||||
// Hence any listener list modification happens-before this method.
|
|
||||||
// 2) This method is called only when 'done' is true. Once 'done'
|
|
||||||
// becomes true, the listener list is never modified - see add/removeListener()
|
|
||||||
for (final BaseFutureListener<? extends BaseFuture> listener : listeners) {
|
|
||||||
callOperationComplete(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
listeners.clear();
|
|
||||||
listenersFinished.countDown();
|
|
||||||
// all events are one time events. It cannot happen that you get
|
|
||||||
// notified twice
|
|
||||||
}
|
|
||||||
|
|
||||||
public K removeListener(final BaseFutureListener<? extends BaseFuture> listener) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (!completed) {
|
|
||||||
listeners.remove(listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public K setCancel(final Cancelable cancel) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (!completed) {
|
|
||||||
this.cancel = cancel;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void cancel() {
|
|
||||||
synchronized (lock) {
|
|
||||||
if (!completedAndNotify()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.type = FutureType.CANCEL;
|
|
||||||
}
|
|
||||||
if(cancel != null) {
|
|
||||||
cancel.cancel();
|
|
||||||
}
|
|
||||||
notifyListeners();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFutureID() {
|
|
||||||
return futureID;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,6 @@
|
|||||||
package io.github.chronosx88.JGUN.futures;
|
package io.github.chronosx88.JGUN.futures;
|
||||||
|
|
||||||
public interface BaseFutureListener<F extends BaseFuture> {
|
public interface BaseFutureListener<T> {
|
||||||
void onComplete(F future);
|
void onComplete(T result);
|
||||||
void onError(F future, Throwable exception);
|
void onError(T result, Throwable exception);
|
||||||
}
|
}
|
||||||
|
@ -1,29 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2009 Thomas Bocek
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
||||||
* use this file except in compliance with the License. You may obtain a copy of
|
|
||||||
* the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
package io.github.chronosx88.JGUN.futures;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A cancelable class should implement this interface and use it for future
|
|
||||||
* objects.
|
|
||||||
*
|
|
||||||
* @author Thomas Bocek
|
|
||||||
*/
|
|
||||||
public interface Cancelable {
|
|
||||||
/**
|
|
||||||
* Gets called if a future is cancelled.
|
|
||||||
*/
|
|
||||||
void cancel();
|
|
||||||
}
|
|
@ -2,49 +2,8 @@ package io.github.chronosx88.JGUN.futures;
|
|||||||
|
|
||||||
import org.json.JSONObject;
|
import org.json.JSONObject;
|
||||||
|
|
||||||
public class FutureGet extends BaseFuture<FutureGet> {
|
public class FutureGet extends BaseCompletableFuture<JSONObject> {
|
||||||
private JSONObject data;
|
|
||||||
private GetStatus getStatus;
|
|
||||||
|
|
||||||
enum GetStatus {
|
|
||||||
OK, NOT_FOUND
|
|
||||||
}
|
|
||||||
|
|
||||||
public FutureGet(String id) {
|
public FutureGet(String id) {
|
||||||
super(id);
|
super(id);
|
||||||
self(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
public JSONObject getData() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isSuccess() {
|
|
||||||
synchronized (lock) {
|
|
||||||
return completed && (getStatus == GetStatus.OK) && (type == FutureType.OK);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public FutureGet done(JSONObject data) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if(data != null) {
|
|
||||||
if (!data.isEmpty()) {
|
|
||||||
this.getStatus = GetStatus.OK;
|
|
||||||
this.type = FutureType.OK;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
this.getStatus = GetStatus.NOT_FOUND;
|
|
||||||
this.type = FutureType.FAILED;
|
|
||||||
this.reason = "Not found";
|
|
||||||
}
|
|
||||||
|
|
||||||
this.data = data;
|
|
||||||
if (!completedAndNotify()) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
notifyListeners();
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,24 +1,10 @@
|
|||||||
package io.github.chronosx88.JGUN.futures;
|
package io.github.chronosx88.JGUN.futures;
|
||||||
|
|
||||||
public class FuturePut extends BaseFuture<FuturePut> {
|
/**
|
||||||
|
* Return success of PUT operation
|
||||||
|
*/
|
||||||
|
public class FuturePut extends BaseCompletableFuture<Boolean> {
|
||||||
public FuturePut(String id) {
|
public FuturePut(String id) {
|
||||||
super(id);
|
super(id);
|
||||||
self(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
public FuturePut done(boolean success) {
|
|
||||||
synchronized (lock) {
|
|
||||||
if(success) {
|
|
||||||
this.type = FutureType.OK;
|
|
||||||
} else {
|
|
||||||
this.type = FutureType.FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!completedAndNotify()) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
notifyListeners();
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,12 +12,12 @@ import java.net.InetAddress;
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
public class GunPeer extends WebSocketClient implements Peer {
|
public class GunClient extends WebSocketClient implements Peer {
|
||||||
private Dup dup = new Dup();
|
private Dup dup = new Dup();
|
||||||
private final StorageBackend storage;
|
private final StorageBackend storage;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
|
||||||
public GunPeer(InetAddress address, int port, StorageBackend storage) throws URISyntaxException {
|
public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException {
|
||||||
super(new URI("ws://" + address.getHostAddress() + ":" + port));
|
super(new URI("ws://" + address.getHostAddress() + ":" + port));
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
this.dispatcher = new Dispatcher(storage, this, dup);
|
this.dispatcher = new Dispatcher(storage, this, dup);
|
||||||
@ -47,14 +47,12 @@ public class GunPeer extends WebSocketClient implements Peer {
|
|||||||
ex.printStackTrace();
|
ex.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Dispatcher getDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void emit(String data) {
|
public void emit(String data) {
|
||||||
this.send(data);
|
this.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public PathRef get(String key) {
|
|
||||||
PathRef pathRef = new PathRef(dispatcher);
|
|
||||||
pathRef.get(key);
|
|
||||||
return pathRef;
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -1,8 +1,7 @@
|
|||||||
package io.github.chronosx88.JGUN.nodes;
|
package io.github.chronosx88.JGUN.nodes;
|
||||||
|
|
||||||
|
import io.github.chronosx88.JGUN.Dispatcher;
|
||||||
import io.github.chronosx88.JGUN.Dup;
|
import io.github.chronosx88.JGUN.Dup;
|
||||||
import io.github.chronosx88.JGUN.HAM;
|
|
||||||
import io.github.chronosx88.JGUN.Utils;
|
|
||||||
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
|
||||||
import org.java_websocket.WebSocket;
|
import org.java_websocket.WebSocket;
|
||||||
import org.java_websocket.handshake.ClientHandshake;
|
import org.java_websocket.handshake.ClientHandshake;
|
||||||
@ -14,10 +13,12 @@ import java.net.InetSocketAddress;
|
|||||||
public class GunSuperPeer extends WebSocketServer implements Peer {
|
public class GunSuperPeer extends WebSocketServer implements Peer {
|
||||||
private Dup dup = new Dup();
|
private Dup dup = new Dup();
|
||||||
private InMemoryGraph graph = new InMemoryGraph();
|
private InMemoryGraph graph = new InMemoryGraph();
|
||||||
|
private Dispatcher dispatcher;
|
||||||
|
|
||||||
public GunSuperPeer(int port) {
|
public GunSuperPeer(int port) {
|
||||||
super(new InetSocketAddress(port));
|
super(new InetSocketAddress(port));
|
||||||
setReuseAddr(true);
|
setReuseAddr(true);
|
||||||
|
dispatcher = new Dispatcher(graph, this, dup);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -32,35 +33,17 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(WebSocket conn, String message) {
|
public void onMessage(WebSocket conn, String message) {
|
||||||
JSONObject msg = new JSONObject(message);
|
JSONObject jsonMsg = new JSONObject(message);
|
||||||
if(dup.check(msg.getString("#"))) { return; }
|
if(dup.check(jsonMsg.getString("#"))){ return; }
|
||||||
dup.track(msg.getString("#"));
|
dup.track(jsonMsg.getString("#"));
|
||||||
if(msg.opt("put") != null) {
|
dispatcher.handleIncomingMessage(jsonMsg);
|
||||||
HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), graph);
|
|
||||||
}
|
|
||||||
if(msg.opt("get") != null) {
|
|
||||||
InMemoryGraph result = Utils.getRequest(msg.optJSONObject("get"), graph);
|
|
||||||
JSONObject ack = new JSONObject();
|
|
||||||
if(!result.isEmpty()) {
|
|
||||||
emit(ack
|
|
||||||
.put("#", dup.track(Dup.random()))
|
|
||||||
.put("@", msg.getString("#"))
|
|
||||||
.put("put", result.toJSONObject())
|
|
||||||
.toString());
|
|
||||||
} else {
|
|
||||||
emit(ack
|
|
||||||
.put("#", dup.track(Dup.random()))
|
|
||||||
.put("@", msg.getString("#"))
|
|
||||||
.put("ok", false)
|
|
||||||
.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
emit(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(WebSocket conn, Exception ex) {
|
public void onError(WebSocket conn, Exception ex) {
|
||||||
System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress());
|
if(conn != null) {
|
||||||
|
System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress());
|
||||||
|
}
|
||||||
ex.printStackTrace();
|
ex.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user