Implemented basic dispatcher and removed GSON dependency.

This commit is contained in:
ChronosX88 2019-05-07 16:32:32 +03:00
parent 1e3291d780
commit 279fa0b98c
3 changed files with 52 additions and 27 deletions

View File

@ -14,7 +14,6 @@ repositories {
dependencies {
implementation 'org.java-websocket:Java-WebSocket:1.4.0'
implementation group: 'org.json', name: 'json', version: '20180813'
implementation 'com.google.code.gson:gson:2.8.5'
testCompile group: 'junit', name: 'junit', version: '4.12'
}

View File

@ -1,6 +1,8 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.BaseFuture;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
@ -12,11 +14,13 @@ import java.util.concurrent.ConcurrentHashMap;
public class Dispatcher {
private final Map<String, BaseFuture<? extends BaseFuture>> pendingFutures = new ConcurrentHashMap<>();
private final StorageBackend graphStorage;
private final Dup dup;
private final Peer peer;
public Dispatcher(StorageBackend graphStorage, Peer peer) {
public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
this.graphStorage = graphStorage;
this.peer = peer;
this.dup = dup;
}
public void addPendingFuture(BaseFuture<? extends BaseFuture> future) {
@ -24,19 +28,53 @@ public class Dispatcher {
}
public void handleIncomingMessage(JSONObject message) {
// FIXME
if(message.has("put")) {
JSONObject ack = handlePut(message);
peer.emit(ack.toString());
}
if(message.has("get")) {
JSONObject ack = handleGet(message);
peer.emit(ack.toString());
}
if(message.has("@")) {
handleIncomingAck(message);
}
}
private JSONObject handleGet(JSONObject getData) {
return null; // FIXME
InMemoryGraph getResults = Utils.getRequest(getData.getJSONObject("get"), graphStorage);
return new JSONObject() // Acknowledgment
.put( "#", dup.track(Dup.random()) )
.put( "@", getData.getString("#") )
.put( "put", getResults.toJSONObject() )
.put( "ok", !(getResults.isEmpty()) );
}
private JSONObject handlePut(JSONObject putData) {
return null; // FIXME
private JSONObject handlePut(JSONObject message) {
HAM.mix(new InMemoryGraph(message.getJSONObject("put")), graphStorage);
return new JSONObject() // Acknowledgment
.put( "#", dup.track(Dup.random()) )
.put( "@", message.getString("#") )
.put( "ok", true);
}
private void handleIncomingAck(JSONObject ack) {
// FIXME
if(ack.has("put")) {
if(pendingFutures.containsKey(ack.getString("@"))) {
BaseFuture<? extends BaseFuture> future = pendingFutures.get(ack.getString("@"));
if(future instanceof FutureGet) {
((FutureGet) future).done(ack.getJSONObject("put"));
}
}
}
if(ack.has("ok")) {
if(pendingFutures.containsKey(ack.getString("@"))) {
BaseFuture<? extends BaseFuture> future = pendingFutures.get(ack.getString("@"));
if(future instanceof FuturePut) {
((FuturePut) future).done(ack.getBoolean("ok"));
}
}
}
}
public void sendPutRequest(JSONObject data) {

View File

@ -1,7 +1,8 @@
package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.*;
import io.github.chronosx88.JGUN.storageBackends.InMemoryGraph;
import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.PathRef;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@ -19,7 +20,7 @@ public class GunPeer extends WebSocketClient implements Peer {
public GunPeer(InetAddress address, int port, StorageBackend storage) throws URISyntaxException {
super(new URI("ws://" + address.getHostAddress() + ":" + port));
this.storage = storage;
this.dispatcher = new Dispatcher(storage, this);
this.dispatcher = new Dispatcher(storage, this, dup);
}
@Override
@ -29,23 +30,10 @@ public class GunPeer extends WebSocketClient implements Peer {
@Override
public void onMessage(String message) {
JSONObject msg = new JSONObject(message);
if(dup.check(msg.getString("#"))){ return; }
dup.track(msg.getString("#"));
if(msg.opt("put") != null) {
HAM.mix(new InMemoryGraph(msg.getJSONObject("put")), storage);
}
if(msg.opt("get") != null) {
InMemoryGraph getResults = Utils.getRequest(msg.getJSONObject("get"), storage);
JSONObject ack = new JSONObject()
.put("#", dup.track(Dup.random()))
.put("@", msg.getString("#"))
.put("put", getResults.toJSONObject());
emit(ack.toString());
}
System.out.println("---------------");
System.out.println(msg.toString(2));
emit(message);
JSONObject jsonMsg = new JSONObject(message);
if(dup.check(jsonMsg.getString("#"))){ return; }
dup.track(jsonMsg.getString("#"));
dispatcher.handleIncomingMessage(jsonMsg);
}
@Override