Back to BerkeleyDB (and optimized it)

This commit is contained in:
ChronosX88 2019-04-28 13:58:35 +04:00
parent d062ba54b9
commit 6939f81f59
5 changed files with 38 additions and 18 deletions

View File

@ -19,7 +19,7 @@ class DataSerializer(private val signatureFactory: SignatureFactory) : EntryBind
if (databaseEntry.data == null) { if (databaseEntry.data == null) {
return null return null
} }
val dataInput = DataInputStream(ByteArrayInputStream(databaseEntry.data)) val dataInput = ByteArrayInputStream(databaseEntry.data)
var buf = Unpooled.buffer() var buf = Unpooled.buffer()
var data: Data? = null var data: Data? = null
while (data == null) { while (data == null) {
@ -41,7 +41,7 @@ class DataSerializer(private val signatureFactory: SignatureFactory) : EntryBind
} }
if (data.isSigned) { if (data.isSigned) {
me = ByteArray(signatureFactory.signatureSize()) me = ByteArray(signatureFactory.signatureSize())
dataInput.readFully(me) dataInput.read(me)
buf = Unpooled.wrappedBuffer(me) buf = Unpooled.wrappedBuffer(me)
} }
retVal = data.decodeDone(buf, signatureFactory); retVal = data.decodeDone(buf, signatureFactory);
@ -52,8 +52,7 @@ class DataSerializer(private val signatureFactory: SignatureFactory) : EntryBind
} }
override fun objectToEntry(data: Data, databaseEntry: DatabaseEntry) { override fun objectToEntry(data: Data, databaseEntry: DatabaseEntry) {
val baos = ByteArrayOutputStream() val out = ByteArrayOutputStream()
val out = DataOutputStream(baos)
val acb = Unpooled.buffer() val acb = Unpooled.buffer()
// store data to disk // store data to disk
// header first // header first
@ -73,7 +72,7 @@ class DataSerializer(private val signatureFactory: SignatureFactory) : EntryBind
throw IOException(e) throw IOException(e)
} }
out.flush() out.flush()
databaseEntry.data = baos.toByteArray() databaseEntry.data = out.toByteArray()
out.close() out.close()
} }

View File

@ -45,9 +45,11 @@ public class P2PUtils {
.all() .all()
.start() .start()
.awaitUninterruptibly(); .awaitUninterruptibly();
if(futureGet != null && !futureGet.isEmpty()) { if(futureGet != null) {
if(!futureGet.isEmpty()) {
return futureGet.dataMap(); return futureGet.dataMap();
} }
}
return null; return null;
} }

View File

@ -51,7 +51,7 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
envConfig.allowCreate = true envConfig.allowCreate = true
dbEnvironment = Environment(path, envConfig) dbEnvironment = Environment(path, envConfig)
val configMap : HashMap<String, com.sleepycat.je.DatabaseConfig> = HashMap() val configMap : HashMap<String, DatabaseConfig> = HashMap()
val compareNumber640 = CompareNumber640() val compareNumber640 = CompareNumber640()
val compareLong = CompareLong() val compareLong = CompareLong()
@ -98,7 +98,9 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
} }
override fun put(key: Number640?, value: Data?): Data? { override fun put(key: Number640?, value: Data?): Data? {
return dataMap.put(key, value) val oldData = dataMap.put(key, value)
dbEnvironment.sync()
return oldData
} }
override fun get(key: Number640?): Data? { override fun get(key: Number640?): Data? {
@ -106,7 +108,9 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
} }
override fun remove(key: Number640?, returnData: Boolean): Data? { override fun remove(key: Number640?, returnData: Boolean): Data? {
return dataMap.remove(key) val oldData = dataMap.remove(key)
dbEnvironment.sync()
return oldData
} }
override fun remove(from: Number640?, to: Number640?): NavigableMap<Number640, Data> { override fun remove(from: Number640?, to: Number640?): NavigableMap<Number640, Data> {
@ -116,6 +120,7 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
retVal[entry.key] = entry.value retVal[entry.key] = entry.value
} }
tmp.clear() tmp.clear()
dbEnvironment.sync()
return retVal return retVal
} }
@ -126,6 +131,7 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
return return
} }
removeRevTimeout(key, oldExpiration) removeRevTimeout(key, oldExpiration)
dbEnvironment.sync()
} }
private fun putIfAbsent2(expiration: Long, key: Number640) { private fun putIfAbsent2(expiration: Long, key: Number640) {
@ -136,10 +142,11 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
} }
(timeouts as MutableSet).add(key) (timeouts as MutableSet).add(key)
timeoutMapRev[expiration] = timeouts timeoutMapRev[expiration] = timeouts
dbEnvironment.sync()
} }
private fun removeRevTimeout(key: Number640, expiration: Long?) { private fun removeRevTimeout(key: Number640, expiration: Long?) {
val tmp = timeoutMapRev[expiration] as MutableSet<Number640> val tmp = timeoutMapRev[expiration] as MutableSet<Number640>?
if (tmp != null) { if (tmp != null) {
tmp.remove(key) tmp.remove(key)
if (tmp.isEmpty()) { if (tmp.isEmpty()) {
@ -148,6 +155,7 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
timeoutMapRev[expiration!!] = tmp timeoutMapRev[expiration!!] = tmp
} }
} }
dbEnvironment.sync()
} }
override fun updateResponsibilities(locationKey: Number160, peerId: Number160?): Boolean { override fun updateResponsibilities(locationKey: Number160, peerId: Number160?): Boolean {
@ -163,12 +171,13 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
} else { } else {
hasChanged = true hasChanged = true
} }
var contentIDs: MutableSet<Number160>? = responsibilityMapRev[peerId] as MutableSet var contentIDs: MutableSet<Number160>? = responsibilityMapRev[peerId] as MutableSet?
if (contentIDs == null) { if (contentIDs == null) {
contentIDs = HashSet() contentIDs = HashSet()
} }
contentIDs.add(locationKey) contentIDs.add(locationKey)
responsibilityMapRev[peerId] = contentIDs responsibilityMapRev[peerId] = contentIDs
dbEnvironment.sync()
return hasChanged return hasChanged
} }
@ -182,10 +191,12 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
responsibilityMapRev[peerId] = contentIDs responsibilityMapRev[peerId] = contentIDs
} }
} }
dbEnvironment.sync()
} }
override fun protectDomain(key: Number320?, publicKey: PublicKey?): Boolean { override fun protectDomain(key: Number320?, publicKey: PublicKey?): Boolean {
protectedDomainMap[key] = publicKey protectedDomainMap[key] = publicKey
dbEnvironment.sync()
return true return true
} }
@ -201,6 +212,8 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
override fun removeTimeout(key: Number640) { override fun removeTimeout(key: Number640) {
val expiration = timeoutMap.remove(key) ?: return val expiration = timeoutMap.remove(key) ?: return
removeRevTimeout(key, expiration) removeRevTimeout(key, expiration)
timeoutMapDB.sync()
dbEnvironment.sync()
} }
override fun removeResponsibility(locationKey: Number160) { override fun removeResponsibility(locationKey: Number160) {
@ -208,10 +221,12 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
if (peerId != null) { if (peerId != null) {
removeRevResponsibility(peerId, locationKey) removeRevResponsibility(peerId, locationKey)
} }
dbEnvironment.sync()
} }
override fun protectEntry(key: Number480?, publicKey: PublicKey?): Boolean { override fun protectEntry(key: Number480?, publicKey: PublicKey?): Boolean {
protectedEntryMap[key] = publicKey protectedEntryMap[key] = publicKey
dbEnvironment.sync()
return true return true
} }
@ -270,5 +285,6 @@ class StorageBerkeleyDB(peerId: Number160, path : File, signatureFactory: Signat
protectedEntryMapDB.close() protectedEntryMapDB.close()
responsibilityMapDB.close() responsibilityMapDB.close()
responsibilityMapRevDB.close() responsibilityMapRevDB.close()
dbEnvironment.close()
} }
} }

