Implement updating nodes for MemoryStorage, handling deferred nodes

This commit is contained in:
ChronosX88 2023-11-14 20:36:26 +03:00
parent 3338c54680
commit 1eb00914a8
6 changed files with 99 additions and 21 deletions

View File

@ -11,7 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class Gun {
private GunClient gunClient;
private final Map<String, NodeChangeListener> changeListeners = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener.ForEach> forEachListeners = new ConcurrentHashMap<>();
private final Map<String, NodeChangeListener.Map> mapChangeListeners = new ConcurrentHashMap<>();
public Gun(InetAddress address, int port, Storage storage) {
try {
@ -32,7 +32,7 @@ public class Gun {
changeListeners.put(nodeID, listener);
}
protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) {
forEachListeners.put(nodeID, listener);
protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
mapChangeListeners.put(nodeID, listener);
}
}

View File

@ -6,7 +6,7 @@ import io.github.chronosx88.JGUN.models.Node;
public interface NodeChangeListener {
void onChange(Node node);
interface ForEach {
interface Map {
void onChange(String key, Object value);
}
}

View File

@ -34,7 +34,7 @@ public class PathReference {
database.addChangeListener(String.join("/", path), changeListener);
}
public void map(NodeChangeListener.ForEach forEachListener) {
database.addForEachChangeListener(String.join("/", path), forEachListener);
public void map(NodeChangeListener.Map forEachListener) {
database.addMapChangeListener(String.join("/", path), forEachListener);
}
}

View File

@ -0,0 +1,31 @@
package io.github.chronosx88.JGUN.models;
import lombok.Getter;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DeferredNode extends Node implements Delayed {
private long deferredUntil = 0;
DeferredNode(NodeMetadata metadata, Map<String, Object> values) {
super(metadata, values);
}
@Override
public long getDelay(TimeUnit timeUnit) {
long delay = deferredUntil - System.currentTimeMillis();
return timeUnit.convert(delay, TimeUnit.MILLISECONDS);
}
public void setDelay(long delayDuration) {
this.deferredUntil = System.currentTimeMillis() + delayDuration;
}
@Override
public int compareTo(Delayed delayed) {
return Math.toIntExact(this.deferredUntil - ((DeferredNode) delayed).deferredUntil);
}
}

View File

@ -1,18 +1,43 @@
package io.github.chronosx88.JGUN.storage;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.DeferredNode;
import io.github.chronosx88.JGUN.models.Node;
import org.checkerframework.checker.index.qual.NonNegative;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class MemoryStorage extends Storage {
private final Map<String, Node> nodes;
private final Map<String, Node> nodes = new ConcurrentHashMap<>();
private final Cache<String, DeferredNode> deferredNodes;
public MemoryStorage() {
nodes = new LinkedHashMap<>();
public MemoryStorage() {
deferredNodes = Caffeine.newBuilder().expireAfter(new Expiry<String, DeferredNode>() {
@Override
public long expireAfterCreate(String key, DeferredNode value, long currentTime) {
return value.getDelay(TimeUnit.NANOSECONDS);
}
@Override
public long expireAfterUpdate(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) {
return Long.MAX_VALUE;
}
@Override
public long expireAfterRead(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) {
return Long.MAX_VALUE;
}
})
.evictionListener((key, value, cause) -> {
assert value != null;
this.mergeNode(value, System.currentTimeMillis());
}).build();
}
public Node getNode(String id) {
@ -21,8 +46,9 @@ public class MemoryStorage extends Storage {
@Override
void updateNode(Node node) {
// TODO
throw new UnsupportedOperationException("TODO");
Node currentNode = nodes.get(node.getMetadata().getNodeID());
currentNode.values.putAll(node.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates());
}
public void addNode(String id, Node incomingNode) {
@ -44,4 +70,9 @@ public class MemoryStorage extends Storage {
public boolean isEmpty() {
return nodes.isEmpty();
}
@Override
void putDeferredNode(DeferredNode node) {
deferredNodes.put(node.getMetadata().getNodeID(), node);
}
}

View File

@ -2,28 +2,41 @@ package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.HAM;
import io.github.chronosx88.JGUN.NodeChangeListener;
import io.github.chronosx88.JGUN.models.DeferredNode;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.util.*;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public abstract class Storage {
abstract Node getNode(String id);
abstract void updateNode(Node node);
abstract void addNode(String id, Node node);
abstract boolean hasNode(String id);
abstract Set<Map.Entry<String, Node>> entries();
abstract Collection<Node> nodes();
abstract boolean isEmpty();
abstract void putDeferredNode(DeferredNode node);
/**
* Merge graph update (usually received from the network)
* @param update Graph update
* @param changeListeners
* @param forEachListeners
*
* @param update Graph update
* @param changeListeners User callbacks which fired when Node has changed (.on() API)
* @param mapChangeListeners User callbacks which fired when Node has changed (.map() API)
*/
public void mergeUpdate(MemoryGraph update, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
public void mergeUpdate(MemoryGraph update, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.Map> mapChangeListeners) {
long machine = System.currentTimeMillis();
MemoryGraph diff = new MemoryGraph();
for (Map.Entry<String, Node> entry : update.getNodes().entrySet()) {
@ -45,9 +58,9 @@ public abstract class Storage {
if (changeListeners.containsKey(diffEntry.getKey())) {
changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue());
}
if (forEachListeners.containsKey(diffEntry.getKey())) {
if (mapChangeListeners.containsKey(diffEntry.getKey())) {
for (Map.Entry<String, Object> nodeEntry : changedNode.getValues().entrySet()) {
forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue());
mapChangeListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue());
}
}
}
@ -56,6 +69,7 @@ public abstract class Storage {
/**
* Merge updated node
*
* @param incomingNode Updated node
* @param machineState Current machine state
* @return Node with changes or null if no changes
@ -79,7 +93,9 @@ public abstract class Storage {
if (!ham.incoming) {
if (ham.defer) {
// TODO handle deferred value
DeferredNode deferred = (DeferredNode) incomingNode;
deferred.setDelay(state - machineState);
this.putDeferredNode(deferred);
}
continue;
}