Implement custom deserializer for NetworkMessage

This commit is contained in:
ChronosX88 2023-11-17 19:00:00 +03:00
parent 9963d39219
commit a551581a99
12 changed files with 114 additions and 35 deletions

View File

@ -1,11 +1,39 @@
package io.github.chronosx88.JGUN.examples; package io.github.chronosx88.JGUN.examples;
import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode; import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage; import io.github.chronosx88.JGUN.storage.MemoryStorage;
import io.github.chronosx88.JGUN.storage.Storage;
import java.net.Inet4Address;
import java.util.concurrent.ExecutionException;
public class MainServer { public class MainServer {
public static void main(String[] args) { public static void main(String[] args) throws ExecutionException, InterruptedException {
GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage()); GatewayNetworkNode peer = new GatewayNetworkNode(5054, new MemoryStorage());
gunSuperNode.start(); Storage storage = new MemoryStorage();
Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build()).get();
System.out.println(result.isOk());
} }
} }

View File

@ -12,15 +12,9 @@ import lombok.Data;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) @JsonDeserialize(using = NetworkMessageDeserializer.class)
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(Ack.class),
@JsonSubTypes.Type(GetAck.class)
})
@SuperBuilder @SuperBuilder
public abstract class BaseMessage { public abstract class NetworkMessage {
@JsonProperty("#") @JsonProperty("#")
private String id; private String id;
} }

View File

@ -0,0 +1,37 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import io.github.chronosx88.JGUN.models.acks.Ack;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import java.io.IOException;
public class NetworkMessageDeserializer extends JsonDeserializer<NetworkMessage> {
@Override
public NetworkMessage deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
JsonNode node = p.readValueAsTree();
if (node.has("#")) {
// parsing ack
if (node.has("@")) {
if (node.has("put")) {
return p.getCodec().treeToValue(node, GetAck.class);
}
return p.getCodec().treeToValue(node, Ack.class);
}
if (node.has("get")) {
return p.getCodec().treeToValue(node, GetRequest.class);
} else if (node.has("put")) {
return p.getCodec().treeToValue(node, PutRequest.class);
}
}
throw new IllegalArgumentException("invalid message received");
}
}

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks; package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@ -11,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class Ack extends BaseMessage { public class Ack extends NetworkMessage {
@JsonProperty("@") @JsonProperty("@")
private String replyTo; private String replyTo;
private boolean ok; private boolean ok;

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks; package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class GetAck extends BaseMessage { public class GetAck extends NetworkMessage {
@JsonProperty("put") @JsonProperty("put")
private MemoryGraph data; private MemoryGraph data;
@JsonProperty("@") @JsonProperty("@")

View File

@ -1,8 +1,7 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized @Jacksonized
public class GetRequest extends BaseMessage implements Request { public class GetRequest extends NetworkMessage implements Request {
@JsonProperty("get") @JsonProperty("get")
private GetRequestParams params; private GetRequestParams params;
} }

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@Jacksonized @Jacksonized
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage implements Request { public class PutRequest extends NetworkMessage implements Request {
@JsonProperty("put") @JsonProperty("put")
private MemoryGraph graph; private MemoryGraph graph;
} }

View File

@ -69,4 +69,9 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
public int getTimeout() { public int getTimeout() {
return 60; return 60;
} }
@Override
public int connectedPeerCount() {
return this.getConnections().size();
}
} }

View File

@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult; import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result; import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node; import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata; import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
@ -52,9 +52,9 @@ public class NetworkHandler {
} }
public void handleIncomingMessage(String message) { public void handleIncomingMessage(String message) {
BaseMessage parsedMessage; NetworkMessage parsedMessage;
try { try {
parsedMessage = objectMapper.readValue(message, BaseMessage.class); parsedMessage = objectMapper.readValue(message, NetworkMessage.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -64,9 +64,9 @@ public class NetworkHandler {
return; return;
} }
final BaseMessage msg = parsedMessage; final NetworkMessage msg = parsedMessage;
executorService.execute(() -> { executorService.execute(() -> {
BaseMessage response = null; NetworkMessage response = null;
if (msg instanceof GetRequest) { if (msg instanceof GetRequest) {
response = handleGet((GetRequest) msg); response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) { } else if (msg instanceof PutRequest) {

View File

@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.FutureGet; import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut; import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph; import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams; import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
@ -51,23 +53,30 @@ public class NetworkManager {
public FuturePut sendPutRequest(MemoryGraph putData) { public FuturePut sendPutRequest(MemoryGraph putData) {
String id = Dup.random(); String id = Dup.random();
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
var requestFuture = new FuturePut(id); var requestFuture = new FuturePut(id);
peer.addPendingPutRequest(requestFuture); if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
peer.addPendingPutRequest(requestFuture);
}
requestFuture.complete(Result.builder().ok(true).build());
return requestFuture; return requestFuture;
} }
public FutureGet sendGetRequest(GetRequestParams params) { public FutureGet sendGetRequest(GetRequestParams params) {
String id = Dup.random(); String id = Dup.random();
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
var requestFuture = new FutureGet(id, params); var requestFuture = new FutureGet(id, params);
peer.addPendingGetRequest(requestFuture); if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
peer.addPendingGetRequest(requestFuture);
} else {
requestFuture.complete(GetResult.builder().data(null).build());
}
return requestFuture; return requestFuture;
} }
} }

View File

@ -64,4 +64,10 @@ public class NetworkNode extends WebSocketClient implements Peer {
public int getTimeout() { public int getTimeout() {
return 60; return 60;
} }
@Override
public int connectedPeerCount() {
if (this.isOpen()) return 1;
return 0;
}
} }

View File

@ -9,4 +9,5 @@ public interface Peer {
void addPendingGetRequest(FutureGet futureGet); void addPendingGetRequest(FutureGet futureGet);
void start(); void start();
int getTimeout(); int getTimeout();
int connectedPeerCount();
} }