Make nodes ignoring their own requests, fix putting data to the graph

This commit is contained in:
ChronosX88 2023-11-17 20:00:00 +03:00
parent a551581a99
commit bcda85a8b1
14 changed files with 118 additions and 84 deletions

View File

@ -9,8 +9,8 @@ public class Gun {
private final StorageManager storageManager; private final StorageManager storageManager;
private final NetworkManager networkManager; private final NetworkManager networkManager;
public Gun(Storage storage, Peer peer) { public Gun(Storage storage, Peer peer) throws InterruptedException {
this.networkManager = new NetworkManager(peer); this.networkManager = new NetworkManager(peer, peer.getNetworkHandler());
this.storageManager = new StorageManager(storage, this.networkManager); this.storageManager = new StorageManager(storage, this.networkManager);
this.networkManager.start(); this.networkManager.start();
} }

View File

@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.storage.StorageManager; import io.github.chronosx88.JGUN.storage.StorageManager;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathReference { public class PathReference {
@ -23,6 +20,7 @@ public class PathReference {
private final NetworkManager networkManager; private final NetworkManager networkManager;
private final StorageManager storageManager; private final StorageManager storageManager;
private final Executor executorService = Executors.newCachedThreadPool();
public PathReference(NetworkManager networkManager, StorageManager storageManager) { public PathReference(NetworkManager networkManager, StorageManager storageManager) {
this.networkManager = networkManager; this.networkManager = networkManager;
@ -85,36 +83,41 @@ public class PathReference {
.build()); .build());
nodeId = newNodeId; nodeId = newNodeId;
} }
} else {
newNodeId = UUID.randomUUID().toString();
if (pathData.size() > 1) {
String parentNodeId = pathData.get(pathData.size()-2);
graph.putNodes(parentNodeId, Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(parentNodeId)
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
.build())
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
} else {
newNodeId = pathData.get(0);
}
}
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId); graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE)); graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE); graph.nodes.remove(NodeBuilder.ROOT_NODE);
} else {
// merge updated node under parent ID
String parentNodeId = pathData.get(pathData.size()-1);
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId);
graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
}
graph.nodes.remove(NodeBuilder.ROOT_NODE);
return this.storageManager.putData(graph); return this.storageManager.putData(graph);
}); });
} }
public void on(NodeChangeListener changeListener) { public void on(NodeChangeListener changeListener) {
storageManager.addChangeListener(String.join("/", path), changeListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener);
});
} }
public void map(NodeChangeListener.Map forEachListener) { public void map(NodeChangeListener.Map mapListener) {
storageManager.addMapChangeListener(String.join("/", path), forEachListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener);
});
} }
} }

View File

@ -19,21 +19,13 @@ public class MainClient {
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage); NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
Gun gun = new Gun(storage, peer); Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder() Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John") .add("firstName", "ABCD")
.add("lastName", "Smith") .build()).get();
.add("age", 25) System.out.println(result);
.add("address", new NodeBuilder() result = gun.get("person").get("address").put(new NodeBuilder()
.add("streetAddress", "21 2nd Street") .add("city", "HUY")
.add("city", "New York") .add("ZIP", new NodeBuilder()
.add("state", "NY") .add("post", "pochta rossii"))
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build()).get(); .build()).get();
System.out.println(result); System.out.println(result);
} }

View File

@ -1,9 +1,14 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.Gun; import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.NodeChangeListener;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder; import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder; import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.network.NetworkNode; import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage; import io.github.chronosx88.JGUN.storage.MemoryStorage;
@ -14,8 +19,8 @@ import java.util.concurrent.ExecutionException;
public class MainServer { public class MainServer {
public static void main(String[] args) throws ExecutionException, InterruptedException { public static void main(String[] args) throws ExecutionException, InterruptedException {
GatewayNetworkNode peer = new GatewayNetworkNode(5054, new MemoryStorage());
Storage storage = new MemoryStorage(); Storage storage = new MemoryStorage();
GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage);
Gun gun = new Gun(storage, peer); Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder() Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John") .add("firstName", "John")
@ -34,6 +39,22 @@ public class MainServer {
.add("type", "fax") .add("type", "fax")
.add("number", "646 555-4567"))) .add("number", "646 555-4567")))
.build()).get(); .build()).get();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
gun.get("person").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
gun.get("person").get("address").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
System.out.println(result.isOk()); System.out.println(result.isOk());
} }
} }

View File

@ -1,3 +1,5 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
public interface Request {} public interface Request {
String getId();
}

View File

