From 87da9656163643a84c27eba35bc63770eb98c06e Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Wed, 15 Nov 2023 00:15:18 +0300 Subject: [PATCH] Reimplement network handler --- build.gradle | 1 - .../java/io/github/chronosx88/JGUN/Dup.java | 25 ++- .../java/io/github/chronosx88/JGUN/Gun.java | 17 +- .../chronosx88/JGUN/NetworkHandler.java | 147 +++++++++++++----- .../github/chronosx88/JGUN/PathReference.java | 11 +- .../JGUN/futures/BaseCompletableFuture.java | 2 +- .../chronosx88/JGUN/futures/FutureGet.java | 4 +- .../chronosx88/JGUN/futures/FuturePut.java | 2 +- .../chronosx88/JGUN/futures/GetResult.java | 11 ++ .../chronosx88/JGUN/futures/Result.java | 12 ++ .../chronosx88/JGUN/models/BaseMessage.java | 14 ++ .../chronosx88/JGUN/models/MemoryGraph.java | 3 + .../chronosx88/JGUN/models/acks/BaseAck.java | 7 +- .../chronosx88/JGUN/models/acks/GetAck.java | 3 +- .../chronosx88/JGUN/models/acks/PutAck.java | 12 -- .../JGUN/models/requests/GetRequest.java | 4 + .../models/requests/GetRequestParams.java | 6 + .../JGUN/models/requests/PutRequest.java | 3 +- .../chronosx88/JGUN/nodes/GunClient.java | 2 +- .../JGUN/storage/MemoryStorage.java | 4 +- .../chronosx88/JGUN/storage/Storage.java | 52 ++++--- 21 files changed, 237 insertions(+), 105 deletions(-) create mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java create mode 100644 src/main/java/io/github/chronosx88/JGUN/futures/Result.java delete mode 100644 src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java diff --git a/build.gradle b/build.gradle index 6232692..a53f89f 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,6 @@ dependencies { implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0' implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3' - implementation 'javax.cache:cache-api:1.1.1' implementation 'com.github.ben-manes.caffeine:jcache:3.1.5' compileOnly 'org.projectlombok:lombok:1.18.30' annotationProcessor 'org.projectlombok:lombok:1.18.30' diff --git a/src/main/java/io/github/chronosx88/JGUN/Dup.java b/src/main/java/io/github/chronosx88/JGUN/Dup.java index 563a354..acb4171 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Dup.java +++ b/src/main/java/io/github/chronosx88/JGUN/Dup.java @@ -1,12 +1,9 @@ package io.github.chronosx88.JGUN; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.Caching; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.expiry.CreatedExpiryPolicy; -import javax.cache.expiry.Duration; -import javax.cache.spi.CachingProvider; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -14,11 +11,9 @@ public class Dup { private final Cache cache; public Dup(long age) { - CachingProvider cachingProvider = Caching.getCachingProvider(); - CacheManager cacheManager = cachingProvider.getCacheManager(); - MutableConfiguration config = new MutableConfiguration<>(); - config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age))); - this.cache = cacheManager.createCache("dup", config); + this.cache = Caffeine.newBuilder() + .expireAfterWrite(age, TimeUnit.SECONDS) + .build(); } private void track(String id) { @@ -26,7 +21,11 @@ public class Dup { } public boolean isDuplicated(String id) { - if(cache.containsKey(id)) { + Long timestamp = null; + try { + timestamp = cache.getIfPresent(id); + } catch (NullPointerException ignored) {} + if(Objects.nonNull(timestamp)) { return true; } else { track(id); diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java index 95247ec..74882d8 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Gun.java +++ b/src/main/java/io/github/chronosx88/JGUN/Gun.java @@ -1,5 +1,6 @@ package io.github.chronosx88.JGUN; +import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.nodes.GunClient; import io.github.chronosx88.JGUN.storage.Storage; @@ -10,10 +11,10 @@ import java.util.concurrent.ConcurrentHashMap; public class Gun { private GunClient gunClient; - private final Map changeListeners = new ConcurrentHashMap<>(); - private final Map mapChangeListeners = new ConcurrentHashMap<>(); + private final Storage storage; public Gun(InetAddress address, int port, Storage storage) { + this.storage = storage; try { this.gunClient = new GunClient(address, port, storage); this.gunClient.connectBlocking(); @@ -29,10 +30,18 @@ public class Gun { } protected void addChangeListener(String nodeID, NodeChangeListener listener) { - changeListeners.put(nodeID, listener); + storage.addChangeListener(nodeID, listener); } protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) { - mapChangeListeners.put(nodeID, listener); + storage.addMapChangeListener(nodeID, listener); + } + + protected void sendPutRequest(MemoryGraph data) { + // TODO + } + + protected void sendGetRequest(String key, String field) { + // TODO } } diff --git a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java index 09d795b..f55ee0d 100644 --- a/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java +++ b/src/main/java/io/github/chronosx88/JGUN/NetworkHandler.java @@ -1,79 +1,146 @@ package io.github.chronosx88.JGUN; -import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.chronosx88.JGUN.futures.FutureGet; +import io.github.chronosx88.JGUN.futures.FuturePut; +import io.github.chronosx88.JGUN.futures.GetResult; +import io.github.chronosx88.JGUN.futures.Result; import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.MemoryGraph; +import io.github.chronosx88.JGUN.models.Node; +import io.github.chronosx88.JGUN.models.NodeMetadata; import io.github.chronosx88.JGUN.models.acks.BaseAck; import io.github.chronosx88.JGUN.models.acks.GetAck; -import io.github.chronosx88.JGUN.models.acks.PutAck; import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.nodes.Peer; import io.github.chronosx88.JGUN.storage.Storage; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class NetworkHandler { - private final Map> pendingFutures = new ConcurrentHashMap<>(); + private final Map pendingGetRequests = new ConcurrentHashMap<>(); + private final Map pendingPutRequests = new ConcurrentHashMap<>(); private final Peer peer; - private final Storage graphStorage; + private final Storage storage; private final Dup dup; private final Executor executorService = Executors.newCachedThreadPool(); + private final ObjectMapper objectMapper = new ObjectMapper(); - public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) { - this.graphStorage = graphStorage; + public NetworkHandler(Storage storage, Peer peer, Dup dup) { + this.storage = storage; this.peer = peer; this.dup = dup; } - public void addPendingFuture(BaseCompletableFuture future) { - pendingFutures.put(future.getFutureID(), future); + public void addPendingGetRequest(FutureGet future) { + this.pendingGetRequests.put(future.getFutureID(), future); } - public void handleIncomingMessage(BaseMessage message) { - if (message instanceof GetRequest) { - handleGet((GetRequest) message); - } else if (message instanceof PutRequest) { - handlePut((PutRequest) message); - } else if (message instanceof BaseAck) { - handleAck((BaseAck) message); + public void addPendingPutRequest(FuturePut future) { + this.pendingPutRequests.put(future.getFutureID(), future); + } + + public void handleIncomingMessage(String message) { + BaseMessage parsedMessage; + try { + parsedMessage = objectMapper.readValue(message, BaseMessage.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } - peer.emit(message.toString()); + + if (dup.isDuplicated(parsedMessage.getId())) { + // TODO log + return; + } + + final BaseMessage msg = parsedMessage; + executorService.execute(() -> { + BaseMessage response = null; + if (msg instanceof GetRequest) { + response = handleGet((GetRequest) msg); + } else if (msg instanceof PutRequest) { + response = handlePut((PutRequest) msg); + } else if (msg instanceof BaseAck) { + response = handleAck((BaseAck) msg); + } + if (Objects.nonNull(response)) { + String respString; + try { + respString = objectMapper.writeValueAsString(response); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + peer.emit(respString); + } + }); + peer.emit(message); } private GetAck handleGet(GetRequest request) { - // TODO - throw new UnsupportedOperationException("TODO"); - } - - private PutAck handlePut(PutRequest request) { - // TODO - throw new UnsupportedOperationException("TODO"); - } - - private void handleAck(BaseAck ack) { - if (ack instanceof GetAck) { - // TODO - } else if (ack instanceof PutAck) { - // TODO + Node node = storage.getNode(request.getParams().getNodeID()); + if (Objects.isNull(node)) return null; + String fieldName = request.getParams().getField(); + if (Objects.nonNull(fieldName)) { + Object fieldValue = node.getValues().get(fieldName); + if (Objects.nonNull(fieldValue)) { + node = Node.builder() + .values(Map.of(fieldName, fieldValue)) + .metadata(NodeMetadata.builder() + .nodeID(node.getMetadata().getNodeID()) + .states(Map.of(fieldName, node.getMetadata().getStates().get(fieldName))) + .build()) + .build(); + } } - - throw new UnsupportedOperationException("TODO"); + return GetAck.builder() + .id(Dup.random()) + .replyTo(request.getId()) + .data(MemoryGraph.builder() + .nodes(Map.of(node.getMetadata().getNodeID(), node)) + .build()) + .ok(true) + .build(); } - public void sendPutRequest(String messageID, MemoryGraph data) { - executorService.execute(() -> { - // TODO - }); + private BaseAck handlePut(PutRequest request) { + storage.mergeUpdate(request.getGraph()); + return BaseAck.builder() + .id(Dup.random()) + .replyTo(request.getId()) + .ok(true) + .build(); } - public void sendGetRequest(String messageID, String key, String field) { - executorService.execute(() -> { - // TODO - }); + private BaseAck handleAck(BaseAck ack) { + if (ack instanceof GetAck) { + FutureGet future = pendingGetRequests.get(ack.getReplyTo()); + if (Objects.nonNull(future)) { + GetAck getAck = (GetAck) ack; + future.complete(GetResult.builder() + .ok(getAck.isOk()) + .data(getAck.getData()) + .build()); + } + return handlePut(PutRequest + .builder() + .graph(((GetAck) ack).getData()) + .build()); + } else { + FuturePut future = pendingPutRequests.get(ack.getReplyTo()); + if (Objects.nonNull(future)) { + future.complete(Result.builder() + .ok(ack.isOk()) + .build()); + } + System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }"); + return null; + } } } diff --git a/src/main/java/io/github/chronosx88/JGUN/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/PathReference.java index a5958a1..747f80a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/PathReference.java +++ b/src/main/java/io/github/chronosx88/JGUN/PathReference.java @@ -2,6 +2,7 @@ package io.github.chronosx88.JGUN; import io.github.chronosx88.JGUN.futures.FutureGet; import io.github.chronosx88.JGUN.futures.FuturePut; +import io.github.chronosx88.JGUN.nodes.GunClient; import java.util.ArrayList; import java.util.HashMap; @@ -9,10 +10,10 @@ import java.util.HashMap; public class PathReference { private final ArrayList path = new ArrayList<>(); - private Gun database; + private Gun gun; - public PathReference(Gun db) { - this.database = db; + public PathReference(Gun gun) { + this.gun = gun; } public PathReference get(String key) { @@ -31,10 +32,10 @@ public class PathReference { } public void on(NodeChangeListener changeListener) { - database.addChangeListener(String.join("/", path), changeListener); + gun.addChangeListener(String.join("/", path), changeListener); } public void map(NodeChangeListener.Map forEachListener) { - database.addMapChangeListener(String.join("/", path), forEachListener); + gun.addMapChangeListener(String.join("/", path), forEachListener); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java index 2801881..813ebe4 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/BaseCompletableFuture.java @@ -2,7 +2,7 @@ package io.github.chronosx88.JGUN.futures; import java.util.concurrent.ExecutionException; -import java9.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletableFuture; import lombok.Getter; diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java index b4c79dc..b6064db 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FutureGet.java @@ -1,8 +1,6 @@ package io.github.chronosx88.JGUN.futures; -import io.github.chronosx88.JGUN.models.MemoryGraph; - -public class FutureGet extends BaseCompletableFuture { +public class FutureGet extends BaseCompletableFuture { public FutureGet(String id) { super(id); } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java index 1859c58..5be3e73 100644 --- a/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java +++ b/src/main/java/io/github/chronosx88/JGUN/futures/FuturePut.java @@ -3,7 +3,7 @@ package io.github.chronosx88.JGUN.futures; /** * Return success of PUT operation */ -public class FuturePut extends BaseCompletableFuture { +public class FuturePut extends BaseCompletableFuture { public FuturePut(String id) { super(id); } diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java b/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java new file mode 100644 index 0000000..86b545a --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/GetResult.java @@ -0,0 +1,11 @@ +package io.github.chronosx88.JGUN.futures; + +import io.github.chronosx88.JGUN.models.MemoryGraph; +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +public class GetResult extends Result { + private final MemoryGraph data; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/futures/Result.java b/src/main/java/io/github/chronosx88/JGUN/futures/Result.java new file mode 100644 index 0000000..e824016 --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/futures/Result.java @@ -0,0 +1,12 @@ +package io.github.chronosx88.JGUN.futures; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +@Getter +@SuperBuilder +public class Result { + private boolean ok; +} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java index 5c9bf30..bbaa3d7 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/BaseMessage.java @@ -1,9 +1,23 @@ package io.github.chronosx88.JGUN.models; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.github.chronosx88.JGUN.models.acks.GetAck; +import io.github.chronosx88.JGUN.models.requests.GetRequest; +import io.github.chronosx88.JGUN.models.requests.PutRequest; +import lombok.Builder; import lombok.Data; +import lombok.experimental.SuperBuilder; @Data +@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) +@JsonSubTypes({ + @JsonSubTypes.Type(GetRequest.class), + @JsonSubTypes.Type(PutRequest.class), + @JsonSubTypes.Type(GetAck.class) +}) +@SuperBuilder public abstract class BaseMessage { @JsonProperty("#") private String id; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java index 51ccd9a..ea0505f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/MemoryGraph.java @@ -11,8 +11,11 @@ import java.util.LinkedHashMap; import java.util.Map; @Data +@Builder +@Jacksonized public class MemoryGraph { @JsonIgnore + @Builder.Default public final Map nodes = new LinkedHashMap<>(); @JsonAnyGetter diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java index 5a64d68..702336f 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/BaseAck.java @@ -5,12 +5,15 @@ import io.github.chronosx88.JGUN.models.BaseMessage; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.experimental.SuperBuilder; import lombok.extern.jackson.Jacksonized; @Data +@SuperBuilder @EqualsAndHashCode(callSuper = true) -public abstract class BaseAck extends BaseMessage { +@Jacksonized +public class BaseAck extends BaseMessage { @JsonProperty("@") private String replyTo; - private String ok; + private boolean ok; } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java index be491fb..3fccfe8 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/acks/GetAck.java @@ -5,11 +5,12 @@ import io.github.chronosx88.JGUN.models.MemoryGraph; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.experimental.SuperBuilder; import lombok.extern.jackson.Jacksonized; @Data +@SuperBuilder @EqualsAndHashCode(callSuper = true) -@Builder @Jacksonized public class GetAck extends BaseAck { @JsonProperty("put") diff --git a/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java b/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java deleted file mode 100644 index 7495e97..0000000 --- a/src/main/java/io/github/chronosx88/JGUN/models/acks/PutAck.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.github.chronosx88.JGUN.models.acks; - -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.extern.jackson.Jacksonized; - -@Data -@EqualsAndHashCode(callSuper = true) -@Builder -@Jacksonized -public class PutAck extends BaseAck {} diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java index 5b557e1..758c277 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequest.java @@ -2,11 +2,15 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; import io.github.chronosx88.JGUN.models.BaseMessage; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.jackson.Jacksonized; @Data +@Builder @EqualsAndHashCode(callSuper = true) +@Jacksonized public class GetRequest extends BaseMessage { @JsonProperty("get") private GetRequestParams params; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java index 8d3d68a..6b8305a 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/GetRequestParams.java @@ -1,7 +1,13 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; +@Data +@Builder +@Jacksonized public class GetRequestParams { @JsonProperty("#") private String nodeID; diff --git a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java index 80e706b..fc8db52 100644 --- a/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java +++ b/src/main/java/io/github/chronosx88/JGUN/models/requests/PutRequest.java @@ -2,6 +2,7 @@ package io.github.chronosx88.JGUN.models.requests; import com.fasterxml.jackson.annotation.JsonProperty; import io.github.chronosx88.JGUN.models.BaseMessage; +import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.models.Node; import lombok.Builder; import lombok.Data; @@ -14,5 +15,5 @@ import lombok.extern.jackson.Jacksonized; @EqualsAndHashCode(callSuper = true) public class PutRequest extends BaseMessage { @JsonProperty("put") - private Node[] params; + private MemoryGraph graph; } diff --git a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java index e2fcc92..d4ebb42 100644 --- a/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java +++ b/src/main/java/io/github/chronosx88/JGUN/nodes/GunClient.java @@ -26,7 +26,7 @@ public class GunClient extends WebSocketClient implements Peer { @Override public void onMessage(String message) { - // TODO + handler.handleIncomingMessage(message); } @Override diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java index 17b6cdd..055669b 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java @@ -45,7 +45,7 @@ public class MemoryStorage extends Storage { } @Override - void updateNode(Node node) { + protected void updateNode(Node node) { Node currentNode = nodes.get(node.getMetadata().getNodeID()); currentNode.values.putAll(node.values); currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); @@ -72,7 +72,7 @@ public class MemoryStorage extends Storage { } @Override - void putDeferredNode(DeferredNode node) { + protected void putDeferredNode(DeferredNode node) { deferredNodes.put(node.getMetadata().getNodeID(), node); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java index 326d0c1..ba0fccb 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java @@ -7,36 +7,34 @@ import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.models.Node; import io.github.chronosx88.JGUN.models.NodeMetadata; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; public abstract class Storage { - abstract Node getNode(String id); + public abstract Node getNode(String id); - abstract void updateNode(Node node); + protected abstract void updateNode(Node node); - abstract void addNode(String id, Node node); + public abstract void addNode(String id, Node node); - abstract boolean hasNode(String id); + public abstract boolean hasNode(String id); - abstract Set> entries(); + public abstract Set> entries(); - abstract Collection nodes(); + public abstract Collection nodes(); - abstract boolean isEmpty(); + public abstract boolean isEmpty(); - abstract void putDeferredNode(DeferredNode node); + protected abstract void putDeferredNode(DeferredNode node); + + private final Map> changeListeners = new HashMap<>(); + private final Map> mapChangeListeners = new HashMap<>(); /** * Merge graph update (usually received from the network) * - * @param update Graph update - * @param changeListeners User callbacks which fired when Node has changed (.on() API) - * @param mapChangeListeners User callbacks which fired when Node has changed (.map() API) + * @param update Graph update */ - public void mergeUpdate(MemoryGraph update, Map changeListeners, Map mapChangeListeners) { + public void mergeUpdate(MemoryGraph update) { long machine = System.currentTimeMillis(); MemoryGraph diff = new MemoryGraph(); for (Map.Entry entry : update.getNodes().entrySet()) { @@ -56,11 +54,11 @@ public abstract class Storage { } if (changeListeners.containsKey(diffEntry.getKey())) { - changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue()); + changeListeners.get(diffEntry.getKey()).forEach((e) -> e.onChange(diffEntry.getValue())); } if (mapChangeListeners.containsKey(diffEntry.getKey())) { for (Map.Entry nodeEntry : changedNode.getValues().entrySet()) { - mapChangeListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue()); + mapChangeListeners.get(nodeEntry.getKey()).forEach((e) -> e.onChange(nodeEntry.getKey(), nodeEntry.getValue())); } } } @@ -113,4 +111,22 @@ public abstract class Storage { return changedNode; } + + public void addChangeListener(String nodeID, NodeChangeListener listener) { + this.changeListeners.putIfAbsent(nodeID, new ArrayList<>()); + this.changeListeners.get(nodeID).add(listener); + } + + public void clearChangeListeners(String nodeID) { + this.changeListeners.remove(nodeID); + } + + public void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) { + this.mapChangeListeners.putIfAbsent(nodeID, new ArrayList<>()); + this.mapChangeListeners.get(nodeID).add(listener); + } + + public void clearMapChangeListeners(String nodeID) { + this.mapChangeListeners.remove(nodeID); + } }