From b8990180453020dd730f9ba1ddb2991cb3c4f8af Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Thu, 9 May 2019 14:06:21 +0400 Subject: [PATCH] Now all futures in JGUN are children of CompletableFuture, correctly putting, getting, deleting. --- .../io/github/chronosx88/JGUN/Dispatcher.java | 29 +- .../java/io/github/chronosx88/JGUN/Gun.java | 28 ++ .../io/github/chronosx88/JGUN/PathRef.java | 67 ++- .../java/io/github/chronosx88/JGUN/Utils.java | 22 +- .../JGUN/entrypoints/MainClientServer.java | 18 - .../{entrypoints => examples}/MainClient.java | 6 +- .../JGUN/examples/MainClientServer.java | 46 ++ .../{entrypoints => examples}/MainServer.java | 2 +- .../JGUN/futures/BaseCompletableFuture.java | 37 ++ .../chronosx88/JGUN/futures/BaseFuture.java | 394 ------------------ .../JGUN/futures/BaseFutureListener.java | 6 +- .../chronosx88/JGUN/futures/Cancelable.java | 29 -- .../chronosx88/JGUN/futures/FutureGet.java | 43 +- .../chronosx88/JGUN/futures/FuturePut.java | 22 +- .../nodes/{GunPeer.java => GunClient.java} | 14 +- .../chronosx88/JGUN/nodes/GunSuperPeer.java | 37 +- 16 files changed, 210 insertions(+), 590 deletions(-) create mode 100644 src/main/java/io/github/chronosx88/JGUN/Gun.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java rename src/main/java/io/github/chronosx88/JGUN/{entrypoints => examples}/MainClient.java (59%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java rename src/main/java/io/github/chronosx88/JGUN/{entrypoints => examples}/MainServer.java (82%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java rename src/main/java/io/github/chronosx88/JGUN/nodes/{GunPeer.java => GunClient.java} (84%) diff --git a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java index bab7f16..f5aa844 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java +++ b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java @@ -1,21 +1,22 @@ package io.github.chronosx88.JGUN; -import io.github.chronosx88.JGUN.futures.BaseFuture; +import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; import io.github.chronosx88.JGUN.futures.FutureGet; import io.github.chronosx88.JGUN.futures.FuturePut; import io.github.chronosx88.JGUN.nodes.Peer; import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import io.github.chronosx88.JGUN.storageBackends.StorageBackend; +import org.java_websocket.client.WebSocketClient; import org.json.JSONObject; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class Dispatcher { - private final Map> pendingFutures = new ConcurrentHashMap<>(); + private final Map> pendingFutures = new ConcurrentHashMap<>(); + private final Peer peer; private final StorageBackend graphStorage; private final Dup dup; - private final Peer peer; public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) { this.graphStorage = graphStorage; @@ -23,7 +24,7 @@ public class Dispatcher { this.dup = dup; } - public void addPendingFuture(BaseFuture future) { + public void addPendingFuture(BaseCompletableFuture future) { pendingFutures.put(future.getFutureID(), future); } @@ -32,7 +33,7 @@ public class Dispatcher { JSONObject ack = handlePut(message); peer.emit(ack.toString()); } - if(message.has("get")) { + if(message.has("await")) { JSONObject ack = handleGet(message); peer.emit(ack.toString()); } @@ -42,7 +43,7 @@ public class Dispatcher { } private JSONObject handleGet(JSONObject getData) { - InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage); + InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("await"), graphStorage); return new JSONObject() // Acknowledgment .put( "#", dup.track(Dup.random()) ) .put( "@", getData.getString("#") ) @@ -61,32 +62,32 @@ public class Dispatcher { private void handleIncomingAck(JSONObject ack) { if(ack.has("put")) { if(pendingFutures.containsKey(ack.getString("@"))) { - BaseFuture future = pendingFutures.get(ack.getString("@")); + BaseCompletableFuture future = pendingFutures.get(ack.getString("@")); if(future instanceof FutureGet) { - ((FutureGet) future).done(ack.getJSONObject("put")); + ((FutureGet) future).complete(ack.getJSONObject("put")); } } } if(ack.has("ok")) { if(pendingFutures.containsKey(ack.getString("@"))) { - BaseFuture future = pendingFutures.get(ack.getString("@")); + BaseCompletableFuture future = pendingFutures.get(ack.getString("@")); if(future instanceof FuturePut) { - ((FuturePut) future).done(ack.getBoolean("ok")); + ((FuturePut) future).complete(ack.getBoolean("ok")); } } } } - public void sendPutRequest(JSONObject data) { + public void sendPutRequest(String messageID, JSONObject data) { new Thread(() -> { InMemoryGraph graph = Utils.prepareDataForPut(data); - peer.emit(Utils.formatPutRequest(graph.toJSONObject()).toString()); + peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString()); }).start(); } - public void sendGetRequest(String key, String field) { + public void sendGetRequest(String messageID, String key, String field) { new Thread(() -> { - JSONObject jsonGet = Utils.formatGetRequest(key, field); + JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field); peer.emit(jsonGet.toString()); }).start(); } diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java new file mode 100644 index 0000000..6c7fe22 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/Gun.java @@ -0,0 +1,28 @@ +package io.github.chronosx88.JGUN; + +import io.github.chronosx88.JGUN.nodes.GunClient; +import io.github.chronosx88.JGUN.storageBackends.StorageBackend; + +import java.net.InetAddress; +import java.net.URISyntaxException; + +public class Gun { + private Dispatcher dispatcher; + private GunClient gunClient; + + public Gun(InetAddress address, int port, StorageBackend storage) { + try { + this.gunClient = new GunClient(address, port, storage); + this.dispatcher = gunClient.getDispatcher(); + this.gunClient.connectBlocking(); + } catch (URISyntaxException | InterruptedException e) { + e.printStackTrace(); + } + } + + public PathRef get(String key) { + PathRef pathRef = new PathRef(dispatcher); + pathRef.get(key); + return pathRef; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/PathRef.java b/src/main/java/io/github/chronosx88/JGUN/PathRef.java index 164d372..27b5f68 100644 --- a/src/main/java/io/github/chronosx88/JGUN/PathRef.java +++ b/src/main/java/io/github/chronosx88/JGUN/PathRef.java @@ -24,43 +24,52 @@ public class PathRef { public FutureGet getData() { FutureGet futureGet = new FutureGet(Dup.random()); - Iterator iterator = path.iterator(); new Thread(() -> { + Iterator iterator = path.iterator(); + String rootSoul = iterator.next(); + String field = iterator.hasNext() ? iterator.next() : null; + + iterator = path.iterator(); + iterator.next(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { - String rootSoul = iterator.next(); - String field = iterator.hasNext() ? iterator.next() : null; FutureGet futureGetRootSoul = new FutureGet(Dup.random()); dispatcher.addPendingFuture(futureGetRootSoul); - dispatcher.sendGetRequest(rootSoul, field); - futureGetRootSoul.awaitUninterruptibly(); - if(futureGetRootSoul.isSuccess() && futureGetRootSoul.getData() != null) { - return futureGetRootSoul.getData(); - } else { - return null; + dispatcher.sendGetRequest(futureGetRootSoul.getFutureID(), rootSoul, field); + JSONObject result = futureGetRootSoul.await(); + if(result != null && result.isEmpty()) { + result = null; } + return result == null ? null : result.getJSONObject(rootSoul); }); - while(iterator.hasNext()) { + do { + String soul = iterator.hasNext() ? iterator.next() : null; + String nextField = iterator.hasNext() ? iterator.next() : null; future = future.thenApply(jsonObject -> { - String soul = iterator.next(); - String field = iterator.hasNext() ? iterator.next() : null; if(jsonObject != null) { - String nodeRef = jsonObject.getJSONObject(soul).getString("#"); - FutureGet get = new FutureGet(Dup.random()); - dispatcher.addPendingFuture(get); - dispatcher.sendGetRequest(nodeRef, field); - get.awaitUninterruptibly(); - if(get.isSuccess() && get.getData() != null) { - return get.getData(); + if(soul != null) { + if(jsonObject.get(soul) instanceof JSONObject) { + String nodeRef = jsonObject.getJSONObject(soul).getString("#"); + FutureGet get = new FutureGet(Dup.random()); + dispatcher.addPendingFuture(get); + dispatcher.sendGetRequest(get.getFutureID(), nodeRef, nextField); + JSONObject result = get.await(); + if(result != null && result.isEmpty()) { + result = null; + } + return result == null ? null : result.getJSONObject(nodeRef); + } } else { - return null; + return jsonObject; } } return null; }); - } + } while(iterator.hasNext()); + try { JSONObject data = future.get(); - futureGet.done(data); + futureGet.complete(data); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } @@ -71,7 +80,19 @@ public class PathRef { public FuturePut put(JSONObject data) { FuturePut futurePut = new FuturePut(Dup.random()); dispatcher.addPendingFuture(futurePut); - dispatcher.sendPutRequest(data); + JSONObject temp = new JSONObject(); + JSONObject temp1 = temp; + for (int i = 0; i < path.size(); i++) { + if(i != path.size() - 1) { + JSONObject object = new JSONObject(); + temp1.put(path.get(i), object); + temp1 = object; + } else { + temp1.put(path.get(i), data == null ? JSONObject.NULL : data); + } + + } + dispatcher.sendPutRequest(futurePut.getFutureID(), temp); return futurePut; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/Utils.java b/src/main/java/io/github/chronosx88/JGUN/Utils.java index e2a445a..823ef78 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Utils.java +++ b/src/main/java/io/github/chronosx88/JGUN/Utils.java @@ -59,11 +59,13 @@ public class Utils { public static InMemoryGraph prepareDataForPut(JSONObject data) { InMemoryGraph result = new InMemoryGraph(); for (String objectKey : data.keySet()) { - JSONObject object = data.getJSONObject(objectKey); - Node node = Utils.newNode(objectKey, object); - ArrayList path = new ArrayList<>(); - path.add(objectKey); - prepareNodeForPut(node, result, path); + Object object = data.get(objectKey); + if(object instanceof JSONObject) { + Node node = Utils.newNode(objectKey, (JSONObject) object); + ArrayList path = new ArrayList<>(); + path.add(objectKey); + prepareNodeForPut(node, result, path); + } } return result; } @@ -86,21 +88,21 @@ public class Utils { result.addNode(node.soul, node); } - public static JSONObject formatGetRequest(String key, String field) { + public static JSONObject formatGetRequest(String messageID, String key, String field) { JSONObject jsonObject = new JSONObject(); - jsonObject.put("#", Dup.random()); + jsonObject.put("#", messageID); JSONObject getParameters = new JSONObject(); getParameters.put("#", key); if(field != null) { getParameters.put(".", field); } - jsonObject.put("get", getParameters); + jsonObject.put("await", getParameters); return jsonObject; } - public static JSONObject formatPutRequest(JSONObject data) { + public static JSONObject formatPutRequest(String messageID, JSONObject data) { JSONObject jsonObject = new JSONObject(); - jsonObject.put("#", Dup.random()); + jsonObject.put("#", messageID); jsonObject.put("put", data); return jsonObject; } diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java deleted file mode 100644 index 80add5b..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClientServer.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.github.chronosx88.JGUN.entrypoints; - -import io.github.chronosx88.JGUN.nodes.GunPeer; -import io.github.chronosx88.JGUN.nodes.GunSuperPeer; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; - -import java.net.Inet4Address; -import java.net.URISyntaxException; -import java.net.UnknownHostException; - -public class MainClientServer { - public static void main(String[] args) throws URISyntaxException, UnknownHostException { - GunSuperPeer gunSuperNode = new GunSuperPeer(21334); - gunSuperNode.start(); - GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph()); - gunClient.connect(); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java similarity index 59% rename from src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java rename to src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java index 5d796f1..2e24531 100644 --- a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java @@ -1,6 +1,6 @@ -package io.github.chronosx88.JGUN.entrypoints; +package io.github.chronosx88.JGUN.examples; -import io.github.chronosx88.JGUN.nodes.GunPeer; +import io.github.chronosx88.JGUN.nodes.GunClient; import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import java.net.Inet4Address; @@ -9,7 +9,7 @@ import java.net.UnknownHostException; public class MainClient { public static void main(String[] args) throws URISyntaxException, UnknownHostException { - GunPeer gunClient = new GunPeer(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph()); + GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new InMemoryGraph()); gunClient.connect(); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java new file mode 100644 index 0000000..8800ca8 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java @@ -0,0 +1,46 @@ +package io.github.chronosx88.JGUN.examples; + +import io.github.chronosx88.JGUN.Gun; +import io.github.chronosx88.JGUN.futures.FutureGet; +import io.github.chronosx88.JGUN.futures.FuturePut; +import io.github.chronosx88.JGUN.nodes.GunSuperPeer; +import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import org.json.JSONObject; + +import java.net.Inet4Address; +import java.net.UnknownHostException; + +public class MainClientServer { + public static void main(String[] args) { + GunSuperPeer gunSuperNode = new GunSuperPeer(21334); + gunSuperNode.start(); + new Thread(() -> { + Gun gun = null; + try { + gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph()); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + FuturePut futurePut = gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "world")); + boolean success = futurePut.await(); + System.out.println("[FuturePut] Success: " + success); + FutureGet futureGet = gun.get("random").get("dVFtzE9CL").getData(); + JSONObject result = futureGet.await(); + System.out.println("[FutureGet] Result of get: " + result.toString(2)); + FuturePut futurePut1 = gun.get("random").get("dVFtzE9CL").put(new JSONObject().put("hello", "123")); + System.out.println("[FuturePut1] Putting an item again: " + futurePut1.await()); + FutureGet futureGet1 = gun.get("random").get("dVFtzE9CL").getData(); + JSONObject result1 = futureGet1.await(); + System.out.println("[FutureGet] Result of get: " + result1.toString(2)); + System.out.println("Deleting an item random/dVFtzE9CL"); + gun.get("random").get("dVFtzE9CL").put(null).await(); + JSONObject resultNull = gun.get("random").get("dVFtzE9CL").getData().await(); + if(resultNull == null) { + System.out.println("Now random/dVFtzE9CL is null!"); + } + gun.get("random").put(new JSONObject().put("hello", "world")).await(); + System.out.println(gun.get("random").getData().await().toString(2)); + }).start(); + + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java similarity index 82% rename from src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java rename to src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java index ddb8a6f..41ce278 100644 --- a/src/main/java/io/github/chronosx88/JGUN/entrypoints/MainServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java @@ -1,4 +1,4 @@ -package io.github.chronosx88.JGUN.entrypoints; +package io.github.chronosx88.JGUN.examples; import io.github.chronosx88.JGUN.nodes.GunSuperPeer; diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java new file mode 100644 index 0000000..dbb90db --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java @@ -0,0 +1,37 @@ +package io.github.chronosx88.JGUN.futures; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class BaseCompletableFuture extends CompletableFuture { + private final String futureID; + + public BaseCompletableFuture(String id) { + super(); + futureID = id; + } + + public String getFutureID() { + return futureID; + } + + public void addListener(final BaseFutureListener listener) { + this.whenCompleteAsync((t, throwable) -> { + if(throwable == null) { + listener.onComplete(t); + } else { + listener.onError(t, throwable); + } + }); + } + + public T await() { + T t = null; + try { + t = super.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return t; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java deleted file mode 100644 index 65b10d1..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFuture.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Copyright 2019 Thomas Bocek - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package io.github.chronosx88.JGUN.futures; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * The base for all BaseFuture implementations. Be aware of possible deadlocks. Never await from a listener. This class - * is heavily inspired by MINA and Netty. - * - * @param - * The class that extends BaseFuture and is used to return back the type for method calls. - * @author Thomas Bocek - */ -public abstract class BaseFuture { - private static final Logger LOG = LoggerFactory.getLogger(BaseFuture.class); - - public enum FutureType { - INIT, OK, FAILED, CANCEL - } - - // Listeners that gets notified if the future finished - private final List> listeners = new ArrayList<>(1); - - // While a future is running, the process may add cancellations for faster - // cancel operations, e.g. cancel connection attempt - private volatile Cancelable cancel = null; - - private final CountDownLatch listenersFinished = new CountDownLatch(1); - - protected final Object lock; - - // set the ready flag if operation completed - protected boolean completed = false; - - // by default false, change in case of success. An unfinished operation is - // always set to failed - protected FutureType type = FutureType.INIT; - - protected String reason = "unknown"; - - private K self; - - private String futureID; - - /** - * Default constructor that sets the lock object, which is used for synchronization to this instance. - */ - public BaseFuture(String futureID) { - this.lock = this; - this.futureID = futureID; - } - - /** - * @param self2 - * Set the type so that we are able to return it to the user. This is for making the API much more - * usable. - */ - protected void self(final K self2) { - this.self = self2; - } - - /** - * @return The object that stored this object. This is necessary for the builder pattern when using generics. - */ - protected K self() { - return self; - } - - public K await() throws InterruptedException { - synchronized (lock) { - while (!completed) { - lock.wait(); - } - } - return self; - } - - public K awaitUninterruptibly() { - synchronized (lock) { - while (!completed) { - try { - lock.wait(); - } catch (final InterruptedException e) { - LOG.debug("interrupted, but ignoring", e); - } - } - } - return self; - } - - public boolean await(final long timeoutMillis) throws InterruptedException { - return await0(timeoutMillis, true); - } - - public boolean awaitUninterruptibly(final long timeoutMillis) { - try { - return await0(timeoutMillis, false); - } catch (final InterruptedException e) { - throw new RuntimeException("This should never ever happen."); - } - } - - /** - * Internal await operation that also checks for potential deadlocks. - * - * @param timeoutMillis - * The time to wait - * @param interrupt - * Flag to indicate if the method can throw an InterruptedException - * @return True if this future has finished in timeoutMillis time, false otherwise - * @throws InterruptedException - * If the flag interrupt is true and this thread has been interrupted. - */ - private boolean await0(final long timeoutMillis, final boolean interrupt) throws InterruptedException { - final long startTime = (timeoutMillis <= 0) ? 0 : System.currentTimeMillis(); - long waitTime = timeoutMillis; - synchronized (lock) { - if (completed) { - return completed; - } else if (waitTime <= 0) { - return completed; - } - while (true) { - try { - lock.wait(waitTime); - } catch (final InterruptedException e) { - if (interrupt) { - throw e; - } - } - if (completed) { - return true; - } else { - waitTime = timeoutMillis - (System.currentTimeMillis() - startTime); - if (waitTime <= 0) { - return completed; - } - } - } - } - } - - public boolean isCompleted() { - synchronized (lock) { - return completed; - } - } - - public boolean isSuccess() { - synchronized (lock) { - return completed && (type == FutureType.OK); - } - } - - public boolean isFailed() { - synchronized (lock) { - // failed means failed or canceled - return completed && (type != FutureType.OK); - } - } - - public boolean isCanceled() { - synchronized (lock) { - return completed && (type == FutureType.CANCEL); - } - } - - public K failed(final BaseFuture origin) { - return failed(origin.failedReason()); - } - - public K failed(final String failed, final BaseFuture origin) { - StringBuilder sb = new StringBuilder(failed); - return failed(sb.append(" <-> ").append(origin.failedReason()).toString()); - } - - public K failed(final Throwable t) { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - t.printStackTrace(printWriter); - return failed(stringWriter.toString()); - } - - public K failed(final String failed, final Throwable t) { - if (t == null) { - return failed("n/a"); - } - StringBuilder sb = new StringBuilder(failed); - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - t.printStackTrace(printWriter); - return failed(sb.append(" <-> ").append(stringWriter.toString()).toString()); - } - - public K failed(final String failed) { - synchronized (lock) { - if (!completedAndNotify()) { - return self; - } - this.reason = failed; - this.type = FutureType.FAILED; - } - notifyListeners(); - return self; - } - - public String failedReason() { - final StringBuffer sb = new StringBuffer("Future (compl/canc):"); - synchronized (lock) { - sb.append(completed).append("/") - .append(", ").append(type.name()) - .append(", ").append(reason); - return sb.toString(); - } - } - - public FutureType type() { - synchronized (lock) { - return type; - } - } - - /** - * Make sure that the calling method has synchronized (lock). - * - * @return True if notified. It will notify if completed is not set yet. - */ - protected boolean completedAndNotify() { - if (!completed) { - completed = true; - lock.notifyAll(); - return true; - } else { - return false; - } - } - - public K awaitListeners() throws InterruptedException { - boolean wait = false; - synchronized (lock) { - while (!completed) { - lock.wait(); - } - if(listeners.size() > 0) { - wait = true; - } - } - if(wait) { - listenersFinished.await(); - } - return self; - } - - public K awaitListenersUninterruptibly() { - boolean wait = false; - synchronized (lock) { - while (!completed) { - try { - lock.wait(); - } catch (final InterruptedException e) { - LOG.debug("interrupted, but ignoring", e); - } - } - if(listeners.size() > 0) { - wait = true; - } - } - while(wait) { - try { - listenersFinished.await(); - wait = false; - } catch (InterruptedException e) { - LOG.debug("interrupted, but ignoring", e); - } - } - return self; - } - - public K addListener(final BaseFutureListener listener) { - boolean notifyNow = false; - synchronized (lock) { - if (completed) { - notifyNow = true; - } else { - listeners.add(listener); - } - } - // called only once - if (notifyNow) { - callOperationComplete(listener); - } - return self; - } - - /** - * Call operation complete or call fail listener. If the fail listener fails, its printed as a stack trace. - * - * @param listener - * The listener to call - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void callOperationComplete(final BaseFutureListener listener) { - try { - listener.onComplete(this); - } catch (final Exception e) { - try { - listener.onError(this, e); - } catch (final Exception e1) { - LOG.error("Unexpected exception in exceptionCaught()", e1); - } - } - } - - - /** - * Always call this from outside synchronized(lock)! - */ - protected void notifyListeners() { - // if this is synchronized, it will deadlock, so do not lock this! - // There won't be any visibility problem or concurrent modification - // because 'ready' flag will be checked against both addListener and - // removeListener calls. - // - // This method doesn't need synchronization because: - // 1) This method is always called after synchronized (this) block. - // Hence any listener list modification happens-before this method. - // 2) This method is called only when 'done' is true. Once 'done' - // becomes true, the listener list is never modified - see add/removeListener() - for (final BaseFutureListener listener : listeners) { - callOperationComplete(listener); - } - - listeners.clear(); - listenersFinished.countDown(); - // all events are one time events. It cannot happen that you get - // notified twice - } - - public K removeListener(final BaseFutureListener listener) { - synchronized (lock) { - if (!completed) { - listeners.remove(listener); - } - } - return self; - } - - public K setCancel(final Cancelable cancel) { - synchronized (lock) { - if (!completed) { - this.cancel = cancel; - } - } - return self; - } - - public void cancel() { - synchronized (lock) { - if (!completedAndNotify()) { - return; - } - this.type = FutureType.CANCEL; - } - if(cancel != null) { - cancel.cancel(); - } - notifyListeners(); - } - - public String getFutureID() { - return futureID; - } -} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java index 0b934b3..0bd6361 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseFutureListener.java @@ -1,6 +1,6 @@ package io.github.chronosx88.JGUN.futures; -public interface BaseFutureListener { - void onComplete(F future); - void onError(F future, Throwable exception); +public interface BaseFutureListener { + void onComplete(T result); + void onError(T result, Throwable exception); } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java b/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java deleted file mode 100644 index 315213d..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/futures/Cancelable.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2009 Thomas Bocek - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package io.github.chronosx88.JGUN.futures; - -/** - * A cancelable class should implement this interface and use it for future - * objects. - * - * @author Thomas Bocek - */ -public interface Cancelable { - /** - * Gets called if a future is cancelled. - */ - void cancel(); -} \ No newline at end of file diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java index d8e6750..31a5800 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java @@ -2,49 +2,8 @@ package io.github.chronosx88.JGUN.futures; import org.json.JSONObject; -public class FutureGet extends BaseFuture { - private JSONObject data; - private GetStatus getStatus; - - enum GetStatus { - OK, NOT_FOUND - } - +public class FutureGet extends BaseCompletableFuture { public FutureGet(String id) { super(id); - self(this); - } - - public JSONObject getData() { - return data; - } - - @Override - public boolean isSuccess() { - synchronized (lock) { - return completed && (getStatus == GetStatus.OK) && (type == FutureType.OK); - } - } - - public FutureGet done(JSONObject data) { - synchronized (lock) { - if(data != null) { - if (!data.isEmpty()) { - this.getStatus = GetStatus.OK; - this.type = FutureType.OK; - } - } else { - this.getStatus = GetStatus.NOT_FOUND; - this.type = FutureType.FAILED; - this.reason = "Not found"; - } - - this.data = data; - if (!completedAndNotify()) { - return this; - } - } - notifyListeners(); - return this; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java index d500f2b..1859c58 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java @@ -1,24 +1,10 @@ package io.github.chronosx88.JGUN.futures; -public class FuturePut extends BaseFuture { +/** + * Return success of PUT operation + */ +public class FuturePut extends BaseCompletableFuture { public FuturePut(String id) { super(id); - self(this); - } - - public FuturePut done(boolean success) { - synchronized (lock) { - if(success) { - this.type = FutureType.OK; - } else { - this.type = FutureType.FAILED; - } - - if (!completedAndNotify()) { - return this; - } - } - notifyListeners(); - return this; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java similarity index 84% rename from src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java rename to src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java index 575e657..53a1901 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java @@ -12,12 +12,12 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -public class GunPeer extends WebSocketClient implements Peer { +public class GunClient extends WebSocketClient implements Peer { private Dup dup = new Dup(); private final StorageBackend storage; private final Dispatcher dispatcher; - public GunPeer(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { + public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { super(new URI("ws://" + address.getHostAddress() + ":" + port)); this.storage = storage; this.dispatcher = new Dispatcher(storage, this, dup); @@ -47,14 +47,12 @@ public class GunPeer extends WebSocketClient implements Peer { ex.printStackTrace(); } + public Dispatcher getDispatcher() { + return dispatcher; + } + @Override public void emit(String data) { this.send(data); } - - public PathRef get(String key) { - PathRef pathRef = new PathRef(dispatcher); - pathRef.get(key); - return pathRef; - } } diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java index fec57dd..5651634 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java @@ -1,8 +1,7 @@ package io.github.chronosx88.JGUN.nodes; +import io.github.chronosx88.JGUN.Dispatcher; import io.github.chronosx88.JGUN.Dup; -import io.github.chronosx88.JGUN.HAM; -import io.github.chronosx88.JGUN.Utils; import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; @@ -14,10 +13,12 @@ import java.net.InetSocketAddress; public class GunSuperPeer extends WebSocketServer implements Peer { private Dup dup = new Dup(); private InMemoryGraph graph = new InMemoryGraph(); + private Dispatcher dispatcher; public GunSuperPeer(int port) { super(new InetSocketAddress(port)); setReuseAddr(true); + dispatcher = new Dispatcher(graph, this, dup); } @Override @@ -32,35 +33,17 @@ public class GunSuperPeer extends WebSocketServer implements Peer { @Override public void onMessage(WebSocket conn, String message) { - JSONObject msg = new JSONObject(message); - if(dup.check(msg.getString("#"))) { return; } - dup.track(msg.getString("#")); - if(msg.opt("put") != null) { - HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), graph); - } - if(msg.opt("get") != null) { - InMemoryGraph result = Utils.getRequest(msg.optJSONObject("get"), graph); - JSONObject ack = new JSONObject(); - if(!result.isEmpty()) { - emit(ack - .put("#", dup.track(Dup.random())) - .put("@", msg.getString("#")) - .put("put", result.toJSONObject()) - .toString()); - } else { - emit(ack - .put("#", dup.track(Dup.random())) - .put("@", msg.getString("#")) - .put("ok", false) - .toString()); - } - } - emit(message); + JSONObject jsonMsg = new JSONObject(message); + if(dup.check(jsonMsg.getString("#"))){ return; } + dup.track(jsonMsg.getString("#")); + dispatcher.handleIncomingMessage(jsonMsg); } @Override public void onError(WebSocket conn, Exception ex) { - System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress()); + if(conn != null) { + System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress()); + } ex.printStackTrace(); }