Compare commits

..

3 Commits

Author SHA1 Message Date
ChronosX88
08912a6f67 Implement referencing other nodes in graph 2023-11-17 22:23:38 +03:00
ChronosX88
bcda85a8b1 Make nodes ignoring their own requests, fix putting data to the graph 2023-11-17 22:23:38 +03:00
ChronosX88
a551581a99 Implement custom deserializer for NetworkMessage 2023-11-17 22:23:38 +03:00
15 changed files with 157 additions and 84 deletions

View File

@ -9,8 +9,8 @@ public class Gun {
private final StorageManager storageManager; private final StorageManager storageManager;
private final NetworkManager networkManager; private final NetworkManager networkManager;
public Gun(Storage storage, Peer peer) { public Gun(Storage storage, Peer peer) throws InterruptedException {
this.networkManager = new NetworkManager(peer); this.networkManager = new NetworkManager(peer, peer.getNetworkHandler());
this.storageManager = new StorageManager(storage, this.networkManager); this.storageManager = new StorageManager(storage, this.networkManager);
this.networkManager.start(); this.networkManager.start();
} }

View File

@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.storage.StorageManager; import io.github.chronosx88.JGUN.storage.StorageManager;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathReference { public class PathReference {
@ -23,12 +20,17 @@ public class PathReference {
private final NetworkManager networkManager; private final NetworkManager networkManager;
private final StorageManager storageManager; private final StorageManager storageManager;
private final Executor executorService = Executors.newCachedThreadPool();
public PathReference(NetworkManager networkManager, StorageManager storageManager) { public PathReference(NetworkManager networkManager, StorageManager storageManager) {
this.networkManager = networkManager; this.networkManager = networkManager;
this.storageManager = storageManager; this.storageManager = storageManager;
} }
public String[] getPath() {
return path.toArray(new String[0]);
}
public PathReference get(String key) { public PathReference get(String key) {
path.add(key); path.add(key);
return this; return this;
@ -85,36 +87,64 @@ public class PathReference {
.build()); .build());
nodeId = newNodeId; nodeId = newNodeId;
} }
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE);
} else { } else {
newNodeId = UUID.randomUUID().toString(); // merge updated node under parent ID
if (pathData.size() > 1) { String parentNodeId = pathData.get(pathData.size()-1);
String parentNodeId = pathData.get(pathData.size()-2); graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId);
graph.putNodes(parentNodeId, Node.builder() graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
.metadata(NodeMetadata.builder()
.nodeID(parentNodeId)
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
.build())
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
} else {
newNodeId = pathData.get(0);
}
} }
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE); graph.nodes.remove(NodeBuilder.ROOT_NODE);
return this.storageManager.putData(graph); return this.storageManager.putData(graph);
}); });
} }
public void on(NodeChangeListener changeListener) { public void on(NodeChangeListener changeListener) {
storageManager.addChangeListener(String.join("/", path), changeListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener);
});
} }
public void map(NodeChangeListener.Map forEachListener) { public void map(NodeChangeListener.Map mapListener) {
storageManager.addMapChangeListener(String.join("/", path), forEachListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener);
});
}
public CompletableFuture<Result> put(PathReference nodeReference) {
String[] pathToAnotherNode = nodeReference.getPath();
return CompletableFuture.supplyAsync(() -> {
try {
return storageManager.getPathData(pathToAnotherNode);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CompletionException(e);
}
}).thenComposeAsync(pathData -> {
if (pathData.size() < pathToAnotherNode.length) {
return CompletableFuture.failedFuture(new IllegalArgumentException("target node not found"));
}
String nodeId = pathData.get(pathData.size()-1);
MemoryGraph graph = new NodeBuilder()
.add(path.get(path.size() - 1), NodeLinkValue.builder()
.link(nodeId)
.build())
.build();
if (path.size() > 1) this.path.remove(path.size()-1);
return this.put(graph);
});
} }
} }

View File

@ -91,6 +91,10 @@ public class NodeBuilder {
return this; return this;
} }
public NodeBuilder add(String name, NodeLinkValue link) {
return addScalar(name, link);
}
public MemoryGraph build() { public MemoryGraph build() {
return this.graph; return this.graph;
} }

View File

