diff --git a/app/libs/FreePastry-2.1.jar b/app/libs/FreePastry-2.1.jar
new file mode 100644
index 0000000..226a779
Binary files /dev/null and b/app/libs/FreePastry-2.1.jar differ
diff --git a/app/libs/trove.jar b/app/libs/trove.jar
deleted file mode 100644
index ac62eb3..0000000
Binary files a/app/libs/trove.jar and /dev/null differ
diff --git a/app/src/main/java/io/github/chronosx88/influence/helpers/AppHelper.java b/app/src/main/java/io/github/chronosx88/influence/helpers/AppHelper.java
index ab6797f..7b42644 100644
--- a/app/src/main/java/io/github/chronosx88/influence/helpers/AppHelper.java
+++ b/app/src/main/java/io/github/chronosx88/influence/helpers/AppHelper.java
@@ -13,7 +13,10 @@ import com.instacart.library.truetime.TrueTime;
import java.io.IOException;
+import io.github.chronosx88.influence.notificationSystem.NotificationSystem;
import io.github.chronosx88.influence.observable.MainObservable;
+import rice.environment.Environment;
+import rice.pastry.PastryNode;
/**
* Extended Application class which designed for centralized getting various objects from anywhere in the application.
@@ -25,9 +28,11 @@ public class AppHelper extends MultiDexApplication {
private static String peerID;
private static PeerDHT peerDHT;
private static RoomHelper chatDB;
- private static NetworkHandler networkHandler;
private static String username = "";
private static SharedPreferences preferences;
+ private static PastryNode pastryNode;
+ private static Environment pastryEnvironment;
+ private static NotificationSystem notificationSystem;
@Override
public void onCreate() {
@@ -67,9 +72,27 @@ public class AppHelper extends MultiDexApplication {
public static RoomHelper getChatDB() { return chatDB; }
- public static void initNetworkHandler() { networkHandler = new NetworkHandler(); }
-
public static SharedPreferences getPreferences() {
return preferences;
}
+
+ public static void storePastryNode(PastryNode node) { pastryNode = node; }
+
+ public static PastryNode getPastryNode() { return pastryNode; }
+
+ public static void storePastryEnvironment(Environment env) {
+ pastryEnvironment = env;
+ }
+
+ public static Environment getPastryEnvironment() {
+ return pastryEnvironment;
+ }
+
+ public static void storeNotificationSystem(NotificationSystem system) {
+ notificationSystem = system;
+ }
+
+ public static NotificationSystem getNotificationSystem() {
+ return notificationSystem;
+ }
}
\ No newline at end of file
diff --git a/app/src/main/java/io/github/chronosx88/influence/logic/MainLogic.java b/app/src/main/java/io/github/chronosx88/influence/logic/MainLogic.java
index 3ddaaeb..8f67b52 100644
--- a/app/src/main/java/io/github/chronosx88/influence/logic/MainLogic.java
+++ b/app/src/main/java/io/github/chronosx88/influence/logic/MainLogic.java
@@ -56,6 +56,15 @@ import io.github.chronosx88.influence.helpers.actions.UIActions;
import io.github.chronosx88.influence.models.ChatMetadata;
import io.github.chronosx88.influence.models.NewChatRequestMessage;
import io.github.chronosx88.influence.models.PublicUserProfile;
+import io.github.chronosx88.influence.notificationSystem.NotificationHandler;
+import io.github.chronosx88.influence.notificationSystem.NotificationSystem;
+import rice.environment.Environment;
+import rice.p2p.scribe.ScribeContent;
+import rice.pastry.NodeHandle;
+import rice.pastry.NodeIdFactory;
+import rice.pastry.PastryNode;
+import rice.pastry.socket.internet.InternetPastryNodeFactory;
+import rice.pastry.standard.RandomNodeIdFactory;
public class MainLogic implements CoreContracts.IMainLogicContract {
private static final String LOG_TAG = MainLogic.class.getName();
@@ -71,6 +80,7 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
private KeyPairManager keyPairManager;
private Thread checkNewChatsThread = null;
private Storage storage;
+ private PastryNode pastryNode;
public MainLogic() {
this.context = AppHelper.getContext();
@@ -151,19 +161,35 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
return;
}
+ try {
+ if(!createPastryNode(7244, new InetSocketAddress(bootstrapAddress, 7244))) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("action", UIActions.BOOTSTRAP_ERROR);
+ AppHelper.getObservable().notifyUIObservers(jsonObject);
+ return;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("action", UIActions.BOOTSTRAP_ERROR);
+ AppHelper.getObservable().notifyUIObservers(jsonObject);
+ return;
+ }
+
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", UIActions.BOOTSTRAP_SUCCESS);
AppHelper.getObservable().notifyUIObservers(jsonObject);
AppHelper.storePeerID(preferences.getString("peerID", null));
AppHelper.updateUsername(preferences.getString("username", null));
AppHelper.storePeerDHT(peerDHT);
- AppHelper.initNetworkHandler();
- //setReceiveHandler();
+ AppHelper.storePastryNode(pastryNode);
+ AppHelper.storeNotificationSystem(new NotificationSystem());
gson = new Gson();
publicProfileToDHT();
SettingsLogic.Companion.publishUsername(AppHelper.getUsername(), AppHelper.getUsername());
NetworkHandler.handlePendingChatRequests();
- TimerTask timerTask = new TimerTask() {
+ AppHelper.getNotificationSystem().subscribe(AppHelper.getPeerID() + "_newChats", notification -> NetworkHandler.handlePendingChatRequests());
+ /*TimerTask timerTask = new TimerTask() {
@Override
public void run() {
if(checkNewChatsThread == null) {
@@ -177,7 +203,7 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
};
Timer timer = new Timer();
- timer.schedule(timerTask, 1, 5000);
+ timer.schedule(timerTask, 1, 5000);*/
replication = new IndirectReplication(peerDHT).start();
} catch (IOException e) {
e.printStackTrace();
@@ -227,14 +253,6 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
}
- private void setReceiveHandler() {
- AppHelper.getPeerDHT().peer().objectDataReply((s, r) -> {
- Log.i(LOG_TAG, "# Incoming message: " + r);
- AppHelper.getObservable().notifyNetworkObservers(r);
- return null;
- });
- }
-
@Override
public void shutdownPeer() {
new Thread(() -> {
@@ -274,29 +292,14 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
private ChannelClientConfiguration createChannelClientConfig() {
- ChannelClientConfiguration channelClientConfiguration = new ChannelClientConfiguration();
- channelClientConfiguration.bindings(new Bindings());
- channelClientConfiguration.maxPermitsPermanentTCP(250);
- channelClientConfiguration.maxPermitsTCP(250);
- channelClientConfiguration.maxPermitsUDP(250);
- channelClientConfiguration.pipelineFilter(new PeerBuilder.DefaultPipelineFilter());
+ ChannelClientConfiguration channelClientConfiguration = PeerBuilder.createDefaultChannelClientConfiguration();
channelClientConfiguration.signatureFactory(new RSASignatureFactory());
- channelClientConfiguration.senderTCP((new InetSocketAddress(0)).getAddress());
- channelClientConfiguration.senderUDP((new InetSocketAddress(0)).getAddress());
- channelClientConfiguration.byteBufPool(false);
return channelClientConfiguration;
}
private ChannelServerConfiguration createChannelServerConfig() {
- ChannelServerConfiguration channelServerConfiguration = new ChannelServerConfiguration();
- channelServerConfiguration.bindings(new Bindings());
- //these two values may be overwritten in the peer builder
- channelServerConfiguration.ports(new Ports(Ports.DEFAULT_PORT, Ports.DEFAULT_PORT));
- channelServerConfiguration.portsForwarding(new Ports(Ports.DEFAULT_PORT, Ports.DEFAULT_PORT));
- channelServerConfiguration.behindFirewall(false);
- channelServerConfiguration.pipelineFilter(new PeerBuilder.DefaultPipelineFilter());
+ ChannelServerConfiguration channelServerConfiguration = PeerBuilder.createDefaultChannelServerConfiguration();
channelServerConfiguration.signatureFactory(new RSASignatureFactory());
- channelServerConfiguration.byteBufPool(false);
return channelServerConfiguration;
}
@@ -369,4 +372,31 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
return null;
}
+
+ private boolean createPastryNode(int bindPort, InetSocketAddress bootAddress) throws Exception {
+ Environment env = new Environment();
+ env.getParameters().setString("probe_for_external_address","true");
+ AppHelper.storePastryEnvironment(env);
+ NodeIdFactory nidFactory = new RandomNodeIdFactory(env);
+ InternetPastryNodeFactory factory = new InternetPastryNodeFactory(nidFactory, bindPort, env);
+ NodeHandle bootHandle = factory.getNodeHandle(bootAddress);
+ PastryNode node;
+ if(bootHandle != null) {
+ node = factory.newNode(bootHandle);
+ } else {
+ return false;
+ }
+ // the node may require sending several messages to fully boot into the ring
+ synchronized(node) {
+ while(!node.isReady() && !node.joinFailed()) {
+ node.wait(100);
+ if (node.joinFailed()) {
+ return false;
+ }
+ }
+ }
+ Log.i(LOG_TAG, "# [Pastry] Finished creating new Pastry node: " + node);
+ this.pastryNode = node;
+ return true;
+ }
}
diff --git a/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationHandler.java b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationHandler.java
new file mode 100644
index 0000000..279d4b2
--- /dev/null
+++ b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2019 ChronosX88
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package io.github.chronosx88.influence.notificationSystem;
+
+import rice.p2p.scribe.ScribeContent;
+
+@FunctionalInterface
+public interface NotificationHandler {
+ void handleNotification(ScribeContent notification);
+}
diff --git a/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationSystem.java b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationSystem.java
new file mode 100644
index 0000000..c69f579
--- /dev/null
+++ b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/NotificationSystem.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2019 ChronosX88
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package io.github.chronosx88.influence.notificationSystem;
+
+import rice.p2p.scribe.ScribeContent;
+
+public class NotificationSystem {
+ private ScribeClient scribeClient = new ScribeClient();
+
+ public void subscribe(String topicName, NotificationHandler handler) {
+ scribeClient.subscribeToTopic(topicName, handler);
+ }
+
+ public void unsubscribe(String topicName) {
+ scribeClient.unsubscribeFromTopic(topicName);
+ }
+
+ public void publish(String topicName, ScribeContent message) {
+ scribeClient.publishToTopic(topicName, message);
+ }
+}
diff --git a/app/src/main/java/io/github/chronosx88/influence/notificationSystem/ScribeClient.java b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/ScribeClient.java
new file mode 100644
index 0000000..3ef8fc5
--- /dev/null
+++ b/app/src/main/java/io/github/chronosx88/influence/notificationSystem/ScribeClient.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2019 ChronosX88
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package io.github.chronosx88.influence.notificationSystem;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.github.chronosx88.influence.helpers.AppHelper;
+import rice.p2p.commonapi.NodeHandle;
+import rice.p2p.scribe.Scribe;
+import rice.p2p.scribe.ScribeContent;
+import rice.p2p.scribe.ScribeImpl;
+import rice.p2p.scribe.Topic;
+import rice.pastry.commonapi.PastryIdFactory;
+
+public class ScribeClient implements rice.p2p.scribe.ScribeClient {
+ private Map topicMap = new ConcurrentHashMap<>();
+ private Map notificationHandlerMap = new ConcurrentHashMap<>();
+ private Scribe scribe = new ScribeImpl(AppHelper.getPastryNode(), "scribeInstance");
+
+ @Override
+ public boolean anycast(Topic topic, ScribeContent scribeContent) {
+ // We don't need anycast. Therefore, just suspend the wave.
+ return true;
+ }
+
+ @Override
+ public void deliver(Topic topic, ScribeContent scribeContent) {
+ if(notificationHandlerMap.containsKey(topic)) {
+ notificationHandlerMap.get(topic).handleNotification(scribeContent);
+ }
+ }
+
+ @Override
+ public void childAdded(Topic topic, NodeHandle nodeHandle) {
+ // Nothing to do
+ }
+
+ @Override
+ public void childRemoved(Topic topic, NodeHandle nodeHandle) {
+ // Nothing to do
+ }
+
+ @Override
+ public void subscribeFailed(Topic topic) {
+ // Nothing to do
+ }
+
+ public void subscribeToTopic(String topicName, NotificationHandler handler) {
+ Topic topic = new Topic(new PastryIdFactory(AppHelper.getPastryEnvironment()), topicName);
+ scribe.subscribe(topic, this);
+ topicMap.put(topicName, topic);
+ notificationHandlerMap.put(topic, handler);
+ }
+
+ public void unsubscribeFromTopic(String topicName) {
+ if(topicMap.containsKey(topicName)) {
+ scribe.unsubscribe(topicMap.get(topicName), this);
+ notificationHandlerMap.remove(topicMap.get(topicName));
+ }
+ }
+
+ public void publishToTopic(String topicName, ScribeContent content) {
+ Topic topic = new Topic(new PastryIdFactory(AppHelper.getPastryEnvironment()), topicName);
+ scribe.publish(topic, content);
+ }
+}