From a551581a99c6336836d1e5c815e3cb8a843ed419 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 17 Nov 2023 19:00:00 +0300 Subject: [PATCH] Implement custom deserializer for NetworkMessage --- .../chronosx88/JGUN/examples/MainServer.java | 34 +++++++++++++++-- .../{BaseMessage.java => NetworkMessage.java} | 10 +---- .../models/NetworkMessageDeserializer.java | 37 +++++++++++++++++++ .../chronosx88/JGUN/models/acks/Ack.java | 4 +- .../chronosx88/JGUN/models/acks/GetAck.java | 4 +- .../JGUN/models/requests/GetRequest.java | 5 +-- .../JGUN/models/requests/PutRequest.java | 4 +- .../JGUN/network/GatewayNetworkNode.java | 5 +++ .../JGUN/network/NetworkHandler.java | 10 ++--- .../JGUN/network/NetworkManager.java | 29 ++++++++++----- .../chronosx88/JGUN/network/NetworkNode.java | 6 +++ .../github/chronosx88/JGUN/network/Peer.java | 1 + 12 files changed, 114 insertions(+), 35 deletions(-) rename src/main/java/io/github/chronosx88/JGUN/models/{BaseMessage.java => NetworkMessage.java} (69%) create mode 100644 src/main/java/io/github/chronosx88/JGUN/models/NetworkMessageDeserializer.java diff --git a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java index 0fe7948..63c31d6 100644 --- a/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java +++ b/src/main/java/io/github/chronosx88/JGUN/examples/MainServer.java @@ -1,11 +1,39 @@ package io.github.chronosx88.JGUN.examples; +import io.github.chronosx88.JGUN.api.Gun; +import io.github.chronosx88.JGUN.api.graph.ArrayBuilder; +import io.github.chronosx88.JGUN.api.graph.NodeBuilder; +import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.network.GatewayNetworkNode; +import io.github.chronosx88.JGUN.network.NetworkNode; import io.github.chronosx88.JGUN.storage.MemoryStorage; +import io.github.chronosx88.JGUN.storage.Storage; + +import java.net.Inet4Address; +import java.util.concurrent.ExecutionException; public class MainServer { - public static void main(String[] args) { - GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage()); - gunSuperNode.start(); + public static void main(String[] args) throws ExecutionException, InterruptedException { + GatewayNetworkNode peer = new GatewayNetworkNode(5054, new MemoryStorage()); + Storage storage = new MemoryStorage(); + Gun gun = new Gun(storage, peer); + Result result = gun.get("person").put(new NodeBuilder() + .add("firstName", "John") + .add("lastName", "Smith") + .add("age", 25) + .add("address", new NodeBuilder() + .add("streetAddress", "21 2nd Street") + .add("city", "New York") + .add("state", "NY") + .add("postalCode", "10021")) + .add("phoneNumber", new ArrayBuilder() + .add(new NodeBuilder() + .add("type", "home") + .add("number", "212 555-1234")) + .add(new NodeBuilder() + .add("type", "fax") + .add("number", "646 555-4567"))) + .build()).get(); + System.out.println(result.isOk()); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java b/src/main/java/io/github/chronosx88/JGUN/models/NetworkMessage.java similarity index 69% rename from src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java rename to src/main/java/io/github/chronosx88/JGUN/models/NetworkMessage.java index 424d6eb..49b302b 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/NetworkMessage.java @@ -12,15 +12,9 @@ import lombok.Data; import lombok.experimental.SuperBuilder; @Data -@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) -@JsonSubTypes({ - @JsonSubTypes.Type(GetRequest.class), - @JsonSubTypes.Type(PutRequest.class), - @JsonSubTypes.Type(Ack.class), - @JsonSubTypes.Type(GetAck.class) -}) +@JsonDeserialize(using = NetworkMessageDeserializer.class) @SuperBuilder -public abstract class BaseMessage { +public abstract class NetworkMessage { @JsonProperty("#") private String id; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/NetworkMessageDeserializer.java b/src/main/java/io/github/chronosx88/JGUN/models/NetworkMessageDeserializer.java new file mode 100644 index 0000000..0a1acc3 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/NetworkMessageDeserializer.java @@ -0,0 +1,37 @@ +package io.github.chronosx88.JGUN.models; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import io.github.chronosx88.JGUN.models.acks.Ack; +import io.github.chronosx88.JGUN.models.acks.GetAck; +import io.github.chronosx88.JGUN.models.requests.GetRequest; +import io.github.chronosx88.JGUN.models.requests.PutRequest; + +import java.io.IOException; + +public class NetworkMessageDeserializer extends JsonDeserializer { + @Override + public NetworkMessage deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException { + JsonNode node = p.readValueAsTree(); + + if (node.has("#")) { + // parsing ack + if (node.has("@")) { + if (node.has("put")) { + return p.getCodec().treeToValue(node, GetAck.class); + } + return p.getCodec().treeToValue(node, Ack.class); + } + + if (node.has("get")) { + return p.getCodec().treeToValue(node, GetRequest.class); + } else if (node.has("put")) { + return p.getCodec().treeToValue(node, PutRequest.class); + } + } + throw new IllegalArgumentException("invalid message received"); + } +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java index 2ccc3d8..06592f5 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/Ack.java @@ -1,7 +1,7 @@ package io.github.chronosx88.JGUN.models.acks; import com.fasterxml.jackson.annotation.JsonProperty; -import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.NetworkMessage; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.SuperBuilder; @@ -11,7 +11,7 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class Ack extends BaseMessage { +public class Ack extends NetworkMessage { @JsonProperty("@") private String replyTo; private boolean ok; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java index 382b59a..0adbe3a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java @@ -1,7 +1,7 @@ package io.github.chronosx88.JGUN.models.acks; import com.fasterxml.jackson.annotation.JsonProperty; -import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.NetworkMessage; import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import lombok.Data; import lombok.EqualsAndHashCode; @@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class GetAck extends BaseMessage { +public class GetAck extends NetworkMessage { @JsonProperty("put") private MemoryGraph data; @JsonProperty("@") diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java index f2fb2ca..6aa25cf 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java @@ -1,8 +1,7 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; -import io.github.chronosx88.JGUN.models.BaseMessage; -import lombok.Builder; +import io.github.chronosx88.JGUN.models.NetworkMessage; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.SuperBuilder; @@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized; @SuperBuilder @EqualsAndHashCode(callSuper = true) @Jacksonized -public class GetRequest extends BaseMessage implements Request { +public class GetRequest extends NetworkMessage implements Request { @JsonProperty("get") private GetRequestParams params; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java index 4582100..fdbb08a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java @@ -1,7 +1,7 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; -import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.NetworkMessage; import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import lombok.Data; import lombok.EqualsAndHashCode; @@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized; @Jacksonized @SuperBuilder @EqualsAndHashCode(callSuper = true) -public class PutRequest extends BaseMessage implements Request { +public class PutRequest extends NetworkMessage implements Request { @JsonProperty("put") private MemoryGraph graph; } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java index 2cec742..fe99aaf 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/GatewayNetworkNode.java @@ -69,4 +69,9 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer { public int getTimeout() { return 60; } + + @Override + public int connectedPeerCount() { + return this.getConnections().size(); + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java index 4b66d51..9891b82 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkHandler.java @@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.api.FutureGet; import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.models.GetResult; import io.github.chronosx88.JGUN.models.Result; -import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.NetworkMessage; import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.NodeMetadata; @@ -52,9 +52,9 @@ public class NetworkHandler { } public void handleIncomingMessage(String message) { - BaseMessage parsedMessage; + NetworkMessage parsedMessage; try { - parsedMessage = objectMapper.readValue(message, BaseMessage.class); + parsedMessage = objectMapper.readValue(message, NetworkMessage.class); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -64,9 +64,9 @@ public class NetworkHandler { return; } - final BaseMessage msg = parsedMessage; + final NetworkMessage msg = parsedMessage; executorService.execute(() -> { - BaseMessage response = null; + NetworkMessage response = null; if (msg instanceof GetRequest) { response = handleGet((GetRequest) msg); } else if (msg instanceof PutRequest) { diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java index 86ab0b0..224efe9 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkManager.java @@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import io.github.chronosx88.JGUN.api.FutureGet; import io.github.chronosx88.JGUN.api.FuturePut; +import io.github.chronosx88.JGUN.models.GetResult; +import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequestParams; @@ -51,23 +53,30 @@ public class NetworkManager { public FuturePut sendPutRequest(MemoryGraph putData) { String id = Dup.random(); - executorService.execute(() -> this.sendRequest(PutRequest.builder() - .id(id) - .graph(putData) - .build())); var requestFuture = new FuturePut(id); - peer.addPendingPutRequest(requestFuture); + if (this.peer.connectedPeerCount() > 0) { + executorService.execute(() -> this.sendRequest(PutRequest.builder() + .id(id) + .graph(putData) + .build())); + peer.addPendingPutRequest(requestFuture); + } + requestFuture.complete(Result.builder().ok(true).build()); return requestFuture; } public FutureGet sendGetRequest(GetRequestParams params) { String id = Dup.random(); - executorService.execute(() -> this.sendRequest(GetRequest.builder() - .id(id) - .params(params) - .build())); var requestFuture = new FutureGet(id, params); - peer.addPendingGetRequest(requestFuture); + if (this.peer.connectedPeerCount() > 0) { + executorService.execute(() -> this.sendRequest(GetRequest.builder() + .id(id) + .params(params) + .build())); + peer.addPendingGetRequest(requestFuture); + } else { + requestFuture.complete(GetResult.builder().data(null).build()); + } return requestFuture; } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java index 0a27379..c808b69 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/NetworkNode.java @@ -64,4 +64,10 @@ public class NetworkNode extends WebSocketClient implements Peer { public int getTimeout() { return 60; } + + @Override + public int connectedPeerCount() { + if (this.isOpen()) return 1; + return 0; + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/network/Peer.java b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java index bbf87d0..6100f9f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/network/Peer.java +++ b/src/main/java/io/github/chronosx88/JGUN/network/Peer.java @@ -9,4 +9,5 @@ public interface Peer { void addPendingGetRequest(FutureGet futureGet); void start(); int getTimeout(); + int connectedPeerCount(); }