[GunSuperPeer] fix: Fix NPE in onClose callback

This commit is contained in:
ChronosX88 2019-10-01 23:37:59 +04:00
parent b980bbd2b3
commit 3a9b86bac0
2 changed files with 57 additions and 23 deletions

View File

@ -16,7 +16,7 @@ public class HAM {
public boolean state = false; public boolean state = false;
} }
public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) { public static HAMResult ham(long machineState, long incomingState, long currentState, Object incomingValue, Object currentValue) throws IllegalArgumentException {
HAMResult result = new HAMResult(); HAMResult result = new HAMResult();
if(machineState < incomingState) { if(machineState < incomingState) {
@ -55,7 +55,7 @@ public class HAM {
throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!"); throw new IllegalArgumentException("Invalid CRDT Data: "+ incomingValue +" to "+ currentValue +" at "+ incomingState +" to "+ currentState +"!");
} }
public static InMemoryGraph mix(InMemoryGraph change, StorageBackend data) { public static boolean mix(InMemoryGraph change, StorageBackend data, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
long machine = System.currentTimeMillis(); long machine = System.currentTimeMillis();
InMemoryGraph diff = null; InMemoryGraph diff = null;
for(Map.Entry<String, Node> entry : change.entries()) { for(Map.Entry<String, Node> entry : change.entries()) {
@ -75,18 +75,25 @@ public class HAM {
} }
known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key); known = data.getNode(node.soul).values.opt(key) == null ? 0 : data.getNode(node.soul).values.opt(key);
} }
HAMResult ham = null;
HAMResult ham = ham(machine, state, was, value, known); try {
if(!ham.incoming) { ham = ham(machine, state, was, value, known);
if(ham.defer) { } catch (IllegalArgumentException e) {
System.out.println("DEFER: " + key + " " + value);
// Hack for accessing value in lambda without making the variable final
StorageBackend[] graph = new StorageBackend[] {data};
Utils.setTimeout(() -> mix(node, graph[0]), (int) (state - System.currentTimeMillis()));
}
continue; continue;
} }
if(ham != null) {
if(!ham.incoming) {
if(ham.defer) {
System.out.println("DEFER: " + key + " " + value);
// Hack for accessing value in lambda without making the variable final
StorageBackend[] graph = new StorageBackend[] {data};
Utils.setTimeout(() -> mix(node, graph[0], changeListeners, forEachListeners), (int) (state - machine));
}
continue;
}
}
if(diff == null) { if(diff == null) {
diff = new InMemoryGraph(); diff = new InMemoryGraph();
} }
@ -99,18 +106,30 @@ public class HAM {
data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject())); data.addNode(node.soul, Utils.newNode(node.soul, new JSONObject()));
} }
data.getNode(node.soul).values.put(key, value); Node tmp = data.getNode(node.soul);
tmp.values.put(key, value);
tmp.states.put(key, state);
data.addNode(node.soul, tmp);
diff.getNode(node.soul).values.put(key, value); diff.getNode(node.soul).values.put(key, value);
diff.getNode(node.soul).states.put(key, state); diff.getNode(node.soul).states.put(key, state);
data.getNode(node.soul).states.put(key, state);
} }
} }
if(diff != null) {
return diff; for(Map.Entry<String, Node> entry : diff.entries()) {
if(changeListeners.containsKey(entry.getKey())) {
changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
}
if(forEachListeners.containsKey(entry.getKey())) {
for(Map.Entry<String, Object> jsonEntry : entry.getValue().values.toMap().entrySet()) {
forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
}
}
}
}
return true;
} }
public static InMemoryGraph mix(Node incomingNode, StorageBackend data) { public static boolean mix(Node incomingNode, StorageBackend data, Map<String, NodeChangeListener> changeListeners, Map<String, NodeChangeListener.ForEach> forEachListeners) {
long machine = System.currentTimeMillis(); long machine = System.currentTimeMillis();
InMemoryGraph diff = null; InMemoryGraph diff = null;
@ -136,7 +155,7 @@ public class HAM {
System.out.println("DEFER: " + key + " " + value); System.out.println("DEFER: " + key + " " + value);
// Hack for accessing value in lambda without making the variable final // Hack for accessing value in lambda without making the variable final
StorageBackend[] graph = new StorageBackend[] {data}; StorageBackend[] graph = new StorageBackend[] {data};
Utils.setTimeout(() -> mix(incomingNode, graph[0]), (int) (state - System.currentTimeMillis())); Utils.setTimeout(() -> mix(incomingNode, graph[0], changeListeners, forEachListeners), (int) (state - machine));
} }
continue; continue;
} }
@ -153,12 +172,25 @@ public class HAM {
data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject())); data.addNode(incomingNode.soul, Utils.newNode(incomingNode.soul, new JSONObject()));
} }
data.getNode(incomingNode.soul).values.put(key, value); Node tmp = data.getNode(incomingNode.soul);
tmp.values.put(key, value);
tmp.states.put(key, state);
data.addNode(incomingNode.soul, tmp);
diff.getNode(incomingNode.soul).values.put(key, value); diff.getNode(incomingNode.soul).values.put(key, value);
diff.getNode(incomingNode.soul).states.put(key, state); diff.getNode(incomingNode.soul).states.put(key, state);
data.getNode(incomingNode.soul).states.put(key, state);
} }
return diff; if(diff != null) {
for(Map.Entry<String, Node> entry : diff.entries()) {
if(changeListeners.containsKey(entry.getKey())) {
changeListeners.get(entry.getKey()).onChange(entry.getValue().toUserJSONObject());
}
if(forEachListeners.containsKey(entry.getKey())) {
for(Map.Entry<String, Object> jsonEntry : entry.getValue().values.toMap().entrySet()) {
forEachListeners.get(entry.getKey()).onChange(jsonEntry.getKey(), jsonEntry.getValue());
}
}
}
}
return true;
} }
} }

View File

@ -29,7 +29,9 @@ public class GunSuperPeer extends WebSocketServer implements Peer {
@Override @Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) { public void onClose(WebSocket conn, int code, String reason, boolean remote) {
System.out.println("Peer " + conn.getRemoteSocketAddress().toString() + " closed the connection for reason (code): " + reason + " (" + code + ")"); if(conn != null) {
System.out.println("Peer " + conn.getRemoteSocketAddress().toString() + " closed the connection for reason (code): " + reason + " (" + code + ")");
}
} }
@Override @Override