diff --git a/build.gradle b/build.gradle index 7649820..ba76629 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,6 @@ repositories { dependencies { implementation 'org.java-websocket:Java-WebSocket:1.4.0' implementation group: 'org.json', name: 'json', version: '20180813' - implementation 'com.google.code.gson:gson:2.8.5' testCompile group: 'junit', name: 'junit', version: '4.12' } diff --git a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java index 85f3537..bab7f16 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java +++ b/src/main/java/io/github/chronosx88/JGUN/Dispatcher.java @@ -1,6 +1,8 @@ package io.github.chronosx88.JGUN; import io.github.chronosx88.JGUN.futures.BaseFuture; +import io.github.chronosx88.JGUN.futures.FutureGet; +import io.github.chronosx88.JGUN.futures.FuturePut; import io.github.chronosx88.JGUN.nodes.Peer; import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; import io.github.chronosx88.JGUN.storageBackends.StorageBackend; @@ -12,11 +14,13 @@ import java.util.concurrent.ConcurrentHashMap; public class Dispatcher { private final Map> pendingFutures = new ConcurrentHashMap<>(); private final StorageBackend graphStorage; + private final Dup dup; private final Peer peer; - public Dispatcher(StorageBackend graphStorage, Peer peer) { + public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) { this.graphStorage = graphStorage; this.peer = peer; + this.dup = dup; } public void addPendingFuture(BaseFuture future) { @@ -24,19 +28,53 @@ public class Dispatcher { } public void handleIncomingMessage(JSONObject message) { - // FIXME + if(message.has("put")) { + JSONObject ack = handlePut(message); + peer.emit(ack.toString()); + } + if(message.has("get")) { + JSONObject ack = handleGet(message); + peer.emit(ack.toString()); + } + if(message.has("@")) { + handleIncomingAck(message); + } } private JSONObject handleGet(JSONObject getData) { - return null; // FIXME + InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage); + return new JSONObject() // Acknowledgment + .put( "#", dup.track(Dup.random()) ) + .put( "@", getData.getString("#") ) + .put( "put", getResults.toJSONObject() ) + .put( "ok", !(getResults.isEmpty()) ); } - private JSONObject handlePut(JSONObject putData) { - return null; // FIXME + private JSONObject handlePut(JSONObject message) { + HAM.mix(new InMemoryGraph(message.getJSONObject("put")), graphStorage); + return new JSONObject() // Acknowledgment + .put( "#", dup.track(Dup.random()) ) + .put( "@", message.getString("#") ) + .put( "ok", true); } private void handleIncomingAck(JSONObject ack) { - // FIXME + if(ack.has("put")) { + if(pendingFutures.containsKey(ack.getString("@"))) { + BaseFuture future = pendingFutures.get(ack.getString("@")); + if(future instanceof FutureGet) { + ((FutureGet) future).done(ack.getJSONObject("put")); + } + } + } + if(ack.has("ok")) { + if(pendingFutures.containsKey(ack.getString("@"))) { + BaseFuture future = pendingFutures.get(ack.getString("@")); + if(future instanceof FuturePut) { + ((FuturePut) future).done(ack.getBoolean("ok")); + } + } + } } public void sendPutRequest(JSONObject data) { diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java index 79325d2..575e657 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunPeer.java @@ -1,7 +1,8 @@ package io.github.chronosx88.JGUN.nodes; -import io.github.chronosx88.JGUN.*; -import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph; +import io.github.chronosx88.JGUN.Dispatcher; +import io.github.chronosx88.JGUN.Dup; +import io.github.chronosx88.JGUN.PathRef; import io.github.chronosx88.JGUN.storageBackends.StorageBackend; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; @@ -19,7 +20,7 @@ public class GunPeer extends WebSocketClient implements Peer { public GunPeer(InetAddress address, int port, StorageBackend storage) throws URISyntaxException { super(new URI("ws://" + address.getHostAddress() + ":" + port)); this.storage = storage; - this.dispatcher = new Dispatcher(storage, this); + this.dispatcher = new Dispatcher(storage, this, dup); } @Override @@ -29,23 +30,10 @@ public class GunPeer extends WebSocketClient implements Peer { @Override public void onMessage(String message) { - JSONObject msg = new JSONObject(message); - if(dup.check(msg.getString("#"))){ return; } - dup.track(msg.getString("#")); - if(msg.opt("put") != null) { - HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), storage); - } - if(msg.opt("get") != null) { - InMemoryGraph getResults = Utils.getRequest(msg.getJSONObject("get"), storage); - JSONObject ack = new JSONObject() - .put("#", dup.track(Dup.random())) - .put("@", msg.getString("#")) - .put("put", getResults.toJSONObject()); - emit(ack.toString()); - } - System.out.println("---------------"); - System.out.println(msg.toString(2)); - emit(message); + JSONObject jsonMsg = new JSONObject(message); + if(dup.check(jsonMsg.getString("#"))){ return; } + dup.track(jsonMsg.getString("#")); + dispatcher.handleIncomingMessage(jsonMsg); } @Override