Implement .put()/get() API properly

This commit is contained in:
ChronosX88 2023-11-17 07:10:59 +03:00
parent 45e013098f
commit 36581fa943
Signed by: ChronosXYZ
GPG Key ID: 52A90DE5862D8321
34 changed files with 530 additions and 240 deletions

View File

@ -1,72 +0,0 @@
package io.github.chronosx88.JGUN;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.storage.Storage;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Gun {
private GunClient peer;
private final Storage storage;
private final ObjectMapper objectMapper;
private final Executor executorService = Executors.newCachedThreadPool();
public Gun(InetAddress address, int port, Storage storage) {
this.objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
this.storage = storage;
try {
this.peer = new GunClient(address, port, storage);
this.peer.connectBlocking();
} catch (URISyntaxException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public PathReference get(String key) {
PathReference pathRef = new PathReference(this);
pathRef.get(key);
return pathRef;
}
protected void addChangeListener(String nodeID, NodeChangeListener listener) {
storage.addChangeListener(nodeID, listener);
}
protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
storage.addMapChangeListener(nodeID, listener);
}
protected FuturePut sendPutRequest(MemoryGraph data) {
String reqID = Dup.random();
executorService.execute(() -> {
storage.mergeUpdate(data);
var request = PutRequest.builder()
.id(reqID)
.graph(data)
.build();
String encodedRequest;
try {
encodedRequest = this.objectMapper.writeValueAsString(request);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
peer.emit(encodedRequest);
});
return new FuturePut(reqID);
}
protected void sendGetRequest(String key, String field) {
// TODO
throw new UnsupportedOperationException("TODO");
}
}

View File

@ -1,40 +0,0 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import java.util.ArrayList;
import java.util.List;
public class PathReference {
private final List<String> path = new ArrayList<>();
private final Gun gun;
public PathReference(Gun gun) {
this.gun = gun;
}
public PathReference get(String key) {
path.add(key);
return this;
}
public FutureGet once() {
// TODO
throw new UnsupportedOperationException("TODO");
}
public FuturePut put(MemoryGraph graph) {
return gun.sendPutRequest(graph);
}
public void on(NodeChangeListener changeListener) {
gun.addChangeListener(String.join("/", path), changeListener);
}
public void map(NodeChangeListener.Map forEachListener) {
gun.addMapChangeListener(String.join("/", path), forEachListener);
}
}

View File

@ -1,4 +1,4 @@
package io.github.chronosx88.JGUN.futures;
package io.github.chronosx88.JGUN.api;
import lombok.Getter;

View File

@ -0,0 +1,15 @@
package io.github.chronosx88.JGUN.api;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
import lombok.Getter;
@Getter
public class FutureGet extends BaseCompletableFuture<GetResult> {
private final GetRequestParams params;
public FutureGet(String id, GetRequestParams params) {
super(id);
this.params = params;
}
}

View File

@ -1,4 +1,6 @@
package io.github.chronosx88.JGUN.futures;
package io.github.chronosx88.JGUN.api;
import io.github.chronosx88.JGUN.models.Result;
/**
* Return success of PUT operation

View File

@ -0,0 +1,23 @@
package io.github.chronosx88.JGUN.api;
import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.network.Peer;
import io.github.chronosx88.JGUN.storage.Storage;
import io.github.chronosx88.JGUN.storage.StorageManager;
public class Gun {
private final StorageManager storageManager;
private final NetworkManager networkManager;
public Gun(Storage storage, Peer peer) {
this.networkManager = new NetworkManager(peer);
this.storageManager = new StorageManager(storage, this.networkManager);
this.networkManager.start();
}
public PathReference get(String key) {
PathReference pathRef = new PathReference(networkManager, storageManager);
pathRef.get(key);
return pathRef;
}
}

View File

@ -0,0 +1,120 @@
package io.github.chronosx88.JGUN.api;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.graph.values.NodeLinkValue;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.storage.StorageManager;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
public class PathReference {
private final List<String> path = new ArrayList<>();
private final NetworkManager networkManager;
private final StorageManager storageManager;
public PathReference(NetworkManager networkManager, StorageManager storageManager) {
this.networkManager = networkManager;
this.storageManager = storageManager;
}
public PathReference get(String key) {
path.add(key);
return this;
}
public CompletableFuture<GetResult> once() {
return CompletableFuture.supplyAsync(() -> {
try {
return storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CompletionException(e);
}
})
.thenComposeAsync(pathData -> {
if (pathData.size() < path.size()-1) {
return CompletableFuture.completedFuture(GetResult.builder().data(null).build());
}
String field = null;
if (path.size() - pathData.size() == 1) {
field = path.get(path.size()-1);
}
return storageManager.fetchNodeId(GetRequestParams.builder()
.nodeId(pathData.get(pathData.size()-1))
.field(field)
.build());
});
}
public CompletableFuture<Result> put(MemoryGraph graph) {
return CompletableFuture.supplyAsync(() -> {
try {
return storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CompletionException(e);
}
})
.thenComposeAsync(pathData -> {
String newNodeId = null;
if (pathData.size() < path.size()) {
String nodeId = pathData.get(pathData.size()-1);
int newNodeCount = path.size() - pathData.size();
String[] pathNewItems = Arrays.stream(path.toArray(new String[0]), pathData.size(), path.size()).toArray(String[]::new);
for (int i = 0; i < newNodeCount; i++) {
newNodeId = UUID.randomUUID().toString();
graph.putNodes(nodeId, Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(nodeId)
.states(Map.of(pathNewItems[i], System.currentTimeMillis()))
.build())
.values(Map.of(pathNewItems[i], NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
nodeId = newNodeId;
}
} else {
newNodeId = UUID.randomUUID().toString();
if (pathData.size() > 1) {
String parentNodeId = pathData.get(pathData.size()-2);
graph.putNodes(parentNodeId, Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(parentNodeId)
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
.build())
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
} else {
newNodeId = pathData.get(0);
}
}
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE);
return this.storageManager.putData(graph);
});
}
public void on(NodeChangeListener changeListener) {
storageManager.addChangeListener(String.join("/", path), changeListener);
}
public void map(NodeChangeListener.Map forEachListener) {
storageManager.addMapChangeListener(String.join("/", path), forEachListener);
}
}

View File

@ -12,7 +12,7 @@ import java.util.UUID;
public class NodeBuilder {
private final MemoryGraph graph;
private final Node rootNode;
protected static final String ROOT_NODE = "__ROOT__";
public static final String ROOT_NODE = "__ROOT__";
public NodeBuilder() {
this.graph = new MemoryGraph();
@ -67,6 +67,7 @@ public class NodeBuilder {
rootNode.values.put(name, NodeLinkValue.builder()
.link(newNodeID)
.build());
rootNode.getMetadata().getStates().put(name, System.currentTimeMillis());
MemoryGraph innerGraph = builder.build();
innerGraph.nodes.get(ROOT_NODE).getMetadata().setNodeID(newNodeID);
innerGraph.nodes.put(newNodeID, innerGraph.nodes.get(ROOT_NODE));

View File

@ -1,15 +1,40 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage;
import io.github.chronosx88.JGUN.storage.Storage;
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
public class MainClient {
public static void main(String[] args) throws URISyntaxException, UnknownHostException {
GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new MemoryStorage());
gunClient.connect();
public static void main(String[] args) throws URISyntaxException, UnknownHostException, ExecutionException, InterruptedException {
Storage storage = new MemoryStorage();
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build()).get();
System.out.println(result);
}
}

View File

@ -1,14 +1,5 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.Gun;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
import io.github.chronosx88.JGUN.storage.MemoryStorage;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
//public class MainClientServer {
// public static void main(String[] args) {
// GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new MemoryStorage());

View File

@ -1,11 +1,11 @@
package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.nodes.GunSuperPeer;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage;
public class MainServer {
public static void main(String[] args) {
GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new MemoryStorage());
GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage());
gunSuperNode.start();
}
}

View File

@ -1,7 +0,0 @@
package io.github.chronosx88.JGUN.futures;
public class FutureGet extends BaseCompletableFuture<GetResult> {
public FutureGet(String id) {
super(id);
}
}

View File

@ -1,11 +0,0 @@
package io.github.chronosx88.JGUN.futures;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class GetResult extends Result {
private final MemoryGraph data;
}

View File

@ -3,10 +3,11 @@ package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.github.chronosx88.JGUN.models.acks.Ack;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;
@ -15,6 +16,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(Ack.class),
@JsonSubTypes.Type(GetAck.class)
})
@SuperBuilder

View File

@ -0,0 +1,11 @@
package io.github.chronosx88.JGUN.models;
import io.github.chronosx88.JGUN.models.graph.Node;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class GetResult extends Result {
private final Node data;
}

View File

@ -1,7 +1,5 @@
package io.github.chronosx88.JGUN.futures;
package io.github.chronosx88.JGUN.models;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

View File

@ -2,7 +2,6 @@ package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class BaseAck extends BaseMessage {
public class Ack extends BaseMessage {
@JsonProperty("@")
private String replyTo;
private boolean ok;

View File

@ -1,6 +1,7 @@
package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -11,7 +12,10 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetAck extends BaseAck {
public class GetAck extends BaseMessage {
@JsonProperty("put")
private MemoryGraph data;
@JsonProperty("@")
private String replyTo;
private boolean ok;
}

View File

@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetRequest extends BaseMessage {
public class GetRequest extends BaseMessage implements Request {
@JsonProperty("get")
private GetRequestParams params;
}

View File

@ -10,7 +10,7 @@ import lombok.extern.jackson.Jacksonized;
@Jacksonized
public class GetRequestParams {
@JsonProperty("#")
private String nodeID;
private String nodeId;
@JsonProperty(".")
private String field;

View File

@ -2,9 +2,7 @@ package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import lombok.Builder;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
@ -14,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@Jacksonized
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage {
public class PutRequest extends BaseMessage implements Request {
@JsonProperty("put")
private MemoryGraph graph;
}

View File

@ -0,0 +1,3 @@
package io.github.chronosx88.JGUN.models.requests;
public interface Request {}

View File

@ -1,4 +1,4 @@
package io.github.chronosx88.JGUN;
package io.github.chronosx88.JGUN.network;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.nodes;
package io.github.chronosx88.JGUN.network;
import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.NetworkHandler;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
@ -9,13 +9,13 @@ import org.java_websocket.server.WebSocketServer;
import java.net.InetSocketAddress;
public class GunSuperPeer extends WebSocketServer implements Peer {
private Dup dup = new Dup(1000*9);
private NetworkHandler handler;
public class GatewayNetworkNode extends WebSocketServer implements Peer {
private final NetworkHandler handler;
public GunSuperPeer(int port, Storage storage) {
public GatewayNetworkNode(int port, Storage storage) {
super(new InetSocketAddress(port));
setReuseAddr(true);
Dup dup = new Dup(1000 * 9);
handler = new NetworkHandler(storage, this, dup);
}
@ -33,13 +33,13 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
@Override
public void onMessage(WebSocket conn, String message) {
// TODO
handler.handleIncomingMessage(message);
}
@Override
public void onError(WebSocket conn, Exception ex) {
if(conn != null) {
System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress());
System.out.println("# Exception occurred on connection: " + conn.getRemoteSocketAddress());
}
ex.printStackTrace();
}
@ -54,4 +54,19 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
conn.send(data);
}
}
@Override
public void addPendingPutRequest(FuturePut futurePut) {
throw new UnsupportedOperationException("TODO"); // TODO
}
@Override
public void addPendingGetRequest(FutureGet futureGet) {
throw new UnsupportedOperationException("TODO"); // TODO
}
@Override
public int getTimeout() {
return 60;
}
}

View File

@ -1,21 +1,21 @@
package io.github.chronosx88.JGUN;
package io.github.chronosx88.JGUN.network;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.futures.GetResult;
import io.github.chronosx88.JGUN.futures.Result;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import io.github.chronosx88.JGUN.models.acks.BaseAck;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.acks.Ack;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storage.Storage;
import java.util.Map;
@ -71,8 +71,11 @@ public class NetworkHandler {
response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) {
response = handlePut((PutRequest) msg);
} else if (msg instanceof BaseAck) {
response = handleAck((BaseAck) msg);
} else if (msg instanceof Ack) {
handleAck((Ack) msg);
} else if (msg instanceof GetAck) {
var ack = (GetAck) msg;
handleGetAck(ack.getData(), ack);
}
if (Objects.nonNull(response)) {
String respString;
@ -88,11 +91,16 @@ public class NetworkHandler {
}
private GetAck handleGet(GetRequest request) {
Node node = storage.getNode(request.getParams().getNodeID());
if (Objects.isNull(node)) return null;
Node node = storage.getNode(request.getParams().getNodeId(), request.getParams().getField());
if (Objects.isNull(node)) return GetAck.builder()
.id(Dup.random())
.replyTo(request.getId())
.data(new MemoryGraph())
.ok(true)
.build();
String fieldName = request.getParams().getField();
if (Objects.nonNull(fieldName)) {
Object fieldValue = node.getValues().get(fieldName);
NodeValue fieldValue = node.values.get(fieldName);
if (Objects.nonNull(fieldValue)) {
node = Node.builder()
.values(Map.of(fieldName, fieldValue))
@ -103,48 +111,46 @@ public class NetworkHandler {
.build();
}
}
MemoryGraph data = new MemoryGraph();
data.nodes = Map.of(node.getMetadata().getNodeID(), node);
return GetAck.builder()
.id(Dup.random())
.replyTo(request.getId())
.data(MemoryGraph.builder()
.nodes(Map.of(node.getMetadata().getNodeID(), node))
.build())
.data(data)
.ok(true)
.build();
}
private BaseAck handlePut(PutRequest request) {
private Ack handlePut(PutRequest request) {
storage.mergeUpdate(request.getGraph());
return BaseAck.builder()
return Ack.builder()
.id(Dup.random())
.replyTo(request.getId())
.ok(true)
.build();
}
private BaseAck handleAck(BaseAck ack) {
if (ack instanceof GetAck) {
FutureGet future = pendingGetRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
GetAck getAck = (GetAck) ack;
future.complete(GetResult.builder()
.ok(getAck.isOk())
.data(getAck.getData())
.build());
}
return handlePut(PutRequest
.builder()
.graph(((GetAck) ack).getData())
private void handleGetAck(MemoryGraph graph, GetAck ack) {
storage.mergeUpdate(graph);
FutureGet future = pendingGetRequests.get(ack.getReplyTo());
if (future != null) {
GetAck getAck = (GetAck) ack;
Node node = storage.getNode(future.getParams().getNodeId(), future.getParams().getField());
future.complete(GetResult.builder()
.ok(getAck.isOk())
.data(node)
.build());
} else {
FuturePut future = pendingPutRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
future.complete(Result.builder()
.ok(ack.isOk())
.build());
}
System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }");
return null;
}
}
private void handleAck(Ack ack) {
FuturePut future = pendingPutRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
future.complete(Result.builder()
.ok(ack.isOk())
.build());
}
System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }");
}
}

View File

@ -0,0 +1,73 @@
package io.github.chronosx88.JGUN.network;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.models.requests.Request;
import lombok.Getter;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class NetworkManager {
private final ObjectMapper objectMapper;
private final Peer peer;
private final Executor executorService = Executors.newCachedThreadPool();
/**
* Default network timeout (in seconds)
*/
@Getter
private final int timeout;
public NetworkManager(Peer peer) {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
this.peer = peer;
this.timeout = peer.getTimeout();
}
public void start() {
executorService.execute(this.peer::start);
}
private <T extends Request> void sendRequest(T request) {
String encodedRequest;
try {
encodedRequest = this.objectMapper.writeValueAsString(request);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
peer.emit(encodedRequest);
}
public FuturePut sendPutRequest(MemoryGraph putData) {
String id = Dup.random();
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
var requestFuture = new FuturePut(id);
peer.addPendingPutRequest(requestFuture);
return requestFuture;
}
public FutureGet sendGetRequest(GetRequestParams params) {
String id = Dup.random();
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
var requestFuture = new FutureGet(id, params);
peer.addPendingGetRequest(requestFuture);
return requestFuture;
}
}

View File

@ -1,9 +1,7 @@
package io.github.chronosx88.JGUN.nodes;
package io.github.chronosx88.JGUN.network;
import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.NetworkHandler;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@ -12,12 +10,12 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
public class GunClient extends WebSocketClient implements Peer {
private Dup dup = new Dup(1000*9);
public class NetworkNode extends WebSocketClient implements Peer {
private final NetworkHandler handler;
public GunClient(InetAddress address, int port, Storage storage) throws URISyntaxException {
public NetworkNode(InetAddress address, int port, Storage storage) throws URISyntaxException {
super(new URI("ws://" + address.getHostAddress() + ":" + port));
Dup dup = new Dup(1000 * 9);
this.handler = new NetworkHandler(storage, this, dup);
}
@ -61,4 +59,9 @@ public class GunClient extends WebSocketClient implements Peer {
public void start() {
this.connect();
}
@Override
public int getTimeout() {
return 60;
}
}

View File

@ -0,0 +1,12 @@
package io.github.chronosx88.JGUN.network;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
public interface Peer {
void emit(String data);
void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet);
void start();
int getTimeout();
}

View File

@ -1,12 +0,0 @@
package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
public interface Peer {
void emit(String data);
void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet);
void start();
}

View File

@ -1,4 +1,6 @@
package io.github.chronosx88.JGUN;
package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.models.graph.NodeValue;
public class HAM {
public static class HAMResult {
@ -8,7 +10,11 @@ public class HAM {
public boolean current = false; // Leave current value
}
public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException {
public static HAMResult ham(long machineState,
long incomingState,
long currentState,
NodeValue incomingValue,
NodeValue currentValue) {
HAMResult result = new HAMResult();
if (machineState < incomingState) {

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.graph.DeferredNode;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeValue;
import org.checkerframework.checker.index.qual.NonNegative;
import java.util.Collection;
@ -40,8 +41,19 @@ public class MemoryStorage extends Storage {
}).build();
}
public Node getNode(String id) {
return nodes.get(id);
public Node getNode(String id, String field) {
Node node = nodes.get(id);
if (node != null && field != null) {
NodeValue requestedField = node.getValues().get(field);
if (requestedField != null) {
Long requestedFieldState = node.getMetadata().getStates().get(field);
node.getValues().clear();
node.getMetadata().getStates().clear();
node.getValues().put(field, requestedField);
node.getMetadata().getStates().put(field, requestedFieldState);
}
}
return node;
}
@Override

View File

@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
import java.util.*;
public abstract class Storage {
public abstract Node getNode(String id);
public abstract Node getNode(String id, String field);
protected abstract void updateNode(Node node);
@ -75,9 +75,9 @@ public abstract class Storage {
NodeValue value = incomingNode.getValues().get(key);
long state = incomingNode.getMetadata().getStates().get(key);
long previousState = -1;
Object currentValue = null;
NodeValue currentValue = null;
if (this.hasNode(incomingNode.getMetadata().getNodeID())) {
Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID());
Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID(), key);
Long prevStateFromStorage = currentNode.getMetadata().getStates().get(key);
if (!Objects.isNull(prevStateFromStorage)) {
previousState = prevStateFromStorage;

View File

@ -0,0 +1,102 @@
package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.api.NodeChangeListener;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.graph.values.NodeLinkValue;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class StorageManager {
private final Storage storage;
private final NetworkManager networkManager;
private final Executor executorService = Executors.newCachedThreadPool();
public StorageManager(Storage storage, NetworkManager networkManager) {
this.storage = storage;
this.networkManager = networkManager;
}
public void addChangeListener(String nodeID, NodeChangeListener listener) {
storage.addChangeListener(nodeID, listener);
}
public void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
storage.addMapChangeListener(nodeID, listener);
}
public void mergeUpdate(MemoryGraph update) {
executorService.execute(() -> {
this.storage.mergeUpdate(update);
});
}
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
List<String> nodeIds = new ArrayList<>(List.of(path[0]));
String nodeId = path[0];
for (int i = 0; i < path.length; i++) {
String field = null;
if (i+1 < path.length) {
field = path[i+1];
}
Node node = storage.getNode(nodeId, field);
if (node != null) {
if (field != null) {
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} else {
break;
}
}
var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
.nodeId(nodeId)
.field(field)
.build());
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
if (result.getData() != null) {
nodeIds.add(result.getData().getMetadata().getNodeID());
if (field != null) {
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} else {
break;
}
} else {
break;
}
}
return nodeIds;
}
public FuturePut putData(MemoryGraph graph) {
this.storage.mergeUpdate(graph);
return this.networkManager.sendPutRequest(graph);
}
public CompletableFuture<GetResult> fetchNodeId(GetRequestParams params) {
return CompletableFuture.supplyAsync(() -> this.storage.getNode(params.getNodeId(), params.getField()))
.thenCompose(node -> {
if (node != null) {
return CompletableFuture.completedFuture(GetResult.builder()
.ok(true)
.data(node)
.build());
}
return networkManager.sendGetRequest(params);
});
}
}

View File

@ -0,0 +1,11 @@
package io.github.chronosx88.JGUN.utils;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public class Pair<K, V> {
private K first;
private V second;
}