From 8626d1a916b8fa4504747335869fd86e373c9003 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 16 Apr 2019 16:29:09 +0300 Subject: [PATCH] Fixed DataSerializer with encode/decode signature --- .idea/gradle.xml | 1 + build.gradle | 2 +- .../dhtBootstrap/DataSerializer.java | 179 +++++++++ .../github/chronosx88/dhtBootstrap/Main.java | 11 +- .../chronosx88/dhtBootstrap/StorageMapDB.java | 363 ++++++++++++++++++ 5 files changed, 551 insertions(+), 5 deletions(-) create mode 100644 src/main/java/io/github/chronosx88/dhtBootstrap/DataSerializer.java create mode 100644 src/main/java/io/github/chronosx88/dhtBootstrap/StorageMapDB.java diff --git a/.idea/gradle.xml b/.idea/gradle.xml index 310e0f2..3163861 100644 --- a/.idea/gradle.xml +++ b/.idea/gradle.xml @@ -10,6 +10,7 @@ + diff --git a/build.gradle b/build.gradle index 29e72c8..d2323b2 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ buildscript { apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' -version '0.2' +version '0.2.1' sourceCompatibility = 1.8 diff --git a/src/main/java/io/github/chronosx88/dhtBootstrap/DataSerializer.java b/src/main/java/io/github/chronosx88/dhtBootstrap/DataSerializer.java new file mode 100644 index 0000000..90fd7c1 --- /dev/null +++ b/src/main/java/io/github/chronosx88/dhtBootstrap/DataSerializer.java @@ -0,0 +1,179 @@ +package io.github.chronosx88.dhtBootstrap; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import net.tomp2p.connection.SignatureFactory; +import net.tomp2p.peers.Number160; +import net.tomp2p.storage.Data; +import org.mapdb.Serializer; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.security.InvalidKeyException; +import java.security.SignatureException; + +public class DataSerializer implements Serializer, Serializable { + + private static final long serialVersionUID = 1428836065493792295L; + //TODO: test the performance impact + private static final int MAX_SIZE = 10 * 1024; + + final private File path; + final private SignatureFactory signatureFactory; + + public DataSerializer(File path, SignatureFactory signatureFactory) { + this.path = path; + this.signatureFactory = signatureFactory; + } + + @Override + public void serialize(DataOutput out, Data value) throws IOException { + if (value.length() > MAX_SIZE) { + // header, 1 means stored on disk in a file + out.writeByte(1); + serializeFile(out, value); + } else { + // header, 0 means stored on disk with MapDB + out.writeByte(0); + serializeMapDB(out, value); + } + } + + private void serializeMapDB(DataOutput out, Data value) throws IOException { + ByteBuf acb = Unpooled.buffer(); + // store data to disk + // header first + value.encodeHeader(acb, signatureFactory); + write(out, acb.nioBuffers()); + acb.skipBytes(acb.writerIndex()); + // next data - no need to copy to another buffer, just take the data + // from memory + write(out, value.toByteBuffers()); + // rest + try { + value.encodeDone(acb, signatureFactory); + write(out, acb.nioBuffers()); + } catch (InvalidKeyException e) { + throw new IOException(e); + } catch (SignatureException e) { + throw new IOException(e); + } + } + + private void serializeFile(DataOutput out, Data value) throws IOException, FileNotFoundException { + Number160 hash = value.hash(); + // store file name + out.write(hash.toByteArray()); + // store as external file, create path + RandomAccessFile file = null; + FileChannel rwChannel = null; + ByteBuf acb = null; + try { + file = new RandomAccessFile(new File(path, hash.toString()), "rw"); + rwChannel = file.getChannel(); + acb = Unpooled.buffer(); + // store data to disk + // header first + value.encodeHeader(acb, signatureFactory); + rwChannel.write(acb.nioBuffers()); + // next data - no need to copy to another buffer, just take the + // data from memory + rwChannel.write(value.toByteBuffers()); + // rest + try { + value.encodeDone(acb, signatureFactory); + rwChannel.write(acb.nioBuffers()); + } catch (InvalidKeyException e) { + throw new IOException(e); + } catch (SignatureException e) { + throw new IOException(e); + } + } finally { + if (acb!=null) { + acb.release(); + } + if (rwChannel != null) { + rwChannel.close(); + } + if (file != null) { + file.close(); + } + } + } + + private void write(DataOutput out, ByteBuffer[] nioBuffers) throws IOException { + final int length = nioBuffers.length; + for(int i=0;i < length; i++) { + int remaining = nioBuffers[i].remaining(); + if(nioBuffers[i].hasArray()) { + out.write(nioBuffers[i].array(), nioBuffers[i].arrayOffset(), remaining); + } else { + byte[] me = new byte[remaining]; + nioBuffers[i].get(me); + out.write(me); + } + } + } + + @Override + public Data deserialize(DataInput in, int available) throws IOException { + int header = in.readByte(); + if(header == 1) { + return deserializeFile(in); + } else if(header == 0) { + return deserializeMapDB(in); + } else { + throw new IOException("unexpected header: " + header); + } + } + + private Data deserializeMapDB(DataInput in) throws IOException { + ByteBuf buf = Unpooled.buffer(); + Data data = null; + while(data == null) { + buf.writeByte(in.readByte()); + data = Data.decodeHeader(buf, signatureFactory); + } + int len = data.length(); + byte me[] = new byte[len]; + in.readFully(me); + buf = Unpooled.wrappedBuffer(me); + boolean retVal = data.decodeBuffer(buf); + if(!retVal) { + throw new IOException("data could not be read"); + } + if(data.isSigned()) { + me = new byte[signatureFactory.signatureSize()]; + in.readFully(me); + buf = Unpooled.wrappedBuffer(me); + } + retVal = data.decodeDone(buf, signatureFactory); + if(!retVal) { + throw new IOException("signature could not be read"); + } + return data; + } + + private Data deserializeFile(DataInput in) throws IOException, FileNotFoundException { + byte[] me = new byte[Number160.BYTE_ARRAY_SIZE]; + in.readFully(me); + Number160 hash = new Number160(me); + RandomAccessFile file = new RandomAccessFile(new File(path, hash.toString()), "r"); + FileChannel inChannel = file.getChannel(); + MappedByteBuffer buffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size()); + buffer.load(); + ByteBuf buf = Unpooled.wrappedBuffer(buffer); + Data data = Data.decodeHeader(buf, signatureFactory); + data.decodeBuffer(buf); + data.decodeDone(buf, signatureFactory); + file.close(); + return data; + } + + @Override + public int fixedSize() { + return -1; + } +} diff --git a/src/main/java/io/github/chronosx88/dhtBootstrap/Main.java b/src/main/java/io/github/chronosx88/dhtBootstrap/Main.java index 54e6ea4..96d15ba 100644 --- a/src/main/java/io/github/chronosx88/dhtBootstrap/Main.java +++ b/src/main/java/io/github/chronosx88/dhtBootstrap/Main.java @@ -8,10 +8,13 @@ import net.tomp2p.p2p.PeerBuilder; import net.tomp2p.peers.Number160; import net.tomp2p.relay.RelayType; import net.tomp2p.relay.tcp.TCPRelayServerConfig; -import net.tomp2p.replication.AutoReplication; +import net.tomp2p.replication.IndirectReplication; import net.tomp2p.storage.StorageDisk; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; import java.util.Properties; import java.util.UUID; @@ -28,7 +31,7 @@ public class Main { File config = new File(DATA_DIR_PATH + "config.properties"); try { if(!dataDir.exists() && !config.exists()) { - dataDir.mkdir(); + dataDir.mkdirs(); config.createNewFile(); props.setProperty("isFirstRun", "false"); props.setProperty("peerID", UUID.randomUUID().toString()); @@ -59,7 +62,7 @@ public class Main { new PeerBuilderNAT(peerDHT.peer()) .addRelayServerConfiguration(RelayType.OPENTCP, new TCPRelayServerConfig()) .start(); - new AutoReplication(peerDHT.peer()).start(); + new IndirectReplication(peerDHT).start(); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/main/java/io/github/chronosx88/dhtBootstrap/StorageMapDB.java b/src/main/java/io/github/chronosx88/dhtBootstrap/StorageMapDB.java new file mode 100644 index 0000000..baa9b91 --- /dev/null +++ b/src/main/java/io/github/chronosx88/dhtBootstrap/StorageMapDB.java @@ -0,0 +1,363 @@ +package io.github.chronosx88.dhtBootstrap; + +import net.tomp2p.connection.SignatureFactory; +import net.tomp2p.dht.Storage; +import net.tomp2p.peers.Number160; +import net.tomp2p.peers.Number320; +import net.tomp2p.peers.Number480; +import net.tomp2p.peers.Number640; +import net.tomp2p.storage.Data; +import org.mapdb.DB; +import org.mapdb.DBMaker; + +import java.io.File; +import java.security.PublicKey; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; + +/* + * Copyright 2019 Thomas Bocek, ChronosX88 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +public class StorageMapDB implements Storage { + + // Core + final private NavigableMap dataMap; + // Maintenance + final private Map timeoutMap; + final private ConcurrentNavigableMap> timeoutMapRev; + // Protection + final private Map protectedDomainMap; + final private Map protectedEntryMap; + // Responsibility + final private Map responsibilityMap; + final private Map> responsibilityMapRev; + + final private DB db; + + final private int storageCheckIntervalMillis; + + //for full control + public StorageMapDB(DB db, Number160 peerId, File path, SignatureFactory signatureFactory, int storageCheckIntervalMillis) { + this.db = db; + DataSerializer dataSerializer = new DataSerializer(path, signatureFactory); + this.dataMap = db.createTreeMap("dataMap_" + peerId.toString()).valueSerializer(dataSerializer).makeOrGet(); + this.timeoutMap = db.createTreeMap("timeoutMap_" + peerId.toString()).makeOrGet(); + this.timeoutMapRev = db.createTreeMap("timeoutMapRev_" + peerId.toString()).makeOrGet(); + this.protectedDomainMap = db.createTreeMap("protectedDomainMap_" + peerId.toString()).makeOrGet(); + this.protectedEntryMap = db.createTreeMap("protectedEntryMap_" + peerId.toString()).makeOrGet(); + this.responsibilityMap = db.createTreeMap("responsibilityMap_" + peerId.toString()).makeOrGet(); + this.responsibilityMapRev = db.createTreeMap("responsibilityMapRev_" + peerId.toString()).makeOrGet(); + this.storageCheckIntervalMillis = storageCheckIntervalMillis; + } + + //set parameter to a reasonable default + public StorageMapDB(Number160 peerId, File path, SignatureFactory signatureFactory) { + this(DBMaker.newFileDB(new File(path, "coreDB.db")).transactionDisable().closeOnJvmShutdown().make(), + peerId, path, signatureFactory, 60 * 1000); + } + + @Override + public Data put(Number640 key, Data value) { + Data oldData = dataMap.put(key, value); + db.commit(); + return oldData; + } + + @Override + public Data get(Number640 key) { + return dataMap.get(key); + } + + @Override + public boolean contains(Number640 key) { + return dataMap.containsKey(key); + } + + @Override + public int contains(Number640 from, Number640 to) { + NavigableMap tmp = dataMap.subMap(from, true, to, true); + return tmp.size(); + } + + @Override + public Data remove(Number640 key, boolean returnData) { + Data retVal = dataMap.remove(key); + db.commit(); + return retVal; + } + + @Override + public NavigableMap remove(Number640 from, Number640 to) { + NavigableMap tmp = dataMap.subMap(from, true, to, true); + + // new TreeMap(tmp); is not possible as this may lead to no such element exception: + // + // java.util.NoSuchElementException: null + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60] + // at java.util.TreeMap.(TreeMap.java:195) ~[na:1.7.0_60] + // at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na] + // + // the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes, + // then you will call next() that returns null and an exception is thrown. + final NavigableMap retVal = new TreeMap(); + for(final Map.Entry entry:tmp.entrySet()) { + retVal.put(entry.getKey(), entry.getValue()); + } + + tmp.clear(); + db.commit(); + return retVal; + } + + @Override + public NavigableMap subMap(Number640 from, Number640 to, int limit, boolean ascending) { + NavigableMap tmp = dataMap.subMap(from, true, to, true); + final NavigableMap retVal = new TreeMap(); + if (limit < 0) { + + // new TreeMap(tmp); is not possible as this may lead to no such element exception: + // + // java.util.NoSuchElementException: null + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60] + // at java.util.TreeMap.(TreeMap.java:195) ~[na:1.7.0_60] + // at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na] + // + // the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes, + // then you will call next() that returns null and an exception is thrown. + + for(final Map.Entry entry:(ascending ? tmp : tmp.descendingMap()).entrySet()) { + retVal.put(entry.getKey(), entry.getValue()); + } + } else { + limit = Math.min(limit, tmp.size()); + Iterator> iterator = ascending ? tmp.entrySet().iterator() : tmp + .descendingMap().entrySet().iterator(); + for (int i = 0; iterator.hasNext() && i < limit; i++) { + Map.Entry entry = iterator.next(); + retVal.put(entry.getKey(), entry.getValue()); + } + } + return retVal; + } + + @Override + public NavigableMap map() { + + // new TreeMap(dataMap); is not possible as this may lead to no such element exception: + // + // java.util.NoSuchElementException: null + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapIter.advance(ConcurrentSkipListMap.java:3030) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3100) ~[na:1.7.0_60] + // at java.util.concurrent.ConcurrentSkipListMap$SubMap$SubMapEntryIterator.next(ConcurrentSkipListMap.java:3096) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2394) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2418) ~[na:1.7.0_60] + // at java.util.TreeMap.buildFromSorted(TreeMap.java:2344) ~[na:1.7.0_60] + // at java.util.TreeMap.(TreeMap.java:195) ~[na:1.7.0_60] + // at net.tomp2p.dht.StorageMemory.subMap(StorageMemory.java:119) ~[classes/:na] + // + // the reason is that the size in TreeMap.buildFromSorted is stored beforehand, then iteratated. If the size changes, + // then you will call next() that returns null and an exception is thrown. + final NavigableMap retVal = new TreeMap(); + for(final Map.Entry entry:dataMap.entrySet()) { + retVal.put(entry.getKey(), entry.getValue()); + } + + return retVal; + } + + // Maintenance + @Override + public void addTimeout(Number640 key, long expiration) { + Long oldExpiration = timeoutMap.put(key, expiration); + putIfAbsent2(expiration, key); + if (oldExpiration == null) { + return; + } + removeRevTimeout(key, oldExpiration); + db.commit(); + } + + private void putIfAbsent2(long expiration, Number640 key) { + Set timeouts = timeoutMapRev.get(expiration); + if(timeouts == null) { + timeouts = Collections.newSetFromMap(new ConcurrentHashMap()); + } + timeouts.add(key); + timeoutMapRev.put(expiration, timeouts); + } + + @Override + public void removeTimeout(Number640 key) { + Long expiration = timeoutMap.remove(key); + if (expiration == null) { + return; + } + removeRevTimeout(key, expiration); + db.commit(); + } + + private void removeRevTimeout(Number640 key, Long expiration) { + Set tmp = timeoutMapRev.get(expiration); + if (tmp != null) { + tmp.remove(key); + if (tmp.isEmpty()) { + timeoutMapRev.remove(expiration); + } else { + timeoutMapRev.put(expiration, tmp); + } + } + } + + @Override + public Collection subMapTimeout(long to) { + SortedMap> tmp = timeoutMapRev.subMap(0L, to); + Collection toRemove = new ArrayList(); + for (Set set : tmp.values()) { + toRemove.addAll(set); + } + return toRemove; + } + + + + // Responsibility + @Override + public Number160 findPeerIDsForResponsibleContent(Number160 locationKey) { + return responsibilityMap.get(locationKey); + } + + @Override + public Collection findContentForResponsiblePeerID(Number160 peerID) { + return responsibilityMapRev.get(peerID); + } + + @Override + public boolean updateResponsibilities(Number160 locationKey, Number160 peerId) { + final Number160 oldPeerID = responsibilityMap.put(locationKey, peerId); + final boolean hasChanged; + if(oldPeerID != null) { + if(oldPeerID.equals(peerId)) { + hasChanged = false; + } else { + removeRevResponsibility(oldPeerID, locationKey); + hasChanged = true; + } + } else { + hasChanged = true; + } + Set contentIDs = responsibilityMapRev.get(peerId); + if(contentIDs == null) { + contentIDs = new HashSet(); + } + contentIDs.add(locationKey); + responsibilityMapRev.put(peerId, contentIDs); + db.commit(); + return hasChanged; + } + + @Override + public void removeResponsibility(Number160 locationKey) { + final Number160 peerId = responsibilityMap.remove(locationKey); + if(peerId != null) { + removeRevResponsibility(peerId, locationKey); + } + db.commit(); + } + + private void removeRevResponsibility(Number160 peerId, Number160 locationKey) { + Set contentIDs = responsibilityMapRev.get(peerId); + if (contentIDs != null) { + contentIDs.remove(locationKey); + if (contentIDs.isEmpty()) { + responsibilityMapRev.remove(peerId); + } else { + responsibilityMapRev.put(peerId, contentIDs); + } + } + } + + // Misc + @Override + public void close() { + db.close(); + } + + // Protection Domain + @Override + public boolean protectDomain(Number320 key, PublicKey publicKey) { + protectedDomainMap.put(key, publicKey); + return true; + } + + @Override + public boolean isDomainProtectedByOthers(Number320 key, PublicKey publicKey) { + PublicKey other = protectedDomainMap.get(key); + if (other == null) { + return false; + } + return !other.equals(publicKey); + } + + // Protection Entry + @Override + public boolean protectEntry(Number480 key, PublicKey publicKey) { + protectedEntryMap.put(key, publicKey); + return true; + } + + @Override + public boolean isEntryProtectedByOthers(Number480 key, PublicKey publicKey) { + PublicKey other = protectedEntryMap.get(key); + if (other == null) { + return false; + } + return !other.equals(publicKey); + } + + @Override + public int storageCheckIntervalMillis() { + return storageCheckIntervalMillis; + } + +}