Compare commits

...

3 Commits

Author SHA1 Message Date
ChronosX88
08912a6f67 Implement referencing other nodes in graph 2023-11-17 22:23:38 +03:00
ChronosX88
bcda85a8b1 Make nodes ignoring their own requests, fix putting data to the graph 2023-11-17 22:23:38 +03:00
ChronosX88
a551581a99 Implement custom deserializer for NetworkMessage 2023-11-17 22:23:38 +03:00
21 changed files with 270 additions and 118 deletions

View File

@ -9,8 +9,8 @@ public class Gun {
private final StorageManager storageManager; private final StorageManager storageManager;
private final NetworkManager networkManager; private final NetworkManager networkManager;
public Gun(Storage storage, Peer peer) { public Gun(Storage storage, Peer peer) throws InterruptedException {
this.networkManager = new NetworkManager(peer); this.networkManager = new NetworkManager(peer, peer.getNetworkHandler());
this.storageManager = new StorageManager(storage, this.networkManager); this.storageManager = new StorageManager(storage, this.networkManager);
this.networkManager.start(); this.networkManager.start();
} }

View File

@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.storage.StorageManager; import io.github.chronosx88.JGUN.storage.StorageManager;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathReference { public class PathReference {
@ -23,12 +20,17 @@ public class PathReference {
private final NetworkManager networkManager; private final NetworkManager networkManager;
private final StorageManager storageManager; private final StorageManager storageManager;
private final Executor executorService = Executors.newCachedThreadPool();
public PathReference(NetworkManager networkManager, StorageManager storageManager) { public PathReference(NetworkManager networkManager, StorageManager storageManager) {
this.networkManager = networkManager; this.networkManager = networkManager;
this.storageManager = storageManager; this.storageManager = storageManager;
} }
public String[] getPath() {
return path.toArray(new String[0]);
}
public PathReference get(String key) { public PathReference get(String key) {
path.add(key); path.add(key);
return this; return this;
@ -85,36 +87,64 @@ public class PathReference {
.build()); .build());
nodeId = newNodeId; nodeId = newNodeId;
} }
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE);
} else { } else {
newNodeId = UUID.randomUUID().toString(); // merge updated node under parent ID
if (pathData.size() > 1) { String parentNodeId = pathData.get(pathData.size()-1);
String parentNodeId = pathData.get(pathData.size()-2); graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId);
graph.putNodes(parentNodeId, Node.builder() graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
.metadata(NodeMetadata.builder()
.nodeID(parentNodeId)
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
.build())
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
} else {
newNodeId = pathData.get(0);
}
} }
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE); graph.nodes.remove(NodeBuilder.ROOT_NODE);
return this.storageManager.putData(graph); return this.storageManager.putData(graph);
}); });
} }
public void on(NodeChangeListener changeListener) { public void on(NodeChangeListener changeListener) {
storageManager.addChangeListener(String.join("/", path), changeListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener);
});
} }
public void map(NodeChangeListener.Map forEachListener) { public void map(NodeChangeListener.Map mapListener) {
storageManager.addMapChangeListener(String.join("/", path), forEachListener); executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener);
});
}
public CompletableFuture<Result> put(PathReference nodeReference) {
String[] pathToAnotherNode = nodeReference.getPath();
return CompletableFuture.supplyAsync(() -> {
try {
return storageManager.getPathData(pathToAnotherNode);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CompletionException(e);
}
}).thenComposeAsync(pathData -> {
if (pathData.size() < pathToAnotherNode.length) {
return CompletableFuture.failedFuture(new IllegalArgumentException("target node not found"));
}
String nodeId = pathData.get(pathData.size()-1);
MemoryGraph graph = new NodeBuilder()
.add(path.get(path.size() - 1), NodeLinkValue.builder()
.link(nodeId)
.build())
.build();
if (path.size() > 1) this.path.remove(path.size()-1);
return this.put(graph);
});
} }
} }

View File

@ -91,6 +91,10 @@ public class NodeBuilder {
return this; return this;
} }
public NodeBuilder add(String name, NodeLinkValue link) {
return addScalar(name, link);
}
public MemoryGraph build() { public MemoryGraph build() {
return this.graph; return this.graph;
} }

