FreePastry/html/rawtutorial/tut_layers.html

918 lines
50 KiB
HTML
Raw Normal View History

2019-05-13 16:45:05 +04:00
<html><head>
<title>FreePastry Tutorial</title>
<link rel="stylesheet" href="tutorial.css" />
</head>
<body>
<div class="content">
<div class="frontmatter">
<h1>The FreePastry Tutorial.</h1>
<div class="abstract">This tutorial is designed to get you cooking quickly with the FreePastry
API and software toolkit.</div>
<h4>Version @tutorial_version@; @tutorial_date@. For <a
href="http://freepastry.org/">FreePastry</a> version @freepastry_version@. Maintained by @maintainer@.</h4>
</div>
<a name="layer"><h1>Transport Layers</h1></a>
<h2>Modify low level details of FreePastry's network stack.</h2>
<div class="nav">
<span class="nav-left"><a href="tut_forward.html#forward">Previous (Forwarding)</a></span>
<span class="nav-center"><a href="index.html">Contents</a></span>
<span class="nav-right"><a href="tut_cancellable_msg.html">Next (Cancellable Messages)</a></span>
</div><br/><hr/>
<p/>This tutorial will show you how to use the new TransportLayer interface in the package org.mpisws.p2p.transport. It will also show you how to integrate your TransportLayer to the SocketPastryNodeFactory network stack.
<p/>Outline of this tutorial:
<ul>
<li><a href="#overview">Overview</a> of the how the transport layers work together.</li>
<li><a href="#design">Design</a> of our example transport layer.</li>
<li>Overriding <a href="#message">message</a> behavior.
<ul>
<li>Constructing a <a href="#handle">MessageRequestHandle</a>.</li>
<li>Implementing the <a href="#m_callback">MessageCallback</a> to notify the user of success/failure.</li>
</ul></li>
<li>Overriding <a href="#socket">socket</a> behavior.
<ul>
<li>Override <a href="#register">registration</a> of read/write</li>
<li>Intercept <a href="#open">openSocket</a>/<a href="#incoming">incomingSocket</a></li>
</ul></li>
<li>finishing:Put the <a href="#finishing">final touches</a> on the transport layer and <a href="#integration">integrate it into the SocketPastryNodeFactory</a></li>
<li><a href="#run">Execute the code</a> in an example.</li>
</ul>
<p/><b>Warning:</b> This tutorial is more difficult than the previous tutorials and is intended for developers who wish to make low-level changes to the FreePastry network stack.
<a name="overview"/><h2>Overview</h2>
<p/>We will start by stating some of the design goals of the the <i>Layered Transport Layer</i>.
<p/>Versions of FreePastry before 2.1 had a unified transport layer. When features at this level needed to be added or modified, it was very complicated to get all of the parts to work properly. The new version rearranges network transport into a stack of layers where each has it's own small task.
<p/>To give you an idea of a typical layer's task, we will describe the layers that are assembled when creating the SocketPastryNodeFactory in FreePastry 2.1, beginning with the lowest layer:
<h3>SocketPastryNodeFactory's Layers:</h3>
<ul>
<li><b>Wire</b> -- Opens/Accepts Sockets, Sends/Receives Datagrams</li>
<li><b>MagicNumber</b> -- Throws away sockets/datagrams for other applications such as HTTP (if it doesn't match the application specific magic number)</li>
<li><b>LimitSockets</b> -- Prevents exaustion of FileDescripters by closing the Least Recently Used socket.</li>
<li><b>MultiInetAddressTransportLayer</b> -- Handles multi-homing (ex, when using a NAT a node may have more than 1 address:port, the internal address, and the NAT's external address that is forwarded)</li>
<li><b>SourceRoute</b> -- sends messages/opens sockets along a source route to route around temporary routing anomalies. This layer manages both the endpoints and the intermediate nodes. Note that this layer does <i>not</i> determine the optimal route to an end host, that is done by another layer, the <i>SourceRouteManager</i>.</li>
<li><b>LowerIdentity</b> -- This layer, in conjunction with the <i>UpperIdenity</i> maintains the "intention" of the sent/received message. For example, if a node has restarted with a different NodeId this layer drops pings/sockets if they were intended for the previous node at this address.</li>
<li><b>Liveness</b> -- Pings nodes to determine liveness/proximity, implements 2 new interfaces: <i>LivenessProvider, Pinger</i> </li>
<li><b>SourceRouteManager</b> -- Chooses the appropriate SourceRoute based on the liveness/proximity exported by the lower layer, implements another new interface <i>ProximityProvider</i></li>
<li><b>Priority</b> -- Uses a single TCP socket to send messages. Can select the order of the messages based on the priority.</li>
<li><b>UpperIdentity</b> -- (See Lower Identity) This layer keeps track of the intended destination of the message so that the lower layer can properly encode that intention.</li>
<li><b>CommonAPI</b> -- Serializes/Deserializes messages from a RawMessage to a ByteBuffer.</li>
</ul>
<p/>Note:
<ul>
<li>Some of these layers depend on lower layers and don't make much sense by themself, ex: <b>SourceRoute/SourceRotueManager</b>, <b>Lower/Upper Identity</b> layers.</li>
<li>Some layers could be re-arranged. For example the LimitSockets layer could probably go anywhere in the stack, yet it seems to make the most sense where it is.</li>
<li>Some layers read/write bytes in the protocol (MagicNumber, SourceRoute, Liveness) while others only make logical decisions (LimitSockets, SourceRouteManager).</li>
</ul>
<h3>Other interesting layers:</h3>
<ul>
<li><b>DirectTransportLayer</b> -- This implements the discreet event simulator, and is used by the DirectPastryNodeFactory.</li>
</ul>
<h3>Upcoming layers:</h3>
<ul>
<li><b>SSL</b> -- Provides Crypto/Authentication (typically goes above the SourceRoute layer to provide end-to-end crypto/auth)</li>
<li><b>BandwidthLimiting</b> -- Limits the Bandwidth of a node (typically goes between MagicNumber/SourceRoute Layers)</li>
<li><b>PeerReview</b> -- Provides protocol accountability (typically goes near the top, such as between CommonAPI/Priority layers)</li>
<li><b>STUN</b> -- would likely replace the Wire Layer and provide NAT hole-punching</li>
</ul>
<p/>Note: The TranportLayer uses a Java language feature that was introduced in Java 1.5, generics. For more informaiton on Generics, see this <a href="http://java.sun.com/j2se/1.5/pdf/generics-tutorial.pdf">this tutorial</a>.
<h3>The TransportLayer interface:</h3>
<pre>
public interface TransportLayer&lt;Identifier, MessageType&gt; extends Destructable {}
</pre>
<p/>Each transport layer operates on an <i>Identifier</i> (<b>InetSocketAddress</b>, <b>SourceRoute</b>, <b>NodeHandle</b> etc.), and a <i>MessageType</i> (<b>ByteBuffer</b>, <b>RawMessage</b> etc.)
<p/>The most important operations are <i>sending messages</i> and <i>opening sockets</i>:
<pre>
public MessageRequestHandle&lt;Identifier, MessageType&gt; sendMessage(
Identifier i,
MessageType m,
MessageCallback&lt;Identifier, MessageType&gt; deliverAckToMe,
Map&lt;String, Object&gt; options);
public SocketRequestHandle&lt;Identifier&gt; openSocket(
Identifier i,
SocketCallback&lt;Identifier&gt; deliverSocketToMe,
Map&lt;String, Object&gt; options);
</pre>
<p/>For these methods, you need the <b>Identifier</b> of the remote node, and (for <code>sendMessage()</code>) the <b>Message</b> to be delivered. Additionally, you may specify some transport-layer specific <b>Options</b> such as <i>Guaranteed</i>/<i>Unguaranteed</i>/<i>Encrypted</i> etc. We will describe some of these options in a later tutorial. Finally you provide a <b>Callback</b> (<code>deliverAckToMe</code>/<code>deliverSocketToMe</code>) to deliver notificaiton of success or failure when the operation completes. These calls are non-blocking and return a <b>RequestHandle</b>. The RequestHandle is like a receipt or a tracking number. You can use the RequestHandle to cancel the existing request if it is no longer necessary. For example if the operaiton takes too long. The <a href="tut_cancellable_msg.html">next tutorial</a> shows you how to work with a RequestHandle.
<a name="callback"><h3>The TransportLayerCallback interface:</h3></a>
<p/>The <code>TransportLayerCallback</code> provides the inverse operations (the result of a remote node sending a message or opening a socket) and must have the identical <i>Identifier</i>/<i>MessageType</i>:
<pre>
public interface TransportLayerCallback&lt;Identifier, MessageType&gt; {
public void messageReceived(Identifier i, MessageType m, Map&lt;String, Object&gt; options) throws IOException;
public void incomingSocket(P2PSocket&lt;Identifier&gt; s) throws IOException;
}
</pre>
<p/>The P2PSocket is similar to the <a href="tut_app_sockets.html">AppSocket</a>. It is non-blocking and you must register each time you use it as explained in the AppSocket tutorial. <i>We recommend taking the AppSocket tutorial before porceeding.</i>
<p/>Here are the calls for the P2PSocket:
<pre>
void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiver<Identifier> receiver);
long read(ByteBuffer dsts) throws IOException;
long write(ByteBuffer srcs) throws IOException;
void shutdownOutput();
void close();
Identifier getIdentifier();
Map<String, Object> getOptions();
</pre>
<h3>Other calls in the TransportLayer:</h3>
<p/>This method returns the Identifier of the local node for this layer.
<pre>
public Identifier getLocalIdentifier();
</pre>
<p/>These methods can control flow by rejecting new messags/sockets if the local node is being overwhelmed.
<pre>
public void acceptSockets(boolean b);
public void acceptMessages(boolean b);
</pre>
<p/>This method sets the <a href="#callback">callback</a>
<pre>
public void setCallback(TransportLayerCallback&lt;Identifier, MessageType&gt; callback);
</pre>
<p/>This method sets the <code>ErrorHandler</code> which is usd for notification of unexpected behavior. Ex: The acceptor socket closes, or an unexpected message arrives
<pre>
public void setErrorHandler(ErrorHandler&lt;Identifier&gt; handler);
</pre>
<p/>This method cleans up the layer, in case you wish to shut down the transport layer w/o exiting the jvm. For example, it will close the ServerSocket that accepts new sockets, as well as clean up any memory.
<pre>
public void destroy();
</pre>
<h2>Your first new TransportLayer</h2>
<a name="design"/><h3>Design:</h3>
<ul>
<li>In this tutorial, we will create a new tranport layer that caps the peak outgoing bandwidth. We will use a bucket sysetem with configurable bandwidth and bucket time limit. For example if we want to limit bandwidth to 10K/second, we can allow 10K for any second, or 1K for 1/10th of second.</li>
<li>For simplicity, we won't distinguish between socket and datagram traffic.</li>
<li>The obvious place for this layer will be just above the Wire layer, but to provide maximum flexibility, we would like this layer to work with any <i>Identifier</i>. Thus we will keep the Identifier parameter <i>generic</i>.</li>
<li>Because of the nature of the layer, we must specify a message type that has a size. In this case, we'll use the <code>ByteBuffer</code> as our Message type.</li>
<li>To insert our new layer between 2 existing layers, we also need to implement a <code>TransportLayerCallback</code> with the same generic parameters so we can insert ourself between two existing layers.</li>
</ul>
<h3>Download the tutorial files:
<a href="./src/transportlayer/BandwidthLimitingTransportLayer.java">BandwidthLimitingTransportLayer.java</a>
<a href="./src/transportlayer/NotEnoughBandwidthException.java">NotEnoughBandwidthException.java</a>
<a href="./src/transportlayer/DistTutorial.java">DistTutorial.java</a>
<a href="./src/transportlayer/MyApp.java">MyApp.java</a>,
<a href="./src/transportlayer/MyMsg.java">MyMsg.java</a> into a directory called rice/tutorial/transportlayer/.</h3>
<p/>Here is the definition of our new class.
<pre>
public class BandwidthLimitingTransportLayer&lt;Identifier&gt; implements
TransportLayer&lt;Identifier, ByteBuffer&gt;,
TransportLayerCallback&lt;Identifier, ByteBuffer&gt; {
}
</pre>
<p/>Here is the constructor. It takes the <code>TransportLayer</code> immeadately below this layer, the bucket size, and bucket time length. We also need access to the environment so we can create a logger.
<pre>
public BandwidthLimitingTransportLayer(
TransportLayer&lt;Identifier, ByteBuffer&gt; tl,
long bucketSize, int bucketTimelimit,
Environment env) {
this.environment = env;
this.tl = tl;
BUCKET_SIZE = bucketSize;
BUCKET_TIME_LIMIT = bucketTimelimit;
logger = env.getLogManager().getLogger(BandwidthLimitingTransportLayer.class, null);
tl.setCallback(this);
}
</pre>
<p/>You can look at the code to see the declaration of these fields. The last thing we have to do is set ourself as the lower level's callback. This will cause it to deliver messages/sockets to us.
<p/>This variable is the bucket.
<pre>
/**
* When this goes to zero, don't send messages
*/
protected long bucket;
</pre>
<p/>Now let's create a task to refil the bucket.
<pre>
environment.getSelectorManager().getTimer().schedule(new TimerTask(){
@Override
public void run() {
// always synchronize on "this" before modifying the bucket
synchronized(this) {
bucket = BUCKET_SIZE;
}
}
}, 0, BUCKET_TIME_LIMIT);
</pre>
<p/>If the TimerTask is unfamiliar, please review <a href="tut_timertask.html#timer">the timer tutorial</a>. This interface is slightly different in that it calls <code>run()</code>, rather than sending a <code>MessageToSelf</code>, but it is the same idea.
<a name="message"/><h2>Limit Message Bandwidth</h2>
When the user requests to send the message, it may be buffered or even fail. The transport layer will notify the user of success or failure sending the message. Furthermore the user can cancel the operation if it hasn't completed. Let's examine the sendMessage call.
<pre>
public MessageRequestHandle&lt;Identifier, ByteBuffer&gt; sendMessage(
Identifier i,
ByteBuffer m,
MessageCallback&lt;Identifier, ByteBuffer&gt; deliverAckToMe,
Map&lt;String, Object&gt; options) {
}
</pre>
<p/>This method will attempt to send the message <b>m</b> to the Identifier <b>i</b>. It will notify the <code>MessageCallback</code> <b>deliverAckToMe</b> of success or failure. And it will return a <b>MessageRequestHandle</b> that can be used to cancel the operation. Furthermore it takes <b>options</b> which could be used, for example, to suppress the bandwidth limitation. However, we won't use the options now, we will merely pass them on to the next layer.
<p/>Now let's limit the outgoing message bandwidth. We are going to throw a <code>NotEnoughBandwidthException</code> if there isn't sufficient bandwidth to send a message.
<p/>There are three things we must do here.
<ul>
<li>Subtract from the bucket or throw the exception.</li>
<li>Return a proper Message receipt so the task can be cancelled.</li>
<li>Acknowledge the message when the lower transport layer acknowledges it to us.</li>
</ul>
<p/>This shows the code to subtract from the bucket or throw the exception. For now, we retun null. All we have to do is call <code>deliverAckToMe.sendFailed()</code> when there isn't enough bandwidth. Note: The <code>MessageCallback</code> may be null, so we must check that it isn't before calling sendFailed() otherwise we will get a NullPointerException. Also, so we can detect that this is occuring when we run the code, we will log when the message is dropped.
<pre>
public MessageRequestHandle&lt;Identifier, ByteBuffer&gt; sendMessage(
Identifier i,
ByteBuffer m,
MessageCallback&lt;Identifier, ByteBuffer&gt; deliverAckToMe,
Map&lt;String, Object&gt; options) {
boolean success = true;
synchronized(this) {
if (m.remaining() > bucket) {
success = false;
} else {
bucket-=m.remaining();
}
}
if (!success) {
if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
if (deliverAckToMe != null)
deliverAckToMe.sendFailed(null, new NotEnoughBandwidthException(bucket, m.remaining()));
return null;
}
tl.sendMessage(i,m,deliverAckToMe,options);
return null;
}
</pre>
<a name="handle"/><h3>MessageRequestHandle</h3>
<p/>Now we will add code to return a proper <code>MessageRequestHandle</code>. The purpose of this receipt is that it allows the user to cancel the message being sent, but it is also used when we notify the user's code of success or failure. This way if the user sends the same message a few times, the MessageRequestHandle can be used to disambiguate which instance was successful/failed.
<p/>The release already includes a generic implementation of the MessageRequestHandle: org.mpisws.p2p.transport.util.MessageRequestHandleImpl. Let's take a look at it:
<p/>The constructor initializes these 3 member variables:
<pre>
Identifier identifier;
MessageType msg;
Map&lt;String, Object&gt; options;
</pre>
<p/>There are also corresponding getters. However, we still need to be able to <code>cancel()</code> the operation in the next transport layer. This requires the 4th member variable:
<pre>
Cancellable subCancellable;
public boolean cancel() {
if (subCancellable == null) return false;
return subCancellable.cancel();
}
</pre>
<p/><code>subCancellable</code> is initialized with a call to <code>setSubCancellable()</code>.
<p/>Here is the code that now properly returns a <code>MessageRequestHandle</code>:
<pre>
public MessageRequestHandle&lt;Identifier, ByteBuffer&gt; sendMessage(Identifier i, ByteBuffer m,
MessageCallback&lt;Identifier, ByteBuffer&gt; deliverAckToMe, Map&lt;String, Object&gt; options) {
MessageRequestHandleImpl&lt;Identifier, ByteBuffer> returnMe =
new MessageRequestHandleImpl&lt;Identifier, ByteBuffer&gt;(i, m, options);
boolean success = true;
synchronized(this) {
if (m.remaining() > bucket) {
success = false;
} else {
bucket-=m.remaining();
}
}
if (!success) {
if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
if (deliverAckToMe != null)
deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining()));
return returnMe;
}
returnMe.setSubCancellable(tl.sendMessage(i,m,deliverAckToMe,options));
return returnMe;
}
</pre>
<p/>Note how we call <code>returnMe.setSubCancellable()</code> with the call to the lower transportLayer.
<a name="m_callback"><h3>MessageCallback</h3>
<p/>There is one more problem in the code. Because we simply pass through the MessageCallback <code>deliverAckToMe</code>, when <code>deliverAckToMe.ack()</code> or <code>deliverAckToMe.sendFailed()</code> is called, it will have the wrong MessageRequestHandle, because the lower transport layer will notify it of success or failure. Thus, we need to create our own MessageRequestHandle which wraps <code>deliverAckToMe</code>. We'll use an Anonymous Inner Class of the MessageCallback interface.
<p/>First, we need to declare <code>deliverAckToMe</code> and <code>returnMe</code> <i>final</i>.
<p/>Second, we will create an anonymous inner class of the <code>MessageCallback</code>.
<pre>
public MessageRequestHandle&lt;Identifier, ByteBuffer&gt; sendMessage(Identifier i, ByteBuffer m,
final MessageCallback&lt;Identifier, ByteBuffer&gt; deliverAckToMe, Map&lt;String, Object&gt; options) {
final MessageRequestHandleImpl&lt;Identifier, ByteBuffer&gt; returnMe =
new MessageRequestHandleImpl&lt;Identifier, ByteBuffer&gt;(i, m, options);
...
returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback&lt;Identifier, ByteBuffer&gt;() {
public void ack(MessageRequestHandle&lt;Identifier, ByteBuffer&gt; msg) {
if (deliverAckToMe != null) deliverAckToMe.ack(returnMe);
}
public void sendFailed(MessageRequestHandle&lt;Identifier, ByteBuffer&gt; msg, IOException reason) {
if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason);
}
},options));
return returnMe;
}
</pre>
<p/>Our <code>MessageCallback</code> simply calls <b>deliverAckToMe</b>'s <code>ack()/sendFailed()</code> methods with <b>returnMe</b>.
<p/>Here is the full code for the method:
<pre>
public MessageRequestHandle&lt;Identifier, ByteBuffer&gt; sendMessage(Identifier i, ByteBuffer m,
final MessageCallback&lt;Identifier, ByteBuffer&gt; deliverAckToMe, Map&lt;String, Object&gt; options) {
final MessageRequestHandleImpl&lt;Identifier, ByteBuffer&gt; returnMe =
new MessageRequestHandleImpl&lt;Identifier, ByteBuffer&gt;(i, m, options);
boolean success = true;
synchronized(this) {
if (m.remaining() > bucket) {
success = false;
} else {
bucket-=m.remaining();
}
}
if (!success) {
if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining()));
return returnMe;
}
returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback&lt;Identifier, ByteBuffer&gt;() {
public void ack(MessageRequestHandle&lt;Identifier, ByteBuffer&gt; msg) {
if (deliverAckToMe != null) deliverAckToMe.ack(returnMe);
}
public void sendFailed(MessageRequestHandle&lt;Identifier, ByteBuffer&gt; msg, IOException reason) {
if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason);
}
},options));
return returnMe;
}
</pre>
<p/>This may seem a bit overwhelming right now, but this code provides a lot of flexibility for the transport layer while still keeping the calls relatively simple.
<a name="socket"><h2>Limit Socket Bandwidth</h2>
<p/>Now our layer limits Message bandwidth, but we must also make the Sockets respect the bandwith limitation. Rather than throwing an exception when we exceed the bandwidth, we just need to throttle the traffic, by only sending what we are allowed. To do this, we will create a <code>P2PSocket</code> that decrements the bucket on each write. Like the <code>MessageRequestHandleImpl</code>, we already have an implementation of a <code>P2PSocket</code> that wraps another <code>P2PSocket</code>. It is called the org.mpisws.p2p.transport.util.SocketWrapperSocket.
<p/>The generic parameters of the <code>SocketWrapperSocket</code> are the 2 kinds of Identifiers that it Adapts. Since we are importing and exporting the same Identifier, we declare our Socket like this:
<pre>
class BandwidthLimitingSocket extends SocketWrapperSocket&lt;Identifier, Identifier&gt; {
}
</pre>
<p/>Now we need to set up the constructor. We will call <code>super()</code> with the wrapped socket's identifier and options. Also, we must pass it a <code>logger</code>.
<pre>
public BandwidthLimitingSocket(P2PSocket&lt;Identifier&gt; socket) {
super(socket.getIdentifier(), socket,
BandwidthLimitingTransportLayer.this.logger,
socket.getOptions());
}
</pre>
<p/>Now we need to override this method:
<pre>
@Override
public long write(ByteBuffer srcs) throws IOException {}
</pre>
<p/>We can determine how much the user wants to write by calling <code>srcs.remaining()</code>. Then we must return how much was written, or -1 if the socket or outbound channel was closed. Also, note that when we read off bytes from srcs, we are changing the stae of srcs. It is critical that we make sure to manage the position of srcs properly. This way if we don't send the whole message, the user code can call write() again with the same message until it has sent the whole thing.
<p/>Here is the easy case, where we accept the <code>write()</code> and return the lower layer's result, or, we continue and log a message:
<pre>
if (srcs.remaining() <= bucket) {
long ret = super.write(srcs);
if (ret >= 0) {
// EOF is usually -1
synchronized(this) {
bucket -= ret;
}
}
return ret;
}
if (logger.level <= Logger.FINE) logger.log("Limiting "+socket+" to "+bucket+" bytes.");
</pre>
<p/>The rest is complicated, because it is important to leave the incoming ByteBuffer's position in the proper place.
<p/>We create a variable <code>ByteBuffer temp</code> with the amount of space left in the buffer:
<pre>
// The user is trying to write more than is allowed. To handle this we
// will copy the allowed amount into a temporary ByteBuffer and pass that
// to the next layer. It is critical set the proper position of scrs
// before returning. First, let's record the original position.
int originalPosition = srcs.position();
// now we create temp, who's size is "bucket"
ByteBuffer temp = ByteBuffer.wrap(srcs.array(), originalPosition, bucket);
</pre>
<p/>Now we try to write the data by calling super. This method may throw an IOException or return EOF. Fortunately, the <code>srcs</code> buffer is in its original position, so we don't need to do anything but return/throw exception:
<pre>
// try to write our temp buffer
long ret = super.write(temp);
if (ret < 0) {
// there was a problem,
// reset the position, return
return ret;
}
</pre>
<p/>If we made it here there were no problems. Allocate the bandwidth and update the position on the <code>srcs</code> ByteBuffer:
<pre>
// allocate the bandwidth
synchronized(this) {
bucket -= ret;
}
// we need to properly set the position
srcs.position(originalPosition+(int)ret);
return ret;
</pre>
<p/>Note that we updated the position with <code>ret</code>, rahter than <code>bucket</code>. This is because it is possible that the lower layer could only send a fraction of the message.
<a name="register"/><h3>Managing register()</h3>
<p/>Before the user can write, he has to register that he wants to write. Then we respond when the operaion can be performed without blocking.
<p/>If we naively pass the write request to the next layer we will cause an infinite loop when we run out of bandwidth. Here is the scenario:
<ul>
<li><b>Client:</b> socket.register() // request to write (passed through to the lower layer).</li>
<li><b>Lower layer:</b> receiver.receiveSelectResult() // granted request to write</li>
<li><b>Client:</b> socket.write()</li>
<li><b>BandwidthLimitingLayer:</b> limiting bandwidth to 0 bytes, write returns 0</li>
<li><i>repeat</i> -- The user code didn't make any progress, so obviously it will register again.</li>
</ul>
<p/>To remedy this situation, we need to do two things:
<ul>
<li>Intercept the call to <code>socket.register()</code></li>
<li>When more bandwidth is available, notify the registered sockets that they can write.</li>
</ul>
<p/>Let's make a member variable in <code>BandwidthLimitingSocket</code> to store the user's request to write:
<pre>
/**
* Store the write requestor. If this variable is not null
* it means that the storedWriter wants to write, but there
* wasn't enough bandwidth.
*/
P2PSocketReceiver&lt;Identifier&gt; storedWriter;
</pre>
<p/>Here is the code to intercept <code>socket.register()</code> in <code>BandwidthLimitingSocket</code>. If we are out of bandwidth and the user wants to write, then we cache the receiver in the <code>storedReceiver</code> variable.
<pre>
@Override
public void register(boolean wantToRead, boolean wantToWrite,
P2PSocketReceiver&lt;Identifier&gt; receiver) {
// this variable is what we will pass to super.register()
boolean canWrite = wantToWrite;
// if the user wants to write, and the bucket is empty, store the writer and set canWrite to false
if (wantToWrite == true && bucket == 0) {
canWrite = false;
storedWriter = receiver;
}
// only call super.register() if we have something to do
if (wantToRead || canWrite) super.register(wantToRead, canWrite, receiver);
}
</pre>
<p/>Now, our refill task needs to notify all BandwidthLimiting sockets when it refills, so they can notify the <code>storedReceiver</code> if they have one.
<p/>In BandwidthLimitingTransportLayer, we need to keep track of all of our <code>BandwidthLimitingSocket</code>s in a Collection called sockets. Whenever we create a <code>BandwidthLimitingSocket</code> we add it to sockets, and whenever we <code>shutdownOutput()</code> or <code>close()</code> the socket, we remove it. Here is the code.
<pre>
/**
* Keep track of all of the BandwidthLimitingSockets
*/
Collection&lt;BandwidthLimitingSocket&gt; sockets = new ArrayList&lt;BandwidthLimitingSocket&gt;();
class BandwidthLimitingSocket extends SocketWrapperSocket&lt;Identifier, Identifier&gt; {
public BandwidthLimitingSocket(P2PSocket&lt;Identifier&gt; socket) {
super(socket.getIdentifier(), socket,
BandwidthLimitingTransportLayer.this.logger, socket.getOptions());
synchronized(BandwidthLimitingTransportLayer.this) {
sockets.add(this);
}
}
public void close() {
super.close();
synchronized(BandwidthLimitingTransportLayer.this) {
sockets.remove(this);
}
}
public void shutdownOutput() {
super.shutdownOutput();
synchronized(BandwidthLimitingTransportLayer.this) {
sockets.remove(this);
}
}
</pre>
<p/>In BandwidthLimitingSocket, we need a method for the refill task to call. Note that we can't simply call <code>storedWriter.receiveSelectResult()</code>, all we can do now is notify the lower transport layer that we wish to write.
<pre>
/**
* Register and clear the storedWriter
*/
public void notifyBandwidthRefilled() {
if (storedWriter != null) {
P2PSocketReceiver&lt;Identifier&gt; temp = storedWriter;
storedWriter = null;
super.register(false, true, temp);
}
}
</pre>
<p/>Now, add this code to the refill task, so that it calls <code>notifyBandwidthRefilled()</code>. <i>This code was in the constructor for BandwidthLimitingTransportLayer.</i>
<pre>
for (BandwidthLimitingSocket s : sockets) {
s.notifyBandwidthRefilled();
}
</pre>
<p/>Phew, that was lot of work. Now that we have our <code>BandwidthLimitingSocket</code>, we need to use it.
<p/>There are two ways to get a socket.
<ul>
<li>When the upper layer opens a socket.</li>
<li>When the lower layer accepts a socket.</li>
</ul>
<a name="open"/><h3>openSocket():</h3>
<p/>Here is the code for <code>openSocket()</code>. We don't get the socket right away. We have to wait until it has completed opening. This is similar to a <a href="tut_continuations.html#lesson0a">continuation</a>. The first thing we have to do is create a <code>SocketRequestHandle</code> for the same reasons as we did in the above code with <code>MessageRequestHandle</code>.
<pre>
SocketRequestHandleImpl&lt;Identifier&gt; returnMe = new SocketRequestHandleImpl&lt;Identifier&gt;(i,options);
returnMe.setSubCancellable(tl.openSocket(i, ... , options));
return returnMe;
</pre>
<p/>Now, we ask the lower transport layer to open the socket, and then we will wrap it with our <code>BandwidthLimitingSocket</code> which we will return to <b>deliverSocketToMe</b>. If there is an Exception, we just pass it up to the previous layer. Note that <code>deliverSocketToMe</code> must be non-null, because it's not useful to request opening a socket if you don't receive a handle to it.
<pre>
tl.openSocket(i, new SocketCallback&lt;Identifier&gt;(){
public void receiveResult(SocketRequestHandle&lt;Identifier&gt; cancellable, P2PSocket&lt;Identifier&gt; sock) {
deliverSocketToMe.receiveResult(returnMe, new BandwidthLimitingSocket(sock));
}
public void receiveException(SocketRequestHandle&lt;Identifier&gt; s, IOException ex) {
deliverSocketToMe.receiveException(returnMe, ex);
}
}, options)
</pre>
<a name="incoming"/><h3>incomingSocket():</h3>
<p/>We have a variable for the lower transport layer: <code>tl</code>. But we don't yet have a member variable for the transport layer above us. To get this variable, we need to implement the <code>setCallback()</code> method in <code>TransportLayer</code> interface. We will name this transport layer <code>callback</code>.
<pre>
TransportLayerCallback&lt;Identifier, ByteBuffer&gt; callback;
public void setCallback(TransportLayerCallback&lt;Identifier, ByteBuffer&gt; callback) {
this.callback = callback;
}
</pre>
<p/>Now it is simple to override <code>incomingSocket()</code>
<pre>
public void incomingSocket(P2PSocket&lt;Identifier&gt; s) throws IOException {
callback.incomingSocket(new BandwidthLimitingSocket(s));
}
</pre>
<a name="finishing"/><h4>Finishing the Transprot Layer</h4>
<p/>The last step is to add all of the rest of the methods in TransportLayer and TransportLayerCallback. These are just going to forward the calls down or up as appropriate:
<pre>
public void acceptMessages(boolean b) {
tl.acceptMessages(b);
}
public void acceptSockets(boolean b) {
tl.acceptSockets(b);
}
public Identifier getLocalIdentifier() {
return tl.getLocalIdentifier();
}
public void setErrorHandler(ErrorHandler&lt;Identifier&gt; handler) {
tl.setErrorHandler(handler);
}
public void destroy() {
tl.destroy();
}
public void messageReceived(Identifier i, ByteBuffer m, Map&lt;String, Object&gt; options) throws IOException {
callback.messageReceived(i, m, options);
}
</pre>
<a name="integration"/><h2>Integration with the SocketPastryNodeFactory</h2>
<p/>Now we will add static methods on <code>BandwidthLimitingTransportLayer</code> that extended <code>SocketPastryNodeFactory</code> with our <code>BandwidthLimitingTransportLayer</code> in the right place. Where should we put this layer? We will show two options. This is where keeping our layer generic pays off.
<h4>Example A</h4>
<p/>Because we are at the Java level, we can't fully account for the TCP overhead of the bandwidth (retransmission etc). However to get the maximum possible effect, we should place it just above Wire. Here, we show how to do this by extending SocketPastryNodeFactory.
<p/>To construct each layer in the transport stack, <code>SocketPastryNodeFactory</code> executes a method called <code>get...TransportLayer()</code>. To insert a layer, simply override one of these calls and wrap the default layer with our new one.
<p/>Since we are going to put our layer right above the wire layer, we will override <code>getWireTransportLayer()</code>. We will first construct the default layer by calling <code>super.getWireTransportLayer()</code>. However, we will return our Bandwidth-Limiting layer which wraps the default wire layer.
<p/>Note that when we construct our <code>BandwidthLimitingTransportLayer</code>, we export <code>&lt;InetSocketAddress&gt;</code>.
<pre>
public static PastryNodeFactory exampleA(int bindport, Environment env,
NodeIdFactory nidFactory, final int amt, final int time) throws IOException {
// anonymously extend SPNF
PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) {
/**
* Override getWireTransportLayer to return the BandwidthLimitingTL wrapping
* the default wire implementation.
*/
@Override
protected TransportLayer&lt;InetSocketAddress, ByteBuffer&gt; getWireTransportLayer(
InetSocketAddress innermostAddress, TLPastryNode pn) throws IOException {
// get the default layer
TransportLayer&lt;InetSocketAddress, ByteBuffer&gt; wtl =
super.getWireTransportLayer(innermostAddress, pn);
// wrap it with our layer
return new BandwidthLimitingTransportLayer&lt;InetSocketAddress&gt;(
wtl, amt, time, pn.getEnvironment());
}
};
return factory;
}
</pre>
<p/>You can replace the construction of the SocketPastryNodeFactory at the beginning of any of the existing tutorials with this code to add the "bandwidth-limiting" feature.
<h4>Example B</h4>
<p/>Perhaps we don't want to include some of FreePastry's overhead in our bandwidth limits. We can remove the calculated overhead for liveness checks and from constructing source routes if we put the <code>BandwidthLimitingTransportLayer</code> above the <code>SourceRouteManager</code>. However, the <code>SourceRouteManager</code> also performs additional functions of providing <i>Liveness</i> and <i>Proximity</i>. Unfortunately our transport layer doesn't implement these additional interfaces.
<p/>In <code>exampleB()</code> we show how easy it is to replace only the <i>TransportLayer</i> functionality of the <code>SourceRouteManager</code> while still returning the existing <code>SourceRouteManager</code> for the <i>Liveness</i> and <i>Proximity</i> functionality. The returned object for <code>getSourceRouteManagerLayer()</code> is a <code>TransLivenessProximity&lt;MultiInetSocketAddress, ByteBuffer&gt;</code>. This is a very simple interface that returns 3 objects:
<pre>
protected interface TransLivenessProximity&lt;Identifier, MessageType&gt; {
TransportLayer&lt;Identifier, ByteBuffer&gt; getTransportLayer();
LivenessProvider&lt;Identifier&gt; getLivenessProvider();
ProximityProvider&lt;Identifier&gt; getProximityProvider();
}
</pre>
<p/>As in Example A, we need to create the default <code>SourceRouteManagerLayer</code> by calling <code>super.getSourceRouteManagerLayer()</code>. But we must also implement a <code>TransLivenessProximity</code> which returns our <code>BandwidthLimitingTrasnportLayer</code> for the <i>TransportLayer</i>, but the <code>SourceRouteManagerLayer</code> for the <i>LivenessProvider</i> and <i>ProximityProvider</i>.
<p/>Here is the code for <code>exampleB()</code>:
<pre>
public static PastryNodeFactory exampleB(int bindport, Environment env,
NodeIdFactory nidFactory, final int amt, final int time) throws IOException {
PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) {
@Override
protected TransLivenessProximity&lt;MultiInetSocketAddress, ByteBuffer&gt; getSourceRouteManagerLayer(
TransportLayer&lt;SourceRoute&lt;MultiInetSocketAddress&gt;, ByteBuffer&gt; ltl,
LivenessProvider&lt;SourceRoute&lt;MultiInetSocketAddress&gt;&gt; livenessProvider,
Pinger&lt;SourceRoute&lt;MultiInetSocketAddress&gt;&gt; pinger,
TLPastryNode pn,
MultiInetSocketAddress proxyAddress,
MultiAddressSourceRouteFactory esrFactory) throws IOException {
// get the default layer
final TransLivenessProximity&lt;MultiInetSocketAddress, ByteBuffer&gt; srm =
super.getSourceRouteManagerLayer(
ltl, livenessProvider, pinger, pn, proxyAddress, esrFactory);
// wrap the default layer with our layer
final BandwidthLimitingTransportLayer&lt;MultiInetSocketAddress&gt; bll =
new BandwidthLimitingTransportLayer&lt;MultiInetSocketAddress&gt;(
srm.getTransportLayer(), amt, time, pn.getEnvironment());
return new TransLivenessProximity&lt;MultiInetSocketAddress, ByteBuffer&gt;(){
public TransportLayer&lt;MultiInetSocketAddress, ByteBuffer&gt; getTransportLayer() {
return bll;
}
public LivenessProvider&lt;MultiInetSocketAddress&gt; getLivenessProvider() {
return srm.getLivenessProvider();
}
public ProximityProvider&lt;MultiInetSocketAddress&gt; getProximityProvider() {
return srm.getProximityProvider();
}
};
}
};
return factory;
}
</pre>
<p/>Note that <code>BandwidthLimitingTransportLayer</code> exports <code>&lt;MultiInetSocketAddress&gt;</code> this time.
<a name="run"/><h3>Running the code</h3>
<p/>We modified DistTutorial from <a href="">lesson4</a> to take some more parameters and call <code>BandwidthLimitingTransportLayer.exampleA()</code>.
<pre>
public DistTutorial(int bindport, InetSocketAddress bootaddress,
int numNodes, Environment env, int bandwidth) throws Exception {
...
// construct the PastryNodeFactory, this is how we use rice.pastry.socket
PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleA(
bindport, env, nidFactory, bandwidth, 1000);
// PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleB(
// bindport, env, nidFactory, bandwidth, 1000);
// PastryNodeFactory factory = new SocketPastryNodeFactory(
// nidFactory, bindport, env);
...
}
/**
* Usage:
* java [-cp FreePastry-&lt;version&gt;.jar] rice.tutorial.transportlayer.DistTutorial localbindport bootIP bootPort numNodes bandwidth;
* example java rice.tutorial.transportlayer.DistTutorial 9001 pokey.cs.almamater.edu 9001 10 1000
*/
public static void main(String[] args) throws Exception {
...
}
</pre>
<p/>We need to turn on FINE logging in our transport layer, and let's also disable ProximityNeighborSelection:
<pre>
// Disable PNS for our example
env.getParameters().setBoolean("transport_use_pns", false);
// enable logging on our new layer
env.getParameters().setInt(
"rice.tutorial.transportlayer.BandwidthLimitingTransportLayer_loglevel",
Logger.FINE);
</pre>
<p/>Now lets run the code, but we must provide the bandwidth. Let's try 10K/second:
<pre>
<span class="input">java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.transportlayer.DistTutorial 5009 10.9.8.7 5009 10 10000</span>
<span class="output">
Finished creating new node TLPastryNode[SNH: &lt;0x9492E7..&gt;/FOO/10.9.8.7:5009]
Finished creating new node TLPastryNode[SNH: &lt;0x44C3E7..&gt;/FOO/10.9.8.7:5010]
Finished creating new node TLPastryNode[SNH: &lt;0xBE1C77..&gt;/FOO/10.9.8.7:5011]
Finished creating new node TLPastryNode[SNH: &lt;0x4C94DA..&gt;/FOO/10.9.8.7:5012]
Finished creating new node TLPastryNode[SNH: &lt;0x3FAE2B..&gt;/FOO/10.9.8.7:5013]
Finished creating new node TLPastryNode[SNH: &lt;0xA5DBDC..&gt;/FOO/10.9.8.7:5014]
Finished creating new node TLPastryNode[SNH: &lt;0x99460E..&gt;/FOO/10.9.8.7:5015]
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632375:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2771 remote=/10.9.8.7:5014] to 315 bytes.
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: &lt;0xB5DAB7..&gt;/FOO/10.9.8.7:5016]
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634218:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:5017 remote=/10.9.8.7:2775] to 283 bytes.
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: &lt;0xC3183D..&gt;/FOO/10.9.8.7:5017]
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2786 remote=/10.9.8.7:5011] to 410 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2789 remote=/10.9.8.7:5015] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2788 remote=/10.9.8.7:5010] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: &lt;0x317CF2..&gt;/FOO/10.9.8.7:5018]
MyApp &lt;0x9492E7..&gt; sending to &lt;0x1CACFE..&gt;
MyApp &lt;0x317CF2..&gt; received MyMsg from &lt;0x9492E7..&gt; to &lt;0x1CACFE..&gt;
...
</span></pre>
Note the logging of
<pre>
Limiting SM XXX to XXX bytes.
</pre>
and
<pre>
Dropping message XXX because not enough bandwidth:XXX
</pre>
and
<pre>
0x34578E:org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl:1197900565238:/10.9.8.7:5014 rice.tutorial.transportlayer.NotEnoughBandwidthException
at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:152)
at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:1)
at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:226)
at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:1)
at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.sendMessage(LimitSocketsTransportLayer.java:220)
at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:283)
at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:1)
at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:413)
at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:1)
at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:321)
at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:1)
at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.messageReceived(LimitSocketsTransportLayer.java:224)
at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:251)
at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:1)
at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:317)
at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:1)
at org.mpisws.p2p.transport.wire.WireTransportLayerImpl.messageReceived(WireTransportLayerImpl.java:183)
at org.mpisws.p2p.transport.wire.UDPLayer.readHeader(UDPLayer.java:228)
at org.mpisws.p2p.transport.wire.UDPLayer.read(UDPLayer.java:192)
at rice.selector.SelectorManager.doSelections(SelectorManager.java:389)
at rice.selector.SelectorManager.run(SelectorManager.java:255)
</pre>
<p/>You can get rid of these exceptions by installing a proper errorHandler on the transport layers. The default one just prints the stack trace.
<p/>However, the code still runs successfully.
<h3>Congratulations! You have successfully extended FreePastry's transport layer.<br>
<hr/>
<div class="nav">
<span class="nav-left"><a href="tut_forward.html#forward">Previous (Forwarding)</a></span>
<span class="nav-center"><a href="index.html">Contents</a></span>
<span class="nav-right"><a href="tut_cancellable_msg.html">Next (Cancellable Messages)</a></span>
</div><br/>
<div class="footer">
Pastry tutorial version @tutorial_version@. &nbsp;&nbsp;&nbsp; Last updated @tutorial_date@.
&nbsp;&nbsp;&nbsp; For FreePastry @freepastry_version@. &nbsp;&nbsp;&nbsp; Maintained by @maintainer@.
</div>
</div>
</body>
</html>