Implemented basic notification system (on top of FreePastry)

This commit is contained in:
ChronosX88 2019-05-15 20:25:32 +04:00
parent ff8aa48b3e
commit 3d2230bea4
7 changed files with 228 additions and 32 deletions

BIN
app/libs/FreePastry-2.1.jar Normal file

Binary file not shown.

Binary file not shown.

View File

@ -13,7 +13,10 @@ import com.instacart.library.truetime.TrueTime;
import java.io.IOException;
import io.github.chronosx88.influence.notificationSystem.NotificationSystem;
import io.github.chronosx88.influence.observable.MainObservable;
import rice.environment.Environment;
import rice.pastry.PastryNode;
/**
* Extended Application class which designed for centralized getting various objects from anywhere in the application.
@ -25,9 +28,11 @@ public class AppHelper extends MultiDexApplication {
private static String peerID;
private static PeerDHT peerDHT;
private static RoomHelper chatDB;
private static NetworkHandler networkHandler;
private static String username = "";
private static SharedPreferences preferences;
private static PastryNode pastryNode;
private static Environment pastryEnvironment;
private static NotificationSystem notificationSystem;
@Override
public void onCreate() {
@ -67,9 +72,27 @@ public class AppHelper extends MultiDexApplication {
public static RoomHelper getChatDB() { return chatDB; }
public static void initNetworkHandler() { networkHandler = new NetworkHandler(); }
public static SharedPreferences getPreferences() {
return preferences;
}
public static void storePastryNode(PastryNode node) { pastryNode = node; }
public static PastryNode getPastryNode() { return pastryNode; }
public static void storePastryEnvironment(Environment env) {
pastryEnvironment = env;
}
public static Environment getPastryEnvironment() {
return pastryEnvironment;
}
public static void storeNotificationSystem(NotificationSystem system) {
notificationSystem = system;
}
public static NotificationSystem getNotificationSystem() {
return notificationSystem;
}
}

View File

@ -56,6 +56,15 @@ import io.github.chronosx88.influence.helpers.actions.UIActions;
import io.github.chronosx88.influence.models.ChatMetadata;
import io.github.chronosx88.influence.models.NewChatRequestMessage;
import io.github.chronosx88.influence.models.PublicUserProfile;
import io.github.chronosx88.influence.notificationSystem.NotificationHandler;
import io.github.chronosx88.influence.notificationSystem.NotificationSystem;
import rice.environment.Environment;
import rice.p2p.scribe.ScribeContent;
import rice.pastry.NodeHandle;
import rice.pastry.NodeIdFactory;
import rice.pastry.PastryNode;
import rice.pastry.socket.internet.InternetPastryNodeFactory;
import rice.pastry.standard.RandomNodeIdFactory;
public class MainLogic implements CoreContracts.IMainLogicContract {
private static final String LOG_TAG = MainLogic.class.getName();
@ -71,6 +80,7 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
private KeyPairManager keyPairManager;
private Thread checkNewChatsThread = null;
private Storage storage;
private PastryNode pastryNode;
public MainLogic() {
this.context = AppHelper.getContext();
@ -151,19 +161,35 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
return;
}
try {
if(!createPastryNode(7244, new InetSocketAddress(bootstrapAddress, 7244))) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", UIActions.BOOTSTRAP_ERROR);
AppHelper.getObservable().notifyUIObservers(jsonObject);
return;
}
} catch (Exception e) {
e.printStackTrace();
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", UIActions.BOOTSTRAP_ERROR);
AppHelper.getObservable().notifyUIObservers(jsonObject);
return;
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("action", UIActions.BOOTSTRAP_SUCCESS);
AppHelper.getObservable().notifyUIObservers(jsonObject);
AppHelper.storePeerID(preferences.getString("peerID", null));
AppHelper.updateUsername(preferences.getString("username", null));
AppHelper.storePeerDHT(peerDHT);
AppHelper.initNetworkHandler();
//setReceiveHandler();
AppHelper.storePastryNode(pastryNode);
AppHelper.storeNotificationSystem(new NotificationSystem());
gson = new Gson();
publicProfileToDHT();
SettingsLogic.Companion.publishUsername(AppHelper.getUsername(), AppHelper.getUsername());
NetworkHandler.handlePendingChatRequests();
TimerTask timerTask = new TimerTask() {
AppHelper.getNotificationSystem().subscribe(AppHelper.getPeerID() + "_newChats", notification -> NetworkHandler.handlePendingChatRequests());
/*TimerTask timerTask = new TimerTask() {
@Override
public void run() {
if(checkNewChatsThread == null) {
@ -177,7 +203,7 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 1, 5000);
timer.schedule(timerTask, 1, 5000);*/
replication = new IndirectReplication(peerDHT).start();
} catch (IOException e) {
e.printStackTrace();
@ -227,14 +253,6 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
}
private void setReceiveHandler() {
AppHelper.getPeerDHT().peer().objectDataReply((s, r) -> {
Log.i(LOG_TAG, "# Incoming message: " + r);
AppHelper.getObservable().notifyNetworkObservers(r);
return null;
});
}
@Override
public void shutdownPeer() {
new Thread(() -> {
@ -274,29 +292,14 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
private ChannelClientConfiguration createChannelClientConfig() {
ChannelClientConfiguration channelClientConfiguration = new ChannelClientConfiguration();
channelClientConfiguration.bindings(new Bindings());
channelClientConfiguration.maxPermitsPermanentTCP(250);
channelClientConfiguration.maxPermitsTCP(250);
channelClientConfiguration.maxPermitsUDP(250);
channelClientConfiguration.pipelineFilter(new PeerBuilder.DefaultPipelineFilter());
ChannelClientConfiguration channelClientConfiguration = PeerBuilder.createDefaultChannelClientConfiguration();
channelClientConfiguration.signatureFactory(new RSASignatureFactory());
channelClientConfiguration.senderTCP((new InetSocketAddress(0)).getAddress());
channelClientConfiguration.senderUDP((new InetSocketAddress(0)).getAddress());
channelClientConfiguration.byteBufPool(false);
return channelClientConfiguration;
}
private ChannelServerConfiguration createChannelServerConfig() {
ChannelServerConfiguration channelServerConfiguration = new ChannelServerConfiguration();
channelServerConfiguration.bindings(new Bindings());
//these two values may be overwritten in the peer builder
channelServerConfiguration.ports(new Ports(Ports.DEFAULT_PORT, Ports.DEFAULT_PORT));
channelServerConfiguration.portsForwarding(new Ports(Ports.DEFAULT_PORT, Ports.DEFAULT_PORT));
channelServerConfiguration.behindFirewall(false);
channelServerConfiguration.pipelineFilter(new PeerBuilder.DefaultPipelineFilter());
ChannelServerConfiguration channelServerConfiguration = PeerBuilder.createDefaultChannelServerConfiguration();
channelServerConfiguration.signatureFactory(new RSASignatureFactory());
channelServerConfiguration.byteBufPool(false);
return channelServerConfiguration;
}
@ -369,4 +372,31 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
}
return null;
}
private boolean createPastryNode(int bindPort, InetSocketAddress bootAddress) throws Exception {
Environment env = new Environment();
env.getParameters().setString("probe_for_external_address","true");
AppHelper.storePastryEnvironment(env);
NodeIdFactory nidFactory = new RandomNodeIdFactory(env);
InternetPastryNodeFactory factory = new InternetPastryNodeFactory(nidFactory, bindPort, env);
NodeHandle bootHandle = factory.getNodeHandle(bootAddress);
PastryNode node;
if(bootHandle != null) {
node = factory.newNode(bootHandle);
} else {
return false;
}
// the node may require sending several messages to fully boot into the ring
synchronized(node) {
while(!node.isReady() && !node.joinFailed()) {
node.wait(100);
if (node.joinFailed()) {
return false;
}
}
}
Log.i(LOG_TAG, "# [Pastry] Finished creating new Pastry node: " + node);
this.pastryNode = node;
return true;
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (C) 2019 ChronosX88
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.github.chronosx88.influence.notificationSystem;
import rice.p2p.scribe.ScribeContent;
@FunctionalInterface
public interface NotificationHandler {
void handleNotification(ScribeContent notification);
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (C) 2019 ChronosX88
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.github.chronosx88.influence.notificationSystem;
import rice.p2p.scribe.ScribeContent;
public class NotificationSystem {
private ScribeClient scribeClient = new ScribeClient();
public void subscribe(String topicName, NotificationHandler handler) {
scribeClient.subscribeToTopic(topicName, handler);
}
public void unsubscribe(String topicName) {
scribeClient.unsubscribeFromTopic(topicName);
}
public void publish(String topicName, ScribeContent message) {
scribeClient.publishToTopic(topicName, message);
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright (C) 2019 ChronosX88
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package io.github.chronosx88.influence.notificationSystem;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.github.chronosx88.influence.helpers.AppHelper;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.scribe.Scribe;
import rice.p2p.scribe.ScribeContent;
import rice.p2p.scribe.ScribeImpl;
import rice.p2p.scribe.Topic;
import rice.pastry.commonapi.PastryIdFactory;
public class ScribeClient implements rice.p2p.scribe.ScribeClient {
private Map<String, Topic> topicMap = new ConcurrentHashMap<>();
private Map<Topic, NotificationHandler> notificationHandlerMap = new ConcurrentHashMap<>();
private Scribe scribe = new ScribeImpl(AppHelper.getPastryNode(), "scribeInstance");
@Override
public boolean anycast(Topic topic, ScribeContent scribeContent) {
// We don't need anycast. Therefore, just suspend the wave.
return true;
}
@Override
public void deliver(Topic topic, ScribeContent scribeContent) {
if(notificationHandlerMap.containsKey(topic)) {
notificationHandlerMap.get(topic).handleNotification(scribeContent);
}
}
@Override
public void childAdded(Topic topic, NodeHandle nodeHandle) {
// Nothing to do
}
@Override
public void childRemoved(Topic topic, NodeHandle nodeHandle) {
// Nothing to do
}
@Override
public void subscribeFailed(Topic topic) {
// Nothing to do
}
public void subscribeToTopic(String topicName, NotificationHandler handler) {
Topic topic = new Topic(new PastryIdFactory(AppHelper.getPastryEnvironment()), topicName);
scribe.subscribe(topic, this);
topicMap.put(topicName, topic);
notificationHandlerMap.put(topic, handler);
}
public void unsubscribeFromTopic(String topicName) {
if(topicMap.containsKey(topicName)) {
scribe.unsubscribe(topicMap.get(topicName), this);
notificationHandlerMap.remove(topicMap.get(topicName));
}
}
public void publishToTopic(String topicName, ScribeContent content) {
Topic topic = new Topic(new PastryIdFactory(AppHelper.getPastryEnvironment()), topicName);
scribe.publish(topic, content);
}
}