Compare commits

...

3 Commits

Author SHA1 Message Date
ChronosX88
b83e3ee14e Reimplement .put() API 2023-11-15 02:37:19 +03:00
ChronosX88
87da965616 Reimplement network handler 2023-11-15 00:15:18 +03:00
ChronosX88
1eb00914a8 Implement updating nodes for MemoryStorage, handling deferred nodes 2023-11-14 20:36:26 +03:00
31 changed files with 660 additions and 158 deletions

View File

@ -19,8 +19,12 @@ dependencies {
implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0' implementation 'net.sourceforge.streamsupport:android-retrofuture:1.7.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3' implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.3'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.3'
implementation 'javax.cache:cache-api:1.1.1'
implementation 'com.github.ben-manes.caffeine:jcache:3.1.5' implementation 'com.github.ben-manes.caffeine:jcache:3.1.5'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.1'
compileOnly 'org.projectlombok:lombok:1.18.30' compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30' annotationProcessor 'org.projectlombok:lombok:1.18.30'
} }
test {
useJUnitPlatform()
}

View File

@ -1,12 +1,9 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import javax.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import javax.cache.CacheManager; import com.github.benmanes.caffeine.cache.Caffeine;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration; import java.util.Objects;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.spi.CachingProvider;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -14,11 +11,9 @@ public class Dup {
private final Cache<String, Long> cache; private final Cache<String, Long> cache;
public Dup(long age) { public Dup(long age) {
CachingProvider cachingProvider = Caching.getCachingProvider(); this.cache = Caffeine.newBuilder()
CacheManager cacheManager = cachingProvider.getCacheManager(); .expireAfterWrite(age, TimeUnit.SECONDS)
MutableConfiguration<String, Long> config = new MutableConfiguration<>(); .build();
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, age)));
this.cache = cacheManager.createCache("dup", config);
} }
private void track(String id) { private void track(String id) {
@ -26,7 +21,11 @@ public class Dup {
} }
public boolean isDuplicated(String id) { public boolean isDuplicated(String id) {
if(cache.containsKey(id)) { Long timestamp = null;
try {
timestamp = cache.getIfPresent(id);
} catch (NullPointerException ignored) {}
if(Objects.nonNull(timestamp)) {
return true; return true;
} else { } else {
track(id); track(id);

View File

@ -0,0 +1,99 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeLink;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.UUID;
public class GraphNodeBuilder {
private final MemoryGraph graph;
private final Node rootNode;
protected static final String ROOT_NODE = "__ROOT__";
public GraphNodeBuilder() {
this.graph = MemoryGraph.builder().build();
this.rootNode = Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(null)
.build())
.build();
graph.nodes.put(ROOT_NODE, this.rootNode);
}
private GraphNodeBuilder addScalar(String name, Object value) {
rootNode.values.put(name, value);
rootNode.getMetadata().getStates().put(name, System.currentTimeMillis());
return this;
}
public GraphNodeBuilder add(String name, String value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, BigInteger value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, BigDecimal value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, int value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, long value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, double value) {
return addScalar(name, value);
}
public GraphNodeBuilder add(String name, boolean value) {
return addScalar(name, value);
}
public GraphNodeBuilder addNull(String name) {
return addScalar(name, null);
}
public GraphNodeBuilder add(String name, GraphNodeBuilder builder) {
String newNodeID = UUID.randomUUID().toString();
rootNode.values.put(name, NodeLink.builder()
.link(newNodeID)
.build());
MemoryGraph innerGraph = builder.build();
innerGraph.nodes.get(ROOT_NODE).getMetadata().setNodeID(newNodeID);
innerGraph.nodes.put(newNodeID, innerGraph.nodes.get(ROOT_NODE));
innerGraph.nodes.remove(ROOT_NODE);
graph.nodes.putAll(innerGraph.nodes);
return this;
}
public GraphNodeBuilder add(String name, NodeArrayBuilder builder) {
MemoryGraph innerGraph = builder.build();
//noinspection unchecked
var innerArray = (List<Object>) innerGraph.nodes.get(ROOT_NODE).values.get(NodeArrayBuilder.ARRAY_FIELD);
rootNode.values.put(name, innerArray);
rootNode.getMetadata().getStates().put(name, innerGraph
.nodes
.get(ROOT_NODE)
.getMetadata()
.getStates()
.get(NodeArrayBuilder.ARRAY_FIELD));
innerGraph.nodes.remove(ROOT_NODE);
graph.nodes.putAll(innerGraph.nodes);
return this;
}
public MemoryGraph build() {
return this.graph;
}
}

View File

@ -1,24 +1,34 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.GunClient; import io.github.chronosx88.JGUN.nodes.GunClient;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map; import java.util.concurrent.Executor;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors;
public class Gun { public class Gun {
private GunClient gunClient; private GunClient peer;
private final Map<String, NodeChangeListener> changeListeners = new ConcurrentHashMap<>(); private final Storage storage;
private final Map<String, NodeChangeListener.ForEach> forEachListeners = new ConcurrentHashMap<>(); private final ObjectMapper objectMapper;
private final Executor executorService = Executors.newCachedThreadPool();
public Gun(InetAddress address, int port, Storage storage) { public Gun(InetAddress address, int port, Storage storage) {
this.objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
this.storage = storage;
try { try {
this.gunClient = new GunClient(address, port, storage); this.peer = new GunClient(address, port, storage);
this.gunClient.connectBlocking(); this.peer.connectBlocking();
} catch (URISyntaxException | InterruptedException e) { } catch (URISyntaxException | InterruptedException e) {
e.printStackTrace(); throw new RuntimeException(e);
} }
} }
@ -29,10 +39,34 @@ public class Gun {
} }
protected void addChangeListener(String nodeID, NodeChangeListener listener) { protected void addChangeListener(String nodeID, NodeChangeListener listener) {
changeListeners.put(nodeID, listener); storage.addChangeListener(nodeID, listener);
} }
protected void addForEachChangeListener(String nodeID, NodeChangeListener.ForEach listener) { protected void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
forEachListeners.put(nodeID, listener); storage.addMapChangeListener(nodeID, listener);
}
protected FuturePut sendPutRequest(MemoryGraph data) {
String reqID = Dup.random();
executorService.execute(() -> {
storage.mergeUpdate(data);
var request = PutRequest.builder()
.id(reqID)
.graph(data)
.build();
String encodedRequest;
try {
encodedRequest = this.objectMapper.writeValueAsString(request);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
peer.emit(encodedRequest);
});
return new FuturePut(reqID);
}
protected void sendGetRequest(String key, String field) {
// TODO
throw new UnsupportedOperationException("TODO");
} }
} }

View File

@ -1,79 +1,150 @@
package io.github.chronosx88.JGUN; package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.futures.GetResult;
import io.github.chronosx88.JGUN.futures.Result;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import io.github.chronosx88.JGUN.models.acks.BaseAck; import io.github.chronosx88.JGUN.models.acks.BaseAck;
import io.github.chronosx88.JGUN.models.acks.GetAck; import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.acks.PutAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest; import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest; import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.nodes.Peer; import io.github.chronosx88.JGUN.nodes.Peer;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class NetworkHandler { public class NetworkHandler {
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>(); private final Map<String, FutureGet> pendingGetRequests = new ConcurrentHashMap<>();
private final Map<String, FuturePut> pendingPutRequests = new ConcurrentHashMap<>();
private final Peer peer; private final Peer peer;
private final Storage graphStorage; private final Storage storage;
private final Dup dup; private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool(); private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper;
public NetworkHandler(Storage graphStorage, Peer peer, Dup dup) { public NetworkHandler(Storage storage, Peer peer, Dup dup) {
this.graphStorage = graphStorage; this.storage = storage;
this.peer = peer; this.peer = peer;
this.dup = dup; this.dup = dup;
this.objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
} }
public void addPendingFuture(BaseCompletableFuture<?> future) { public void addPendingGetRequest(FutureGet future) {
pendingFutures.put(future.getFutureID(), future); this.pendingGetRequests.put(future.getFutureID(), future);
} }
public void handleIncomingMessage(BaseMessage message) { public void addPendingPutRequest(FuturePut future) {
if (message instanceof GetRequest) { this.pendingPutRequests.put(future.getFutureID(), future);
handleGet((GetRequest) message); }
} else if (message instanceof PutRequest) {
handlePut((PutRequest) message); public void handleIncomingMessage(String message) {
} else if (message instanceof BaseAck) { BaseMessage parsedMessage;
handleAck((BaseAck) message); try {
parsedMessage = objectMapper.readValue(message, BaseMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} }
peer.emit(message.toString());
if (dup.isDuplicated(parsedMessage.getId())) {
// TODO log
return;
}
final BaseMessage msg = parsedMessage;
executorService.execute(() -> {
BaseMessage response = null;
if (msg instanceof GetRequest) {
response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) {
response = handlePut((PutRequest) msg);
} else if (msg instanceof BaseAck) {
response = handleAck((BaseAck) msg);
}
if (Objects.nonNull(response)) {
String respString;
try {
respString = objectMapper.writeValueAsString(response);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
peer.emit(respString);
}
});
peer.emit(message);
} }
private GetAck handleGet(GetRequest request) { private GetAck handleGet(GetRequest request) {
// TODO Node node = storage.getNode(request.getParams().getNodeID());
throw new UnsupportedOperationException("TODO"); if (Objects.isNull(node)) return null;
} String fieldName = request.getParams().getField();
if (Objects.nonNull(fieldName)) {
private PutAck handlePut(PutRequest request) { Object fieldValue = node.getValues().get(fieldName);
// TODO if (Objects.nonNull(fieldValue)) {
throw new UnsupportedOperationException("TODO"); node = Node.builder()
} .values(Map.of(fieldName, fieldValue))
.metadata(NodeMetadata.builder()
private void handleAck(BaseAck ack) { .nodeID(node.getMetadata().getNodeID())
if (ack instanceof GetAck) { .states(Map.of(fieldName, node.getMetadata().getStates().get(fieldName)))
// TODO .build())
} else if (ack instanceof PutAck) { .build();
// TODO }
} }
return GetAck.builder()
throw new UnsupportedOperationException("TODO"); .id(Dup.random())
.replyTo(request.getId())
.data(MemoryGraph.builder()
.nodes(Map.of(node.getMetadata().getNodeID(), node))
.build())
.ok(true)
.build();
} }
public void sendPutRequest(String messageID, MemoryGraph data) { private BaseAck handlePut(PutRequest request) {
executorService.execute(() -> { storage.mergeUpdate(request.getGraph());
// TODO return BaseAck.builder()
}); .id(Dup.random())
.replyTo(request.getId())
.ok(true)
.build();
} }
public void sendGetRequest(String messageID, String key, String field) { private BaseAck handleAck(BaseAck ack) {
executorService.execute(() -> { if (ack instanceof GetAck) {
// TODO FutureGet future = pendingGetRequests.get(ack.getReplyTo());
}); if (Objects.nonNull(future)) {
GetAck getAck = (GetAck) ack;
future.complete(GetResult.builder()
.ok(getAck.isOk())
.data(getAck.getData())
.build());
}
return handlePut(PutRequest
.builder()
.graph(((GetAck) ack).getData())
.build());
} else {
FuturePut future = pendingPutRequests.get(ack.getReplyTo());
if (Objects.nonNull(future)) {
future.complete(Result.builder()
.ok(ack.isOk())
.build());
}
System.out.println("Got ack! { #: '" + ack.getId() + "', @: '" + ack.getReplyTo() + "' }");
return null;
}
} }
} }

View File

@ -0,0 +1,88 @@
package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeLink;
import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.*;
public class NodeArrayBuilder {
private final MemoryGraph graph;
private final Node rootNode;
private final List<Object> innerArray;
protected static final String ARRAY_FIELD = "__ARRAY__";
public NodeArrayBuilder() {
this.graph = MemoryGraph.builder().build();
this.innerArray = new ArrayList<>();
this.rootNode = Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(null)
.states(new HashMap<>(Map.of(ARRAY_FIELD, System.currentTimeMillis())))
.build())
.values(Map.of(ARRAY_FIELD, innerArray))
.build();
graph.nodes.put(GraphNodeBuilder.ROOT_NODE, this.rootNode);
}
private NodeArrayBuilder addScalar(Object value) {
this.innerArray.add(value);
this.rootNode.getMetadata().getStates().put(ARRAY_FIELD, System.currentTimeMillis());
return this;
}
public NodeArrayBuilder add(String value) {
return addScalar(value);
}
public NodeArrayBuilder add(BigInteger value) {
return addScalar(value);
}
public NodeArrayBuilder add(BigDecimal value) {
return addScalar(value);
}
public NodeArrayBuilder add(int value) {
return addScalar(value);
}
public NodeArrayBuilder add(long value) {
return addScalar(value);
}
public NodeArrayBuilder add(double value) {
return addScalar(value);
}
public NodeArrayBuilder add(boolean value) {
return addScalar(value);
}
public NodeArrayBuilder addNull(String name) {
return addScalar(null);
}
public NodeArrayBuilder add(GraphNodeBuilder builder) {
String newNodeID = UUID.randomUUID().toString();
//noinspection unchecked
List<Object> innerArray = (List<Object>) rootNode.values.get(ARRAY_FIELD);
innerArray.add(NodeLink.builder()
.link(newNodeID)
.build());
MemoryGraph innerGraph = builder.build();
innerGraph.nodes.get(GraphNodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeID);
innerGraph.nodes.put(newNodeID, innerGraph.nodes.get(GraphNodeBuilder.ROOT_NODE));
innerGraph.nodes.remove(GraphNodeBuilder.ROOT_NODE);
this.graph.nodes.putAll(innerGraph.nodes);
return this;
}
public MemoryGraph build() {
return this.graph;
}
}

View File

@ -6,7 +6,7 @@ import io.github.chronosx88.JGUN.models.Node;
public interface NodeChangeListener { public interface NodeChangeListener {
void onChange(Node node); void onChange(Node node);
interface ForEach { interface Map {
void onChange(String key, Object value); void onChange(String key, Object value);
} }
} }

View File

@ -2,17 +2,18 @@ package io.github.chronosx88.JGUN;
import io.github.chronosx88.JGUN.futures.FutureGet; import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut; import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.List;
public class PathReference { public class PathReference {
private final ArrayList<String> path = new ArrayList<>(); private final List<String> path = new ArrayList<>();
private Gun database; private final Gun gun;
public PathReference(Gun db) { public PathReference(Gun gun) {
this.database = db; this.gun = gun;
} }
public PathReference get(String key) { public PathReference get(String key) {
@ -20,21 +21,20 @@ public class PathReference {
return this; return this;
} }
public FutureGet getData() { public FutureGet once() {
// TODO // TODO
return null; throw new UnsupportedOperationException("TODO");
} }
public FuturePut put(HashMap<String, Object> data) { public FuturePut put(MemoryGraph graph) {
// TODO return gun.sendPutRequest(graph);
return null;
} }
public void on(NodeChangeListener changeListener) { public void on(NodeChangeListener changeListener) {
database.addChangeListener(String.join("/", path), changeListener); gun.addChangeListener(String.join("/", path), changeListener);
} }
public void map(NodeChangeListener.ForEach forEachListener) { public void map(NodeChangeListener.Map forEachListener) {
database.addForEachChangeListener(String.join("/", path), forEachListener); gun.addMapChangeListener(String.join("/", path), forEachListener);
} }
} }

View File

@ -1,10 +1,8 @@
package io.github.chronosx88.JGUN.futures; package io.github.chronosx88.JGUN.futures;
import java.util.concurrent.ExecutionException;
import java9.util.concurrent.CompletableFuture;
import lombok.Getter; import lombok.Getter;
import java.util.concurrent.CompletableFuture;
@Getter @Getter
public class BaseCompletableFuture<T> extends CompletableFuture<T> { public class BaseCompletableFuture<T> extends CompletableFuture<T> {
@ -14,24 +12,4 @@ public class BaseCompletableFuture<T> extends CompletableFuture<T> {
super(); super();
futureID = id; futureID = id;
} }
public void addListener(final BaseFutureListener<T> listener) {
this.whenCompleteAsync((t, throwable) -> {
if(throwable == null) {
listener.onComplete(t);
} else {
listener.onError(t, throwable);
}
});
}
public T await() {
T t = null;
try {
t = super.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return t;
}
} }

View File

@ -1,6 +0,0 @@
package io.github.chronosx88.JGUN.futures;
public interface BaseFutureListener<T> {
void onComplete(T result);
void onError(T result, Throwable exception);
}

View File

@ -1,8 +1,6 @@
package io.github.chronosx88.JGUN.futures; package io.github.chronosx88.JGUN.futures;
import io.github.chronosx88.JGUN.models.MemoryGraph; public class FutureGet extends BaseCompletableFuture<GetResult> {
public class FutureGet extends BaseCompletableFuture<MemoryGraph> {
public FutureGet(String id) { public FutureGet(String id) {
super(id); super(id);
} }

View File

@ -3,7 +3,7 @@ package io.github.chronosx88.JGUN.futures;
/** /**
* Return success of PUT operation * Return success of PUT operation
*/ */
public class FuturePut extends BaseCompletableFuture<Boolean> { public class FuturePut extends BaseCompletableFuture<Result> {
public FuturePut(String id) { public FuturePut(String id) {
super(id); super(id);
} }

View File

@ -0,0 +1,11 @@
package io.github.chronosx88.JGUN.futures;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class GetResult extends Result {
private final MemoryGraph data;
}

View File

@ -0,0 +1,12 @@
package io.github.chronosx88.JGUN.futures;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@Getter
@SuperBuilder
public class Result {
private boolean ok;
}

View File

@ -1,9 +1,23 @@
package io.github.chronosx88.JGUN.models; package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.experimental.SuperBuilder;
@Data @Data
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(GetAck.class)
})
@SuperBuilder
public abstract class BaseMessage { public abstract class BaseMessage {
@JsonProperty("#") @JsonProperty("#")
private String id; private String id;

View File

@ -0,0 +1,31 @@
package io.github.chronosx88.JGUN.models;
import lombok.Getter;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DeferredNode extends Node implements Delayed {
private long deferredUntil = 0;
DeferredNode(NodeMetadata metadata, Map<String, Object> values) {
super(metadata, values);
}
@Override
public long getDelay(TimeUnit timeUnit) {
long delay = deferredUntil - System.currentTimeMillis();
return timeUnit.convert(delay, TimeUnit.MILLISECONDS);
}
public void setDelay(long delayDuration) {
this.deferredUntil = System.currentTimeMillis() + delayDuration;
}
@Override
public int compareTo(Delayed delayed) {
return Math.toIntExact(this.deferredUntil - ((DeferredNode) delayed).deferredUntil);
}
}

View File

@ -11,8 +11,11 @@ import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@Data @Data
@Builder
@Jacksonized
public class MemoryGraph { public class MemoryGraph {
@JsonIgnore @JsonIgnore
@Builder.Default
public final Map<String, Node> nodes = new LinkedHashMap<>(); public final Map<String, Node> nodes = new LinkedHashMap<>();
@JsonAnyGetter @JsonAnyGetter

View File

@ -0,0 +1,14 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
@Data
@Builder
@Jacksonized
public class NodeLink {
@JsonProperty("#")
String link;
}

View File

@ -6,6 +6,7 @@ import lombok.Data;
import lombok.extern.jackson.Jacksonized; import lombok.extern.jackson.Jacksonized;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@Data @Data
@ -14,7 +15,7 @@ import java.util.Map;
public class NodeMetadata { public class NodeMetadata {
@JsonProperty(">") @JsonProperty(">")
@Builder.Default @Builder.Default
private Map<String, Long> states = new HashMap<>(); // field -> state private Map<String, Long> states = new LinkedHashMap<>(); // field -> state
@JsonProperty("#") @JsonProperty("#")
private String nodeID; private String nodeID;

View File

@ -5,12 +5,15 @@ import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized; import lombok.extern.jackson.Jacksonized;
@Data @Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public abstract class BaseAck extends BaseMessage { @Jacksonized
public class BaseAck extends BaseMessage {
@JsonProperty("@") @JsonProperty("@")
private String replyTo; private String replyTo;
private String ok; private boolean ok;
} }

View File

@ -5,11 +5,12 @@ import io.github.chronosx88.JGUN.models.MemoryGraph;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized; import lombok.extern.jackson.Jacksonized;
@Data @Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized @Jacksonized
public class GetAck extends BaseAck { public class GetAck extends BaseAck {
@JsonProperty("put") @JsonProperty("put")

View File

@ -1,12 +0,0 @@
package io.github.chronosx88.JGUN.models.acks;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.jackson.Jacksonized;
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@Jacksonized
public class PutAck extends BaseAck {}

View File

@ -2,11 +2,16 @@ package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@Data @Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetRequest extends BaseMessage { public class GetRequest extends BaseMessage {
@JsonProperty("get") @JsonProperty("get")
private GetRequestParams params; private GetRequestParams params;

View File

@ -1,7 +1,13 @@
package io.github.chronosx88.JGUN.models.requests; package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
@Data
@Builder
@Jacksonized
public class GetRequestParams { public class GetRequestParams {
@JsonProperty("#") @JsonProperty("#")
private String nodeID; private String nodeID;

View File

@ -2,17 +2,19 @@ package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage; import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node; import io.github.chronosx88.JGUN.models.Node;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized; import lombok.extern.jackson.Jacksonized;
@Data @Data
@Jacksonized @Jacksonized
@Builder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage { public class PutRequest extends BaseMessage {
@JsonProperty("put") @JsonProperty("put")
private Node[] params; private MemoryGraph graph;
} }

View File

@ -2,6 +2,8 @@ package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.Dup; import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.NetworkHandler; import io.github.chronosx88.JGUN.NetworkHandler;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
import io.github.chronosx88.JGUN.storage.Storage; import io.github.chronosx88.JGUN.storage.Storage;
import org.java_websocket.client.WebSocketClient; import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake; import org.java_websocket.handshake.ServerHandshake;
@ -26,7 +28,7 @@ public class GunClient extends WebSocketClient implements Peer {
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
// TODO handler.handleIncomingMessage(message);
} }
@Override @Override
@ -44,4 +46,19 @@ public class GunClient extends WebSocketClient implements Peer {
public void emit(String data) { public void emit(String data) {
this.send(data); this.send(data);
} }
@Override
public void addPendingPutRequest(FuturePut futurePut) {
this.handler.addPendingPutRequest(futurePut);
}
@Override
public void addPendingGetRequest(FutureGet futureGet) {
this.handler.addPendingGetRequest(futureGet);
}
@Override
public void start() {
this.connect();
}
} }

View File

@ -1,4 +0,0 @@
package io.github.chronosx88.JGUN.nodes;
public class GunNodeBuilder {
}

View File

@ -1,5 +1,12 @@
package io.github.chronosx88.JGUN.nodes; package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.futures.BaseCompletableFuture;
import io.github.chronosx88.JGUN.futures.FutureGet;
import io.github.chronosx88.JGUN.futures.FuturePut;
public interface Peer { public interface Peer {
void emit(String data); void emit(String data);
void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet);
void start();
} }

View File

@ -1,18 +1,43 @@
package io.github.chronosx88.JGUN.storage; package io.github.chronosx88.JGUN.storage;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.DeferredNode;
import io.github.chronosx88.JGUN.models.Node; import io.github.chronosx88.JGUN.models.Node;
import org.checkerframework.checker.index.qual.NonNegative;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class MemoryStorage extends Storage { public class MemoryStorage extends Storage {
private final Map<String, Node> nodes; private final Map<String, Node> nodes = new ConcurrentHashMap<>();
private final Cache<String, DeferredNode> deferredNodes;
public MemoryStorage() { public MemoryStorage() {
nodes = new LinkedHashMap<>(); deferredNodes = Caffeine.newBuilder().expireAfter(new Expiry<String, DeferredNode>() {
@Override
public long expireAfterCreate(String key, DeferredNode value, long currentTime) {
return value.getDelay(TimeUnit.NANOSECONDS);
}
@Override
public long expireAfterUpdate(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) {
return Long.MAX_VALUE;
}
@Override
public long expireAfterRead(String key, DeferredNode value, long currentTime, @NonNegative long currentDuration) {
return Long.MAX_VALUE;
}
})
.evictionListener((key, value, cause) -> {
assert value != null;
this.mergeNode(value, System.currentTimeMillis());
}).build();
} }
public Node getNode(String id) { public Node getNode(String id) {
@ -20,9 +45,10 @@ public class MemoryStorage extends Storage {
} }
@Override @Override
void updateNode(Node node) { protected void updateNode(Node node) {
// TODO Node currentNode = nodes.get(node.getMetadata().getNodeID());
throw new UnsupportedOperationException("TODO"); currentNode.values.putAll(node.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates());
} }
public void addNode(String id, Node incomingNode) { public void addNode(String id, Node incomingNode) {
@ -44,4 +70,9 @@ public class MemoryStorage extends Storage {
public boolean isEmpty() { public boolean isEmpty() {
return nodes.isEmpty(); return nodes.isEmpty();
} }
@Override
protected void putDeferredNode(DeferredNode node) {
deferredNodes.put(node.getMetadata().getNodeID(), node);
}
} }

View File

@ -2,6 +2,7 @@ package io.github.chronosx88.JGUN.storage;
import io.github.chronosx88.JGUN.HAM; import io.github.chronosx88.JGUN.HAM;
import io.github.chronosx88.JGUN.NodeChangeListener; import io.github.chronosx88.JGUN.NodeChangeListener;
import io.github.chronosx88.JGUN.models.DeferredNode;
import io.github.chronosx88.JGUN.models.MemoryGraph; import io.github.chronosx88.JGUN.models.MemoryGraph;
import io.github.chronosx88.JGUN.models.Node; import io.github.chronosx88.JGUN.models.Node;
import io.github.chronosx88.JGUN.models.NodeMetadata; import io.github.chronosx88.JGUN.models.NodeMetadata;
@ -9,23 +10,33 @@ import io.github.chronosx88.JGUN.models.NodeMetadata;
import java.util.*; import java.util.*;
public abstract class Storage { public abstract class Storage {
abstract Node getNode(String id); public abstract Node getNode(String id);
abstract void updateNode(Node node);
abstract void addNode(String id, Node node); protected abstract void updateNode(Node node);
abstract boolean hasNode(String id);
abstract Set<Map.Entry<String, Node>> entries(); public abstract void addNode(String id, Node node);
abstract Collection<Node> nodes();
abstract boolean isEmpty(); public abstract boolean hasNode(String id);
public abstract Set<Map.Entry<String, Node>> entries();
public abstract Collection<Node> nodes();
public abstract boolean isEmpty();
protected abstract void putDeferredNode(DeferredNode node);
private final Map<String, List<NodeChangeListener>> changeListeners = new HashMap<>();
private final Map<String, List<NodeChangeListener.Map>> mapChangeListeners = new HashMap<>();
/** /**
* Merge graph update (usually received from the network) * Merge graph update (usually received from the network)
*
* @param update Graph update * @param update Graph update
* @param changeListeners
* @param forEachListeners
*/ */
public void mergeUpdate(MemoryGraph update, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) { public void mergeUpdate(MemoryGraph update) {
long machine = System.currentTimeMillis(); long machine = System.currentTimeMillis();
MemoryGraph diff = new MemoryGraph(); MemoryGraph diff = MemoryGraph.builder().build();
for (Map.Entry<String, Node> entry : update.getNodes().entrySet()) { for (Map.Entry<String, Node> entry : update.getNodes().entrySet()) {
Node node = entry.getValue(); Node node = entry.getValue();
Node diffNode = this.mergeNode(node, machine); Node diffNode = this.mergeNode(node, machine);
@ -43,11 +54,11 @@ public abstract class Storage {
} }
if (changeListeners.containsKey(diffEntry.getKey())) { if (changeListeners.containsKey(diffEntry.getKey())) {
changeListeners.get(diffEntry.getKey()).onChange(diffEntry.getValue()); changeListeners.get(diffEntry.getKey()).forEach((e) -> e.onChange(diffEntry.getValue()));
} }
if (forEachListeners.containsKey(diffEntry.getKey())) { if (mapChangeListeners.containsKey(diffEntry.getKey())) {
for (Map.Entry<String, Object> nodeEntry : changedNode.getValues().entrySet()) { for (Map.Entry<String, Object> nodeEntry : changedNode.getValues().entrySet()) {
forEachListeners.get(nodeEntry.getKey()).onChange(nodeEntry.getKey(), nodeEntry.getValue()); mapChangeListeners.get(nodeEntry.getKey()).forEach((e) -> e.onChange(nodeEntry.getKey(), nodeEntry.getValue()));
} }
} }
} }
@ -56,6 +67,7 @@ public abstract class Storage {
/** /**
* Merge updated node * Merge updated node
*
* @param incomingNode Updated node * @param incomingNode Updated node
* @param machineState Current machine state * @param machineState Current machine state
* @return Node with changes or null if no changes * @return Node with changes or null if no changes
@ -79,7 +91,9 @@ public abstract class Storage {
if (!ham.incoming) { if (!ham.incoming) {
if (ham.defer) { if (ham.defer) {
// TODO handle deferred value DeferredNode deferred = (DeferredNode) incomingNode;
deferred.setDelay(state - machineState);
this.putDeferredNode(deferred);
} }
continue; continue;
} }
@ -97,4 +111,22 @@ public abstract class Storage {
return changedNode; return changedNode;
} }
public void addChangeListener(String nodeID, NodeChangeListener listener) {
this.changeListeners.putIfAbsent(nodeID, new ArrayList<>());
this.changeListeners.get(nodeID).add(listener);
}
public void clearChangeListeners(String nodeID) {
this.changeListeners.remove(nodeID);
}
public void addMapChangeListener(String nodeID, NodeChangeListener.Map listener) {
this.mapChangeListeners.putIfAbsent(nodeID, new ArrayList<>());
this.mapChangeListeners.get(nodeID).add(listener);
}
public void clearMapChangeListeners(String nodeID) {
this.mapChangeListeners.remove(nodeID);
}
} }

View File

@ -0,0 +1,63 @@
package io.github.chronosx88.JGUN;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.junit.jupiter.api.Test;
class GraphNodeBuilderTest {
@Test
void Test_sampleGraph1() {
var objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
var graph = new GraphNodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new GraphNodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new NodeArrayBuilder()
.add(new GraphNodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new GraphNodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build();
String graphJSON = null;
try {
graphJSON = objectMapper.writeValueAsString(graph);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println(graphJSON);
}
@Test
void Test_sampleGraph2() {
var objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
var graph = new GraphNodeBuilder()
.add("a", new NodeArrayBuilder()
.add(new GraphNodeBuilder()
.add("b", new GraphNodeBuilder()
.add("c", true)))
.add(0))
.build();
String graphJSON = null;
try {
graphJSON = objectMapper.writeValueAsString(graph);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println(graphJSON);
}
}