Compare commits

...

3 Commits

Author SHA1 Message Date
ChronosX88
08912a6f67 Implement referencing other nodes in graph 2023-11-17 22:23:38 +03:00
ChronosX88
bcda85a8b1 Make nodes ignoring their own requests, fix putting data to the graph 2023-11-17 22:23:38 +03:00
ChronosX88
a551581a99 Implement custom deserializer for NetworkMessage 2023-11-17 22:23:38 +03:00
21 changed files with 270 additions and 118 deletions

View File

@ -9,8 +9,8 @@ public class Gun {
private final StorageManager storageManager;
private final NetworkManager networkManager;
public Gun(Storage storage, Peer peer) {
this.networkManager = new NetworkManager(peer);
public Gun(Storage storage, Peer peer) throws InterruptedException {
this.networkManager = new NetworkManager(peer, peer.getNetworkHandler());
this.storageManager = new StorageManager(storage, this.networkManager);
this.networkManager.start();
}

View File

@ -12,10 +12,7 @@ import io.github.chronosx88.JGUN.network.NetworkManager;
import io.github.chronosx88.JGUN.storage.StorageManager;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.stream.Stream;
public class PathReference {
@ -23,12 +20,17 @@ public class PathReference {
private final NetworkManager networkManager;
private final StorageManager storageManager;
private final Executor executorService = Executors.newCachedThreadPool();
public PathReference(NetworkManager networkManager, StorageManager storageManager) {
this.networkManager = networkManager;
this.storageManager = storageManager;
}
public String[] getPath() {
return path.toArray(new String[0]);
}
public PathReference get(String key) {
path.add(key);
return this;
@ -85,36 +87,64 @@ public class PathReference {
.build());
nodeId = newNodeId;
}
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE);
} else {
newNodeId = UUID.randomUUID().toString();
if (pathData.size() > 1) {
String parentNodeId = pathData.get(pathData.size()-2);
graph.putNodes(parentNodeId, Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(parentNodeId)
.states(Map.of(path.get(path.size()-1), System.currentTimeMillis()))
.build())
.values(Map.of(path.get(path.size()-1), NodeLinkValue.builder()
.link(newNodeId)
.build()))
.build());
} else {
newNodeId = pathData.get(0);
}
// merge updated node under parent ID
String parentNodeId = pathData.get(pathData.size()-1);
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(parentNodeId);
graph.nodes.put(parentNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
}
graph.nodes.get(NodeBuilder.ROOT_NODE).getMetadata().setNodeID(newNodeId);
graph.nodes.put(newNodeId, graph.nodes.get(NodeBuilder.ROOT_NODE));
graph.nodes.remove(NodeBuilder.ROOT_NODE);
return this.storageManager.putData(graph);
});
}
public void on(NodeChangeListener changeListener) {
storageManager.addChangeListener(String.join("/", path), changeListener);
executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addChangeListener(pathData.get(pathData.size()-1), changeListener);
});
}
public void map(NodeChangeListener.Map forEachListener) {
storageManager.addMapChangeListener(String.join("/", path), forEachListener);
public void map(NodeChangeListener.Map mapListener) {
executorService.execute(() -> {
List<String> pathData;
try {
pathData = storageManager.getPathData(path.toArray(new String[0]));
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
storageManager.addMapChangeListener(pathData.get(pathData.size()-1), mapListener);
});
}
public CompletableFuture<Result> put(PathReference nodeReference) {
String[] pathToAnotherNode = nodeReference.getPath();
return CompletableFuture.supplyAsync(() -> {
try {
return storageManager.getPathData(pathToAnotherNode);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new CompletionException(e);
}
}).thenComposeAsync(pathData -> {
if (pathData.size() < pathToAnotherNode.length) {
return CompletableFuture.failedFuture(new IllegalArgumentException("target node not found"));
}
String nodeId = pathData.get(pathData.size()-1);
MemoryGraph graph = new NodeBuilder()
.add(path.get(path.size() - 1), NodeLinkValue.builder()
.link(nodeId)
.build())
.build();
if (path.size() > 1) this.path.remove(path.size()-1);
return this.put(graph);
});
}
}

View File

@ -91,6 +91,10 @@ public class NodeBuilder {
return this;
}
public NodeBuilder add(String name, NodeLinkValue link) {
return addScalar(name, link);
}
public MemoryGraph build() {
return this.graph;
}

View File

@ -18,22 +18,22 @@ public class MainClient {
Storage storage = new MemoryStorage();
NetworkNode peer = new NetworkNode(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 5054, storage);
Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.add("firstName", "Test")
.build()).get();
System.out.println(result);
result = gun.get("person").get("address").put(new NodeBuilder()
.add("city", "Dallas")
.build()).get();
System.out.println(result);
result = gun.get("person").get("homeAddress").put(gun.get("person").get("address")).get();
System.out.println(result);
result = gun.get("person").get("homeAddress").put(new NodeBuilder()
.add("city", "New YORK CITY")
.build()).get();
System.out.println(result);
}

