Reimplement network handler

This commit is contained in:
ChronosX88 2023-11-15 00:15:18 +03:00
parent 1eb00914a8
commit 87da965616
21 changed files with 237 additions and 105 deletions

View File

@ -19,7 +19,6 @@ dependencies {
implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3'
implementation 'javax.cache:cache-api:1.1.1'
implementation 'com.github.ben-manes.caffeine:jcache:3.1.5'
compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

View File

@ -1,12 +1,9 @@
package io.github.chronosx88.JGUN;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.spi.CachingProvider;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -14,11 +11,9 @@ public class Dup {
private final Cache<String, Long> cache;
public Dup(long age) {
CachingProvider cachingProvider = Caching.getCachingProvider();
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, Long> config = new MutableConfiguration<>();
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age)));
this.cache = cacheManager.createCache("dup", config);
this.cache = Caffeine.newBuilder()
.expireAfterWrite(age, TimeUnit.SECONDS)
.build();
}
private void track(String id) {
@ -26,7 +21,11 @@ public class Dup {
}
public boolean isDuplicated(String id) {
if(cache.containsKey(id)) {
Long timestamp = null;
try {
timestamp = cache.getIfPresent(id);
} catch (NullPointerException ignored) {}
if(Objects.nonNull(timestamp)) {
return true;
} else {
track(id);

View File

@ -1,5 +1,6 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.storage.Storage;
@ -10,10 +11,10 @@ import java.util.concurrent.ConcurrentHashMap;
public class Gun {
private GunClient gunClient;
private final Map<String, NodeChangeListener> changeListeners = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener.Map> mapChangeListeners = new ConcurrentHashMap<>();
private final Storage storage;
public Gun(InetAddress address, int port, Storage storage) {
this.storage = storage;
try {
this.gunClient = new GunClient(address, port, storage);
this.gunClient.connectBlocking();
@ -29,10 +30,18 @@ public class Gun {
}
protected void addChangeListener(String nodeID, NodeChangeListener listener) {
changeListeners.put(nodeID, listener);
storage.addChangeListener(nodeID, listener);
}
protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
mapChangeListeners.put(nodeID, listener);
storage.addMapChangeListener(nodeID, listener);
}
protected void sendPutRequest(MemoryGraph data) {
// TODO
}
protected void sendGetRequest(String key, String field) {
// TODO
}
}

View File

@ -1,79 +1,146 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.futures.GetResult;
import io.github.chronosx88.JGUN.futures.Result;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import io.github.chronosx88.JGUN.models.acks.BaseAck;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.acks.PutAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storage.Storage;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class NetworkHandler {
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>();
private final Map<String, FutureGet> pendingGetRequests = new ConcurrentHashMap<>();
private final Map<String, FuturePut> pendingPutRequests = new ConcurrentHashMap<>();
private final Peer peer;
private final Storage graphStorage;
private final Storage storage;
private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper = new ObjectMapper();
public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) {
this.graphStorage = graphStorage;
public NetworkHandler(Storage storage, Peer peer, Dup dup) {
this.storage = storage;
this.peer = peer;
this.dup = dup;
}
public void addPendingFuture(BaseCompletableFuture<?> future) {
pendingFutures.put(future.getFutureID(), future);
public void addPendingGetRequest(FutureGet future) {
this.pendingGetRequests.put(future.getFutureID(), future);
}
public void handleIncomingMessage(BaseMessage message) {
if (message instanceof GetRequest) {
handleGet((GetRequest) message);
} else if (message instanceof PutRequest) {
handlePut((PutRequest) message);
} else if (message instanceof BaseAck) {
handleAck((BaseAck) message);
public void addPendingPutRequest(FuturePut future) {
this.pendingPutRequests.put(future.getFutureID(), future);
}
peer.emit(message.toString());
public void handleIncomingMessage(String message) {
BaseMessage parsedMessage;
try {
parsedMessage = objectMapper.readValue(message, BaseMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
if (dup.isDuplicated(parsedMessage.getId())) {
// TODO log
return;
}
final BaseMessage msg = parsedMessage;
executorService.execute(() -> {
BaseMessage response = null;
if (msg instanceof GetRequest) {
response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) {
response = handlePut((PutRequest) msg);
} else if (msg instanceof BaseAck) {
response = handleAck((BaseAck) msg);
}
if (Objects.nonNull(response)) {
String respString;
try {
respString = objectMapper.writeValueAsString(response);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
peer.emit(respString);
}
});
peer.emit(message);
}
private GetAck handleGet(GetRequest request) {
// TODO
throw new UnsupportedOperationException("TODO");
Node node = storage.getNode(request.getParams().getNodeID());
if (Objects.isNull(node)) return null;
String fieldName = request.getParams().getField();
if (Objects.nonNull(fieldName)) {
Object fieldValue = node.getValues().get(fieldName);
if (Objects.nonNull(fieldValue)) {
node = Node.builder()
.values(Map.of(fieldName, fieldValue))
.metadata(NodeMetadata.builder()
.nodeID(node.getMetadata().getNodeID())
.states(Map.of(fieldName, node.getMetadata().getStates().get(fieldName)))
.build())
.build();
}
}
return GetAck.builder()
.id(Dup.random())
.replyTo(request.getId())
.data(MemoryGraph.builder()
.nodes(Map.of(node.getMetadata().getNodeID(), node))
.build())
.ok(true)
.build();
}
private PutAck handlePut(PutRequest request) {
// TODO
throw new UnsupportedOperationException("TODO");
private BaseAck handlePut(PutRequest request) {
storage.mergeUpdate(request.getGraph());
return BaseAck.builder()
.id(Dup.random())
.replyTo(request.getId())
.ok(true)
.build();
}
private void handleAck(BaseAck ack) {
private BaseAck handleAck(BaseAck ack) {
if (ack instanceof GetAck) {
// TODO
} else if (ack instanceof PutAck) {
// TODO
FutureGet future = pendingGetRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
GetAck getAck = (GetAck) ack;
future.complete(GetResult.builder()
.ok(getAck.isOk())
.data(getAck.getData())
.build());
}
throw new UnsupportedOperationException("TODO");
return handlePut(PutRequest
.builder()
.graph(((GetAck) ack).getData())
.build());
} else {
FuturePut future = pendingPutRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
future.complete(Result.builder()
.ok(ack.isOk())
.build());
}
public void sendPutRequest(String messageID, MemoryGraph data) {
executorService.execute(() -> {
// TODO
});
System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }");
return null;
}
public void sendGetRequest(String messageID, String key, String field) {
executorService.execute(() -> {
// TODO
});
}
}

View File

@ -2,6 +2,7 @@ package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.nodes.GunClient;
import java.util.ArrayList;
import java.util.HashMap;
@ -9,10 +10,10 @@ import java.util.HashMap;
public class PathReference {
private final ArrayList<String> path = new ArrayList<>();
private Gun database;
private Gun gun;
public PathReference(Gun db) {
this.database = db;
public PathReference(Gun gun) {
this.gun = gun;
}
public PathReference get(String key) {
@ -31,10 +32,10 @@ public class PathReference {
}
public void on(NodeChangeListener changeListener) {
database.addChangeListener(String.join("/", path), changeListener);
gun.addChangeListener(String.join("/", path), changeListener);
}
public void map(NodeChangeListener.Map forEachListener) {
database.addMapChangeListener(String.join("/", path), forEachListener);
gun.addMapChangeListener(String.join("/", path), forEachListener);
}
}

View File

@ -2,7 +2,7 @@ package io.github.chronosx88.JGUN.futures;
import java.util.concurrent.ExecutionException;
import java9.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;

View File

@ -1,8 +1,6 @@
package io.github.chronosx88.JGUN.futures;
import io.github.chronosx88.JGUN.models.MemoryGraph;
public class FutureGet extends BaseCompletableFuture<MemoryGraph> {
public class FutureGet extends BaseCompletableFuture<GetResult> {
public FutureGet(String id) {
super(id);
}

View File

@ -3,7 +3,7 @@ package io.github.chronosx88.JGUN.futures;
/**
* Return success of PUT operation
*/
public class FuturePut extends BaseCompletableFuture<Boolean> {
public class FuturePut extends BaseCompletableFuture<Result> {
public FuturePut(String id) {
super(id);
}

View File

@ -0,0 +1,11 @@
package io.github.chronosx88.JGUN.futures;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class GetResult extends Result {
private final MemoryGraph data;
}

View File

@ -0,0 +1,12 @@
package io.github.chronosx88.JGUN.futures;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class Result {
private boolean ok;
}

View File

@ -1,9 +1,23 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(GetAck.class)
})
@SuperBuilder
public abstract class BaseMessage {
@JsonProperty("#")
private String id;

View File

@ -11,8 +11,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
@Data
@Builder
@Jacksonized
public class MemoryGraph {
@JsonIgnore
@Builder.Default
public final Map<String, Node> nodes = new LinkedHashMap<>();
@JsonAnyGetter

View File

@ -5,12 +5,15 @@ import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public abstract class BaseAck extends BaseMessage {
@Jacksonized
public class BaseAck extends BaseMessage {
@JsonProperty("@")
private String replyTo;
private String ok;
private boolean ok;
}

View File

@ -5,11 +5,12 @@ import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized
public class GetAck extends BaseAck {
@JsonProperty("put")

View File

@ -1,12 +0,0 @@
package io.github.chronosx88.JGUN.models.acks;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized
public class PutAck extends BaseAck {}

View File

@ -2,11 +2,15 @@ package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@Builder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetRequest extends BaseMessage {
@JsonProperty("get")
private GetRequestParams params;

View File

@ -1,7 +1,13 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
@Data
@Builder
@Jacksonized
public class GetRequestParams {
@JsonProperty("#")
private String nodeID;

View File

@ -2,6 +2,7 @@ package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import lombok.Builder;
import lombok.Data;
@ -14,5 +15,5 @@ import lombok.extern.jackson.Jacksonized;
@EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage {
@JsonProperty("put")
private Node[] params;
private MemoryGraph graph;
}

View File

@ -26,7 +26,7 @@ public class GunClient extends WebSocketClient implements Peer {
@Override
public void onMessage(String message) {
// TODO
handler.handleIncomingMessage(message);
}
@Override

View File

@ -45,7 +45,7 @@ public class MemoryStorage extends Storage {
}
@Override
void updateNode(Node node) {
protected void updateNode(Node node) {
Node currentNode = nodes.get(node.getMetadata().getNodeID());
currentNode.values.putAll(node.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates());
@ -72,7 +72,7 @@ public class MemoryStorage extends Storage {
}
@Override
void putDeferredNode(DeferredNode node) {
protected void putDeferredNode(DeferredNode node) {
deferredNodes.put(node.getMetadata().getNodeID(), node);
}
}

View File

@ -7,36 +7,34 @@ import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
public abstract class Storage {
abstract Node getNode(String id);
public abstract Node getNode(String id);
abstract void updateNode(Node node);
protected abstract void updateNode(Node node);
abstract void addNode(String id, Node node);
public abstract void addNode(String id, Node node);
abstract boolean hasNode(String id);
public abstract boolean hasNode(String id);
abstract Set<Map.Entry<String, Node>> entries();
public abstract Set<Map.Entry<String, Node>> entries();
abstract Collection<Node> nodes();
public abstract Collection<Node> nodes();
abstract boolean isEmpty();
public abstract boolean isEmpty();
abstract void putDeferredNode(DeferredNode node);
protected abstract void putDeferredNode(DeferredNode node);
private final Map<String, List<NodeChangeListener>> changeListeners = new HashMap<>();
private final Map<String, List<NodeChangeListener.Map>> mapChangeListeners = new HashMap<>();
/**
* Merge graph update (usually received from the network)
*
* @param update Graph update
* @param changeListeners User callbacks which fired when Node has changed (.on() API)
* @param mapChangeListeners User callbacks which fired when Node has changed (.map() API)
*/
public void mergeUpdate(MemoryGraph update, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.Map> mapChangeListeners) {
public void mergeUpdate(MemoryGraph update) {
long machine = System.currentTimeMillis();
MemoryGraph diff = new MemoryGraph();
for (Map.Entry<String, Node> entry : update.getNodes().entrySet()) {
@ -56,11 +54,11 @@ public abstract class Storage {
}
if (changeListeners.containsKey(diffEntry.getKey())) {
changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue());
changeListeners.get(diffEntry.getKey()).forEach((e) -> e.onChange(diffEntry.getValue()));
}
if (mapChangeListeners.containsKey(diffEntry.getKey())) {
for (Map.Entry<String, Object> nodeEntry : changedNode.getValues().entrySet()) {
mapChangeListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue());
mapChangeListeners.get(nodeEntry.getKey()).forEach((e) -> e.onChange(nodeEntry.getKey(), nodeEntry.getValue()));
}
}
}
@ -113,4 +111,22 @@ public abstract class Storage {
return changedNode;
}
public void addChangeListener(String nodeID, NodeChangeListener listener) {
this.changeListeners.putIfAbsent(nodeID, new ArrayList<>());
this.changeListeners.get(nodeID).add(listener);
}
public void clearChangeListeners(String nodeID) {
this.changeListeners.remove(nodeID);
}
public void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
this.mapChangeListeners.putIfAbsent(nodeID, new ArrayList<>());
this.mapChangeListeners.get(nodeID).add(listener);
}
public void clearMapChangeListeners(String nodeID) {
this.mapChangeListeners.remove(nodeID);
}
}