@ -16,11 +16,11 @@ public class Dup {
.build(); .build();
} }
private void track(String id) { public void track(String id) {
cache.put(id, System.currentTimeMillis()); cache.put(id, System.currentTimeMillis());
} }
public boolean isDuplicated(String id) { public boolean checkDuplicated(String id) {
Long timestamp = null; Long timestamp = null;
try { try {
timestamp = cache.getIfPresent(id); timestamp = cache.getIfPresent(id);

View File

@ -74,4 +74,9 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
public int connectedPeerCount() { public int connectedPeerCount() {
return this.getConnections().size(); return this.getConnections().size();
} }
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import lombok.Getter;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -30,6 +31,8 @@ public class NetworkHandler {
private final Peer peer; private final Peer peer;
private final Storage storage; private final Storage storage;
@Getter
private final Dup dup; private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -59,7 +62,7 @@ public class NetworkHandler {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
if (dup.isDuplicated(parsedMessage.getId())) { if (dup.checkDuplicated(parsedMessage.getId())) {
// TODO log // TODO log
return; return;
} }
@ -78,6 +81,7 @@ public class NetworkHandler {
handleGetAck(ack.getData(), ack); handleGetAck(ack.getData(), ack);
} }
if (Objects.nonNull(response)) { if (Objects.nonNull(response)) {
this.dup.track(response.getId());
String respString; String respString;
try { try {
respString = objectMapper.writeValueAsString(response); respString = objectMapper.writeValueAsString(response);

View File

@ -21,6 +21,7 @@ public class NetworkManager {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Peer peer; private final Peer peer;
private final NetworkHandler networkHandler;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
@ -30,15 +31,16 @@ public class NetworkManager {
@Getter @Getter
private final int timeout; private final int timeout;
public NetworkManager(Peer peer) { public NetworkManager(Peer peer, NetworkHandler networkHandler) {
this.networkHandler = networkHandler;
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module()); objectMapper.registerModule(new Jdk8Module());
this.peer = peer; this.peer = peer;
this.timeout = peer.getTimeout(); this.timeout = peer.getTimeout();
} }
public void start() { public void start() throws InterruptedException {
executorService.execute(this.peer::start); this.peer.start();
} }
private <T extends Request> void sendRequest(T request) { private <T extends Request> void sendRequest(T request) {
@ -48,6 +50,7 @@ public class NetworkManager {
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.networkHandler.getDup().track(request.getId());
peer.emit(encodedRequest); peer.emit(encodedRequest);
} }

View File

@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer {
@Override @Override
public void onOpen(ServerHandshake handshakeData) { public void onOpen(ServerHandshake handshakeData) {
System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus()); System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus());
} }
@Override @Override
@ -56,8 +56,8 @@ public class NetworkNode extends WebSocketClient implements Peer {
} }
@Override @Override
public void start() { public void start() throws InterruptedException {
this.connect(); this.connectBlocking();
} }
@Override @Override
@ -70,4 +70,9 @@ public class NetworkNode extends WebSocketClient implements Peer {
if (this.isOpen()) return 1; if (this.isOpen()) return 1;
return 0; return 0;
} }
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -7,7 +7,8 @@ public interface Peer {
void emit(String data); void emit(String data);
void addPendingPutRequest(FuturePut futurePut); void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet); void addPendingGetRequest(FutureGet futureGet);
void start(); void start() throws InterruptedException;
int getTimeout(); int getTimeout();
int connectedPeerCount(); int connectedPeerCount();
NetworkHandler getNetworkHandler();
} }

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.graph.DeferredNode; import io.github.chronosx88.JGUN.models.graph.DeferredNode;
import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.graph.NodeValue; import io.github.chronosx88.JGUN.models.graph.NodeValue;
import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.index.qual.NonNegative;
@ -46,21 +47,23 @@ public class MemoryStorage extends Storage {
if (node != null && field != null) { if (node != null && field != null) {
NodeValue requestedField = node.getValues().get(field); NodeValue requestedField = node.getValues().get(field);
if (requestedField != null) { if (requestedField != null) {
Long requestedFieldState = node.getMetadata().getStates().get(field); node = Node.builder()
node.getValues().clear(); .metadata(NodeMetadata.builder()
node.getMetadata().getStates().clear(); .nodeID(node.getMetadata().getNodeID())
node.getValues().put(field, requestedField); .states(Map.of(field, node.getMetadata().getStates().get(field)))
node.getMetadata().getStates().put(field, requestedFieldState); .build())
.values(Map.of(field, requestedField))
.build();
} }
} }
return node; return node;
} }
@Override @Override
protected void updateNode(Node node) { protected void updateNode(Node newNode) {
Node currentNode = nodes.get(node.getMetadata().getNodeID()); Node currentNode = nodes.get(newNode.getMetadata().getNodeID());
currentNode.values.putAll(node.values); currentNode.values.putAll(newNode.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates());
} }
public void addNode(String id, Node incomingNode) { public void addNode(String id, Node incomingNode) {

View File

@ -95,7 +95,7 @@ public abstract class Storage {
continue; continue;
} }
if (Objects.isNull(changedNode)) { if (changedNode == null) {
changedNode = Node.builder() changedNode = Node.builder()
.metadata(NodeMetadata.builder() .metadata(NodeMetadata.builder()
.nodeID(incomingNode.getMetadata().getNodeID()) .nodeID(incomingNode.getMetadata().getNodeID())

View File

@ -40,7 +40,7 @@ public class StorageManager {
} }
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException { public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
List<String> nodeIds = new ArrayList<>(List.of(path[0])); List<String> nodeIds = new ArrayList<>();
String nodeId = path[0]; String nodeId = path[0];
for (int i = 0; i < path.length; i++) { for (int i = 0; i < path.length; i++) {
String field = null; String field = null;
@ -49,35 +49,30 @@ public class StorageManager {
} }
Node node = storage.getNode(nodeId, field); Node node = storage.getNode(nodeId, field);
if (node != null) { if (node != null) {
if (field != null) { if (field == null) {
nodeIds.add(nodeId);
break;
}
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink(); nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId); nodeIds.add(nodeId);
continue; continue;
} }
} else {
break;
} }
} // proceeds to request from the network
var future = this.networkManager.sendGetRequest(GetRequestParams.builder() var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
.nodeId(nodeId) .nodeId(nodeId)
.field(field) .field(field)
.build()); .build());
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS); var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
if (result.getData() != null) { if (result.getData() == null || field == null) {
nodeIds.add(result.getData().getMetadata().getNodeID()); nodeIds.add(nodeId);
if (field != null) { break;
}
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) { if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink(); nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId); nodeIds.add(nodeId);
} }
} else {
break;
}
} else {
break;
}
} }
return nodeIds; return nodeIds;
} }