The FreePastry Tutorial.
This tutorial is designed to get you cooking quickly with the FreePastry
API and software toolkit.
Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.
Transport Layers
Modify low level details of FreePastry's network stack.
This tutorial will show you how to use the new TransportLayer interface in the package org.mpisws.p2p.transport. It will also show you how to integrate your TransportLayer to the SocketPastryNodeFactory network stack. Outline of this tutorial:
- Overview of the how the transport layers work together.
- Design of our example transport layer.
- Overriding message behavior.
- Constructing a MessageRequestHandle.
- Implementing the MessageCallback to notify the user of success/failure.
- Overriding socket behavior.
- Override registration of read/write
- Intercept openSocket/incomingSocket
- finishing:Put the final touches on the transport layer and integrate it into the SocketPastryNodeFactory
- Execute the code in an example.
Overview
We will start by stating some of the design goals of the the Layered Transport Layer. Versions of FreePastry before 2.1 had a unified transport layer. When features at this level needed to be added or modified, it was very complicated to get all of the parts to work properly. The new version rearranges network transport into a stack of layers where each has it's own small task. To give you an idea of a typical layer's task, we will describe the layers that are assembled when creating the SocketPastryNodeFactory in FreePastry 2.1, beginning with the lowest layer:SocketPastryNodeFactory's Layers:
- Wire -- Opens/Accepts Sockets, Sends/Receives Datagrams
- MagicNumber -- Throws away sockets/datagrams for other applications such as HTTP (if it doesn't match the application specific magic number)
- LimitSockets -- Prevents exaustion of FileDescripters by closing the Least Recently Used socket.
- MultiInetAddressTransportLayer -- Handles multi-homing (ex, when using a NAT a node may have more than 1 address:port, the internal address, and the NAT's external address that is forwarded)
- SourceRoute -- sends messages/opens sockets along a source route to route around temporary routing anomalies. This layer manages both the endpoints and the intermediate nodes. Note that this layer does not determine the optimal route to an end host, that is done by another layer, the SourceRouteManager.
- LowerIdentity -- This layer, in conjunction with the UpperIdenity maintains the "intention" of the sent/received message. For example, if a node has restarted with a different NodeId this layer drops pings/sockets if they were intended for the previous node at this address.
- Liveness -- Pings nodes to determine liveness/proximity, implements 2 new interfaces: LivenessProvider, Pinger
- SourceRouteManager -- Chooses the appropriate SourceRoute based on the liveness/proximity exported by the lower layer, implements another new interface ProximityProvider
- Priority -- Uses a single TCP socket to send messages. Can select the order of the messages based on the priority.
- UpperIdentity -- (See Lower Identity) This layer keeps track of the intended destination of the message so that the lower layer can properly encode that intention.
- CommonAPI -- Serializes/Deserializes messages from a RawMessage to a ByteBuffer.
- Some of these layers depend on lower layers and don't make much sense by themself, ex: SourceRoute/SourceRotueManager, Lower/Upper Identity layers.
- Some layers could be re-arranged. For example the LimitSockets layer could probably go anywhere in the stack, yet it seems to make the most sense where it is.
- Some layers read/write bytes in the protocol (MagicNumber, SourceRoute, Liveness) while others only make logical decisions (LimitSockets, SourceRouteManager).
Other interesting layers:
- DirectTransportLayer -- This implements the discreet event simulator, and is used by the DirectPastryNodeFactory.
Upcoming layers:
- SSL -- Provides Crypto/Authentication (typically goes above the SourceRoute layer to provide end-to-end crypto/auth)
- BandwidthLimiting -- Limits the Bandwidth of a node (typically goes between MagicNumber/SourceRoute Layers)
- PeerReview -- Provides protocol accountability (typically goes near the top, such as between CommonAPI/Priority layers)
- STUN -- would likely replace the Wire Layer and provide NAT hole-punching
The TransportLayer interface:
public interface TransportLayer<Identifier, MessageType> extends Destructable {}Each transport layer operates on an Identifier (InetSocketAddress, SourceRoute, NodeHandle etc.), and a MessageType (ByteBuffer, RawMessage etc.) The most important operations are sending messages and opening sockets:
public MessageRequestHandle<Identifier, MessageType> sendMessage( Identifier i, MessageType m, MessageCallback<Identifier, MessageType> deliverAckToMe, Map<String, Object> options); public SocketRequestHandle<Identifier> openSocket( Identifier i, SocketCallback<Identifier> deliverSocketToMe, Map<String, Object> options);For these methods, you need the Identifier of the remote node, and (for
sendMessage()
) the Message to be delivered. Additionally, you may specify some transport-layer specific Options such as Guaranteed/Unguaranteed/Encrypted etc. We will describe some of these options in a later tutorial. Finally you provide a Callback (deliverAckToMe
/deliverSocketToMe
) to deliver notificaiton of success or failure when the operation completes. These calls are non-blocking and return a RequestHandle. The RequestHandle is like a receipt or a tracking number. You can use the RequestHandle to cancel the existing request if it is no longer necessary. For example if the operaiton takes too long. The next tutorial shows you how to work with a RequestHandle.
The TransportLayerCallback interface:
TheTransportLayerCallback
provides the inverse operations (the result of a remote node sending a message or opening a socket) and must have the identical Identifier/MessageType:
public interface TransportLayerCallback<Identifier, MessageType> { public void messageReceived(Identifier i, MessageType m, Map<String, Object> options) throws IOException; public void incomingSocket(P2PSocket<Identifier> s) throws IOException; }The P2PSocket is similar to the AppSocket. It is non-blocking and you must register each time you use it as explained in the AppSocket tutorial. We recommend taking the AppSocket tutorial before porceeding. Here are the calls for the P2PSocket:
void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiverreceiver); long read(ByteBuffer dsts) throws IOException; long write(ByteBuffer srcs) throws IOException; void shutdownOutput(); void close(); Identifier getIdentifier(); Map getOptions();
Other calls in the TransportLayer:
This method returns the Identifier of the local node for this layer.public Identifier getLocalIdentifier();These methods can control flow by rejecting new messags/sockets if the local node is being overwhelmed.
public void acceptSockets(boolean b); public void acceptMessages(boolean b);This method sets the callback
public void setCallback(TransportLayerCallback<Identifier, MessageType> callback);This method sets the
ErrorHandler
which is usd for notification of unexpected behavior. Ex: The acceptor socket closes, or an unexpected message arrives
public void setErrorHandler(ErrorHandler<Identifier> handler);This method cleans up the layer, in case you wish to shut down the transport layer w/o exiting the jvm. For example, it will close the ServerSocket that accepts new sockets, as well as clean up any memory.
public void destroy();
Your first new TransportLayer
Design:
- In this tutorial, we will create a new tranport layer that caps the peak outgoing bandwidth. We will use a bucket sysetem with configurable bandwidth and bucket time limit. For example if we want to limit bandwidth to 10K/second, we can allow 10K for any second, or 1K for 1/10th of second.
- For simplicity, we won't distinguish between socket and datagram traffic.
- The obvious place for this layer will be just above the Wire layer, but to provide maximum flexibility, we would like this layer to work with any Identifier. Thus we will keep the Identifier parameter generic.
- Because of the nature of the layer, we must specify a message type that has a size. In this case, we'll use the
ByteBuffer
as our Message type. - To insert our new layer between 2 existing layers, we also need to implement a
TransportLayerCallback
with the same generic parameters so we can insert ourself between two existing layers.
Download the tutorial files: BandwidthLimitingTransportLayer.java NotEnoughBandwidthException.java DistTutorial.java MyApp.java, MyMsg.java into a directory called rice/tutorial/transportlayer/.
Here is the definition of our new class.public class BandwidthLimitingTransportLayer<Identifier> implements TransportLayer<Identifier, ByteBuffer>, TransportLayerCallback<Identifier, ByteBuffer> { }Here is the constructor. It takes the
TransportLayer
immeadately below this layer, the bucket size, and bucket time length. We also need access to the environment so we can create a logger.
public BandwidthLimitingTransportLayer( TransportLayer<Identifier, ByteBuffer> tl, long bucketSize, int bucketTimelimit, Environment env) { this.environment = env; this.tl = tl; BUCKET_SIZE = bucketSize; BUCKET_TIME_LIMIT = bucketTimelimit; logger = env.getLogManager().getLogger(BandwidthLimitingTransportLayer.class, null); tl.setCallback(this); }You can look at the code to see the declaration of these fields. The last thing we have to do is set ourself as the lower level's callback. This will cause it to deliver messages/sockets to us. This variable is the bucket.
/** * When this goes to zero, don't send messages */ protected long bucket;Now let's create a task to refil the bucket.
environment.getSelectorManager().getTimer().schedule(new TimerTask(){ @Override public void run() { // always synchronize on "this" before modifying the bucket synchronized(this) { bucket = BUCKET_SIZE; } } }, 0, BUCKET_TIME_LIMIT);If the TimerTask is unfamiliar, please review the timer tutorial. This interface is slightly different in that it calls
run()
, rather than sending a MessageToSelf
, but it is the same idea.
Limit Message Bandwidth
When the user requests to send the message, it may be buffered or even fail. The transport layer will notify the user of success or failure sending the message. Furthermore the user can cancel the operation if it hasn't completed. Let's examine the sendMessage call.public MessageRequestHandle<Identifier, ByteBuffer> sendMessage( Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { }This method will attempt to send the message m to the Identifier i. It will notify the
MessageCallback
deliverAckToMe of success or failure. And it will return a MessageRequestHandle that can be used to cancel the operation. Furthermore it takes options which could be used, for example, to suppress the bandwidth limitation. However, we won't use the options now, we will merely pass them on to the next layer.
Now let's limit the outgoing message bandwidth. We are going to throw a NotEnoughBandwidthException
if there isn't sufficient bandwidth to send a message.
There are three things we must do here.
- Subtract from the bucket or throw the exception.
- Return a proper Message receipt so the task can be cancelled.
- Acknowledge the message when the lower transport layer acknowledges it to us.
deliverAckToMe.sendFailed()
when there isn't enough bandwidth. Note: The MessageCallback
may be null, so we must check that it isn't before calling sendFailed() otherwise we will get a NullPointerException. Also, so we can detect that this is occuring when we run the code, we will log when the message is dropped.
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage( Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(null, new NotEnoughBandwidthException(bucket, m.remaining())); return null; } tl.sendMessage(i,m,deliverAckToMe,options); return null; }
MessageRequestHandle
Now we will add code to return a properMessageRequestHandle
. The purpose of this receipt is that it allows the user to cancel the message being sent, but it is also used when we notify the user's code of success or failure. This way if the user sends the same message a few times, the MessageRequestHandle can be used to disambiguate which instance was successful/failed.
The release already includes a generic implementation of the MessageRequestHandle: org.mpisws.p2p.transport.util.MessageRequestHandleImpl. Let's take a look at it:
The constructor initializes these 3 member variables:
Identifier identifier; MessageType msg; Map<String, Object> options;There are also corresponding getters. However, we still need to be able to
cancel()
the operation in the next transport layer. This requires the 4th member variable:
Cancellable subCancellable; public boolean cancel() { if (subCancellable == null) return false; return subCancellable.cancel(); }
subCancellable
is initialized with a call to setSubCancellable()
.
Here is the code that now properly returns a MessageRequestHandle
:
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining())); return returnMe; } returnMe.setSubCancellable(tl.sendMessage(i,m,deliverAckToMe,options)); return returnMe; }Note how we call
returnMe.setSubCancellable()
with the call to the lower transportLayer.
MessageCallback
There is one more problem in the code. Because we simply pass through the MessageCallbackdeliverAckToMe
, when deliverAckToMe.ack()
or deliverAckToMe.sendFailed()
is called, it will have the wrong MessageRequestHandle, because the lower transport layer will notify it of success or failure. Thus, we need to create our own MessageRequestHandle which wraps deliverAckToMe
. We'll use an Anonymous Inner Class of the MessageCallback interface.
First, we need to declare deliverAckToMe
and returnMe
final.
Second, we will create an anonymous inner class of the MessageCallback
.
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); ... returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() { public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) { if (deliverAckToMe != null) deliverAckToMe.ack(returnMe); } public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) { if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason); } },options)); return returnMe; }Our
MessageCallback
simply calls deliverAckToMe's ack()/sendFailed()
methods with returnMe.
Here is the full code for the method:
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining())); return returnMe; } returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() { public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) { if (deliverAckToMe != null) deliverAckToMe.ack(returnMe); } public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) { if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason); } },options)); return returnMe; }This may seem a bit overwhelming right now, but this code provides a lot of flexibility for the transport layer while still keeping the calls relatively simple.
Limit Socket Bandwidth
Now our layer limits Message bandwidth, but we must also make the Sockets respect the bandwith limitation. Rather than throwing an exception when we exceed the bandwidth, we just need to throttle the traffic, by only sending what we are allowed. To do this, we will create aP2PSocket
that decrements the bucket on each write. Like the MessageRequestHandleImpl
, we already have an implementation of a P2PSocket
that wraps another P2PSocket
. It is called the org.mpisws.p2p.transport.util.SocketWrapperSocket.
The generic parameters of the SocketWrapperSocket
are the 2 kinds of Identifiers that it Adapts. Since we are importing and exporting the same Identifier, we declare our Socket like this:
class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> { }Now we need to set up the constructor. We will call
super()
with the wrapped socket's identifier and options. Also, we must pass it a logger
.
public BandwidthLimitingSocket(P2PSocket<Identifier> socket) { super(socket.getIdentifier(), socket, BandwidthLimitingTransportLayer.this.logger, socket.getOptions()); }Now we need to override this method:
@Override public long write(ByteBuffer srcs) throws IOException {}We can determine how much the user wants to write by calling
srcs.remaining()
. Then we must return how much was written, or -1 if the socket or outbound channel was closed. Also, note that when we read off bytes from srcs, we are changing the stae of srcs. It is critical that we make sure to manage the position of srcs properly. This way if we don't send the whole message, the user code can call write() again with the same message until it has sent the whole thing.
Here is the easy case, where we accept the write()
and return the lower layer's result, or, we continue and log a message:
if (srcs.remaining() <= bucket) { long ret = super.write(srcs); if (ret >= 0) { // EOF is usually -1 synchronized(this) { bucket -= ret; } } return ret; } if (logger.level <= Logger.FINE) logger.log("Limiting "+socket+" to "+bucket+" bytes.");The rest is complicated, because it is important to leave the incoming ByteBuffer's position in the proper place. We create a variable
ByteBuffer temp
with the amount of space left in the buffer:
// The user is trying to write more than is allowed. To handle this we // will copy the allowed amount into a temporary ByteBuffer and pass that // to the next layer. It is critical set the proper position of scrs // before returning. First, let's record the original position. int originalPosition = srcs.position(); // now we create temp, who's size is "bucket" ByteBuffer temp = ByteBuffer.wrap(srcs.array(), originalPosition, bucket);Now we try to write the data by calling super. This method may throw an IOException or return EOF. Fortunately, the
srcs
buffer is in its original position, so we don't need to do anything but return/throw exception:
// try to write our temp buffer long ret = super.write(temp); if (ret < 0) { // there was a problem, // reset the position, return return ret; }If we made it here there were no problems. Allocate the bandwidth and update the position on the
srcs
ByteBuffer:
// allocate the bandwidth synchronized(this) { bucket -= ret; } // we need to properly set the position srcs.position(originalPosition+(int)ret); return ret;Note that we updated the position with
ret
, rahter than bucket
. This is because it is possible that the lower layer could only send a fraction of the message.
Managing register()
Before the user can write, he has to register that he wants to write. Then we respond when the operaion can be performed without blocking. If we naively pass the write request to the next layer we will cause an infinite loop when we run out of bandwidth. Here is the scenario:- Client: socket.register() // request to write (passed through to the lower layer).
- Lower layer: receiver.receiveSelectResult() // granted request to write
- Client: socket.write()
- BandwidthLimitingLayer: limiting bandwidth to 0 bytes, write returns 0
- repeat -- The user code didn't make any progress, so obviously it will register again.
- Intercept the call to
socket.register()
- When more bandwidth is available, notify the registered sockets that they can write.
BandwidthLimitingSocket
to store the user's request to write:
/** * Store the write requestor. If this variable is not null * it means that the storedWriter wants to write, but there * wasn't enough bandwidth. */ P2PSocketReceiver<Identifier> storedWriter;Here is the code to intercept
socket.register()
in BandwidthLimitingSocket
. If we are out of bandwidth and the user wants to write, then we cache the receiver in the storedReceiver
variable.
@Override public void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiver<Identifier> receiver) { // this variable is what we will pass to super.register() boolean canWrite = wantToWrite; // if the user wants to write, and the bucket is empty, store the writer and set canWrite to false if (wantToWrite == true && bucket == 0) { canWrite = false; storedWriter = receiver; } // only call super.register() if we have something to do if (wantToRead || canWrite) super.register(wantToRead, canWrite, receiver); }Now, our refill task needs to notify all BandwidthLimiting sockets when it refills, so they can notify the
storedReceiver
if they have one.
In BandwidthLimitingTransportLayer, we need to keep track of all of our BandwidthLimitingSocket
s in a Collection called sockets. Whenever we create a BandwidthLimitingSocket
we add it to sockets, and whenever we shutdownOutput()
or close()
the socket, we remove it. Here is the code.
/** * Keep track of all of the BandwidthLimitingSockets */ Collection<BandwidthLimitingSocket> sockets = new ArrayList<BandwidthLimitingSocket>(); class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> { public BandwidthLimitingSocket(P2PSocket<Identifier> socket) { super(socket.getIdentifier(), socket, BandwidthLimitingTransportLayer.this.logger, socket.getOptions()); synchronized(BandwidthLimitingTransportLayer.this) { sockets.add(this); } } public void close() { super.close(); synchronized(BandwidthLimitingTransportLayer.this) { sockets.remove(this); } } public void shutdownOutput() { super.shutdownOutput(); synchronized(BandwidthLimitingTransportLayer.this) { sockets.remove(this); } }In BandwidthLimitingSocket, we need a method for the refill task to call. Note that we can't simply call
storedWriter.receiveSelectResult()
, all we can do now is notify the lower transport layer that we wish to write.
/** * Register and clear the storedWriter */ public void notifyBandwidthRefilled() { if (storedWriter != null) { P2PSocketReceiver<Identifier> temp = storedWriter; storedWriter = null; super.register(false, true, temp); } }Now, add this code to the refill task, so that it calls
notifyBandwidthRefilled()
. This code was in the constructor for BandwidthLimitingTransportLayer.
for (BandwidthLimitingSocket s : sockets) { s.notifyBandwidthRefilled(); }Phew, that was lot of work. Now that we have our
BandwidthLimitingSocket
, we need to use it.
There are two ways to get a socket.
- When the upper layer opens a socket.
- When the lower layer accepts a socket.
openSocket():
Here is the code foropenSocket()
. We don't get the socket right away. We have to wait until it has completed opening. This is similar to a continuation. The first thing we have to do is create a SocketRequestHandle
for the same reasons as we did in the above code with MessageRequestHandle
.
SocketRequestHandleImpl<Identifier> returnMe = new SocketRequestHandleImpl<Identifier>(i,options); returnMe.setSubCancellable(tl.openSocket(i, ... , options)); return returnMe;Now, we ask the lower transport layer to open the socket, and then we will wrap it with our
BandwidthLimitingSocket
which we will return to deliverSocketToMe. If there is an Exception, we just pass it up to the previous layer. Note that deliverSocketToMe
must be non-null, because it's not useful to request opening a socket if you don't receive a handle to it.
tl.openSocket(i, new SocketCallback<Identifier>(){ public void receiveResult(SocketRequestHandle<Identifier> cancellable, P2PSocket<Identifier> sock) { deliverSocketToMe.receiveResult(returnMe, new BandwidthLimitingSocket(sock)); } public void receiveException(SocketRequestHandle<Identifier> s, IOException ex) { deliverSocketToMe.receiveException(returnMe, ex); } }, options)
incomingSocket():
We have a variable for the lower transport layer:tl
. But we don't yet have a member variable for the transport layer above us. To get this variable, we need to implement the setCallback()
method in TransportLayer
interface. We will name this transport layer callback
.
TransportLayerCallback<Identifier, ByteBuffer> callback; public void setCallback(TransportLayerCallback<Identifier, ByteBuffer> callback) { this.callback = callback; }Now it is simple to override
incomingSocket()
public void incomingSocket(P2PSocket<Identifier> s) throws IOException { callback.incomingSocket(new BandwidthLimitingSocket(s)); }
Finishing the Transprot Layer
The last step is to add all of the rest of the methods in TransportLayer and TransportLayerCallback. These are just going to forward the calls down or up as appropriate:public void acceptMessages(boolean b) { tl.acceptMessages(b); } public void acceptSockets(boolean b) { tl.acceptSockets(b); } public Identifier getLocalIdentifier() { return tl.getLocalIdentifier(); } public void setErrorHandler(ErrorHandler<Identifier> handler) { tl.setErrorHandler(handler); } public void destroy() { tl.destroy(); } public void messageReceived(Identifier i, ByteBuffer m, Map<String, Object> options) throws IOException { callback.messageReceived(i, m, options); }
Integration with the SocketPastryNodeFactory
Now we will add static methods onBandwidthLimitingTransportLayer
that extended SocketPastryNodeFactory
with our BandwidthLimitingTransportLayer
in the right place. Where should we put this layer? We will show two options. This is where keeping our layer generic pays off.
Example A
Because we are at the Java level, we can't fully account for the TCP overhead of the bandwidth (retransmission etc). However to get the maximum possible effect, we should place it just above Wire. Here, we show how to do this by extending SocketPastryNodeFactory. To construct each layer in the transport stack,SocketPastryNodeFactory
executes a method called get...TransportLayer()
. To insert a layer, simply override one of these calls and wrap the default layer with our new one.
Since we are going to put our layer right above the wire layer, we will override getWireTransportLayer()
. We will first construct the default layer by calling super.getWireTransportLayer()
. However, we will return our Bandwidth-Limiting layer which wraps the default wire layer.
Note that when we construct our BandwidthLimitingTransportLayer
, we export <InetSocketAddress>
.
public static PastryNodeFactory exampleA(int bindport, Environment env, NodeIdFactory nidFactory, final int amt, final int time) throws IOException { // anonymously extend SPNF PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) { /** * Override getWireTransportLayer to return the BandwidthLimitingTL wrapping * the default wire implementation. */ @Override protected TransportLayer<InetSocketAddress, ByteBuffer> getWireTransportLayer( InetSocketAddress innermostAddress, TLPastryNode pn) throws IOException { // get the default layer TransportLayer<InetSocketAddress, ByteBuffer> wtl = super.getWireTransportLayer(innermostAddress, pn); // wrap it with our layer return new BandwidthLimitingTransportLayer<InetSocketAddress>( wtl, amt, time, pn.getEnvironment()); } }; return factory; }You can replace the construction of the SocketPastryNodeFactory at the beginning of any of the existing tutorials with this code to add the "bandwidth-limiting" feature.
Example B
Perhaps we don't want to include some of FreePastry's overhead in our bandwidth limits. We can remove the calculated overhead for liveness checks and from constructing source routes if we put theBandwidthLimitingTransportLayer
above the SourceRouteManager
. However, the SourceRouteManager
also performs additional functions of providing Liveness and Proximity. Unfortunately our transport layer doesn't implement these additional interfaces.
In exampleB()
we show how easy it is to replace only the TransportLayer functionality of the SourceRouteManager
while still returning the existing SourceRouteManager
for the Liveness and Proximity functionality. The returned object for getSourceRouteManagerLayer()
is a TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>
. This is a very simple interface that returns 3 objects:
protected interface TransLivenessProximity<Identifier, MessageType> { TransportLayer<Identifier, ByteBuffer> getTransportLayer(); LivenessProvider<Identifier> getLivenessProvider(); ProximityProvider<Identifier> getProximityProvider(); }As in Example A, we need to create the default
SourceRouteManagerLayer
by calling super.getSourceRouteManagerLayer()
. But we must also implement a TransLivenessProximity
which returns our BandwidthLimitingTrasnportLayer
for the TransportLayer, but the SourceRouteManagerLayer
for the LivenessProvider and ProximityProvider.
Here is the code for exampleB()
:
public static PastryNodeFactory exampleB(int bindport, Environment env, NodeIdFactory nidFactory, final int amt, final int time) throws IOException { PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) { @Override protected TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> getSourceRouteManagerLayer( TransportLayer<SourceRoute<MultiInetSocketAddress>, ByteBuffer> ltl, LivenessProvider<SourceRoute<MultiInetSocketAddress>> livenessProvider, Pinger<SourceRoute<MultiInetSocketAddress>> pinger, TLPastryNode pn, MultiInetSocketAddress proxyAddress, MultiAddressSourceRouteFactory esrFactory) throws IOException { // get the default layer final TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> srm = super.getSourceRouteManagerLayer( ltl, livenessProvider, pinger, pn, proxyAddress, esrFactory); // wrap the default layer with our layer final BandwidthLimitingTransportLayer<MultiInetSocketAddress> bll = new BandwidthLimitingTransportLayer<MultiInetSocketAddress>( srm.getTransportLayer(), amt, time, pn.getEnvironment()); return new TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>(){ public TransportLayer<MultiInetSocketAddress, ByteBuffer> getTransportLayer() { return bll; } public LivenessProvider<MultiInetSocketAddress> getLivenessProvider() { return srm.getLivenessProvider(); } public ProximityProvider<MultiInetSocketAddress> getProximityProvider() { return srm.getProximityProvider(); } }; } }; return factory; }Note that
BandwidthLimitingTransportLayer
exports <MultiInetSocketAddress>
this time.
Running the code
We modified DistTutorial from lesson4 to take some more parameters and callBandwidthLimitingTransportLayer.exampleA()
.
public DistTutorial(int bindport, InetSocketAddress bootaddress, int numNodes, Environment env, int bandwidth) throws Exception { ... // construct the PastryNodeFactory, this is how we use rice.pastry.socket PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleA( bindport, env, nidFactory, bandwidth, 1000); // PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleB( // bindport, env, nidFactory, bandwidth, 1000); // PastryNodeFactory factory = new SocketPastryNodeFactory( // nidFactory, bindport, env); ... } /** * Usage: * java [-cp FreePastry-<version>.jar] rice.tutorial.transportlayer.DistTutorial localbindport bootIP bootPort numNodes bandwidth; * example java rice.tutorial.transportlayer.DistTutorial 9001 pokey.cs.almamater.edu 9001 10 1000 */ public static void main(String[] args) throws Exception { ... }We need to turn on FINE logging in our transport layer, and let's also disable ProximityNeighborSelection:
// Disable PNS for our example env.getParameters().setBoolean("transport_use_pns", false); // enable logging on our new layer env.getParameters().setInt( "rice.tutorial.transportlayer.BandwidthLimitingTransportLayer_loglevel", Logger.FINE);Now lets run the code, but we must provide the bandwidth. Let's try 10K/second:
java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.transportlayer.DistTutorial 5009 10.9.8.7 5009 10 10000 Finished creating new node TLPastryNode[SNH: <0x9492E7..>/FOO/10.9.8.7:5009] Finished creating new node TLPastryNode[SNH: <0x44C3E7..>/FOO/10.9.8.7:5010] Finished creating new node TLPastryNode[SNH: <0xBE1C77..>/FOO/10.9.8.7:5011] Finished creating new node TLPastryNode[SNH: <0x4C94DA..>/FOO/10.9.8.7:5012] Finished creating new node TLPastryNode[SNH: <0x3FAE2B..>/FOO/10.9.8.7:5013] Finished creating new node TLPastryNode[SNH: <0xA5DBDC..>/FOO/10.9.8.7:5014] Finished creating new node TLPastryNode[SNH: <0x99460E..>/FOO/10.9.8.7:5015] 0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632375:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2771 remote=/10.9.8.7:5014] to 315 bytes. 0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 Finished creating new node TLPastryNode[SNH: <0xB5DAB7..>/FOO/10.9.8.7:5016] 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634218:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:5017 remote=/10.9.8.7:2775] to 283 bytes. 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 Finished creating new node TLPastryNode[SNH: <0xC3183D..>/FOO/10.9.8.7:5017] 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2786 remote=/10.9.8.7:5011] to 410 bytes. 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2789 remote=/10.9.8.7:5015] to 0 bytes. 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2788 remote=/10.9.8.7:5010] to 0 bytes. 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0 Finished creating new node TLPastryNode[SNH: <0x317CF2..>/FOO/10.9.8.7:5018] MyApp <0x9492E7..> sending to <0x1CACFE..> MyApp <0x317CF2..> received MyMsg from <0x9492E7..> to <0x1CACFE..> ...Note the logging of
Limiting SM XXX to XXX bytes.and
Dropping message XXX because not enough bandwidth:XXXand
0x34578E:org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl:1197900565238:/10.9.8.7:5014 rice.tutorial.transportlayer.NotEnoughBandwidthException at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:152) at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:1) at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:226) at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:1) at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.sendMessage(LimitSocketsTransportLayer.java:220) at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:283) at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:1) at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:413) at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:1) at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:321) at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:1) at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.messageReceived(LimitSocketsTransportLayer.java:224) at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:251) at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:1) at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:317) at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:1) at org.mpisws.p2p.transport.wire.WireTransportLayerImpl.messageReceived(WireTransportLayerImpl.java:183) at org.mpisws.p2p.transport.wire.UDPLayer.readHeader(UDPLayer.java:228) at org.mpisws.p2p.transport.wire.UDPLayer.read(UDPLayer.java:192) at rice.selector.SelectorManager.doSelections(SelectorManager.java:389) at rice.selector.SelectorManager.run(SelectorManager.java:255)You can get rid of these exceptions by installing a proper errorHandler on the transport layers. The default one just prints the stack trace. However, the code still runs successfully.