@ -18,22 +18,22 @@ public class MainClient {
Storage storage = new MemoryStorage(); Storage storage = new MemoryStorage();
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage); NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
Gun gun = new Gun(storage, peer); Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder() Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John") .add("firstName", "Test")
.add("lastName", "Smith") .build()).get();
.add("age", 25) System.out.println(result);
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street") result = gun.get("person").get("address").put(new NodeBuilder()
.add("city", "New York") .add("city", "Dallas")
.add("state", "NY") .build()).get();
.add("postalCode", "10021")) System.out.println(result);
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder() result = gun.get("person").get("homeAddress").put(gun.get("person").get("address")).get();
.add("type", "home") System.out.println(result);
.add("number", "212 555-1234"))
.add(new NodeBuilder() result = gun.get("person").get("homeAddress").put(new NodeBuilder()
.add("type", "fax") .add("city", "New YORK CITY")
.add("number", "646 555-4567")))
.build()).get(); .build()).get();
System.out.println(result); System.out.println(result);
} }

View File

@ -1,9 +1,14 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.Gun; import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.NodeChangeListener;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder; import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder; import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.network.NetworkNode; import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage; import io.github.chronosx88.JGUN.storage.MemoryStorage;
@ -14,8 +19,8 @@ import java.util.concurrent.ExecutionException;
public class MainServer { public class MainServer {
public static void main(String[] args) throws ExecutionException, InterruptedException { public static void main(String[] args) throws ExecutionException, InterruptedException {
GatewayNetworkNode peer = new GatewayNetworkNode(5054, new MemoryStorage());
Storage storage = new MemoryStorage(); Storage storage = new MemoryStorage();
GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage);
Gun gun = new Gun(storage, peer); Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder() Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John") .add("firstName", "John")
@ -34,6 +39,22 @@ public class MainServer {
.add("type", "fax") .add("type", "fax")
.add("number", "646 555-4567"))) .add("number", "646 555-4567")))
.build()).get(); .build()).get();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
gun.get("person").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
gun.get("person").get("address").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
System.out.println(result.isOk()); System.out.println(result.isOk());
} }
} }

View File

@ -1,3 +1,5 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
public interface Request {} public interface Request {
String getId();
}

View File