View File

@ -1,11 +1,60 @@
package io.github.chronosx88.JGUN.examples;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.Gun;
import io.github.chronosx88.JGUN.api.NodeChangeListener;
import io.github.chronosx88.JGUN.api.graph.ArrayBuilder;
import io.github.chronosx88.JGUN.api.graph.NodeBuilder;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.network.GatewayNetworkNode;
import io.github.chronosx88.JGUN.network.NetworkNode;
import io.github.chronosx88.JGUN.storage.MemoryStorage;
import io.github.chronosx88.JGUN.storage.Storage;
import java.net.Inet4Address;
import java.util.concurrent.ExecutionException;
public class MainServer {
public static void main(String[] args) {
GatewayNetworkNode gunSuperNode = new GatewayNetworkNode(5054, new MemoryStorage());
gunSuperNode.start();
public static void main(String[] args) throws ExecutionException, InterruptedException {
Storage storage = new MemoryStorage();
GatewayNetworkNode peer = new GatewayNetworkNode(5054, storage);
Gun gun = new Gun(storage, peer);
Result result = gun.get("person").put(new NodeBuilder()
.add("firstName", "John")
.add("lastName", "Smith")
.add("age", 25)
.add("address", new NodeBuilder()
.add("streetAddress", "21 2nd Street")
.add("city", "New York")
.add("state", "NY")
.add("postalCode", "10021"))
.add("phoneNumber", new ArrayBuilder()
.add(new NodeBuilder()
.add("type", "home")
.add("number", "212 555-1234"))
.add(new NodeBuilder()
.add("type", "fax")
.add("number", "646 555-4567")))
.build()).get();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
gun.get("person").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
gun.get("person").get("address").on(node -> {
try {
System.out.println(mapper.writeValueAsString(node));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
System.out.println(result.isOk());
}
}

View File

@ -12,15 +12,9 @@ import lombok.Data;
import lombok.experimental.SuperBuilder;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(GetRequest.class),
@JsonSubTypes.Type(PutRequest.class),
@JsonSubTypes.Type(Ack.class),
@JsonSubTypes.Type(GetAck.class)
})
@JsonDeserialize(using = NetworkMessageDeserializer.class)
@SuperBuilder
public abstract class BaseMessage {
public abstract class NetworkMessage {
@JsonProperty("#")
private String id;
}

View File

@ -0,0 +1,37 @@
package io.github.chronosx88.JGUN.models;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import io.github.chronosx88.JGUN.models.acks.Ack;
import io.github.chronosx88.JGUN.models.acks.GetAck;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import java.io.IOException;
public class NetworkMessageDeserializer extends JsonDeserializer<NetworkMessage> {
@Override
public NetworkMessage deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
JsonNode node = p.readValueAsTree();
if (node.has("#")) {
// parsing ack
if (node.has("@")) {
if (node.has("put")) {
return p.getCodec().treeToValue(node, GetAck.class);
}
return p.getCodec().treeToValue(node, Ack.class);
}
if (node.has("get")) {
return p.getCodec().treeToValue(node, GetRequest.class);
} else if (node.has("put")) {
return p.getCodec().treeToValue(node, PutRequest.class);
}
}
throw new IllegalArgumentException("invalid message received");
}
}

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
@ -11,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class Ack extends BaseMessage {
public class Ack extends NetworkMessage {
@JsonProperty("@")
private String replyTo;
private boolean ok;

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.acks;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetAck extends BaseMessage {
public class GetAck extends NetworkMessage {
@JsonProperty("put")
private MemoryGraph data;
@JsonProperty("@")

View File

@ -1,8 +1,7 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import lombok.Builder;
import io.github.chronosx88.JGUN.models.NetworkMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;
@ -12,7 +11,7 @@ import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@Jacksonized
public class GetRequest extends BaseMessage implements Request {
public class GetRequest extends NetworkMessage implements Request {
@JsonProperty("get")
private GetRequestParams params;
}

View File

@ -1,7 +1,7 @@
package io.github.chronosx88.JGUN.models.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -12,7 +12,7 @@ import lombok.extern.jackson.Jacksonized;
@Jacksonized
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class PutRequest extends BaseMessage implements Request {
public class PutRequest extends NetworkMessage implements Request {
@JsonProperty("put")
private MemoryGraph graph;
}

View File

@ -1,3 +1,5 @@
package io.github.chronosx88.JGUN.models.requests;
public interface Request {}
public interface Request {
String getId();
}

View File

@ -16,11 +16,11 @@ public class Dup {
.build();
}
private void track(String id) {
public void track(String id) {
cache.put(id, System.currentTimeMillis());
}
public boolean isDuplicated(String id) {
public boolean checkDuplicated(String id) {
Long timestamp = null;
try {
timestamp = cache.getIfPresent(id);

View File

@ -69,4 +69,14 @@ public class GatewayNetworkNode extends WebSocketServer implements Peer {
public int getTimeout() {
return 60;
}
@Override
public int connectedPeerCount() {
return this.getConnections().size();
}
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
}

View File

@ -7,7 +7,7 @@ import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.BaseMessage;
import io.github.chronosx88.JGUN.models.NetworkMessage;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
@ -17,6 +17,7 @@ import io.github.chronosx88.JGUN.models.graph.NodeValue;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.PutRequest;
import io.github.chronosx88.JGUN.storage.Storage;
import lombok.Getter;
import java.util.Map;
import java.util.Objects;
@ -30,6 +31,8 @@ public class NetworkHandler {
private final Peer peer;
private final Storage storage;
@Getter
private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool();
private final ObjectMapper objectMapper;
@ -52,21 +55,21 @@ public class NetworkHandler {
}
public void handleIncomingMessage(String message) {
BaseMessage parsedMessage;
NetworkMessage parsedMessage;
try {
parsedMessage = objectMapper.readValue(message, BaseMessage.class);
parsedMessage = objectMapper.readValue(message, NetworkMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
if (dup.isDuplicated(parsedMessage.getId())) {
if (dup.checkDuplicated(parsedMessage.getId())) {
// TODO log
return;
}
final BaseMessage msg = parsedMessage;
final NetworkMessage msg = parsedMessage;
executorService.execute(() -> {
BaseMessage response = null;
NetworkMessage response = null;
if (msg instanceof GetRequest) {
response = handleGet((GetRequest) msg);
} else if (msg instanceof PutRequest) {
@ -78,6 +81,7 @@ public class NetworkHandler {
handleGetAck(ack.getData(), ack);
}
if (Objects.nonNull(response)) {
this.dup.track(response.getId());
String respString;
try {
respString = objectMapper.writeValueAsString(response);

View File

@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.github.chronosx88.JGUN.api.FutureGet;
import io.github.chronosx88.JGUN.api.FuturePut;
import io.github.chronosx88.JGUN.models.GetResult;
import io.github.chronosx88.JGUN.models.Result;
import io.github.chronosx88.JGUN.models.graph.MemoryGraph;
import io.github.chronosx88.JGUN.models.requests.GetRequest;
import io.github.chronosx88.JGUN.models.requests.GetRequestParams;
@ -19,6 +21,7 @@ public class NetworkManager {
private final ObjectMapper objectMapper;
private final Peer peer;
private final NetworkHandler networkHandler;
private final Executor executorService = Executors.newCachedThreadPool();
@ -28,15 +31,16 @@ public class NetworkManager {
@Getter
private final int timeout;
public NetworkManager(Peer peer) {
public NetworkManager(Peer peer, NetworkHandler networkHandler) {
this.networkHandler = networkHandler;
objectMapper = new ObjectMapper();
objectMapper.registerModule(new Jdk8Module());
this.peer = peer;
this.timeout = peer.getTimeout();
}
public void start() {
executorService.execute(this.peer::start);
public void start() throws InterruptedException {
this.peer.start();
}
private <T extends Request> void sendRequest(T request) {
@ -46,28 +50,36 @@ public class NetworkManager {
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
this.networkHandler.getDup().track(request.getId());
peer.emit(encodedRequest);
}
public FuturePut sendPutRequest(MemoryGraph putData) {
String id = Dup.random();
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
var requestFuture = new FuturePut(id);
peer.addPendingPutRequest(requestFuture);
if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(PutRequest.builder()
.id(id)
.graph(putData)
.build()));
peer.addPendingPutRequest(requestFuture);
}
requestFuture.complete(Result.builder().ok(true).build());
return requestFuture;
}
public FutureGet sendGetRequest(GetRequestParams params) {
String id = Dup.random();
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
var requestFuture = new FutureGet(id, params);
peer.addPendingGetRequest(requestFuture);
if (this.peer.connectedPeerCount() > 0) {
executorService.execute(() -> this.sendRequest(GetRequest.builder()
.id(id)
.params(params)
.build()));
peer.addPendingGetRequest(requestFuture);
} else {
requestFuture.complete(GetResult.builder().data(null).build());
}
return requestFuture;
}
}

View File

@ -21,7 +21,7 @@ public class NetworkNode extends WebSocketClient implements Peer {
@Override
public void onOpen(ServerHandshake handshakeData) {
System.out.println("# Connection with SuperNode open. Status: " + handshakeData.getHttpStatus());
System.out.println("# Connection with gateway node is open. Status: " + handshakeData.getHttpStatus());
}
@Override
@ -56,12 +56,23 @@ public class NetworkNode extends WebSocketClient implements Peer {
}
@Override
public void start() {
this.connect();
public void start() throws InterruptedException {
this.connectBlocking();
}
@Override
public int getTimeout() {
return 60;
}
@Override
public int connectedPeerCount() {
if (this.isOpen()) return 1;
return 0;
}
@Override
public NetworkHandler getNetworkHandler() {
return handler;
}
}

View File

@ -7,6 +7,8 @@ public interface Peer {
void emit(String data);
void addPendingPutRequest(FuturePut futurePut);
void addPendingGetRequest(FutureGet futureGet);
void start();
void start() throws InterruptedException;
int getTimeout();
int connectedPeerCount();
NetworkHandler getNetworkHandler();
}

View File

@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import io.github.chronosx88.JGUN.models.graph.DeferredNode;
import io.github.chronosx88.JGUN.models.graph.Node;
import io.github.chronosx88.JGUN.models.graph.NodeMetadata;
import io.github.chronosx88.JGUN.models.graph.NodeValue;
import org.checkerframework.checker.index.qual.NonNegative;
@ -46,21 +47,23 @@ public class MemoryStorage extends Storage {
if (node != null && field != null) {
NodeValue requestedField = node.getValues().get(field);
if (requestedField != null) {
Long requestedFieldState = node.getMetadata().getStates().get(field);
node.getValues().clear();
node.getMetadata().getStates().clear();
node.getValues().put(field, requestedField);
node.getMetadata().getStates().put(field, requestedFieldState);
node = Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(node.getMetadata().getNodeID())
.states(Map.of(field, node.getMetadata().getStates().get(field)))
.build())
.values(Map.of(field, requestedField))
.build();
}
}
return node;
}
@Override
protected void updateNode(Node node) {
Node currentNode = nodes.get(node.getMetadata().getNodeID());
currentNode.values.putAll(node.values);
currentNode.getMetadata().getStates().putAll(node.getMetadata().getStates());
protected void updateNode(Node newNode) {
Node currentNode = nodes.get(newNode.getMetadata().getNodeID());
currentNode.values.putAll(newNode.values);
currentNode.getMetadata().getStates().putAll(newNode.getMetadata().getStates());
}
public void addNode(String id, Node incomingNode) {

View File

@ -95,7 +95,7 @@ public abstract class Storage {
continue;
}
if (Objects.isNull(changedNode)) {
if (changedNode == null) {
changedNode = Node.builder()
.metadata(NodeMetadata.builder()
.nodeID(incomingNode.getMetadata().getNodeID())

View File

@ -40,7 +40,7 @@ public class StorageManager {
}
public List<String> getPathData(String[] path) throws TimeoutException, ExecutionException, InterruptedException {
List<String> nodeIds = new ArrayList<>(List.of(path[0]));
List<String> nodeIds = new ArrayList<>();
String nodeId = path[0];
for (int i = 0; i < path.length; i++) {
String field = null;
@ -49,35 +49,30 @@ public class StorageManager {
}
Node node = storage.getNode(nodeId, field);
if (node != null) {
if (field != null) {
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
} else {
if (field == null) {
nodeIds.add(nodeId);
break;
}
if (node.values.containsKey(field) && node.values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) node.values.get(field)).getLink();
nodeIds.add(nodeId);
continue;
}
}
// proceeds to request from the network
var future = this.networkManager.sendGetRequest(GetRequestParams.builder()
.nodeId(nodeId)
.field(field)
.build());
var result = future.get(networkManager.getTimeout(), TimeUnit.SECONDS);
if (result.getData() != null) {
nodeIds.add(result.getData().getMetadata().getNodeID());
if (field != null) {
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
} else {
break;
}
} else {
if (result.getData() == null || field == null) {
nodeIds.add(nodeId);
break;
}
if (result.getData().values.containsKey(field) && result.getData().values.get(field).getValueType() == NodeValue.ValueType.LINK) {
nodeId = ((NodeLinkValue) result.getData().values.get(field)).getLink();
nodeIds.add(nodeId);
}
}
return nodeIds;
}