mirror of
https://github.com/ChronosX88/JGUN.git
synced 2024-10-23 10:11:01 +00:00
Compare commits
3 Commits
637d31d124
...
08912a6f67
Author | SHA1 | Date | |
---|---|---|---|
|
08912a6f67 | ||
|
bcda85a8b1 | ||
|
a551581a99 |
@ -9,8 +9,8 @@ public class Gun {
|
|||||||
private final StorageManager storageManager;
|
private final StorageManager storageManager;
|
||||||
private final NetworkManager networkManager;
|
private final NetworkManager networkManager;
|
||||||
|
|
||||||
public Gun(Storage storage, Peer peer) {
|
public Gun(Storage storage, Peer peer) throws InterruptedException {
|
||||||
this.networkManager = new NetworkManager(peer);
|
this.networkManager = new NetworkManager(peer, peer.getNetworkHandler());
|
||||||
this.storageManager = new StorageManager(storage, this.networkManager);
|
this.storageManager = new StorageManager(storage, this.networkManager);
|
||||||
this.networkManager.start();
|
this.networkManager.start();
|
||||||
}
|
}
|
||||||
|
@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager;
|
|||||||
import io.github.chronosx88.JGUN.storage.StorageManager;
|
import io.github.chronosx88.JGUN.storage.StorageManager;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.CompletionException;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class PathReference {
|
public class PathReference {
|
||||||
@ -23,12 +20,17 @@ public class PathReference {
|
|||||||
|
|
||||||
private final NetworkManager networkManager;
|
private final NetworkManager networkManager;
|
||||||
private final StorageManager storageManager;
|
private final StorageManager storageManager;
|
||||||
|
private final Executor executorService = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
public PathReference(NetworkManager networkManager, StorageManager storageManager) {
|
public PathReference(NetworkManager networkManager, StorageManager storageManager) {
|
||||||
this.networkManager = networkManager;
|
this.networkManager = networkManager;
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String[] getPath() {
|
||||||
|
return path.toArray(new String[0]);
|
||||||
|
}
|
||||||
|
|
||||||
public PathReference get(String key) {
|
public PathReference get(String key) {
|
||||||
path.add(key);
|
path.add(key);
|
||||||
return this;
|
return this;
|
||||||
@ -85,36 +87,64 @@ public class PathReference {
|
|||||||
.build());
|
.build());
|
||||||
nodeId = newNodeId;
|
nodeId = newNodeId;
|
||||||
}
|
}
|
||||||
|
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
|
||||||
|
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
|
||||||
|
graph.nodes.remove(NodeBuilder.ROOT_NODE);
|
||||||
} else {
|
} else {
|
||||||
newNodeId = UUID.randomUUID().toString();
|
// merge updated node under parent ID
|
||||||
if (pathData.size() > 1) {
|
String parentNodeId = pathData.get(pathData.size()-1);
|
||||||
String parentNodeId = pathData.get(pathData.size()-2);
|
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId);
|
||||||
graph.putNodes(parentNodeId, Node.builder()
|
graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
|
||||||
.metadata(NodeMetadata.builder()
|
|
||||||
.nodeID(parentNodeId)
|
|
||||||
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
|
|
||||||
.build())
|
|
||||||
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
|
|
||||||
.link(newNodeId)
|
|
||||||
.build()))
|
|
||||||
.build());
|
|
||||||
} else {
|
|
||||||
newNodeId = pathData.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
|
|
||||||
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
|
|
||||||
graph.nodes.remove(NodeBuilder.ROOT_NODE);
|
graph.nodes.remove(NodeBuilder.ROOT_NODE);
|
||||||
return this.storageManager.putData(graph);
|
return this.storageManager.putData(graph);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void on(NodeChangeListener changeListener) {
|
public void on(NodeChangeListener changeListener) {
|
||||||
storageManager.addChangeListener(String.join("/", path), changeListener);
|
executorService.execute(() -> {
|
||||||
|
List<String> pathData;
|
||||||
|
try {
|
||||||
|
pathData = storageManager.getPathData(path.toArray(new String[0]));
|
||||||
|
} catch (TimeoutException | ExecutionException | InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void map(NodeChangeListener.Map forEachListener) {
|
public void map(NodeChangeListener.Map mapListener) {
|
||||||
storageManager.addMapChangeListener(String.join("/", path), forEachListener);
|
executorService.execute(() -> {
|
||||||
|
List<String> pathData;
|
||||||
|
try {
|
||||||
|
pathData = storageManager.getPathData(path.toArray(new String[0]));
|
||||||
|
} catch (TimeoutException | ExecutionException | InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Result> put(PathReference nodeReference) {
|
||||||
|
String[] pathToAnotherNode = nodeReference.getPath();
|
||||||
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return storageManager.getPathData(pathToAnotherNode);
|
||||||
|
} catch (TimeoutException | ExecutionException | InterruptedException e) {
|
||||||
|
throw new CompletionException(e);
|
||||||
|
}
|
||||||
|
}).thenComposeAsync(pathData -> {
|
||||||
|
if (pathData.size() < pathToAnotherNode.length) {
|
||||||
|
return CompletableFuture.failedFuture(new IllegalArgumentException("target node not found"));
|
||||||
|
}
|
||||||
|
String nodeId = pathData.get(pathData.size()-1);
|
||||||
|
MemoryGraph graph = new NodeBuilder()
|
||||||
|
.add(path.get(path.size() - 1), NodeLinkValue.builder()
|
||||||
|
.link(nodeId)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
if (path.size() > 1) this.path.remove(path.size()-1);
|
||||||
|
return this.put(graph);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,10 @@ public class NodeBuilder {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NodeBuilder add(String name, NodeLinkValue link) {
|
||||||
|
return addScalar(name, link);
|
||||||
|
}
|
||||||
|
|
||||||
public MemoryGraph build() {
|
public MemoryGraph build() {
|
||||||
return this.graph;
|
return this.graph;
|
||||||
}
|
}
|
||||||
|
@ -18,22 +18,22 @@ public class MainClient {
|
|||||||
Storage storage = new MemoryStorage();
|
Storage storage = new MemoryStorage();
|
||||||
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
|
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
|
||||||
Gun gun = new Gun(storage, peer);
|
Gun gun = new Gun(storage, peer);
|
||||||
|
|
||||||
Result result = gun.get("person").put(new NodeBuilder()
|
Result result = gun.get("person").put(new NodeBuilder()
|
||||||
.add("firstName", "John")
|
.add("firstName", "Test")
|
||||||
.add("lastName", "Smith")
|
.build()).get();
|
||||||
.add("age", 25)
|
System.out.println(result);
|
||||||
.add("address", new NodeBuilder()
|
|
||||||
.add("streetAddress", "21 2nd Street")
|
result = gun.get("person").get("address").put(new NodeBuilder()
|
||||||
.add("city", "New York")
|
.add("city", "Dallas")
|
||||||
.add("state", "NY")
|
.build()).get();
|
||||||
.add("postalCode", "10021"))
|
System.out.println(result);
|
||||||
.add("phoneNumber", new ArrayBuilder()
|
|
||||||
.add(new NodeBuilder()
|
result = gun.get("person").get("homeAddress").put(gun.get("person").get("address")).get();
|
||||||
.add("type", "home")
|
System.out.println(result);
|
||||||
.add("number", "212 555-1234"))
|
|
||||||
.add(new NodeBuilder()
|
result = gun.get("person").get("homeAddress").put(new NodeBuilder()
|
||||||
.add("type", "fax")
|
.add("city", "New YORK CITY")
|
||||||
.add("number", "646 555-4567")))
|
|
||||||
.build()).get();
|
.build()).get();
|
||||||
System.out.println(result);
|
System.out.println(result);
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,60 @@
|
|||||||
package io.github.chronosx88.JGUN.examples;
|
package io.github.chronosx88.JGUN.examples;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
||||||
|
import io.github.chronosx88.JGUN.api.Gun;
|
||||||
|
import io.github.chronosx88.JGUN.api.NodeChangeListener;
|
||||||
|
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
|
||||||
|
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
|
||||||
|
import io.github.chronosx88.JGUN.models.Result;
|
||||||
|
import io.github.chronosx88.JGUN.models.graph.Node;
|
||||||
import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
|
import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
|
||||||
|
import io.github.chronosx88.JGUN.network.NetworkNode;
|
||||||
import io.github.chronosx88.JGUN.storage.MemoryStorage;
|
import io.github.chronosx88.JGUN.storage.MemoryStorage;
|
||||||
|
import io.github.chronosx88.JGUN.storage.Storage;
|
||||||
|
|
||||||
|
import java.net.Inet4Address;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
public class MainServer {
|
public class MainServer {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||||
GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage());
|
Storage storage = new MemoryStorage();
|
||||||
gunSuperNode.start();
|
GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage);
|
||||||
|
Gun gun = new Gun(storage, peer);
|
||||||
|
Result result = gun.get("person").put(new NodeBuilder()
|
||||||
|
.add("firstName", "John")
|
||||||
|
.add("lastName", "Smith")
|
||||||
|
.add("age", 25)
|
||||||
|
.add("address", new NodeBuilder()
|
||||||
|
.add("streetAddress", "21 2nd Street")
|
||||||
|
.add("city", "New York")
|
||||||
|
.add("state", "NY")
|
||||||
|
.add("postalCode", "10021"))
|
||||||
|
.add("phoneNumber", new ArrayBuilder()
|
||||||
|
.add(new NodeBuilder()
|
||||||
|
.add("type", "home")
|
||||||
|
.add("number", "212 555-1234"))
|
||||||
|
.add(new NodeBuilder()
|
||||||
|
.add("type", "fax")
|
||||||
|
.add("number", "646 555-4567")))
|
||||||
|
.build()).get();
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
mapper.registerModule(new Jdk8Module());
|
||||||
|
gun.get("person").on(node -> {
|
||||||
|
try {
|
||||||
|
System.out.println(mapper.writeValueAsString(node));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
gun.get("person").get("address").on(node -> {
|
||||||
|
try {
|
||||||
|
System.out.println(mapper.writeValueAsString(node));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
System.out.println(result.isOk());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,15 +12,9 @@ import lombok.Data;
|
|||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
|
@JsonDeserialize(using = NetworkMessageDeserializer.class)
|
||||||
@JsonSubTypes({
|
|
||||||
@JsonSubTypes.Type(GetRequest.class),
|
|
||||||
@JsonSubTypes.Type(PutRequest.class),
|
|
||||||
@JsonSubTypes.Type(Ack.class),
|
|
||||||
@JsonSubTypes.Type(GetAck.class)
|
|
||||||
})
|
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
public abstract class BaseMessage {
|
public abstract class NetworkMessage {
|
||||||
@JsonProperty("#")
|
@JsonProperty("#")
|
||||||
private String id;
|
private String id;
|
||||||
}
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package io.github.chronosx88.JGUN.models;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JacksonException;
|
||||||
|
import com.fasterxml.jackson.core.JsonParser;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||||
|
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import io.github.chronosx88.JGUN.models.acks.Ack;
|
||||||
|
import io.github.chronosx88.JGUN.models.acks.GetAck;
|
||||||
|
import io.github.chronosx88.JGUN.models.requests.GetRequest;
|
||||||
|
import io.github.chronosx88.JGUN.models.requests.PutRequest;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class NetworkMessageDeserializer extends JsonDeserializer<NetworkMessage> {
|
||||||
|
@Override
|
||||||
|
public NetworkMessage deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
|
||||||
|
JsonNode node = p.readValueAsTree();
|
||||||
|
|
||||||
|
if (node.has("#")) {
|
||||||
|
// parsing ack
|
||||||
|
if (node.has("@")) {
|
||||||
|
if (node.has("put")) {
|
||||||
|
return p.getCodec().treeToValue(node, GetAck.class);
|
||||||
|
}
|
||||||
|
return p.getCodec().treeToValue(node, Ack.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node.has("get")) {
|
||||||
|
return p.getCodec().treeToValue(node, GetRequest.class);
|
||||||
|
} else if (node.has("put")) {
|
||||||
|
return p.getCodec().treeToValue(node, PutRequest.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("invalid message received");
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
package io.github.chronosx88.JGUN.models.acks;
|
package io.github.chronosx88.JGUN.models.acks;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.github.chronosx88.JGUN.models.BaseMessage;
|
import io.github.chronosx88.JGUN.models.NetworkMessage;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
@ -11,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
|
|||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Jacksonized
|
@Jacksonized
|
||||||
public class Ack extends BaseMessage {
|
public class Ack extends NetworkMessage {
|
||||||
@JsonProperty("@")
|
@JsonProperty("@")
|
||||||
private String replyTo;
|
private String replyTo;
|
||||||
private boolean ok;
|
private boolean ok;
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package io.github.chronosx88.JGUN.models.acks;
|
package io.github.chronosx88.JGUN.models.acks;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.github.chronosx88.JGUN.models.BaseMessage;
|
import io.github.chronosx88.JGUN.models.NetworkMessage;
|
||||||
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
|
|||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Jacksonized
|
@Jacksonized
|
||||||
public class GetAck extends BaseMessage {
|
public class GetAck extends NetworkMessage {
|
||||||
@JsonProperty("put")
|
@JsonProperty("put")
|
||||||
private MemoryGraph data;
|
private MemoryGraph data;
|
||||||
@JsonProperty("@")
|
@JsonProperty("@")
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
package io.github.chronosx88.JGUN.models.requests;
|
package io.github.chronosx88.JGUN.models.requests;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.github.chronosx88.JGUN.models.BaseMessage;
|
import io.github.chronosx88.JGUN.models.NetworkMessage;
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
|
|||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@Jacksonized
|
@Jacksonized
|
||||||
public class GetRequest extends BaseMessage implements Request {
|
public class GetRequest extends NetworkMessage implements Request {
|
||||||
@JsonProperty("get")
|
@JsonProperty("get")
|
||||||
private GetRequestParams params;
|
private GetRequestParams params;
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package io.github.chronosx88.JGUN.models.requests;
|
package io.github.chronosx88.JGUN.models.requests;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import io.github.chronosx88.JGUN.models.BaseMessage;
|
import io.github.chronosx88.JGUN.models.NetworkMessage;
|
||||||
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
|
|||||||
@Jacksonized
|
@Jacksonized
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class PutRequest extends BaseMessage implements Request {
|
public class PutRequest extends NetworkMessage implements Request {
|
||||||
@JsonProperty("put")
|
@JsonProperty("put")
|
||||||
private MemoryGraph graph;
|
private MemoryGraph graph;
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
package io.github.chronosx88.JGUN.models.requests;
|
package io.github.chronosx88.JGUN.models.requests;
|
||||||
|
|
||||||
public interface Request {}
|
public interface Request {
|
||||||
|
String getId();
|
||||||
|
}
|
||||||
|
@ -16,11 +16,11 @@ public class Dup {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void track(String id) {
|
public void track(String id) {
|
||||||
cache.put(id, System.currentTimeMillis());
|
cache.put(id, System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDuplicated(String id) {
|
public boolean checkDuplicated(String id) {
|
||||||
Long timestamp = null;
|
Long timestamp = null;
|
||||||
try {
|
try {
|
||||||
timestamp = cache.getIfPresent(id);
|
timestamp = cache.getIfPresent(id);
|
||||||
|
@ -69,4 +69,14 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
|
|||||||
public int getTimeout() {
|
public int getTimeout() {
|
||||||
return 60;
|
return 60;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int connectedPeerCount() {
|
||||||
|
return this.getConnections().size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NetworkHandler getNetworkHandler() {
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.api.FutureGet;
|
|||||||
import io.github.chronosx88.JGUN.api.FuturePut;
|
import io.github.chronosx88.JGUN.api.FuturePut;
|
||||||
import io.github.chronosx88.JGUN.models.GetResult;
|
import io.github.chronosx88.JGUN.models.GetResult;
|
||||||
import io.github.chronosx88.JGUN.models.Result;
|
import io.github.chronosx88.JGUN.models.Result;
|
||||||
import io.github.chronosx88.JGUN.models.BaseMessage;
|
import io.github.chronosx88.JGUN.models.NetworkMessage;
|
||||||
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
||||||
import io.github.chronosx88.JGUN.models.graph.Node;
|
import io.github.chronosx88.JGUN.models.graph.Node;
|
||||||
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
|
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
|
||||||
@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
|
|||||||
import io.github.chronosx88.JGUN.models.requests.GetRequest;
|
import io.github.chronosx88.JGUN.models.requests.GetRequest;
|
||||||
import io.github.chronosx88.JGUN.models.requests.PutRequest;
|
import io.github.chronosx88.JGUN.models.requests.PutRequest;
|
||||||
import io.github.chronosx88.JGUN.storage.Storage;
|
import io.github.chronosx88.JGUN.storage.Storage;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@ -30,6 +31,8 @@ public class NetworkHandler {
|
|||||||
|
|
||||||
private final Peer peer;
|
private final Peer peer;
|
||||||
private final Storage storage;
|
private final Storage storage;
|
||||||
|
|
||||||
|
@Getter
|
||||||
private final Dup dup;
|
private final Dup dup;
|
||||||
private final Executor executorService = Executors.newCachedThreadPool();
|
private final Executor executorService = Executors.newCachedThreadPool();
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
@ -52,21 +55,21 @@ public class NetworkHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void handleIncomingMessage(String message) {
|
public void handleIncomingMessage(String message) {
|
||||||
BaseMessage parsedMessage;
|
NetworkMessage parsedMessage;
|
||||||
try {
|
try {
|
||||||
parsedMessage = objectMapper.readValue(message, BaseMessage.class);
|
parsedMessage = objectMapper.readValue(message, NetworkMessage.class);
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dup.isDuplicated(parsedMessage.getId())) {
|
if (dup.checkDuplicated(parsedMessage.getId())) {
|
||||||
// TODO log
|
// TODO log
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final BaseMessage msg = parsedMessage;
|
final NetworkMessage msg = parsedMessage;
|
||||||
executorService.execute(() -> {
|
executorService.execute(() -> {
|
||||||
BaseMessage response = null;
|
NetworkMessage response = null;
|
||||||
if (msg instanceof GetRequest) {
|
if (msg instanceof GetRequest) {
|
||||||
response = handleGet((GetRequest) msg);
|
response = handleGet((GetRequest) msg);
|
||||||
} else if (msg instanceof PutRequest) {
|
} else if (msg instanceof PutRequest) {
|
||||||
@ -78,6 +81,7 @@ public class NetworkHandler {
|
|||||||
handleGetAck(ack.getData(), ack);
|
handleGetAck(ack.getData(), ack);
|
||||||
}
|
}
|
||||||
if (Objects.nonNull(response)) {
|
if (Objects.nonNull(response)) {
|
||||||
|
this.dup.track(response.getId());
|
||||||
String respString;
|
String respString;
|
||||||
try {
|
try {
|
||||||
respString = objectMapper.writeValueAsString(response);
|
respString = objectMapper.writeValueAsString(response);
|
||||||
|
@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
|
||||||
import io.github.chronosx88.JGUN.api.FutureGet;
|
import io.github.chronosx88.JGUN.api.FutureGet;
|
||||||
import io.github.chronosx88.JGUN.api.FuturePut;
|
import io.github.chronosx88.JGUN.api.FuturePut;
|
||||||
|
import io.github.chronosx88.JGUN.models.GetResult;
|
||||||
|
import io.github.chronosx88.JGUN.models.Result;
|
||||||
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
|
||||||
import io.github.chronosx88.JGUN.models.requests.GetRequest;
|
import io.github.chronosx88.JGUN.models.requests.GetRequest;
|
||||||
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
|
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
|
||||||
@ -19,6 +21,7 @@ public class NetworkManager {
|
|||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
private final Peer peer;
|
private final Peer peer;
|
||||||
|
private final NetworkHandler networkHandler;
|
||||||
|
|
||||||
private final Executor executorService = Executors.newCachedThreadPool();
|
private final Executor executorService = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
@ -28,15 +31,16 @@ public class NetworkManager {
|
|||||||
@Getter
|
@Getter
|
||||||
private final int timeout;
|
private final int timeout;
|
||||||
|
|
||||||
public NetworkManager(Peer peer) {
|
public NetworkManager(Peer peer, NetworkHandler networkHandler) {
|
||||||
|
this.networkHandler = networkHandler;
|
||||||
objectMapper = new ObjectMapper();
|
objectMapper = new ObjectMapper();
|
||||||
objectMapper.registerModule(new Jdk8Module());
|
objectMapper.registerModule(new Jdk8Module());
|
||||||
this.peer = peer;
|
this.peer = peer;
|
||||||
this.timeout = peer.getTimeout();
|
this.timeout = peer.getTimeout();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() throws InterruptedException {
|
||||||
executorService.execute(this.peer::start);
|
this.peer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends Request> void sendRequest(T request) {
|
private <T extends Request> void sendRequest(T request) {
|
||||||
@ -46,28 +50,36 @@ public class NetworkManager {
|
|||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
this.networkHandler.getDup().track(request.getId());
|
||||||
peer.emit(encodedRequest);
|
peer.emit(encodedRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FuturePut sendPutRequest(MemoryGraph putData) {
|
public FuturePut sendPutRequest(MemoryGraph putData) {
|
||||||
String id = Dup.random();
|
String id = Dup.random();
|
||||||
executorService.execute(() -> this.sendRequest(PutRequest.builder()
|
|
||||||
.id(id)
|
|
||||||
.graph(putData)
|
|
||||||
.build()));
|
|
||||||
var requestFuture = new FuturePut(id);
|
var requestFuture = new FuturePut(id);
|
||||||
peer.addPendingPutRequest(requestFuture);
|
if (this.peer.connectedPeerCount() > 0) {
|
||||||
|
executorService.execute(() -> this.sendRequest(PutRequest.builder()
|
||||||
|
.id(id)
|
||||||
|
.graph(putData)
|
||||||
|
.build()));
|
||||||
|
peer.addPendingPutRequest(requestFuture);
|
||||||
|
}
|
||||||
|
requestFuture.complete(Result.builder().ok(true).build());
|
||||||
return requestFuture;
|
return requestFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FutureGet sendGetRequest(GetRequestParams params) {
|
public FutureGet sendGetRequest(GetRequestParams params) {
|
||||||
String id = Dup.random();
|
String id = Dup.random();
|
||||||
executorService.execute(() -> this.sendRequest(GetRequest.builder()
|
|
||||||
.id(id)
|
|
||||||
.params(params)
|
|
||||||
.build()));
|
|
||||||
var requestFuture = new FutureGet(id, params);
|
var requestFuture = new FutureGet(id, params);
|
||||||
peer.addPendingGetRequest(requestFuture);
|
if (this.peer.connectedPeerCount() > 0) {
|
||||||
|
executorService.execute(() -> this.sendRequest(GetRequest.builder()
|
||||||
|
.id(id)
|
||||||
|
.params(params)
|
||||||
|
.build()));
|
||||||
|
peer.addPendingGetRequest(requestFuture);
|
||||||
|
} else {
|
||||||
|
requestFuture.complete(GetResult.builder().data(null).build());
|
||||||
|
}
|
||||||
return requestFuture;
|
return requestFuture;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onOpen(ServerHandshake handshakeData) {
|
public void onOpen(ServerHandshake handshakeData) {
|
||||||
System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus());
|
System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -56,12 +56,23 @@ public class NetworkNode extends WebSocketClient implements Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() throws InterruptedException {
|
||||||
this.connect();
|
this.connectBlocking();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getTimeout() {
|
public int getTimeout() {
|
||||||
return 60;
|
return 60;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int connectedPeerCount() {
|
||||||
|
if (this.isOpen()) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NetworkHandler getNetworkHandler() {
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,8 @@ public interface Peer {
|
|||||||
void emit(String data);
|
void emit(String data);
|
||||||
void addPendingPutRequest(FuturePut futurePut);
|
void addPendingPutRequest(FuturePut futurePut);
|
||||||
void addPendingGetRequest(FutureGet futureGet);
|
void addPendingGetRequest(FutureGet futureGet);
|
||||||
void start();
|
void start() throws InterruptedException;
|
||||||
int getTimeout();
|
int getTimeout();
|
||||||
|
int connectedPeerCount();
|
||||||
|
NetworkHandler getNetworkHandler();
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
|
|||||||
import com.github.benmanes.caffeine.cache.Expiry;
|
import com.github.benmanes.caffeine.cache.Expiry;
|
||||||
import io.github.chronosx88.JGUN.models.graph.DeferredNode;
|
import io.github.chronosx88.JGUN.models.graph.DeferredNode;
|
||||||
import io.github.chronosx88.JGUN.models.graph.Node;
|
import io.github.chronosx88.JGUN.models.graph.Node;
|
||||||
|
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
|
||||||
import io.github.chronosx88.JGUN.models.graph.NodeValue;
|
import io.github.chronosx88.JGUN.models.graph.NodeValue;
|
||||||
import org.checkerframework.checker.index.qual.NonNegative;
|
import org.checkerframework.checker.index.qual.NonNegative;
|
||||||
|
|
||||||
@ -46,21 +47,23 @@ public class MemoryStorage extends Storage {
|
|||||||
if (node != null && field != null) {
|
if (node != null && field != null) {
|
||||||
NodeValue requestedField = node.getValues().get(field);
|
NodeValue requestedField = node.getValues().get(field);
|
||||||
if (requestedField != null) {
|
if (requestedField != null) {
|
||||||
Long requestedFieldState = node.getMetadata().getStates().get(field);
|
node = Node.builder()
|
||||||
node.getValues().clear();
|
.metadata(NodeMetadata.builder()
|
||||||
node.getMetadata().getStates().clear();
|
.nodeID(node.getMetadata().getNodeID())
|
||||||
node.getValues().put(field, requestedField);
|
.states(Map.of(field, node.getMetadata().getStates().get(field)))
|
||||||
node.getMetadata().getStates().put(field, requestedFieldState);
|
.build())
|
||||||
|
.values(Map.of(field, requestedField))
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateNode(Node node) {
|
protected void updateNode(Node newNode) {
|
||||||
Node currentNode = nodes.get(node.getMetadata().getNodeID());
|
Node currentNode = nodes.get(newNode.getMetadata().getNodeID());
|
||||||
currentNode.values.putAll(node.values);
|
currentNode.values.putAll(newNode.values);
|
||||||
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates());
|
currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addNode(String id, Node incomingNode) {
|
public void addNode(String id, Node incomingNode) {
|
||||||
|
@ -95,7 +95,7 @@ public abstract class Storage {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Objects.isNull(changedNode)) {
|
if (changedNode == null) {
|
||||||
changedNode = Node.builder()
|
changedNode = Node.builder()
|
||||||
.metadata(NodeMetadata.builder()
|
.metadata(NodeMetadata.builder()
|
||||||
.nodeID(incomingNode.getMetadata().getNodeID())
|
.nodeID(incomingNode.getMetadata().getNodeID())
|
||||||
|
@ -40,7 +40,7 @@ public class StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
|
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
|
||||||
List<String> nodeIds = new ArrayList<>(List.of(path[0]));
|
List<String> nodeIds = new ArrayList<>();
|
||||||
String nodeId = path[0];
|
String nodeId = path[0];
|
||||||
for (int i = 0; i < path.length; i++) {
|
for (int i = 0; i < path.length; i++) {
|
||||||
String field = null;
|
String field = null;
|
||||||
@ -49,35 +49,30 @@ public class StorageManager {
|
|||||||
}
|
}
|
||||||
Node node = storage.getNode(nodeId, field);
|
Node node = storage.getNode(nodeId, field);
|
||||||
if (node != null) {
|
if (node != null) {
|
||||||
if (field != null) {
|
if (field == null) {
|
||||||
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
|
nodeIds.add(nodeId);
|
||||||
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
|
|
||||||
nodeIds.add(nodeId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
|
||||||
|
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
|
||||||
|
nodeIds.add(nodeId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// proceeds to request from the network
|
||||||
var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
|
var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
|
||||||
.nodeId(nodeId)
|
.nodeId(nodeId)
|
||||||
.field(field)
|
.field(field)
|
||||||
.build());
|
.build());
|
||||||
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
|
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
|
||||||
if (result.getData() != null) {
|
if (result.getData() == null || field == null) {
|
||||||
nodeIds.add(result.getData().getMetadata().getNodeID());
|
nodeIds.add(nodeId);
|
||||||
if (field != null) {
|
|
||||||
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
|
|
||||||
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
|
|
||||||
nodeIds.add(nodeId);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
|
||||||
|
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
|
||||||
|
nodeIds.add(nodeId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nodeIds;
|
return nodeIds;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user