Merge pull request #6 from ashatch/executors-and-tidyup

use Executor services for thread pooling/management; some tidy up
This commit is contained in:
ChronosX88 2019-10-07 15:08:51 +04:00 committed by GitHub
commit f853d8539e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 17 deletions

View File

@ -12,6 +12,8 @@ import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Dispatcher {
private final Map<String, BaseCompletableFuture<?>> pendingFutures = new ConcurrentHashMap<>();
@ -20,6 +22,7 @@ public class Dispatcher {
private final Peer peer;
private final StorageBackend graphStorage;
private final Dup dup;
private final Executor executorService = Executors.newCachedThreadPool();
public Dispatcher(StorageBackend graphStorage, Peer peer, Dup dup) {
this.graphStorage = graphStorage;
@ -83,17 +86,17 @@ public class Dispatcher {
}
public void sendPutRequest(String messageID, JSONObject data) {
new Thread(() -> {
executorService.execute(() -> {
InMemoryGraph graph = Utils.prepareDataForPut(data);
peer.emit(Utils.formatPutRequest(messageID, graph.toJSONObject()).toString());
}).start();
});
}
public void sendGetRequest(String messageID, String key, String field) {
new Thread(() -> {
executorService.execute(() -> {
JSONObject jsonGet = Utils.formatGetRequest(messageID, key, field);
peer.emit(jsonGet.toString());
}).start();
});
}
public void addChangeListener(String soul, NodeChangeListener listener) {

View File

@ -1,13 +1,11 @@
package io.github.chronosx88.JGUN;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class Dup {
private static char[] randomSeed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
private static Random random = new Random(System.currentTimeMillis());
private Map<String, Long> s = new ConcurrentHashMap<>();
private DupOpt opt = new DupOpt();
private Thread to = null;

View File

@ -9,12 +9,14 @@ import org.json.JSONObject;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MainClientServer {
public static void main(String[] args) {
GunSuperPeer gunSuperNode = new GunSuperPeer(21334, new InMemoryGraph());
gunSuperNode.start();
new Thread(() -> {
Runnable task = () -> {
Gun gun = null;
try {
gun = new Gun(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), 21334, new InMemoryGraph());
@ -45,7 +47,9 @@ public class MainClientServer {
System.out.println("Deleting an item random/dVFtzE9CL");
gun.get("random").get("dVFtzE9CL").put(null).await();
gun.get("random").put(new JSONObject().put("hello", "world")).await();
}).start();
};
Executor executorService = Executors.newSingleThreadExecutor();
executorService.execute(task);
}
}

View File

@ -1,9 +1,9 @@
package io.github.chronosx88.JGUN.futures;
import java9.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java9.util.concurrent.CompletableFuture;
public class BaseCompletableFuture<T> extends CompletableFuture<T> {
private final String futureID;

View File

@ -2,8 +2,8 @@ package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.PathRef;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.json.JSONObject;
@ -14,12 +14,10 @@ import java.net.URISyntaxException;
public class GunClient extends WebSocketClient implements Peer {
private Dup dup = new Dup();
private final StorageBackend storage;
private final Dispatcher dispatcher;
public GunClient(InetAddress address, int port, StorageBackend storage) throws URISyntaxException {
super(new URI("ws://" + address.getHostAddress() + ":" + port));
this.storage = storage;
this.dispatcher = new Dispatcher(storage, this, dup);
}

View File

@ -3,6 +3,7 @@ package io.github.chronosx88.JGUN.nodes;
import io.github.chronosx88.JGUN.Dispatcher;
import io.github.chronosx88.JGUN.Dup;
import io.github.chronosx88.JGUN.storageBackends.StorageBackend;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
@ -12,14 +13,12 @@ import java.net.InetSocketAddress;
public class GunSuperPeer extends WebSocketServer implements Peer {
private Dup dup = new Dup();
private StorageBackend graph;
private Dispatcher dispatcher;
public GunSuperPeer(int port, StorageBackend storageBackend) {
super(new InetSocketAddress(port));
setReuseAddr(true);
dispatcher = new Dispatcher(storageBackend, this, dup);
this.graph = storageBackend;
}
@Override

View File

@ -1,9 +1,14 @@
package io.github.chronosx88.JGUN.storageBackends;
import io.github.chronosx88.JGUN.Node;
import org.json.JSONObject;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class InMemoryGraph implements StorageBackend {