From bcda85a8b1642cd6c474216ff36ddd962914efe4 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 17 Nov 2023 20:00:00 +0300 Subject: [PATCH] Make nodes ignoring their own requests, fix putting data to the graph --- .../io/github/chronosx88/JGUN/api/Gun.java | 4 +- .../chronosx88/JGUN/api/PathReference.java | 53 ++++++++++--------- .../chronosx88/JGUN/examples/MainClient.java | 22 +++----- .../chronosx88/JGUN/examples/MainServer.java | 23 +++++++- .../JGUN/models/requests/Request.java | 4 +- .../github/chronosx88/JGUN/network/Dup.java | 4 +- .../JGUN/network/GatewayNetworkNode.java | 5 ++ .../JGUN/network/NetworkHandler.java | 6 ++- .../JGUN/network/NetworkManager.java | 9 ++-- .../chronosx88/JGUN/network/NetworkNode.java | 11 ++-- .../github/chronosx88/JGUN/network/Peer.java | 3 +- .../JGUN/storage/MemoryStorage.java | 21 ++++---- .../chronosx88/JGUN/storage/Storage.java | 2 +- .../JGUN/storage/StorageManager.java | 35 ++++++------ 14 files changed, 118 insertions(+), 84 deletions(-) diff --git a/src/main/java/io/github/chronosx88/JGUN/api/Gun.java b/src/main/java/io/github/chronosx88/JGUN/api/Gun.java index 4cdaaae..85734fb 100644 --- a/src/main/java/io/github/chronosx88/JGUN/api/Gun.java +++ b/src/main/java/io/github/chronosx88/JGUN/api/Gun.java @@ -9,8 +9,8 @@ public class Gun { private final StorageManager storageManager; private final NetworkManager networkManager; - public Gun(Storage storage, Peer peer) { - this.networkManager = new NetworkManager(peer); + public Gun(Storage storage, Peer peer) throws InterruptedException { + this.networkManager = new NetworkManager(peer, peer.getNetworkHandler()); this.storageManager = new StorageManager(storage, this.networkManager); this.networkManager.start(); } diff --git a/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java index 340ac16..96dbd7f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java +++ b/src/main/java/io/github/chronosx88/JGUN/api/PathReference.java @@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager; import io.github.chronosx88.JGUN.storage.StorageManager; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.stream.Stream; public class PathReference { @@ -23,6 +20,7 @@ public class PathReference { private final NetworkManager networkManager; private final StorageManager storageManager; + private final Executor executorService = Executors.newCachedThreadPool(); public PathReference(NetworkManager networkManager, StorageManager storageManager) { this.networkManager = networkManager; @@ -85,36 +83,41 @@ public class PathReference { .build()); nodeId = newNodeId; } + graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId); + graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE)); + graph.nodes.remove(NodeBuilder.ROOT_NODE); } else { - newNodeId = UUID.randomUUID().toString(); - if (pathData.size() > 1) { - String parentNodeId = pathData.get(pathData.size()-2); - graph.putNodes(parentNodeId, Node.builder() - .metadata(NodeMetadata.builder() - .nodeID(parentNodeId) - .states(Map.of(path.get(path.size()-1), System.currentTimeMillis())) - .build()) - .values(Map.of(path.get(path.size()-1), NodeLinkValue.builder() - .link(newNodeId) - .build())) - .build()); - } else { - newNodeId = pathData.get(0); - } - + // merge updated node under parent ID + String parentNodeId = pathData.get(pathData.size()-1); + graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId); + graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE)); } - graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId); - graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE)); graph.nodes.remove(NodeBuilder.ROOT_NODE); return this.storageManager.putData(graph); }); } public void on(NodeChangeListener changeListener) { - storageManager.addChangeListener(String.join("/", path), changeListener); + executorService.execute(() -> { + List pathData; + try { + pathData = storageManager.getPathData(path.toArray(new String[0])); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener); + }); } - public void map(NodeChangeListener.Map forEachListener) { - storageManager.addMapChangeListener(String.join("/", path), forEachListener); + public void map(NodeChangeListener.Map mapListener) { + executorService.execute(() -> { + List pathData; + try { + pathData = storageManager.getPathData(path.toArray(new String[0])); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener); + }); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java index 9c347cd..8021cf9 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainClient.java @@ -19,21 +19,13 @@ public class MainClient { NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage); Gun gun = new Gun(storage, peer); Result result = gun.get("person").put(new NodeBuilder() - .add("firstName", "John") - .add("lastName", "Smith") - .add("age", 25) - .add("address", new NodeBuilder() - .add("streetAddress", "21 2nd Street") - .add("city", "New York") - .add("state", "NY") - .add("postalCode", "10021")) - .add("phoneNumber", new ArrayBuilder() - .add(new NodeBuilder() - .add("type", "home") - .add("number", "212 555-1234")) - .add(new NodeBuilder() - .add("type", "fax") - .add("number", "646 555-4567"))) + .add("firstName", "ABCD") + .build()).get(); + System.out.println(result); + result = gun.get("person").get("address").put(new NodeBuilder() + .add("city", "HUY") + .add("ZIP", new NodeBuilder() + .add("post", "pochta rossii")) .build()).get(); System.out.println(result); } diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java index 63c31d6..683911f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java @@ -1,9 +1,14 @@ package io.github.chronosx88.JGUN.examples; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import io.github.chronosx88.JGUN.api.Gun; +import io.github.chronosx88.JGUN.api.NodeChangeListener; import io.github.chronosx88.JGUN.api.graph.ArrayBuilder; import io.github.chronosx88.JGUN.api.graph.NodeBuilder; import io.github.chronosx88.JGUN.models.Result; +import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.network.NetworkNode; import io.github.chronosx88.JGUN.storage.MemoryStorage; @@ -14,8 +19,8 @@ import java.util.concurrent.ExecutionException; public class MainServer { public static void main(String[] args) throws ExecutionException, InterruptedException { - GatewayNetworkNode peer = new GatewayNetworkNode(5054, new MemoryStorage()); Storage storage = new MemoryStorage(); + GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage); Gun gun = new Gun(storage, peer); Result result = gun.get("person").put(new NodeBuilder() .add("firstName", "John") @@ -34,6 +39,22 @@ public class MainServer { .add("type", "fax") .add("number", "646 555-4567"))) .build()).get(); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new Jdk8Module()); + gun.get("person").on(node -> { + try { + System.out.println(mapper.writeValueAsString(node)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + gun.get("person").get("address").on(node -> { + try { + System.out.println(mapper.writeValueAsString(node)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); System.out.println(result.isOk()); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java index b9eb61f..f808689 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/Request.java @@ -1,3 +1,5 @@ package io.github.chronosx88.JGUN.models.requests; -public interface Request {} +public interface Request { + String getId(); +} diff --git a/src/main/java/io/github/chronosx88/JGUN/network/Dup.java b/src/main/java/io/github/chronosx88/JGUN/network/Dup.java index dff3b9f..f3de0cb 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/Dup.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/Dup.java @@ -16,11 +16,11 @@ public class Dup { .build(); } - private void track(String id) { + public void track(String id) { cache.put(id, System.currentTimeMillis()); } - public boolean isDuplicated(String id) { + public boolean checkDuplicated(String id) { Long timestamp = null; try { timestamp = cache.getIfPresent(id); diff --git a/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java index fe99aaf..08b89f0 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java @@ -74,4 +74,9 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer { public int connectedPeerCount() { return this.getConnections().size(); } + + @Override + public NetworkHandler getNetworkHandler() { + return handler; + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java index 9891b82..4f11349 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java @@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue; import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.storage.Storage; +import lombok.Getter; import java.util.Map; import java.util.Objects; @@ -30,6 +31,8 @@ public class NetworkHandler { private final Peer peer; private final Storage storage; + + @Getter private final Dup dup; private final Executor executorService = Executors.newCachedThreadPool(); private final ObjectMapper objectMapper; @@ -59,7 +62,7 @@ public class NetworkHandler { throw new RuntimeException(e); } - if (dup.isDuplicated(parsedMessage.getId())) { + if (dup.checkDuplicated(parsedMessage.getId())) { // TODO log return; } @@ -78,6 +81,7 @@ public class NetworkHandler { handleGetAck(ack.getData(), ack); } if (Objects.nonNull(response)) { + this.dup.track(response.getId()); String respString; try { respString = objectMapper.writeValueAsString(response); diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java index 224efe9..e05c0b2 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java @@ -21,6 +21,7 @@ public class NetworkManager { private final ObjectMapper objectMapper; private final Peer peer; + private final NetworkHandler networkHandler; private final Executor executorService = Executors.newCachedThreadPool(); @@ -30,15 +31,16 @@ public class NetworkManager { @Getter private final int timeout; - public NetworkManager(Peer peer) { + public NetworkManager(Peer peer, NetworkHandler networkHandler) { + this.networkHandler = networkHandler; objectMapper = new ObjectMapper(); objectMapper.registerModule(new Jdk8Module()); this.peer = peer; this.timeout = peer.getTimeout(); } - public void start() { - executorService.execute(this.peer::start); + public void start() throws InterruptedException { + this.peer.start(); } private void sendRequest(T request) { @@ -48,6 +50,7 @@ public class NetworkManager { } catch (JsonProcessingException e) { throw new RuntimeException(e); } + this.networkHandler.getDup().track(request.getId()); peer.emit(encodedRequest); } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java index c808b69..efece43 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java @@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer { @Override public void onOpen(ServerHandshake handshakeData) { - System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus()); + System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus()); } @Override @@ -56,8 +56,8 @@ public class NetworkNode extends WebSocketClient implements Peer { } @Override - public void start() { - this.connect(); + public void start() throws InterruptedException { + this.connectBlocking(); } @Override @@ -70,4 +70,9 @@ public class NetworkNode extends WebSocketClient implements Peer { if (this.isOpen()) return 1; return 0; } + + @Override + public NetworkHandler getNetworkHandler() { + return handler; + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/Peer.java b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java index 6100f9f..c5d613e 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/Peer.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java @@ -7,7 +7,8 @@ public interface Peer { void emit(String data); void addPendingPutRequest(FuturePut futurePut); void addPendingGetRequest(FutureGet futureGet); - void start(); + void start() throws InterruptedException; int getTimeout(); int connectedPeerCount(); + NetworkHandler getNetworkHandler(); } diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java index ac152bd..7b14c0c 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java @@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; import io.github.chronosx88.JGUN.models.graph.DeferredNode; import io.github.chronosx88.JGUN.models.graph.Node; +import io.github.chronosx88.JGUN.models.graph.NodeMetadata; import io.github.chronosx88.JGUN.models.graph.NodeValue; import org.checkerframework.checker.index.qual.NonNegative; @@ -46,21 +47,23 @@ public class MemoryStorage extends Storage { if (node != null && field != null) { NodeValue requestedField = node.getValues().get(field); if (requestedField != null) { - Long requestedFieldState = node.getMetadata().getStates().get(field); - node.getValues().clear(); - node.getMetadata().getStates().clear(); - node.getValues().put(field, requestedField); - node.getMetadata().getStates().put(field, requestedFieldState); + node = Node.builder() + .metadata(NodeMetadata.builder() + .nodeID(node.getMetadata().getNodeID()) + .states(Map.of(field, node.getMetadata().getStates().get(field))) + .build()) + .values(Map.of(field, requestedField)) + .build(); } } return node; } @Override - protected void updateNode(Node node) { - Node currentNode = nodes.get(node.getMetadata().getNodeID()); - currentNode.values.putAll(node.values); - currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); + protected void updateNode(Node newNode) { + Node currentNode = nodes.get(newNode.getMetadata().getNodeID()); + currentNode.values.putAll(newNode.values); + currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates()); } public void addNode(String id, Node incomingNode) { diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java index cc79d03..ae7872a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java @@ -95,7 +95,7 @@ public abstract class Storage { continue; } - if (Objects.isNull(changedNode)) { + if (changedNode == null) { changedNode = Node.builder() .metadata(NodeMetadata.builder() .nodeID(incomingNode.getMetadata().getNodeID()) diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java b/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java index a51bccf..4212d03 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/StorageManager.java @@ -40,7 +40,7 @@ public class StorageManager { } public List getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException { - List nodeIds = new ArrayList<>(List.of(path[0])); + List nodeIds = new ArrayList<>(); String nodeId = path[0]; for (int i = 0; i < path.length; i++) { String field = null; @@ -49,35 +49,30 @@ public class StorageManager { } Node node = storage.getNode(nodeId, field); if (node != null) { - if (field != null) { - if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { - nodeId = ((NodeLinkValue) node.values.get(field)).getLink(); - nodeIds.add(nodeId); - continue; - } - } else { + if (field == null) { + nodeIds.add(nodeId); break; } + if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { + nodeId = ((NodeLinkValue) node.values.get(field)).getLink(); + nodeIds.add(nodeId); + continue; + } } - + // proceeds to request from the network var future = this.networkManager.sendGetRequest(GetRequestParams.builder() .nodeId(nodeId) .field(field) .build()); var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS); - if (result.getData() != null) { - nodeIds.add(result.getData().getMetadata().getNodeID()); - if (field != null) { - if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) { - nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink(); - nodeIds.add(nodeId); - } - } else { - break; - } - } else { + if (result.getData() == null || field == null) { + nodeIds.add(nodeId); break; } + if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) { + nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink(); + nodeIds.add(nodeId); + } } return nodeIds; }