Fixed DataSerializer with encode/decode signature

This commit is contained in:
ChronosX88 2019-04-16 16:29:09 +03:00
parent 3e719ea69e
commit 8626d1a916
5 changed files with 551 additions and 5 deletions

View File

@ -10,6 +10,7 @@
<option value="$PROJECT_DIR$" />
</set>
</option>
<option name="useAutoImport" value="true" />
<option name="useQualifiedModuleNames" value="true" />
</GradleProjectSettings>
</option>

View File

@ -12,7 +12,7 @@ buildscript {
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
version '0.2'
version '0.2.1'
sourceCompatibility = 1.8

View File

@ -0,0 +1,179 @@
package io.github.chronosx88.dhtBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import net.tomp2p.connection.SignatureFactory;
import net.tomp2p.peers.Number160;
import net.tomp2p.storage.Data;
import org.mapdb.Serializer;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.InvalidKeyException;
import java.security.SignatureException;
public class DataSerializer implements Serializer<Data>, Serializable {
private static final long serialVersionUID = 1428836065493792295L;
//TODO: test the performance impact
private static final int MAX_SIZE = 10 * 1024;
final private File path;
final private SignatureFactory signatureFactory;
public DataSerializer(File path, SignatureFactory signatureFactory) {
this.path = path;
this.signatureFactory = signatureFactory;
}
@Override
public void serialize(DataOutput out, Data value) throws IOException {
if (value.length() > MAX_SIZE) {
// header, 1 means stored on disk in a file
out.writeByte(1);
serializeFile(out, value);
} else {
// header, 0 means stored on disk with MapDB
out.writeByte(0);
serializeMapDB(out, value);
}
}
private void serializeMapDB(DataOutput out, Data value) throws IOException {
ByteBuf acb = Unpooled.buffer();
// store data to disk
// header first
value.encodeHeader(acb, signatureFactory);
write(out, acb.nioBuffers());
acb.skipBytes(acb.writerIndex());
// next data - no need to copy to another buffer, just take the data
// from memory
write(out, value.toByteBuffers());
// rest
try {
value.encodeDone(acb, signatureFactory);
write(out, acb.nioBuffers());
} catch (InvalidKeyException e) {
throw new IOException(e);
} catch (SignatureException e) {
throw new IOException(e);
}
}
private void serializeFile(DataOutput out, Data value) throws IOException, FileNotFoundException {
Number160 hash = value.hash();
// store file name
out.write(hash.toByteArray());
// store as external file, create path
RandomAccessFile file = null;
FileChannel rwChannel = null;
ByteBuf acb = null;
try {
file = new RandomAccessFile(new File(path, hash.toString()), "rw");
rwChannel = file.getChannel();
acb = Unpooled.buffer();
// store data to disk
// header first
value.encodeHeader(acb, signatureFactory);
rwChannel.write(acb.nioBuffers());
// next data - no need to copy to another buffer, just take the
// data from memory
rwChannel.write(value.toByteBuffers());
// rest
try {
value.encodeDone(acb, signatureFactory);
rwChannel.write(acb.nioBuffers());
} catch (InvalidKeyException e) {
throw new IOException(e);
} catch (SignatureException e) {
throw new IOException(e);
}
} finally {
if (acb!=null) {
acb.release();
}
if (rwChannel != null) {
rwChannel.close();
}
if (file != null) {
file.close();
}
}
}
private void write(DataOutput out, ByteBuffer[] nioBuffers) throws IOException {
final int length = nioBuffers.length;
for(int i=0;i < length; i++) {
int remaining = nioBuffers[i].remaining();
if(nioBuffers[i].hasArray()) {
out.write(nioBuffers[i].array(), nioBuffers[i].arrayOffset(), remaining);
} else {
byte[] me = new byte[remaining];
nioBuffers[i].get(me);
out.write(me);
}
}
}
@Override
public Data deserialize(DataInput in, int available) throws IOException {
int header = in.readByte();
if(header == 1) {
return deserializeFile(in);
} else if(header == 0) {
return deserializeMapDB(in);
} else {
throw new IOException("unexpected header: " + header);
}
}
private Data deserializeMapDB(DataInput in) throws IOException {
ByteBuf buf = Unpooled.buffer();
Data data = null;
while(data == null) {
buf.writeByte(in.readByte());
data = Data.decodeHeader(buf, signatureFactory);
}
int len = data.length();
byte me[] = new byte[len];
in.readFully(me);
buf = Unpooled.wrappedBuffer(me);
boolean retVal = data.decodeBuffer(buf);
if(!retVal) {
throw new IOException("data could not be read");
}
if(data.isSigned()) {
me = new byte[signatureFactory.signatureSize()];
in.readFully(me);
buf = Unpooled.wrappedBuffer(me);
}
retVal = data.decodeDone(buf, signatureFactory);
if(!retVal) {
throw new IOException("signature could not be read");
}
return data;
}
private Data deserializeFile(DataInput in) throws IOException, FileNotFoundException {
byte[] me = new byte[Number160.BYTE_ARRAY_SIZE];
in.readFully(me);
Number160 hash = new Number160(me);
RandomAccessFile file = new RandomAccessFile(new File(path, hash.toString()), "r");
FileChannel inChannel = file.getChannel();
MappedByteBuffer buffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
buffer.load();
ByteBuf buf = Unpooled.wrappedBuffer(buffer);
Data data = Data.decodeHeader(buf, signatureFactory);
data.decodeBuffer(buf);
data.decodeDone(buf, signatureFactory);
file.close();
return data;
}
@Override
public int fixedSize() {
return -1;
}
}

View File

@ -8,10 +8,13 @@ import net.tomp2p.p2p.PeerBuilder;
import net.tomp2p.peers.Number160;
import net.tomp2p.relay.RelayType;
import net.tomp2p.relay.tcp.TCPRelayServerConfig;
import net.tomp2p.replication.AutoReplication;
import net.tomp2p.replication.IndirectReplication;
import net.tomp2p.storage.StorageDisk;
import java.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
@ -28,7 +31,7 @@ public class Main {
File config = new File(DATA_DIR_PATH + "config.properties");
try {
if(!dataDir.exists() && !config.exists()) {
dataDir.mkdir();
dataDir.mkdirs();
config.createNewFile();
props.setProperty("isFirstRun", "false");
props.setProperty("peerID", UUID.randomUUID().toString());
@ -59,7 +62,7 @@ public class Main {
new PeerBuilderNAT(peerDHT.peer())
.addRelayServerConfiguration(RelayType.OPENTCP, new TCPRelayServerConfig())
.start();
new AutoReplication(peerDHT.peer()).start();
new IndirectReplication(peerDHT).start();
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -0,0 +1,363 @@
package io.github.chronosx88.dhtBootstrap;
import net.tomp2p.connection.SignatureFactory;
import net.tomp2p.dht.Storage;
import net.tomp2p.peers.Number160;
import net.tomp2p.peers.Number320;
import net.tomp2p.peers.Number480;
import net.tomp2p.peers.Number640;
import net.tomp2p.storage.Data;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import java.io.File;
import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
/*
* Copyright 2019 Thomas Bocek, ChronosX88
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public class StorageMapDB implements Storage {
// Core
final private NavigableMap<Number640, Data> dataMap;
// Maintenance
final private Map<Number640, Long> timeoutMap;
final private ConcurrentNavigableMap<Long, Set<Number640>> timeoutMapRev;
// Protection
final private Map<Number320, PublicKey> protectedDomainMap;
final private Map<Number480, PublicKey> protectedEntryMap;
// Responsibility
final private Map<Number160, Number160> responsibilityMap;
final private Map<Number160, Set<Number160>> responsibilityMapRev;
final private DB db;
final private int storageCheckIntervalMillis;
//for full control
public StorageMapDB(DB db, Number160 peerId, File path, SignatureFactory signatureFactory, int storageCheckIntervalMillis) {
this.db = db;
DataSerializer dataSerializer = new DataSerializer(path, signatureFactory);
this.dataMap = db.createTreeMap("dataMap_" + peerId.toString()).valueSerializer(dataSerializer).makeOrGet();
this.timeoutMap = db.createTreeMap("timeoutMap_" + peerId.toString()).makeOrGet();
this.timeoutMapRev = db.createTreeMap("timeoutMapRev_" + peerId.toString()).makeOrGet();
this.protectedDomainMap = db.createTreeMap("protectedDomainMap_" + peerId.toString()).makeOrGet();
this.protectedEntryMap = db.createTreeMap("protectedEntryMap_" + peerId.toString()).makeOrGet();
this.responsibilityMap = db.createTreeMap("responsibilityMap_" + peerId.toString()).makeOrGet();
this.responsibilityMapRev = db.createTreeMap("responsibilityMapRev_" + peerId.toString()).makeOrGet();
this.storageCheckIntervalMillis = storageCheckIntervalMillis;
}
//set parameter to a reasonable default
public StorageMapDB(Number160 peerId, File path, SignatureFactory signatureFactory) {
this(DBMaker.newFileDB(new File(path, "coreDB.db")).transactionDisable().closeOnJvmShutdown().make(),
peerId, path, signatureFactory, 60 * 1000);
}
@Override
public Data put(Number640 key, Data value) {
Data oldData = dataMap.put(key, value);
db.commit();
return oldData;
}
@Override
public Data get(Number640 key) {
return dataMap.get(key);
}
@Override
public boolean contains(Number640 key) {
return dataMap.containsKey(key);
}
@Override
public int contains(Number640 from, Number640 to) {
NavigableMap<Number640, Data> tmp = dataMap.subMap(from, true, to, true);
return tmp.size();
}
@Override
public Data remove(Number640 key, boolean returnData) {
Data retVal = dataMap.remove(key);
db.commit();
return retVal;
}
@Override
public NavigableMap<Number640, Data> remove(Number640 from, Number640 to) {
NavigableMap<Number640, Data> tmp = dataMap.subMap(from, true, to, true);
// new TreeMap<Number640, Data>(tmp); is not possible as this may lead to no such element exception:
//
// java.util.NoSuchElementException: null
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60]
// at java.util.TreeMap.<init>(TreeMap.java:195) ~[na:1.7.0_60]
// at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na]
//
// the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes,
// then you will call next() that returns null and an exception is thrown.
final NavigableMap<Number640, Data> retVal = new TreeMap<Number640, Data>();
for(final Map.Entry<Number640, Data> entry:tmp.entrySet()) {
retVal.put(entry.getKey(), entry.getValue());
}
tmp.clear();
db.commit();
return retVal;
}
@Override
public NavigableMap<Number640, Data> subMap(Number640 from, Number640 to, int limit, boolean ascending) {
NavigableMap<Number640, Data> tmp = dataMap.subMap(from, true, to, true);
final NavigableMap<Number640, Data> retVal = new TreeMap<Number640, Data>();
if (limit < 0) {
// new TreeMap<Number640, Data>(tmp); is not possible as this may lead to no such element exception:
//
// java.util.NoSuchElementException: null
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60]
// at java.util.TreeMap.<init>(TreeMap.java:195) ~[na:1.7.0_60]
// at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na]
//
// the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes,
// then you will call next() that returns null and an exception is thrown.
for(final Map.Entry<Number640, Data> entry:(ascending ? tmp : tmp.descendingMap()).entrySet()) {
retVal.put(entry.getKey(), entry.getValue());
}
} else {
limit = Math.min(limit, tmp.size());
Iterator<Map.Entry<Number640, Data>> iterator = ascending ? tmp.entrySet().iterator() : tmp
.descendingMap().entrySet().iterator();
for (int i = 0; iterator.hasNext() && i < limit; i++) {
Map.Entry<Number640, Data> entry = iterator.next();
retVal.put(entry.getKey(), entry.getValue());
}
}
return retVal;
}
@Override
public NavigableMap<Number640, Data> map() {
// new TreeMap<Number640, Data>(dataMap); is not possible as this may lead to no such element exception:
//
// java.util.NoSuchElementException: null
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60]
// at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60]
// at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60]
// at java.util.TreeMap.<init>(TreeMap.java:195) ~[na:1.7.0_60]
// at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na]
//
// the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes,
// then you will call next() that returns null and an exception is thrown.
final NavigableMap<Number640, Data> retVal = new TreeMap<Number640, Data>();
for(final Map.Entry<Number640, Data> entry:dataMap.entrySet()) {
retVal.put(entry.getKey(), entry.getValue());
}
return retVal;
}
// Maintenance
@Override
public void addTimeout(Number640 key, long expiration) {
Long oldExpiration = timeoutMap.put(key, expiration);
putIfAbsent2(expiration, key);
if (oldExpiration == null) {
return;
}
removeRevTimeout(key, oldExpiration);
db.commit();
}
private void putIfAbsent2(long expiration, Number640 key) {
Set<Number640> timeouts = timeoutMapRev.get(expiration);
if(timeouts == null) {
timeouts = Collections.newSetFromMap(new ConcurrentHashMap<Number640, Boolean>());
}
timeouts.add(key);
timeoutMapRev.put(expiration, timeouts);
}
@Override
public void removeTimeout(Number640 key) {
Long expiration = timeoutMap.remove(key);
if (expiration == null) {
return;
}
removeRevTimeout(key, expiration);
db.commit();
}
private void removeRevTimeout(Number640 key, Long expiration) {
Set<Number640> tmp = timeoutMapRev.get(expiration);
if (tmp != null) {
tmp.remove(key);
if (tmp.isEmpty()) {
timeoutMapRev.remove(expiration);
} else {
timeoutMapRev.put(expiration, tmp);
}
}
}
@Override
public Collection<Number640> subMapTimeout(long to) {
SortedMap<Long, Set<Number640>> tmp = timeoutMapRev.subMap(0L, to);
Collection<Number640> toRemove = new ArrayList<Number640>();
for (Set<Number640> set : tmp.values()) {
toRemove.addAll(set);
}
return toRemove;
}
// Responsibility
@Override
public Number160 findPeerIDsForResponsibleContent(Number160 locationKey) {
return responsibilityMap.get(locationKey);
}
@Override
public Collection<Number160> findContentForResponsiblePeerID(Number160 peerID) {
return responsibilityMapRev.get(peerID);
}
@Override
public boolean updateResponsibilities(Number160 locationKey, Number160 peerId) {
final Number160 oldPeerID = responsibilityMap.put(locationKey, peerId);
final boolean hasChanged;
if(oldPeerID != null) {
if(oldPeerID.equals(peerId)) {
hasChanged = false;
} else {
removeRevResponsibility(oldPeerID, locationKey);
hasChanged = true;
}
} else {
hasChanged = true;
}
Set<Number160> contentIDs = responsibilityMapRev.get(peerId);
if(contentIDs == null) {
contentIDs = new HashSet<Number160>();
}
contentIDs.add(locationKey);
responsibilityMapRev.put(peerId, contentIDs);
db.commit();
return hasChanged;
}
@Override
public void removeResponsibility(Number160 locationKey) {
final Number160 peerId = responsibilityMap.remove(locationKey);
if(peerId != null) {
removeRevResponsibility(peerId, locationKey);
}
db.commit();
}
private void removeRevResponsibility(Number160 peerId, Number160 locationKey) {
Set<Number160> contentIDs = responsibilityMapRev.get(peerId);
if (contentIDs != null) {
contentIDs.remove(locationKey);
if (contentIDs.isEmpty()) {
responsibilityMapRev.remove(peerId);
} else {
responsibilityMapRev.put(peerId, contentIDs);
}
}
}
// Misc
@Override
public void close() {
db.close();
}
// Protection Domain
@Override
public boolean protectDomain(Number320 key, PublicKey publicKey) {
protectedDomainMap.put(key, publicKey);
return true;
}
@Override
public boolean isDomainProtectedByOthers(Number320 key, PublicKey publicKey) {
PublicKey other = protectedDomainMap.get(key);
if (other == null) {
return false;
}
return !other.equals(publicKey);
}
// Protection Entry
@Override
public boolean protectEntry(Number480 key, PublicKey publicKey) {
protectedEntryMap.put(key, publicKey);
return true;
}
@Override
public boolean isEntryProtectedByOthers(Number480 key, PublicKey publicKey) {
PublicKey other = protectedEntryMap.get(key);
if (other == null) {
return false;
}
return !other.equals(publicKey);
}
@Override
public int storageCheckIntervalMillis() {
return storageCheckIntervalMillis;
}
}