diff --git a/src/main/java/io/github/chronosx88/JGUN/Gun.java b/src/main/java/io/github/chronosx88/JGUN/Gun.java index 197b397..95247ec 100644 --- a/src/main/java/io/github/chronosx88/JGUN/Gun.java +++ b/src/main/java/io/github/chronosx88/JGUN/Gun.java @@ -11,7 +11,7 @@ import java.util.concurrent.ConcurrentHashMap; public class Gun { private GunClient gunClient; private final Map changeListeners = new ConcurrentHashMap<>(); - private final Map forEachListeners = new ConcurrentHashMap<>(); + private final Map mapChangeListeners = new ConcurrentHashMap<>(); public Gun(InetAddress address, int port, Storage storage) { try { @@ -32,7 +32,7 @@ public class Gun { changeListeners.put(nodeID, listener); } - protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) { - forEachListeners.put(nodeID, listener); + protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) { + mapChangeListeners.put(nodeID, listener); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java index 72432af..4d3d58e 100644 --- a/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java +++ b/src/main/java/io/github/chronosx88/JGUN/NodeChangeListener.java @@ -6,7 +6,7 @@ import io.github.chronosx88.JGUN.models.Node; public interface NodeChangeListener { void onChange(Node node); - interface ForEach { + interface Map { void onChange(String key, Object value); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/PathReference.java b/src/main/java/io/github/chronosx88/JGUN/PathReference.java index 29b7fd5..a5958a1 100644 --- a/src/main/java/io/github/chronosx88/JGUN/PathReference.java +++ b/src/main/java/io/github/chronosx88/JGUN/PathReference.java @@ -34,7 +34,7 @@ public class PathReference { database.addChangeListener(String.join("/", path), changeListener); } - public void map(NodeChangeListener.ForEach forEachListener) { - database.addForEachChangeListener(String.join("/", path), forEachListener); + public void map(NodeChangeListener.Map forEachListener) { + database.addMapChangeListener(String.join("/", path), forEachListener); } } diff --git a/src/main/java/io/github/chronosx88/JGUN/models/DeferredNode.java b/src/main/java/io/github/chronosx88/JGUN/models/DeferredNode.java new file mode 100644 index 0000000..2fbceea --- /dev/null +++ b/src/main/java/io/github/chronosx88/JGUN/models/DeferredNode.java @@ -0,0 +1,31 @@ +package io.github.chronosx88.JGUN.models; + +import lombok.Getter; + +import java.util.Map; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class DeferredNode extends Node implements Delayed { + private long deferredUntil = 0; + + DeferredNode(NodeMetadata metadata, Map values) { + super(metadata, values); + } + + @Override + public long getDelay(TimeUnit timeUnit) { + long delay = deferredUntil - System.currentTimeMillis(); + return timeUnit.convert(delay, TimeUnit.MILLISECONDS); + } + + public void setDelay(long delayDuration) { + this.deferredUntil = System.currentTimeMillis() + delayDuration; + } + + @Override + public int compareTo(Delayed delayed) { + return Math.toIntExact(this.deferredUntil - ((DeferredNode) delayed).deferredUntil); + } + +} diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java index a2de416..17b6cdd 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/MemoryStorage.java @@ -1,18 +1,43 @@ package io.github.chronosx88.JGUN.storage; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import io.github.chronosx88.JGUN.models.DeferredNode; import io.github.chronosx88.JGUN.models.Node; +import org.checkerframework.checker.index.qual.NonNegative; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; - +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; public class MemoryStorage extends Storage { - private final Map nodes; + private final Map nodes = new ConcurrentHashMap<>(); + private final Cache deferredNodes; - public MemoryStorage() { - nodes = new LinkedHashMap<>(); + public MemoryStorage() { + deferredNodes = Caffeine.newBuilder().expireAfter(new Expiry() { + @Override + public long expireAfterCreate(String key, DeferredNode value, long currentTime) { + return value.getDelay(TimeUnit.NANOSECONDS); + } + + @Override + public long expireAfterUpdate(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) { + return Long.MAX_VALUE; + } + + @Override + public long expireAfterRead(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) { + return Long.MAX_VALUE; + } + }) + .evictionListener((key, value, cause) -> { + assert value != null; + this.mergeNode(value, System.currentTimeMillis()); + }).build(); } public Node getNode(String id) { @@ -21,8 +46,9 @@ public class MemoryStorage extends Storage { @Override void updateNode(Node node) { - // TODO - throw new UnsupportedOperationException("TODO"); + Node currentNode = nodes.get(node.getMetadata().getNodeID()); + currentNode.values.putAll(node.values); + currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates()); } public void addNode(String id, Node incomingNode) { @@ -44,4 +70,9 @@ public class MemoryStorage extends Storage { public boolean isEmpty() { return nodes.isEmpty(); } + + @Override + void putDeferredNode(DeferredNode node) { + deferredNodes.put(node.getMetadata().getNodeID(), node); + } } diff --git a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java index d2234c5..326d0c1 100644 --- a/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java +++ b/src/main/java/io/github/chronosx88/JGUN/storage/Storage.java @@ -2,28 +2,41 @@ package io.github.chronosx88.JGUN.storage; import io.github.chronosx88.JGUN.HAM; import io.github.chronosx88.JGUN.NodeChangeListener; +import io.github.chronosx88.JGUN.models.DeferredNode; import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.models.Node; import io.github.chronosx88.JGUN.models.NodeMetadata; -import java.util.*; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; public abstract class Storage { abstract Node getNode(String id); + abstract void updateNode(Node node); + abstract void addNode(String id, Node node); + abstract boolean hasNode(String id); + abstract Set> entries(); + abstract Collection nodes(); + abstract boolean isEmpty(); + abstract void putDeferredNode(DeferredNode node); + /** * Merge graph update (usually received from the network) - * @param update Graph update - * @param changeListeners - * @param forEachListeners + * + * @param update Graph update + * @param changeListeners User callbacks which fired when Node has changed (.on() API) + * @param mapChangeListeners User callbacks which fired when Node has changed (.map() API) */ - public void mergeUpdate(MemoryGraph update, Map changeListeners, Map forEachListeners) { + public void mergeUpdate(MemoryGraph update, Map changeListeners, Map mapChangeListeners) { long machine = System.currentTimeMillis(); MemoryGraph diff = new MemoryGraph(); for (Map.Entry entry : update.getNodes().entrySet()) { @@ -45,9 +58,9 @@ public abstract class Storage { if (changeListeners.containsKey(diffEntry.getKey())) { changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue()); } - if (forEachListeners.containsKey(diffEntry.getKey())) { + if (mapChangeListeners.containsKey(diffEntry.getKey())) { for (Map.Entry nodeEntry : changedNode.getValues().entrySet()) { - forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue()); + mapChangeListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue()); } } } @@ -56,6 +69,7 @@ public abstract class Storage { /** * Merge updated node + * * @param incomingNode Updated node * @param machineState Current machine state * @return Node with changes or null if no changes @@ -79,7 +93,9 @@ public abstract class Storage { if (!ham.incoming) { if (ham.defer) { - // TODO handle deferred value + DeferredNode deferred = (DeferredNode) incomingNode; + deferred.setDelay(state - machineState); + this.putDeferredNode(deferred); } continue; }