@ -16,11 +16,11 @@ public class Dup {
.build(); .build();
} }
private void track(String id) { public void track(String id) {
cache.put(id, System.currentTimeMillis()); cache.put(id, System.currentTimeMillis());
} }
public boolean isDuplicated(String id) { public boolean checkDuplicated(String id) {
Long timestamp = null; Long timestamp = null;
try { try {
timestamp = cache.getIfPresent(id); timestamp = cache.getIfPresent(id);

View File

@ -74,4 +74,9 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
public int connectedPeerCount() { public int connectedPeerCount() {
return this.getConnections().size(); return this.getConnections().size();
} }
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import lombok.Getter;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -30,6 +31,8 @@ public class NetworkHandler {
private final Peer peer; private final Peer peer;
private final Storage storage; private final Storage storage;
@Getter
private final Dup dup; private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -59,7 +62,7 @@ public class NetworkHandler {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
if (dup.isDuplicated(parsedMessage.getId())) { if (dup.checkDuplicated(parsedMessage.getId())) {
// TODO log // TODO log
return; return;
} }
@ -78,6 +81,7 @@ public class NetworkHandler {
handleGetAck(ack.getData(), ack); handleGetAck(ack.getData(), ack);
} }
if (Objects.nonNull(response)) { if (Objects.nonNull(response)) {
this.dup.track(response.getId());
String respString; String respString;
try { try {
respString = objectMapper.writeValueAsString(response); respString = objectMapper.writeValueAsString(response);

View File

@ -21,6 +21,7 @@ public class NetworkManager {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Peer peer; private final Peer peer;
private final NetworkHandler networkHandler;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
@ -30,15 +31,16 @@ public class NetworkManager {
@Getter @Getter
private final int timeout; private final int timeout;
public NetworkManager(Peer peer) { public NetworkManager(Peer peer, NetworkHandler networkHandler) {
this.networkHandler = networkHandler;
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module()); objectMapper.registerModule(new Jdk8Module());
this.peer = peer; this.peer = peer;
this.timeout = peer.getTimeout(); this.timeout = peer.getTimeout();
} }
public void start() { public void start() throws InterruptedException {
executorService.execute(this.peer::start); this.peer.start();
} }
private <T extends Request> void sendRequest(T request) { private <T extends Request> void sendRequest(T request) {
@ -48,6 +50,7 @@ public class NetworkManager {
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.networkHandler.getDup().track(request.getId());
peer.emit(encodedRequest); peer.emit(encodedRequest);
} }

View File

@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer {
@Override @Override
public void onOpen(ServerHandshake handshakeData) { public void onOpen(ServerHandshake handshakeData) {
System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus()); System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus());
} }
@Override @Override
@ -56,8 +56,8 @@ public class NetworkNode extends WebSocketClient implements Peer {
} }
@Override @Override
public void start() { public void start() throws InterruptedException {
this.connect(); this.connectBlocking();
} }
@Override @Override
@ -70,4 +70,9 @@ public class NetworkNode extends WebSocketClient implements Peer {
if (this.isOpen()) return 1; if (this.isOpen()) return 1;
return 0; return 0;
} }
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -7,7 +7,8 @@ public interface Peer {
void emit(String data); void emit(String data);
void addPendingPutRequest(FuturePut futurePut); void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet); void addPendingGetRequest(FutureGet futureGet);
void start(); void start() throws InterruptedException;
int getTimeout(); int getTimeout();
int connectedPeerCount(); int connectedPeerCount();
NetworkHandler getNetworkHandler();
} }

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.graph.DeferredNode; import io.github.chronosx88.JGUN.models.graph.DeferredNode;
import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.graph.NodeValue; import io.github.chronosx88.JGUN.models.graph.NodeValue;
import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.index.qual.NonNegative;
@ -46,21 +47,23 @@ public class MemoryStorage extends Storage {
if (node != null && field != null) { if (node != null && field != null) {
NodeValue requestedField = node.getValues().get(field); NodeValue requestedField = node.getValues().get(field);
if (requestedField != null) { if (requestedField != null) {
Long requestedFieldState = node.getMetadata().getStates().get(field); node = Node.builder()
node.getValues().clear(); .metadata(NodeMetadata.builder()
node.getMetadata().getStates().clear(); .nodeID(node.getMetadata().getNodeID())
node.getValues().put(field, requestedField); .states(Map.of(field, node.getMetadata().getStates().get(field)))
node.getMetadata().getStates().put(field, requestedFieldState); .build())
.values(Map.of(field, requestedField))
.build();
} }
} }
return node; return node;
} }
@Override @Override
protected void updateNode(Node node) { protected void updateNode(Node newNode) {
Node currentNode = nodes.get(node.getMetadata().getNodeID()); Node currentNode = nodes.get(newNode.getMetadata().getNodeID());
currentNode.values.putAll(node.values); currentNode.values.putAll(newNode.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates());
} }
public void addNode(String id, Node incomingNode) { public void addNode(String id, Node incomingNode) {

View File

@ -95,7 +95,7 @@ public abstract class Storage {
continue; continue;
} }
if (Objects.isNull(changedNode)) { if (changedNode == null) {
changedNode = Node.builder() changedNode = Node.builder()
.metadata(NodeMetadata.builder() .metadata(NodeMetadata.builder()
.nodeID(incomingNode.getMetadata().getNodeID()) .nodeID(incomingNode.getMetadata().getNodeID())

View File

@ -40,7 +40,7 @@ public class StorageManager {
} }
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException { public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
List<String> nodeIds = new ArrayList<>(List.of(path[0])); List<String> nodeIds = new ArrayList<>();
String nodeId = path[0]; String nodeId = path[0];
for (int i = 0; i < path.length; i++) { for (int i = 0; i < path.length; i++) {
String field = null; String field = null;
@ -49,35 +49,30 @@ public class StorageManager {
} }
Node node = storage.getNode(nodeId, field); Node node = storage.getNode(nodeId, field);
if (node != null) { if (node != null) {
if (field != null) { if (field == null) {
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { nodeIds.add(nodeId);
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} else {
break; break;
} }
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} }
// proceeds to request from the network
var future = this.networkManager.sendGetRequest(GetRequestParams.builder() var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
.nodeId(nodeId) .nodeId(nodeId)
.field(field) .field(field)
.build()); .build());
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS); var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
if (result.getData() != null) { if (result.getData() == null || field == null) {
nodeIds.add(result.getData().getMetadata().getNodeID()); nodeIds.add(nodeId);
if (field != null) {
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} else {
break;
}
} else {
break; break;
} }
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} }
return nodeIds; return nodeIds;
} }