View File

@ -18,22 +18,22 @@ public class MainClient {
Storage storage = new MemoryStorage(); Storage storage = new MemoryStorage();
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage); NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
Gun gun = new Gun(storage, peer); Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder() Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John") .add("firstName", "Test")
.add("lastName", "Smith") .build()).get();
.add("age", 25) System.out.println(result);
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street") result = gun.get("person").get("address").put(new NodeBuilder()
.add("city", "New York") .add("city", "Dallas")
.add("state", "NY") .build()).get();
.add("postalCode", "10021")) System.out.println(result);
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder() result = gun.get("person").get("homeAddress").put(gun.get("person").get("address")).get();
.add("type", "home") System.out.println(result);
.add("number", "212 555-1234"))
.add(new NodeBuilder() result = gun.get("person").get("homeAddress").put(new NodeBuilder()
.add("type", "fax") .add("city", "New YORK CITY")
.add("number", "646 555-4567")))
.build()).get(); .build()).get();
System.out.println(result); System.out.println(result);
} }

View File

@ -1,11 +1,60 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.NodeChangeListener;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage; import io.github.chronosx88.JGUN.storage.MemoryStorage;
import io.github.chronosx88.JGUN.storage.Storage;
import java.net.Inet4Address;
import java.util.concurrent.ExecutionException;
public class MainServer { public class MainServer {
public static void main(String[] args) { public static void main(String[] args) throws ExecutionException, InterruptedException {
GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage()); Storage storage = new MemoryStorage();
gunSuperNode.start(); GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage);
Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build()).get();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
gun.get("person").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
gun.get("person").get("address").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
System.out.println(result.isOk());
} }
} }

View File

@ -12,15 +12,9 @@ import lombok.Data;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) @JsonDeserialize(using = NetworkMessageDeserializer.class)
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(Ack.class),
@JsonSubTypes.Type(GetAck.class)
})
@SuperBuilder @SuperBuilder
public abstract class BaseMessage { public abstract class NetworkMessage {
@JsonProperty("#") @JsonProperty("#")
private String id; private String id;
} }

View File

