From 36581fa9437ce3b1cb2bf560f609d5c991b9d84f Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 17 Nov 2023 07:10:59 +0300 Subject: [PATCH] Implement .put()/get() API properly --- .../java/io/github/chronosx88/JGUN/Gun.java | 72 ----------- .../github/chronosx88/JGUN/PathReference.java | 40 ------ .../BaseCompletableFuture.java | 2 +- .../github/chronosx88/JGUN/api/FutureGet.java | 15 +++ .../JGUN/{futures => api}/FuturePut.java | 4 +- .../io/github/chronosx88/JGUN/api/Gun.java | 23 ++++ .../chronosx88/JGUN/api/PathReference.java | 120 ++++++++++++++++++ .../JGUN/api/graph/NodeBuilder.java | 3 +- .../chronosx88/JGUN/examples/MainClient.java | 33 ++++- .../JGUN/examples/MainClientServer.java | 9 -- .../chronosx88/JGUN/examples/MainServer.java | 4 +- .../chronosx88/JGUN/futures/FutureGet.java | 7 - .../chronosx88/JGUN/futures/GetResult.java | 11 -- .../chronosx88/JGUN/models/BaseMessage.java | 4 +- .../chronosx88/JGUN/models/GetResult.java | 11 ++ .../JGUN/{futures => models}/Result.java | 4 +- .../models/acks/{BaseAck.java => Ack.java} | 3 +- .../chronosx88/JGUN/models/acks/GetAck.java | 6 +- .../JGUN/models/requests/GetRequest.java | 2 +- .../models/requests/GetRequestParams.java | 2 +- .../JGUN/models/requests/PutRequest.java | 6 +- .../JGUN/models/requests/Request.java | 3 + .../chronosx88/JGUN/{ => network}/Dup.java | 2 +- .../GatewayNetworkNode.java} | 33 +++-- .../JGUN/{ => network}/NetworkHandler.java | 90 +++++++------ .../JGUN/network/NetworkManager.java | 73 +++++++++++ .../NetworkNode.java} | 19 +-- .../github/chronosx88/JGUN/network/Peer.java | 12 ++ .../io/github/chronosx88/JGUN/nodes/Peer.java | 12 -- .../chronosx88/JGUN/{ => storage}/HAM.java | 10 +- .../JGUN/storage/MemoryStorage.java | 16 ++- .../chronosx88/JGUN/storage/Storage.java | 6 +- .../JGUN/storage/StorageManager.java | 102 +++++++++++++++ .../io/github/chronosx88/JGUN/utils/Pair.java | 11 ++ 34 files changed, 530 insertions(+), 240 deletions(-) delete mode 100644 src/main/java/io/github/chronosx88/JGUN/Gun.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/PathReference.java rename src/main/java/io/github/chronosx88/JGUN/{futures => api}/BaseCompletableFuture.java (86%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/api/FutureGet.java rename src/main/java/io/github/chronosx88/JGUN/{futures => api}/FuturePut.java (65%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/api/Gun.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/api/PathReference.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/GetResult.java rename src/main/java/io/github/chronosx88/JGUN/{futures => models}/Result.java (56%) rename src/main/java/io/github/chronosx88/JGUN/models/acks/{BaseAck.java => Ack.java} (86%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java rename src/main/java/io/github/chronosx88/JGUN/{ => network}/Dup.java (95%) rename src/main/java/io/github/chronosx88/JGUN/{nodes/GunSuperPeer.java => network/GatewayNetworkNode.java} (60%) rename src/main/java/io/github/chronosx88/JGUN/{ => network}/NetworkHandler.java (64%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java rename src/main/java/io/github/chronosx88/JGUN/{nodes/GunClient.java => network/NetworkNode.java} (77%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/network/Peer.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java rename src/main/java/io/github/chronosx88/JGUN/{ => storage}/HAM.java (80%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/utils/Pair.java diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java deleted file mode 100644 index 141ef98..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/Gun.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.github.chronosx88.JGUN; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import io.github.chronosx88.JGUN.futures.FuturePut; -import io.github.chronosx88.JGUN.models.MemoryGraph; -import io.github.chronosx88.JGUN.models.requests.PutRequest; -import io.github.chronosx88.JGUN.nodes.GunClient; -import io.github.chronosx88.JGUN.storage.Storage; - -import java.net.InetAddress; -import java.net.URISyntaxException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -public class Gun { - private GunClient peer; - private final Storage storage; - private final ObjectMapper objectMapper; - private final Executor executorService = Executors.newCachedThreadPool(); - - public Gun(InetAddress address, int port, Storage storage) { - this.objectMapper = new ObjectMapper(); - objectMapper.registerModule(new Jdk8Module()); - this.storage = storage; - try { - this.peer = new GunClient(address, port, storage); - this.peer.connectBlocking(); - } catch (URISyntaxException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - public PathReference get(String key) { - PathReference pathRef = new PathReference(this); - pathRef.get(key); - return pathRef; - } - - protected void addChangeListener(String nodeID, NodeChangeListener listener) { - storage.addChangeListener(nodeID, listener); - } - - protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) { - storage.addMapChangeListener(nodeID, listener); - } - - protected FuturePut sendPutRequest(MemoryGraph data) { - String reqID = Dup.random(); - executorService.execute(() -> { - storage.mergeUpdate(data); - var request = PutRequest.builder() - .id(reqID) - .graph(data) - .build(); - String encodedRequest; - try { - encodedRequest = this.objectMapper.writeValueAsString(request); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - peer.emit(encodedRequest); - }); - return new FuturePut(reqID); - } - - protected void sendGetRequest(String key, String field) { - // TODO - throw new UnsupportedOperationException("TODO"); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/PathReference.java deleted file mode 100644 index f872add..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/PathReference.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.github.chronosx88.JGUN; - -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; -import io.github.chronosx88.JGUN.models.MemoryGraph; - -import java.util.ArrayList; -import java.util.List; - -public class PathReference { - private final List path = new ArrayList<>(); - - private final Gun gun; - - public PathReference(Gun gun) { - this.gun = gun; - } - - public PathReference get(String key) { - path.add(key); - return this; - } - - public FutureGet once() { - // TODO - throw new UnsupportedOperationException("TODO"); - } - - public FuturePut put(MemoryGraph graph) { - return gun.sendPutRequest(graph); - } - - public void on(NodeChangeListener changeListener) { - gun.addChangeListener(String.join("/", path), changeListener); - } - - public void map(NodeChangeListener.Map forEachListener) { - gun.addMapChangeListener(String.join("/", path), forEachListener); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java b/src/main/java/io/github/chronosx88/JGUN/api/BaseCompletableFuture.java similarity index 86% rename from src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java rename to src/main/java/io/github/chronosx88/JGUN/api/BaseCompletableFuture.java index 26021a2..dc52b4e 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java +++ b/src/main/java/io/github/chronosx88/JGUN/api/BaseCompletableFuture.java @@ -1,4 +1,4 @@ -package io.github.chronosx88.JGUN.futures; +package io.github.chronosx88.JGUN.api; import lombok.Getter; diff --git a/src/main/java/io/github/chronosx88/JGUN/api/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/api/FutureGet.java new file mode 100644 index 0000000..2ec21ee --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/api/FutureGet.java @@ -0,0 +1,15 @@ +package io.github.chronosx88.JGUN.api; + +import io.github.chronosx88.JGUN.models.GetResult; +import io.github.chronosx88.JGUN.models.requests.GetRequestParams; +import lombok.Getter; + +@Getter +public class FutureGet extends BaseCompletableFuture { + private final GetRequestParams params; + + public FutureGet(String id, GetRequestParams params) { + super(id); + this.params = params; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java b/src/main/java/io/github/chronosx88/JGUN/api/FuturePut.java similarity index 65% rename from src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java rename to src/main/java/io/github/chronosx88/JGUN/api/FuturePut.java index 5be3e73..52bb37e 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java +++ b/src/main/java/io/github/chronosx88/JGUN/api/FuturePut.java @@ -1,4 +1,6 @@ -package io.github.chronosx88.JGUN.futures; +package io.github.chronosx88.JGUN.api; + +import io.github.chronosx88.JGUN.models.Result; /** * Return success of PUT operation diff --git a/src/main/java/io/github/chronosx88/JGUN/api/Gun.java b/src/main/java/io/github/chronosx88/JGUN/api/Gun.java new file mode 100644 index 0000000..4cdaaae --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/api/Gun.java @@ -0,0 +1,23 @@ +package io.github.chronosx88.JGUN.api; + +import io.github.chronosx88.JGUN.network.NetworkManager; +import io.github.chronosx88.JGUN.network.Peer; +import io.github.chronosx88.JGUN.storage.Storage; +import io.github.chronosx88.JGUN.storage.StorageManager; + +public class Gun { + private final StorageManager storageManager; + private final NetworkManager networkManager; + + public Gun(Storage storage, Peer peer) { + this.networkManager = new NetworkManager(peer); + this.storageManager = new StorageManager(storage, this.networkManager); + this.networkManager.start(); + } + + public PathReference get(String key) { + PathReference pathRef = new PathReference(networkManager, storageManager); + pathRef.get(key); + return pathRef; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java new file mode 100644 index 0000000..340ac16 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java @@ -0,0 +1,120 @@ +package io.github.chronosx88.JGUN.api; + +import io.github.chronosx88.JGUN.api.graph.NodeBuilder; +import io.github.chronosx88.JGUN.models.GetResult; +import io.github.chronosx88.JGUN.models.Result; +import io.github.chronosx88.JGUN.models.graph.MemoryGraph; +import io.github.chronosx88.JGUN.models.graph.Node; +import io.github.chronosx88.JGUN.models.graph.NodeMetadata; +import io.github.chronosx88.JGUN.models.graph.values.NodeLinkValue; +import io.github.chronosx88.JGUN.models.requests.GetRequestParams; +import io.github.chronosx88.JGUN.network.NetworkManager; +import io.github.chronosx88.JGUN.storage.StorageManager; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; + +public class PathReference { + private final List path = new ArrayList<>(); + + private final NetworkManager networkManager; + private final StorageManager storageManager; + + public PathReference(NetworkManager networkManager, StorageManager storageManager) { + this.networkManager = networkManager; + this.storageManager = storageManager; + } + + public PathReference get(String key) { + path.add(key); + return this; + } + + public CompletableFuture once() { + return CompletableFuture.supplyAsync(() -> { + try { + return storageManager.getPathData(path.toArray(new String[0])); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + throw new CompletionException(e); + } + }) + .thenComposeAsync(pathData -> { + if (pathData.size() < path.size()-1) { + return CompletableFuture.completedFuture(GetResult.builder().data(null).build()); + } + String field = null; + if (path.size() - pathData.size() == 1) { + field = path.get(path.size()-1); + } + + return storageManager.fetchNodeId(GetRequestParams.builder() + .nodeId(pathData.get(pathData.size()-1)) + .field(field) + .build()); + }); + } + + public CompletableFuture put(MemoryGraph graph) { + return CompletableFuture.supplyAsync(() -> { + try { + return storageManager.getPathData(path.toArray(new String[0])); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + throw new CompletionException(e); + } + }) + .thenComposeAsync(pathData -> { + String newNodeId = null; + if (pathData.size() < path.size()) { + String nodeId = pathData.get(pathData.size()-1); + int newNodeCount = path.size() - pathData.size(); + String[] pathNewItems = Arrays.stream(path.toArray(new String[0]), pathData.size(), path.size()).toArray(String[]::new); + for (int i = 0; i < newNodeCount; i++) { + newNodeId = UUID.randomUUID().toString(); + graph.putNodes(nodeId, Node.builder() + .metadata(NodeMetadata.builder() + .nodeID(nodeId) + .states(Map.of(pathNewItems[i], System.currentTimeMillis())) + .build()) + .values(Map.of(pathNewItems[i], NodeLinkValue.builder() + .link(newNodeId) + .build())) + .build()); + nodeId = newNodeId; + } + } else { + newNodeId = UUID.randomUUID().toString(); + if (pathData.size() > 1) { + String parentNodeId = pathData.get(pathData.size()-2); + graph.putNodes(parentNodeId, Node.builder() + .metadata(NodeMetadata.builder() + .nodeID(parentNodeId) + .states(Map.of(path.get(path.size()-1), System.currentTimeMillis())) + .build()) + .values(Map.of(path.get(path.size()-1), NodeLinkValue.builder() + .link(newNodeId) + .build())) + .build()); + } else { + newNodeId = pathData.get(0); + } + + } + graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId); + graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE)); + graph.nodes.remove(NodeBuilder.ROOT_NODE); + return this.storageManager.putData(graph); + }); + } + + public void on(NodeChangeListener changeListener) { + storageManager.addChangeListener(String.join("/", path), changeListener); + } + + public void map(NodeChangeListener.Map forEachListener) { + storageManager.addMapChangeListener(String.join("/", path), forEachListener); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/api/graph/NodeBuilder.java b/src/main/java/io/github/chronosx88/JGUN/api/graph/NodeBuilder.java index a4be063..05496b3 100644 --- a/src/main/java/io/github/chronosx88/JGUN/api/graph/NodeBuilder.java +++ b/src/main/java/io/github/chronosx88/JGUN/api/graph/NodeBuilder.java @@ -12,7 +12,7 @@ import java.util.UUID; public class NodeBuilder { private final MemoryGraph graph; private final Node rootNode; - protected static final String ROOT_NODE = "__ROOT__"; + public static final String ROOT_NODE = "__ROOT__"; public NodeBuilder() { this.graph = new MemoryGraph(); @@ -67,6 +67,7 @@ public class NodeBuilder { rootNode.values.put(name, NodeLinkValue.builder() .link(newNodeID) .build()); + rootNode.getMetadata().getStates().put(name, System.currentTimeMillis()); MemoryGraph innerGraph = builder.build(); innerGraph.nodes.get(ROOT_NODE).getMetadata().setNodeID(newNodeID); innerGraph.nodes.put(newNodeID, innerGraph.nodes.get(ROOT_NODE)); diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java index 3bc557c..9c347cd 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java @@ -1,15 +1,40 @@ package io.github.chronosx88.JGUN.examples; -import io.github.chronosx88.JGUN.nodes.GunClient; +import io.github.chronosx88.JGUN.api.Gun; +import io.github.chronosx88.JGUN.api.graph.ArrayBuilder; +import io.github.chronosx88.JGUN.api.graph.NodeBuilder; +import io.github.chronosx88.JGUN.models.Result; +import io.github.chronosx88.JGUN.network.NetworkNode; import io.github.chronosx88.JGUN.storage.MemoryStorage; +import io.github.chronosx88.JGUN.storage.Storage; import java.net.Inet4Address; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; public class MainClient { - public static void main(String[] args) throws URISyntaxException, UnknownHostException { - GunClient gunClient = new GunClient(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, new MemoryStorage()); - gunClient.connect(); + public static void main(String[] args) throws URISyntaxException, UnknownHostException, ExecutionException, InterruptedException { + Storage storage = new MemoryStorage(); + NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage); + Gun gun = new Gun(storage, peer); + Result result = gun.get("person").put(new NodeBuilder() + .add("firstName", "John") + .add("lastName", "Smith") + .add("age", 25) + .add("address", new NodeBuilder() + .add("streetAddress", "21 2nd Street") + .add("city", "New York") + .add("state", "NY") + .add("postalCode", "10021")) + .add("phoneNumber", new ArrayBuilder() + .add(new NodeBuilder() + .add("type", "home") + .add("number", "212 555-1234")) + .add(new NodeBuilder() + .add("type", "fax") + .add("number", "646 555-4567"))) + .build()).get(); + System.out.println(result); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java index 50a68b0..98309e7 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClientServer.java @@ -1,14 +1,5 @@ package io.github.chronosx88.JGUN.examples; -import io.github.chronosx88.JGUN.Gun; -import io.github.chronosx88.JGUN.nodes.GunSuperPeer; -import io.github.chronosx88.JGUN.storage.MemoryStorage; - -import java.net.Inet4Address; -import java.net.UnknownHostException; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - //public class MainClientServer { // public static void main(String[] args) { // GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new MemoryStorage()); diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java index e6835e1..0fe7948 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java @@ -1,11 +1,11 @@ package io.github.chronosx88.JGUN.examples; -import io.github.chronosx88.JGUN.nodes.GunSuperPeer; +import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.storage.MemoryStorage; public class MainServer { public static void main(String[] args) { - GunSuperPeer gunSuperNode = new GunSuperPeer(5054, new MemoryStorage()); + GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage()); gunSuperNode.start(); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java deleted file mode 100644 index b6064db..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.github.chronosx88.JGUN.futures; - -public class FutureGet extends BaseCompletableFuture { - public FutureGet(String id) { - super(id); - } -} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java b/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java deleted file mode 100644 index 86b545a..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.github.chronosx88.JGUN.futures; - -import io.github.chronosx88.JGUN.models.MemoryGraph; -import lombok.Getter; -import lombok.experimental.SuperBuilder; - -@Getter -@SuperBuilder -public class GetResult extends Result { - private final MemoryGraph data; -} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java index bbaa3d7..424d6eb 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java @@ -3,10 +3,11 @@ package io.github.chronosx88.JGUN.models; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.github.chronosx88.JGUN.models.acks.Ack; import io.github.chronosx88.JGUN.models.acks.GetAck; import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest; -import lombok.Builder; import lombok.Data; import lombok.experimental.SuperBuilder; @@ -15,6 +16,7 @@ import lombok.experimental.SuperBuilder; @JsonSubTypes({ @JsonSubTypes.Type(GetRequest.class), @JsonSubTypes.Type(PutRequest.class), + @JsonSubTypes.Type(Ack.class), @JsonSubTypes.Type(GetAck.class) }) @SuperBuilder diff --git a/src/main/java/io/github/chronosx88/JGUN/models/GetResult.java b/src/main/java/io/github/chronosx88/JGUN/models/GetResult.java new file mode 100644 index 0000000..a5eac4a --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/GetResult.java @@ -0,0 +1,11 @@ +package io.github.chronosx88.JGUN.models; + +import io.github.chronosx88.JGUN.models.graph.Node; +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +public class GetResult extends Result { + private final Node data; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/Result.java b/src/main/java/io/github/chronosx88/JGUN/models/Result.java similarity index 56% rename from src/main/java/io/github/chronosx88/JGUN/futures/Result.java rename to src/main/java/io/github/chronosx88/JGUN/models/Result.java index e824016..99c011f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/Result.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/Result.java @@ -1,7 +1,5 @@ -package io.github.chronosx88.JGUN.futures; +package io.github.chronosx88.JGUN.models; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.experimental.SuperBuilder; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java similarity index 86% rename from src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java rename to src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java index 702336f..2ccc3d8 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java @@ -2,7 +2,6 @@ package io.github.chronosx88.JGUN.models.acks; import com.fasterxml.jackson.annotation.JsonProperty; import io.github.chronosx88.JGUN.models.BaseMessage; -import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.SuperBuilder; @@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class BaseAck extends BaseMessage { +public class Ack extends BaseMessage { @JsonProperty("@") private String replyTo; private boolean ok; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java index 89d4d1c..382b59a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java @@ -1,6 +1,7 @@ package io.github.chronosx88.JGUN.models.acks; import com.fasterxml.jackson.annotation.JsonProperty; +import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import lombok.Data; import lombok.EqualsAndHashCode; @@ -11,7 +12,10 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class GetAck extends BaseAck { +public class GetAck extends BaseMessage { @JsonProperty("put") private MemoryGraph data; + @JsonProperty("@") + private String replyTo; + private boolean ok; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java index 67bdb18..f2fb2ca 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java @@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class GetRequest extends BaseMessage { +public class GetRequest extends BaseMessage implements Request { @JsonProperty("get") private GetRequestParams params; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java index 6b8305a..dbffbbf 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java @@ -10,7 +10,7 @@ import lombok.extern.jackson.Jacksonized; @Jacksonized public class GetRequestParams { @JsonProperty("#") - private String nodeID; + private String nodeId; @JsonProperty(".") private String field; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java index d866790..4582100 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java @@ -2,9 +2,7 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; import io.github.chronosx88.JGUN.models.BaseMessage; -import io.github.chronosx88.JGUN.models.MemoryGraph; -import io.github.chronosx88.JGUN.models.Node; -import lombok.Builder; +import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.SuperBuilder; @@ -14,7 +12,7 @@ import lombok.extern.jackson.Jacksonized; @Jacksonized @SuperBuilder @EqualsAndHashCode(callSuper = true) -public class PutRequest extends BaseMessage { +public class PutRequest extends BaseMessage implements Request { @JsonProperty("put") private MemoryGraph graph; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java new file mode 100644 index 0000000..b9eb61f --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java @@ -0,0 +1,3 @@ +package io.github.chronosx88.JGUN.models.requests; + +public interface Request {} diff --git a/src/main/java/io/github/chronosx88/JGUN/Dup.java b/src/main/java/io/github/chronosx88/JGUN/network/Dup.java similarity index 95% rename from src/main/java/io/github/chronosx88/JGUN/Dup.java rename to src/main/java/io/github/chronosx88/JGUN/network/Dup.java index acb4171..dff3b9f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Dup.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/Dup.java @@ -1,4 +1,4 @@ -package io.github.chronosx88.JGUN; +package io.github.chronosx88.JGUN.network; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java similarity index 60% rename from src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java rename to src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java index e052149..ddb8a0c 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunSuperPeer.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java @@ -1,7 +1,7 @@ -package io.github.chronosx88.JGUN.nodes; +package io.github.chronosx88.JGUN.network; -import io.github.chronosx88.JGUN.Dup; -import io.github.chronosx88.JGUN.NetworkHandler; +import io.github.chronosx88.JGUN.api.FutureGet; +import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.storage.Storage; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; @@ -9,13 +9,13 @@ import org.java_websocket.server.WebSocketServer; import java.net.InetSocketAddress; -public class GunSuperPeer extends WebSocketServer implements Peer { - private Dup dup = new Dup(1000*9); - private NetworkHandler handler; +public class GatewayNetworkNode extends WebSocketServer implements Peer { + private final NetworkHandler handler; - public GunSuperPeer(int port, Storage storage) { + public GatewayNetworkNode(int port, Storage storage) { super(new InetSocketAddress(port)); setReuseAddr(true); + Dup dup = new Dup(1000 * 9); handler = new NetworkHandler(storage, this, dup); } @@ -33,13 +33,13 @@ public class GunSuperPeer extends WebSocketServer implements Peer { @Override public void onMessage(WebSocket conn, String message) { - // TODO + handler.handleIncomingMessage(message); } @Override public void onError(WebSocket conn, Exception ex) { if(conn != null) { - System.out.println("# Exception occured on connection: " + conn.getRemoteSocketAddress()); + System.out.println("# Exception occurred on connection: " + conn.getRemoteSocketAddress()); } ex.printStackTrace(); } @@ -54,4 +54,19 @@ public class GunSuperPeer extends WebSocketServer implements Peer { conn.send(data); } } + + @Override + public void addPendingPutRequest(FuturePut futurePut) { + throw new UnsupportedOperationException("TODO"); // TODO + } + + @Override + public void addPendingGetRequest(FutureGet futureGet) { + throw new UnsupportedOperationException("TODO"); // TODO + } + + @Override + public int getTimeout() { + return 60; + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java similarity index 64% rename from src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java rename to src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java index b97c6c8..4b66d51 100644 --- a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java @@ -1,21 +1,21 @@ -package io.github.chronosx88.JGUN; +package io.github.chronosx88.JGUN.network; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; -import io.github.chronosx88.JGUN.futures.GetResult; -import io.github.chronosx88.JGUN.futures.Result; +import io.github.chronosx88.JGUN.api.FutureGet; +import io.github.chronosx88.JGUN.api.FuturePut; +import io.github.chronosx88.JGUN.models.GetResult; +import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.BaseMessage; -import io.github.chronosx88.JGUN.models.MemoryGraph; -import io.github.chronosx88.JGUN.models.Node; -import io.github.chronosx88.JGUN.models.NodeMetadata; -import io.github.chronosx88.JGUN.models.acks.BaseAck; +import io.github.chronosx88.JGUN.models.graph.MemoryGraph; +import io.github.chronosx88.JGUN.models.graph.Node; +import io.github.chronosx88.JGUN.models.graph.NodeMetadata; +import io.github.chronosx88.JGUN.models.acks.Ack; import io.github.chronosx88.JGUN.models.acks.GetAck; +import io.github.chronosx88.JGUN.models.graph.NodeValue; import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest; -import io.github.chronosx88.JGUN.nodes.Peer; import io.github.chronosx88.JGUN.storage.Storage; import java.util.Map; @@ -71,8 +71,11 @@ public class NetworkHandler { response = handleGet((GetRequest) msg); } else if (msg instanceof PutRequest) { response = handlePut((PutRequest) msg); - } else if (msg instanceof BaseAck) { - response = handleAck((BaseAck) msg); + } else if (msg instanceof Ack) { + handleAck((Ack) msg); + } else if (msg instanceof GetAck) { + var ack = (GetAck) msg; + handleGetAck(ack.getData(), ack); } if (Objects.nonNull(response)) { String respString; @@ -88,11 +91,16 @@ public class NetworkHandler { } private GetAck handleGet(GetRequest request) { - Node node = storage.getNode(request.getParams().getNodeID()); - if (Objects.isNull(node)) return null; + Node node = storage.getNode(request.getParams().getNodeId(), request.getParams().getField()); + if (Objects.isNull(node)) return GetAck.builder() + .id(Dup.random()) + .replyTo(request.getId()) + .data(new MemoryGraph()) + .ok(true) + .build(); String fieldName = request.getParams().getField(); if (Objects.nonNull(fieldName)) { - Object fieldValue = node.getValues().get(fieldName); + NodeValue fieldValue = node.values.get(fieldName); if (Objects.nonNull(fieldValue)) { node = Node.builder() .values(Map.of(fieldName, fieldValue)) @@ -103,48 +111,46 @@ public class NetworkHandler { .build(); } } + MemoryGraph data = new MemoryGraph(); + data.nodes = Map.of(node.getMetadata().getNodeID(), node); return GetAck.builder() .id(Dup.random()) .replyTo(request.getId()) - .data(MemoryGraph.builder() - .nodes(Map.of(node.getMetadata().getNodeID(), node)) - .build()) + .data(data) .ok(true) .build(); } - private BaseAck handlePut(PutRequest request) { + private Ack handlePut(PutRequest request) { storage.mergeUpdate(request.getGraph()); - return BaseAck.builder() + + return Ack.builder() .id(Dup.random()) .replyTo(request.getId()) .ok(true) .build(); } - private BaseAck handleAck(BaseAck ack) { - if (ack instanceof GetAck) { - FutureGet future = pendingGetRequests.get(ack.getReplyTo()); - if (Objects.nonNull(future)) { - GetAck getAck = (GetAck) ack; - future.complete(GetResult.builder() - .ok(getAck.isOk()) - .data(getAck.getData()) - .build()); - } - return handlePut(PutRequest - .builder() - .graph(((GetAck) ack).getData()) + private void handleGetAck(MemoryGraph graph, GetAck ack) { + storage.mergeUpdate(graph); + FutureGet future = pendingGetRequests.get(ack.getReplyTo()); + if (future != null) { + GetAck getAck = (GetAck) ack; + Node node = storage.getNode(future.getParams().getNodeId(), future.getParams().getField()); + future.complete(GetResult.builder() + .ok(getAck.isOk()) + .data(node) .build()); - } else { - FuturePut future = pendingPutRequests.get(ack.getReplyTo()); - if (Objects.nonNull(future)) { - future.complete(Result.builder() - .ok(ack.isOk()) - .build()); - } - System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }"); - return null; } } + + private void handleAck(Ack ack) { + FuturePut future = pendingPutRequests.get(ack.getReplyTo()); + if (Objects.nonNull(future)) { + future.complete(Result.builder() + .ok(ack.isOk()) + .build()); + } + System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }"); + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java new file mode 100644 index 0000000..86ab0b0 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java @@ -0,0 +1,73 @@ +package io.github.chronosx88.JGUN.network; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import io.github.chronosx88.JGUN.api.FutureGet; +import io.github.chronosx88.JGUN.api.FuturePut; +import io.github.chronosx88.JGUN.models.graph.MemoryGraph; +import io.github.chronosx88.JGUN.models.requests.GetRequest; +import io.github.chronosx88.JGUN.models.requests.GetRequestParams; +import io.github.chronosx88.JGUN.models.requests.PutRequest; +import io.github.chronosx88.JGUN.models.requests.Request; +import lombok.Getter; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +public class NetworkManager { + private final ObjectMapper objectMapper; + + private final Peer peer; + + private final Executor executorService = Executors.newCachedThreadPool(); + + /** + * Default network timeout (in seconds) + */ + @Getter + private final int timeout; + + public NetworkManager(Peer peer) { + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new Jdk8Module()); + this.peer = peer; + this.timeout = peer.getTimeout(); + } + + public void start() { + executorService.execute(this.peer::start); + } + + private void sendRequest(T request) { + String encodedRequest; + try { + encodedRequest = this.objectMapper.writeValueAsString(request); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + peer.emit(encodedRequest); + } + + public FuturePut sendPutRequest(MemoryGraph putData) { + String id = Dup.random(); + executorService.execute(() -> this.sendRequest(PutRequest.builder() + .id(id) + .graph(putData) + .build())); + var requestFuture = new FuturePut(id); + peer.addPendingPutRequest(requestFuture); + return requestFuture; + } + + public FutureGet sendGetRequest(GetRequestParams params) { + String id = Dup.random(); + executorService.execute(() -> this.sendRequest(GetRequest.builder() + .id(id) + .params(params) + .build())); + var requestFuture = new FutureGet(id, params); + peer.addPendingGetRequest(requestFuture); + return requestFuture; + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java similarity index 77% rename from src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java rename to src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java index bb78945..0a27379 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java @@ -1,9 +1,7 @@ -package io.github.chronosx88.JGUN.nodes; +package io.github.chronosx88.JGUN.network; -import io.github.chronosx88.JGUN.Dup; -import io.github.chronosx88.JGUN.NetworkHandler; -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; +import io.github.chronosx88.JGUN.api.FutureGet; +import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.storage.Storage; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; @@ -12,12 +10,12 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -public class GunClient extends WebSocketClient implements Peer { - private Dup dup = new Dup(1000*9); +public class NetworkNode extends WebSocketClient implements Peer { private final NetworkHandler handler; - public GunClient(InetAddress address, int port, Storage storage) throws URISyntaxException { + public NetworkNode(InetAddress address, int port, Storage storage) throws URISyntaxException { super(new URI("ws://" + address.getHostAddress() + ":" + port)); + Dup dup = new Dup(1000 * 9); this.handler = new NetworkHandler(storage, this, dup); } @@ -61,4 +59,9 @@ public class GunClient extends WebSocketClient implements Peer { public void start() { this.connect(); } + + @Override + public int getTimeout() { + return 60; + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/Peer.java b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java new file mode 100644 index 0000000..bbf87d0 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java @@ -0,0 +1,12 @@ +package io.github.chronosx88.JGUN.network; + +import io.github.chronosx88.JGUN.api.FutureGet; +import io.github.chronosx88.JGUN.api.FuturePut; + +public interface Peer { + void emit(String data); + void addPendingPutRequest(FuturePut futurePut); + void addPendingGetRequest(FutureGet futureGet); + void start(); + int getTimeout(); +} diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java b/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java deleted file mode 100644 index 8abfa46..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/Peer.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.github.chronosx88.JGUN.nodes; - -import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; -import io.github.chronosx88.JGUN.futures.FutureGet; -import io.github.chronosx88.JGUN.futures.FuturePut; - -public interface Peer { - void emit(String data); - void addPendingPutRequest(FuturePut futurePut); - void addPendingGetRequest(FutureGet futureGet); - void start(); -} diff --git a/src/main/java/io/github/chronosx88/JGUN/HAM.java b/src/main/java/io/github/chronosx88/JGUN/storage/HAM.java similarity index 80% rename from src/main/java/io/github/chronosx88/JGUN/HAM.java rename to src/main/java/io/github/chronosx88/JGUN/storage/HAM.java index df47c80..e615f27 100644 --- a/src/main/java/io/github/chronosx88/JGUN/HAM.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/HAM.java @@ -1,4 +1,6 @@ -package io.github.chronosx88.JGUN; +package io.github.chronosx88.JGUN.storage; + +import io.github.chronosx88.JGUN.models.graph.NodeValue; public class HAM { public static class HAMResult { @@ -8,7 +10,11 @@ public class HAM { public boolean current = false; // Leave current value } - public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException { + public static HAMResult ham(long machineState, + long incomingState, + long currentState, + NodeValue incomingValue, + NodeValue currentValue) { HAMResult result = new HAMResult(); if (machineState < incomingState) { diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java index feaa9c7..ac152bd 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java @@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; import io.github.chronosx88.JGUN.models.graph.DeferredNode; import io.github.chronosx88.JGUN.models.graph.Node; +import io.github.chronosx88.JGUN.models.graph.NodeValue; import org.checkerframework.checker.index.qual.NonNegative; import java.util.Collection; @@ -40,8 +41,19 @@ public class MemoryStorage extends Storage { }).build(); } - public Node getNode(String id) { - return nodes.get(id); + public Node getNode(String id, String field) { + Node node = nodes.get(id); + if (node != null && field != null) { + NodeValue requestedField = node.getValues().get(field); + if (requestedField != null) { + Long requestedFieldState = node.getMetadata().getStates().get(field); + node.getValues().clear(); + node.getMetadata().getStates().clear(); + node.getValues().put(field, requestedField); + node.getMetadata().getStates().put(field, requestedFieldState); + } + } + return node; } @Override diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java index 200be29..cc79d03 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java @@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue; import java.util.*; public abstract class Storage { - public abstract Node getNode(String id); + public abstract Node getNode(String id, String field); protected abstract void updateNode(Node node); @@ -75,9 +75,9 @@ public abstract class Storage { NodeValue value = incomingNode.getValues().get(key); long state = incomingNode.getMetadata().getStates().get(key); long previousState = -1; - Object currentValue = null; + NodeValue currentValue = null; if (this.hasNode(incomingNode.getMetadata().getNodeID())) { - Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID()); + Node currentNode = this.getNode(incomingNode.getMetadata().getNodeID(), key); Long prevStateFromStorage = currentNode.getMetadata().getStates().get(key); if (!Objects.isNull(prevStateFromStorage)) { previousState = prevStateFromStorage; diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java b/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java new file mode 100644 index 0000000..a51bccf --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java @@ -0,0 +1,102 @@ +package io.github.chronosx88.JGUN.storage; + +import io.github.chronosx88.JGUN.api.FuturePut; +import io.github.chronosx88.JGUN.api.NodeChangeListener; +import io.github.chronosx88.JGUN.models.GetResult; +import io.github.chronosx88.JGUN.models.graph.MemoryGraph; +import io.github.chronosx88.JGUN.models.graph.Node; +import io.github.chronosx88.JGUN.models.graph.NodeValue; +import io.github.chronosx88.JGUN.models.graph.values.NodeLinkValue; +import io.github.chronosx88.JGUN.models.requests.GetRequestParams; +import io.github.chronosx88.JGUN.network.NetworkManager; +import io.github.chronosx88.JGUN.utils.Pair; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +public class StorageManager { + private final Storage storage; + private final NetworkManager networkManager; + private final Executor executorService = Executors.newCachedThreadPool(); + + public StorageManager(Storage storage, NetworkManager networkManager) { + this.storage = storage; + this.networkManager = networkManager; + } + + public void addChangeListener(String nodeID, NodeChangeListener listener) { + storage.addChangeListener(nodeID, listener); + } + + public void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) { + storage.addMapChangeListener(nodeID, listener); + } + + public void mergeUpdate(MemoryGraph update) { + executorService.execute(() -> { + this.storage.mergeUpdate(update); + }); + } + + public List getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException { + List nodeIds = new ArrayList<>(List.of(path[0])); + String nodeId = path[0]; + for (int i = 0; i < path.length; i++) { + String field = null; + if (i+1 < path.length) { + field = path[i+1]; + } + Node node = storage.getNode(nodeId, field); + if (node != null) { + if (field != null) { + if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { + nodeId = ((NodeLinkValue) node.values.get(field)).getLink(); + nodeIds.add(nodeId); + continue; + } + } else { + break; + } + } + + var future = this.networkManager.sendGetRequest(GetRequestParams.builder() + .nodeId(nodeId) + .field(field) + .build()); + var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS); + if (result.getData() != null) { + nodeIds.add(result.getData().getMetadata().getNodeID()); + if (field != null) { + if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) { + nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink(); + nodeIds.add(nodeId); + } + } else { + break; + } + } else { + break; + } + } + return nodeIds; + } + + public FuturePut putData(MemoryGraph graph) { + this.storage.mergeUpdate(graph); + return this.networkManager.sendPutRequest(graph); + } + + public CompletableFuture fetchNodeId(GetRequestParams params) { + return CompletableFuture.supplyAsync(() -> this.storage.getNode(params.getNodeId(), params.getField())) + .thenCompose(node -> { + if (node != null) { + return CompletableFuture.completedFuture(GetResult.builder() + .ok(true) + .data(node) + .build()); + } + return networkManager.sendGetRequest(params); + }); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/utils/Pair.java b/src/main/java/io/github/chronosx88/JGUN/utils/Pair.java new file mode 100644 index 0000000..bf877f8 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/utils/Pair.java @@ -0,0 +1,11 @@ +package io.github.chronosx88.JGUN.utils; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class Pair { + private K first; + private V second; +} \ No newline at end of file