View File

@ -30,7 +30,7 @@ import io.github.chronosx88.influence.models.roomEntities.MessageEntity;
public class ChatLogic implements CoreContracts.IChatLogicContract { public class ChatLogic implements CoreContracts.IChatLogicContract {
private static Gson gson = new Gson(); private static Gson gson = new Gson();
private String chatID; private String chatID;
private String newMessage = ""; private volatile String newMessage = "";
private ChatEntity chatEntity; private ChatEntity chatEntity;
private Thread checkNewMessagesThread = null; private Thread checkNewMessagesThread = null;
private KeyPairManager keyPairManager; private KeyPairManager keyPairManager;

View File

@ -44,6 +44,7 @@ import io.github.chronosx88.influence.helpers.JVMShutdownHook;
import io.github.chronosx88.influence.helpers.KeyPairManager; import io.github.chronosx88.influence.helpers.KeyPairManager;
import io.github.chronosx88.influence.helpers.NetworkHandler; import io.github.chronosx88.influence.helpers.NetworkHandler;
import io.github.chronosx88.influence.helpers.P2PUtils; import io.github.chronosx88.influence.helpers.P2PUtils;
import io.github.chronosx88.influence.helpers.StorageBerkeleyDB;
import io.github.chronosx88.influence.helpers.StorageMapDB; import io.github.chronosx88.influence.helpers.StorageMapDB;
import io.github.chronosx88.influence.helpers.actions.UIActions; import io.github.chronosx88.influence.helpers.actions.UIActions;
import io.github.chronosx88.influence.models.PublicUserProfile; import io.github.chronosx88.influence.models.PublicUserProfile;
@ -85,9 +86,11 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
new Thread(() -> { new Thread(() -> {
try { try {
//StorageBerkeleyDB storageBerkeleyDB = new StorageBerkeleyDB(peerID, context.getFilesDir(), new RSASignatureFactory()); File dhtDBEnv = new File(context.getFilesDir(), "dhtDBEnv");
Storage storageMapDB = new StorageMapDB(peerID, new File(context.getFilesDir(), "dhtDB.db"), new RSASignatureFactory()); if(!dhtDBEnv.exists())
this.storage = storageMapDB; dhtDBEnv.mkdirs();
Storage storage = new StorageBerkeleyDB(peerID, dhtDBEnv, new RSASignatureFactory());
this.storage = storage;
peerDHT = new PeerBuilderDHT( peerDHT = new PeerBuilderDHT(
new PeerBuilder(peerID) new PeerBuilder(peerID)
.ports(7243) .ports(7243)
@ -95,9 +98,9 @@ public class MainLogic implements CoreContracts.IMainLogicContract {
.channelServerConfiguration(createChannelServerConfig()) .channelServerConfiguration(createChannelServerConfig())
.start() .start()
) )
.storage(storageMapDB) .storage(storage)
.start(); .start();
Runtime.getRuntime().addShutdownHook(new JVMShutdownHook(storageMapDB)); Runtime.getRuntime().addShutdownHook(new JVMShutdownHook(storage));
try { try {
String bootstrapIP = this.preferences.getString("bootstrapAddress", null); String bootstrapIP = this.preferences.getString("bootstrapAddress", null);
if(bootstrapIP == null) { if(bootstrapIP == null) {