@ -0,0 +1,37 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import io.github.chronosx88.JGUN.models.acks.Ack;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import java.io.IOException;
public class NetworkMessageDeserializer extends JsonDeserializer<NetworkMessage> {
@Override
public NetworkMessage deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
JsonNode node = p.readValueAsTree();
if (node.has("#")) {
// parsing ack
if (node.has("@")) {
if (node.has("put")) {
return p.getCodec().treeToValue(node, GetAck.class);
}
return p.getCodec().treeToValue(node, Ack.class);
}
if (node.has("get")) {
return p.getCodec().treeToValue(node, GetRequest.class);
} else if (node.has("put")) {
return p.getCodec().treeToValue(node, PutRequest.class);
}
}
throw new IllegalArgumentException("invalid message received");
}
}

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks; package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@ -11,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class Ack extends BaseMessage { public class Ack extends NetworkMessage {
@JsonProperty("@") @JsonProperty("@")
private String replyTo; private String replyTo;
private boolean ok; private boolean ok;

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks; package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class GetAck extends BaseMessage { public class GetAck extends NetworkMessage {
@JsonProperty("put") @JsonProperty("put")
private MemoryGraph data; private MemoryGraph data;
@JsonProperty("@") @JsonProperty("@")

View File

@ -1,8 +1,7 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class GetRequest extends BaseMessage implements Request { public class GetRequest extends NetworkMessage implements Request {
@JsonProperty("get") @JsonProperty("get")
private GetRequestParams params; private GetRequestParams params;
} }

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@Jacksonized @Jacksonized
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage implements Request { public class PutRequest extends NetworkMessage implements Request {
@JsonProperty("put") @JsonProperty("put")
private MemoryGraph graph; private MemoryGraph graph;
} }

View File

@ -1,3 +1,5 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
public interface Request {} public interface Request {
String getId();
}

View File

@ -16,11 +16,11 @@ public class Dup {
.build(); .build();
} }
private void track(String id) { public void track(String id) {
cache.put(id, System.currentTimeMillis()); cache.put(id, System.currentTimeMillis());
} }
public boolean isDuplicated(String id) { public boolean checkDuplicated(String id) {
Long timestamp = null; Long timestamp = null;
try { try {
timestamp = cache.getIfPresent(id); timestamp = cache.getIfPresent(id);

View File

@ -69,4 +69,14 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
public int getTimeout() { public int getTimeout() {
return 60; return 60;
} }
@Override
public int connectedPeerCount() {
return this.getConnections().size();
}
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult; import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata; import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import lombok.Getter;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -30,6 +31,8 @@ public class NetworkHandler {
private final Peer peer; private final Peer peer;
private final Storage storage; private final Storage storage;
@Getter
private final Dup dup; private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -52,21 +55,21 @@ public class NetworkHandler {
} }
public void handleIncomingMessage(String message) { public void handleIncomingMessage(String message) {
BaseMessage parsedMessage; NetworkMessage parsedMessage;
try { try {
parsedMessage = objectMapper.readValue(message, BaseMessage.class); parsedMessage = objectMapper.readValue(message, NetworkMessage.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
if (dup.isDuplicated(parsedMessage.getId())) { if (dup.checkDuplicated(parsedMessage.getId())) {
// TODO log // TODO log
return; return;
} }
final BaseMessage msg = parsedMessage; final NetworkMessage msg = parsedMessage;
executorService.execute(() -> { executorService.execute(() -> {
BaseMessage response = null; NetworkMessage response = null;
if (msg instanceof GetRequest) { if (msg instanceof GetRequest) {
response = handleGet((GetRequest) msg); response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) { } else if (msg instanceof PutRequest) {
@ -78,6 +81,7 @@ public class NetworkHandler {
handleGetAck(ack.getData(), ack); handleGetAck(ack.getData(), ack);
} }
if (Objects.nonNull(response)) { if (Objects.nonNull(response)) {
this.dup.track(response.getId());
String respString; String respString;
try { try {
respString = objectMapper.writeValueAsString(response); respString = objectMapper.writeValueAsString(response);

View File

@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.FutureGet; import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams; import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
@ -19,6 +21,7 @@ public class NetworkManager {
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Peer peer; private final Peer peer;
private final NetworkHandler networkHandler;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
@ -28,15 +31,16 @@ public class NetworkManager {
@Getter @Getter
private final int timeout; private final int timeout;
public NetworkManager(Peer peer) { public NetworkManager(Peer peer, NetworkHandler networkHandler) {
this.networkHandler = networkHandler;
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module()); objectMapper.registerModule(new Jdk8Module());
this.peer = peer; this.peer = peer;
this.timeout = peer.getTimeout(); this.timeout = peer.getTimeout();
} }
public void start() { public void start() throws InterruptedException {
executorService.execute(this.peer::start); this.peer.start();
} }
private <T extends Request> void sendRequest(T request) { private <T extends Request> void sendRequest(T request) {
@ -46,28 +50,36 @@ public class NetworkManager {
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.networkHandler.getDup().track(request.getId());
peer.emit(encodedRequest); peer.emit(encodedRequest);
} }
public FuturePut sendPutRequest(MemoryGraph putData) { public FuturePut sendPutRequest(MemoryGraph putData) {
String id = Dup.random(); String id = Dup.random();
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
var requestFuture = new FuturePut(id); var requestFuture = new FuturePut(id);
peer.addPendingPutRequest(requestFuture); if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
peer.addPendingPutRequest(requestFuture);
}
requestFuture.complete(Result.builder().ok(true).build());
return requestFuture; return requestFuture;
} }
public FutureGet sendGetRequest(GetRequestParams params) { public FutureGet sendGetRequest(GetRequestParams params) {
String id = Dup.random(); String id = Dup.random();
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
var requestFuture = new FutureGet(id, params); var requestFuture = new FutureGet(id, params);
peer.addPendingGetRequest(requestFuture); if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
peer.addPendingGetRequest(requestFuture);
} else {
requestFuture.complete(GetResult.builder().data(null).build());
}
return requestFuture; return requestFuture;
} }
} }

View File

@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer {
@Override @Override
public void onOpen(ServerHandshake handshakeData) { public void onOpen(ServerHandshake handshakeData) {
System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus()); System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus());
} }
@Override @Override
@ -56,12 +56,23 @@ public class NetworkNode extends WebSocketClient implements Peer {
} }
@Override @Override
public void start() { public void start() throws InterruptedException {
this.connect(); this.connectBlocking();
} }
@Override @Override
public int getTimeout() { public int getTimeout() {
return 60; return 60;
} }
@Override
public int connectedPeerCount() {
if (this.isOpen()) return 1;
return 0;
}
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
} }

View File

@ -7,6 +7,8 @@ public interface Peer {
void emit(String data); void emit(String data);
void addPendingPutRequest(FuturePut futurePut); void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet); void addPendingGetRequest(FutureGet futureGet);
void start(); void start() throws InterruptedException;
int getTimeout(); int getTimeout();
int connectedPeerCount();
NetworkHandler getNetworkHandler();
} }

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.graph.DeferredNode; import io.github.chronosx88.JGUN.models.graph.DeferredNode;
import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.graph.NodeValue; import io.github.chronosx88.JGUN.models.graph.NodeValue;
import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.index.qual.NonNegative;
@ -46,21 +47,23 @@ public class MemoryStorage extends Storage {
if (node != null && field != null) { if (node != null && field != null) {
NodeValue requestedField = node.getValues().get(field); NodeValue requestedField = node.getValues().get(field);
if (requestedField != null) { if (requestedField != null) {
Long requestedFieldState = node.getMetadata().getStates().get(field); node = Node.builder()
node.getValues().clear(); .metadata(NodeMetadata.builder()
node.getMetadata().getStates().clear(); .nodeID(node.getMetadata().getNodeID())
node.getValues().put(field, requestedField); .states(Map.of(field, node.getMetadata().getStates().get(field)))
node.getMetadata().getStates().put(field, requestedFieldState); .build())
.values(Map.of(field, requestedField))
.build();
} }
} }
return node; return node;
} }
@Override @Override
protected void updateNode(Node node) { protected void updateNode(Node newNode) {
Node currentNode = nodes.get(node.getMetadata().getNodeID()); Node currentNode = nodes.get(newNode.getMetadata().getNodeID());
currentNode.values.putAll(node.values); currentNode.values.putAll(newNode.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates());
} }
public void addNode(String id, Node incomingNode) { public void addNode(String id, Node incomingNode) {

View File

@ -95,7 +95,7 @@ public abstract class Storage {
continue; continue;
} }
if (Objects.isNull(changedNode)) { if (changedNode == null) {
changedNode = Node.builder() changedNode = Node.builder()
.metadata(NodeMetadata.builder() .metadata(NodeMetadata.builder()
.nodeID(incomingNode.getMetadata().getNodeID()) .nodeID(incomingNode.getMetadata().getNodeID())

View File

@ -40,7 +40,7 @@ public class StorageManager {
} }
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException { public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
List<String> nodeIds = new ArrayList<>(List.of(path[0])); List<String> nodeIds = new ArrayList<>();
String nodeId = path[0]; String nodeId = path[0];
for (int i = 0; i < path.length; i++) { for (int i = 0; i < path.length; i++) {
String field = null; String field = null;
@ -49,35 +49,30 @@ public class StorageManager {
} }
Node node = storage.getNode(nodeId, field); Node node = storage.getNode(nodeId, field);
if (node != null) { if (node != null) {
if (field != null) { if (field == null) {
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) { nodeIds.add(nodeId);
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} else {
break; break;
} }
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} }
// proceeds to request from the network
var future = this.networkManager.sendGetRequest(GetRequestParams.builder() var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
.nodeId(nodeId) .nodeId(nodeId)
.field(field) .field(field)
.build()); .build());
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS); var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
if (result.getData() != null) { if (result.getData() == null || field == null) {
nodeIds.add(result.getData().getMetadata().getNodeID()); nodeIds.add(nodeId);
if (field != null) {
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} else {
break;
}
} else {
break; break;
} }
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} }
return nodeIds; return nodeIds;
} }