The FreePastry Tutorial.

This tutorial is designed to get you cooking quickly with the FreePastry API and software toolkit.

Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.

Transport Layers

Modify low level details of FreePastry's network stack.



This tutorial will show you how to use the new TransportLayer interface in the package org.mpisws.p2p.transport. It will also show you how to integrate your TransportLayer to the SocketPastryNodeFactory network stack.

Outline of this tutorial:

Warning: This tutorial is more difficult than the previous tutorials and is intended for developers who wish to make low-level changes to the FreePastry network stack.

Overview

We will start by stating some of the design goals of the the Layered Transport Layer.

Versions of FreePastry before 2.1 had a unified transport layer. When features at this level needed to be added or modified, it was very complicated to get all of the parts to work properly. The new version rearranges network transport into a stack of layers where each has it's own small task.

To give you an idea of a typical layer's task, we will describe the layers that are assembled when creating the SocketPastryNodeFactory in FreePastry 2.1, beginning with the lowest layer:

SocketPastryNodeFactory's Layers:

Note:

Other interesting layers:

Upcoming layers:

Note: The TranportLayer uses a Java language feature that was introduced in Java 1.5, generics. For more informaiton on Generics, see this this tutorial.

The TransportLayer interface:

public interface TransportLayer<Identifier, MessageType> extends Destructable {}

Each transport layer operates on an Identifier (InetSocketAddress, SourceRoute, NodeHandle etc.), and a MessageType (ByteBuffer, RawMessage etc.)

The most important operations are sending messages and opening sockets:

  public MessageRequestHandle<Identifier, MessageType> sendMessage(
    Identifier i, 
    MessageType m, 
    MessageCallback<Identifier, MessageType> deliverAckToMe, 
    Map<String, Object> options);

  public SocketRequestHandle<Identifier> openSocket(
    Identifier i, 
    SocketCallback<Identifier> deliverSocketToMe, 
    Map<String, Object> options);

For these methods, you need the Identifier of the remote node, and (for sendMessage()) the Message to be delivered. Additionally, you may specify some transport-layer specific Options such as Guaranteed/Unguaranteed/Encrypted etc. We will describe some of these options in a later tutorial. Finally you provide a Callback (deliverAckToMe/deliverSocketToMe) to deliver notificaiton of success or failure when the operation completes. These calls are non-blocking and return a RequestHandle. The RequestHandle is like a receipt or a tracking number. You can use the RequestHandle to cancel the existing request if it is no longer necessary. For example if the operaiton takes too long. The next tutorial shows you how to work with a RequestHandle.

The TransportLayerCallback interface:

The TransportLayerCallback provides the inverse operations (the result of a remote node sending a message or opening a socket) and must have the identical Identifier/MessageType:

public interface TransportLayerCallback<Identifier, MessageType> {
  public void messageReceived(Identifier i, MessageType m, Map<String, Object> options) throws IOException;

  public void incomingSocket(P2PSocket<Identifier> s) throws IOException;
}

The P2PSocket is similar to the AppSocket. It is non-blocking and you must register each time you use it as explained in the AppSocket tutorial. We recommend taking the AppSocket tutorial before porceeding.

Here are the calls for the P2PSocket:

  void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiver receiver);
  long read(ByteBuffer dsts) throws IOException; 
  long write(ByteBuffer srcs) throws IOException; 
  void shutdownOutput();
  void close(); 
  Identifier getIdentifier();
  Map getOptions();

Other calls in the TransportLayer:

This method returns the Identifier of the local node for this layer.

  public Identifier getLocalIdentifier();

These methods can control flow by rejecting new messags/sockets if the local node is being overwhelmed.

  public void acceptSockets(boolean b);
  
  public void acceptMessages(boolean b);

This method sets the callback

  public void setCallback(TransportLayerCallback<Identifier, MessageType> callback);

This method sets the ErrorHandler which is usd for notification of unexpected behavior. Ex: The acceptor socket closes, or an unexpected message arrives

  public void setErrorHandler(ErrorHandler<Identifier> handler);  

This method cleans up the layer, in case you wish to shut down the transport layer w/o exiting the jvm. For example, it will close the ServerSocket that accepts new sockets, as well as clean up any memory.

  public void destroy();

Your first new TransportLayer

Design:

Download the tutorial files: BandwidthLimitingTransportLayer.java NotEnoughBandwidthException.java DistTutorial.java MyApp.java, MyMsg.java into a directory called rice/tutorial/transportlayer/.

Here is the definition of our new class.

public class BandwidthLimitingTransportLayer<Identifier> implements 
    TransportLayer<Identifier, ByteBuffer>,
    TransportLayerCallback<Identifier, ByteBuffer> {
}

Here is the constructor. It takes the TransportLayer immeadately below this layer, the bucket size, and bucket time length. We also need access to the environment so we can create a logger.

  public BandwidthLimitingTransportLayer(
      TransportLayer<Identifier, ByteBuffer> tl, 
      long bucketSize, int bucketTimelimit, 
      Environment env) {
    this.environment = env;
    this.tl = tl;
    BUCKET_SIZE = bucketSize;
    BUCKET_TIME_LIMIT = bucketTimelimit;    
    logger = env.getLogManager().getLogger(BandwidthLimitingTransportLayer.class, null);
    tl.setCallback(this);
  }

You can look at the code to see the declaration of these fields. The last thing we have to do is set ourself as the lower level's callback. This will cause it to deliver messages/sockets to us.

This variable is the bucket.

  /**
   * When this goes to zero, don't send messages
   */
  protected long bucket;

Now let's create a task to refil the bucket.

    environment.getSelectorManager().getTimer().schedule(new TimerTask(){    
      @Override
      public void run() {
        // always synchronize on "this" before modifying the bucket
        synchronized(this) {
          bucket = BUCKET_SIZE;
        }
      }    
    }, 0, BUCKET_TIME_LIMIT);

If the TimerTask is unfamiliar, please review the timer tutorial. This interface is slightly different in that it calls run(), rather than sending a MessageToSelf, but it is the same idea.

Limit Message Bandwidth

When the user requests to send the message, it may be buffered or even fail. The transport layer will notify the user of success or failure sending the message. Furthermore the user can cancel the operation if it hasn't completed. Let's examine the sendMessage call.
  public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(
      Identifier i, 
      ByteBuffer m, 
      MessageCallback<Identifier, ByteBuffer> deliverAckToMe, 
      Map<String, Object> options) {

  }

This method will attempt to send the message m to the Identifier i. It will notify the MessageCallback deliverAckToMe of success or failure. And it will return a MessageRequestHandle that can be used to cancel the operation. Furthermore it takes options which could be used, for example, to suppress the bandwidth limitation. However, we won't use the options now, we will merely pass them on to the next layer.

Now let's limit the outgoing message bandwidth. We are going to throw a NotEnoughBandwidthException if there isn't sufficient bandwidth to send a message.

There are three things we must do here.

This shows the code to subtract from the bucket or throw the exception. For now, we retun null. All we have to do is call deliverAckToMe.sendFailed() when there isn't enough bandwidth. Note: The MessageCallback may be null, so we must check that it isn't before calling sendFailed() otherwise we will get a NullPointerException. Also, so we can detect that this is occuring when we run the code, we will log when the message is dropped.

  public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(
      Identifier i, 
      ByteBuffer m, 
      MessageCallback<Identifier, ByteBuffer> deliverAckToMe, 
      Map<String, Object> options) {

    boolean success = true;
    synchronized(this) {
      if (m.remaining() > bucket) {
        success = false;
      } else {
        bucket-=m.remaining();
      }
    }
    if (!success) {
      if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
      if (deliverAckToMe != null) 
        deliverAckToMe.sendFailed(null, new NotEnoughBandwidthException(bucket, m.remaining()));
      return null;
    }
    tl.sendMessage(i,m,deliverAckToMe,options);
    return null;
  }  

MessageRequestHandle

Now we will add code to return a proper MessageRequestHandle. The purpose of this receipt is that it allows the user to cancel the message being sent, but it is also used when we notify the user's code of success or failure. This way if the user sends the same message a few times, the MessageRequestHandle can be used to disambiguate which instance was successful/failed.

The release already includes a generic implementation of the MessageRequestHandle: org.mpisws.p2p.transport.util.MessageRequestHandleImpl. Let's take a look at it:

The constructor initializes these 3 member variables:

  Identifier identifier;
  MessageType msg;
  Map<String, Object> options;

There are also corresponding getters. However, we still need to be able to cancel() the operation in the next transport layer. This requires the 4th member variable:

  Cancellable subCancellable;

  public boolean cancel() {
    if (subCancellable == null) return false;
    return subCancellable.cancel();
  }

subCancellable is initialized with a call to setSubCancellable().

Here is the code that now properly returns a MessageRequestHandle:

  public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, 
    MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) {

    MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = 
      new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options);
    
    boolean success = true;
    synchronized(this) {
      if (m.remaining() > bucket) {
        success = false;
      } else {
        bucket-=m.remaining();
      }
    }
    if (!success) {
      if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
      if (deliverAckToMe != null) 
        deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining()));
      return returnMe;
    }
    
    returnMe.setSubCancellable(tl.sendMessage(i,m,deliverAckToMe,options));
    return returnMe;
  }  

Note how we call returnMe.setSubCancellable() with the call to the lower transportLayer.

MessageCallback

There is one more problem in the code. Because we simply pass through the MessageCallback deliverAckToMe, when deliverAckToMe.ack() or deliverAckToMe.sendFailed() is called, it will have the wrong MessageRequestHandle, because the lower transport layer will notify it of success or failure. Thus, we need to create our own MessageRequestHandle which wraps deliverAckToMe. We'll use an Anonymous Inner Class of the MessageCallback interface.

First, we need to declare deliverAckToMe and returnMe final.

Second, we will create an anonymous inner class of the MessageCallback.

  public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, 
      final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) {

    final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = 
      new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options);
    
       ...

    returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() {
      public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) {
        if (deliverAckToMe != null) deliverAckToMe.ack(returnMe);
      }

      public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) {
        if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason);
      }    
    },options));
    return returnMe;
  }  

Our MessageCallback simply calls deliverAckToMe's ack()/sendFailed() methods with returnMe.

Here is the full code for the method:

  public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, 
      final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) {

    final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = 
      new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options);
    
    boolean success = true;
    synchronized(this) {
      if (m.remaining() > bucket) {
        success = false;
      } else {
        bucket-=m.remaining();
      }
    }
    if (!success) {
      if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket);
      if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining()));
      return returnMe;
    }
    
    returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() {
      public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) {
        if (deliverAckToMe != null) deliverAckToMe.ack(returnMe);
      }

      public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) {
        if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason);
      }    
    },options));
    return returnMe;
  }  

This may seem a bit overwhelming right now, but this code provides a lot of flexibility for the transport layer while still keeping the calls relatively simple.

Limit Socket Bandwidth

Now our layer limits Message bandwidth, but we must also make the Sockets respect the bandwith limitation. Rather than throwing an exception when we exceed the bandwidth, we just need to throttle the traffic, by only sending what we are allowed. To do this, we will create a P2PSocket that decrements the bucket on each write. Like the MessageRequestHandleImpl, we already have an implementation of a P2PSocket that wraps another P2PSocket. It is called the org.mpisws.p2p.transport.util.SocketWrapperSocket.

The generic parameters of the SocketWrapperSocket are the 2 kinds of Identifiers that it Adapts. Since we are importing and exporting the same Identifier, we declare our Socket like this:

  class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> {
  }

Now we need to set up the constructor. We will call super() with the wrapped socket's identifier and options. Also, we must pass it a logger.

    public BandwidthLimitingSocket(P2PSocket<Identifier> socket) {
      super(socket.getIdentifier(), socket, 
            BandwidthLimitingTransportLayer.this.logger, 
            socket.getOptions());
    }

Now we need to override this method:

    @Override
    public long write(ByteBuffer srcs) throws IOException {}

We can determine how much the user wants to write by calling srcs.remaining(). Then we must return how much was written, or -1 if the socket or outbound channel was closed. Also, note that when we read off bytes from srcs, we are changing the stae of srcs. It is critical that we make sure to manage the position of srcs properly. This way if we don't send the whole message, the user code can call write() again with the same message until it has sent the whole thing.

Here is the easy case, where we accept the write() and return the lower layer's result, or, we continue and log a message:

      if (srcs.remaining() <= bucket) {
        long ret = super.write(srcs);
        if (ret >= 0) {
          // EOF is usually -1
          synchronized(this) {
            bucket -= ret;
          }
        }
        return ret;
      }

      if (logger.level <= Logger.FINE) logger.log("Limiting "+socket+" to "+bucket+" bytes.");

The rest is complicated, because it is important to leave the incoming ByteBuffer's position in the proper place.

We create a variable ByteBuffer temp with the amount of space left in the buffer:

      // The user is trying to write more than is allowed.  To handle this we 
      // will copy the allowed amount into a temporary ByteBuffer and pass that
      // to the next layer.  It is critical set the proper position of scrs 
      // before returning.  First, let's record the original position.
      int originalPosition = srcs.position();
      
      // now we create temp, who's size is "bucket"
      ByteBuffer temp = ByteBuffer.wrap(srcs.array(), originalPosition, bucket);

Now we try to write the data by calling super. This method may throw an IOException or return EOF. Fortunately, the srcs buffer is in its original position, so we don't need to do anything but return/throw exception:

      // try to write our temp buffer
      long ret = super.write(temp);
      
      if (ret < 0) {
        // there was a problem,
        // reset the position, return
        return ret;
      }

If we made it here there were no problems. Allocate the bandwidth and update the position on the srcs ByteBuffer:

      // allocate the bandwidth
      synchronized(this) {
        bucket -= ret;
      }
      
      // we need to properly set the position
      srcs.position(originalPosition+(int)ret);
      return ret;

Note that we updated the position with ret, rahter than bucket. This is because it is possible that the lower layer could only send a fraction of the message.

Managing register()

Before the user can write, he has to register that he wants to write. Then we respond when the operaion can be performed without blocking.

If we naively pass the write request to the next layer we will cause an infinite loop when we run out of bandwidth. Here is the scenario:

To remedy this situation, we need to do two things:

Let's make a member variable in BandwidthLimitingSocket to store the user's request to write:

    /**
     * Store the write requestor.  If this variable is not null 
     * it means that the storedWriter wants to write, but there
     * wasn't enough bandwidth.
     */
    P2PSocketReceiver<Identifier> storedWriter;

Here is the code to intercept socket.register() in BandwidthLimitingSocket. If we are out of bandwidth and the user wants to write, then we cache the receiver in the storedReceiver variable.

    @Override
    public void register(boolean wantToRead, boolean wantToWrite, 
        P2PSocketReceiver<Identifier> receiver) {

      // this variable is what we will pass to super.register()
      boolean canWrite = wantToWrite;
      
      // if the user wants to write, and the bucket is empty, store the writer and set canWrite to false
      if (wantToWrite == true && bucket == 0) {
        canWrite = false;
        storedWriter = receiver;
      }

      // only call super.register() if we have something to do
      if (wantToRead || canWrite) super.register(wantToRead, canWrite, receiver);
    }

Now, our refill task needs to notify all BandwidthLimiting sockets when it refills, so they can notify the storedReceiver if they have one.

In BandwidthLimitingTransportLayer, we need to keep track of all of our BandwidthLimitingSockets in a Collection called sockets. Whenever we create a BandwidthLimitingSocket we add it to sockets, and whenever we shutdownOutput() or close() the socket, we remove it. Here is the code.

  /**
   * Keep track of all of the BandwidthLimitingSockets
   */
  Collection<BandwidthLimitingSocket> sockets = new ArrayList<BandwidthLimitingSocket>();
  
  class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> {
    public BandwidthLimitingSocket(P2PSocket<Identifier> socket) {

      super(socket.getIdentifier(), socket, 
            BandwidthLimitingTransportLayer.this.logger, socket.getOptions());

      synchronized(BandwidthLimitingTransportLayer.this) {
        sockets.add(this);
      }
    }

    public void close() {
      super.close();
      synchronized(BandwidthLimitingTransportLayer.this) {
        sockets.remove(this);      
      }
    }
    
    public void shutdownOutput() {
      super.shutdownOutput();
      synchronized(BandwidthLimitingTransportLayer.this) {
        sockets.remove(this);
      }
    }

In BandwidthLimitingSocket, we need a method for the refill task to call. Note that we can't simply call storedWriter.receiveSelectResult(), all we can do now is notify the lower transport layer that we wish to write.

    /**
     * Register and clear the storedWriter
     */
    public void notifyBandwidthRefilled() {
      if (storedWriter != null) {
        P2PSocketReceiver<Identifier> temp = storedWriter;
        storedWriter = null;
        super.register(false, true, temp);
      }
    }

Now, add this code to the refill task, so that it calls notifyBandwidthRefilled(). This code was in the constructor for BandwidthLimitingTransportLayer.

          for (BandwidthLimitingSocket s : sockets) {
            s.notifyBandwidthRefilled();
          }

Phew, that was lot of work. Now that we have our BandwidthLimitingSocket, we need to use it.

There are two ways to get a socket.

openSocket():

Here is the code for openSocket(). We don't get the socket right away. We have to wait until it has completed opening. This is similar to a continuation. The first thing we have to do is create a SocketRequestHandle for the same reasons as we did in the above code with MessageRequestHandle.

    SocketRequestHandleImpl<Identifier> returnMe = new SocketRequestHandleImpl<Identifier>(i,options);
    returnMe.setSubCancellable(tl.openSocket(i, ... , options));  
    return returnMe;

Now, we ask the lower transport layer to open the socket, and then we will wrap it with our BandwidthLimitingSocket which we will return to deliverSocketToMe. If there is an Exception, we just pass it up to the previous layer. Note that deliverSocketToMe must be non-null, because it's not useful to request opening a socket if you don't receive a handle to it.

  tl.openSocket(i, new SocketCallback<Identifier>(){
      public void receiveResult(SocketRequestHandle<Identifier> cancellable, P2PSocket<Identifier> sock) {
        deliverSocketToMe.receiveResult(returnMe, new BandwidthLimitingSocket(sock));
      }
    
      public void receiveException(SocketRequestHandle<Identifier> s, IOException ex) {
        deliverSocketToMe.receiveException(returnMe, ex);
      }
    }, options)

incomingSocket():

We have a variable for the lower transport layer: tl. But we don't yet have a member variable for the transport layer above us. To get this variable, we need to implement the setCallback() method in TransportLayer interface. We will name this transport layer callback.

  TransportLayerCallback<Identifier, ByteBuffer> callback;
  public void setCallback(TransportLayerCallback<Identifier, ByteBuffer> callback) {
    this.callback = callback;
  }

Now it is simple to override incomingSocket()

  public void incomingSocket(P2PSocket<Identifier> s) throws IOException {
    callback.incomingSocket(new BandwidthLimitingSocket(s));
  }

Finishing the Transprot Layer

The last step is to add all of the rest of the methods in TransportLayer and TransportLayerCallback. These are just going to forward the calls down or up as appropriate:

  public void acceptMessages(boolean b) {
    tl.acceptMessages(b);
  }

  public void acceptSockets(boolean b) {
    tl.acceptSockets(b);
  }

  public Identifier getLocalIdentifier() {
    return tl.getLocalIdentifier();
  }

  public void setErrorHandler(ErrorHandler<Identifier> handler) {
    tl.setErrorHandler(handler);
  }

  public void destroy() {
    tl.destroy();
  }

  public void messageReceived(Identifier i, ByteBuffer m, Map<String, Object> options) throws IOException {
    callback.messageReceived(i, m, options);
  }

Integration with the SocketPastryNodeFactory

Now we will add static methods on BandwidthLimitingTransportLayer that extended SocketPastryNodeFactory with our BandwidthLimitingTransportLayer in the right place. Where should we put this layer? We will show two options. This is where keeping our layer generic pays off.

Example A

Because we are at the Java level, we can't fully account for the TCP overhead of the bandwidth (retransmission etc). However to get the maximum possible effect, we should place it just above Wire. Here, we show how to do this by extending SocketPastryNodeFactory.

To construct each layer in the transport stack, SocketPastryNodeFactory executes a method called get...TransportLayer(). To insert a layer, simply override one of these calls and wrap the default layer with our new one.

Since we are going to put our layer right above the wire layer, we will override getWireTransportLayer(). We will first construct the default layer by calling super.getWireTransportLayer(). However, we will return our Bandwidth-Limiting layer which wraps the default wire layer.

Note that when we construct our BandwidthLimitingTransportLayer, we export <InetSocketAddress>.

  public static PastryNodeFactory exampleA(int bindport, Environment env, 
      NodeIdFactory nidFactory, final int amt, final int time) throws IOException {  
    
    // anonymously extend SPNF
    PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) {
      
      /**
       * Override getWireTransportLayer to return the BandwidthLimitingTL wrapping
       * the default wire implementation.
       */
      @Override
      protected TransportLayer<InetSocketAddress, ByteBuffer> getWireTransportLayer(
          InetSocketAddress innermostAddress, TLPastryNode pn) throws IOException {
        // get the default layer
        TransportLayer<InetSocketAddress, ByteBuffer> wtl = 
          super.getWireTransportLayer(innermostAddress, pn);        
        
        // wrap it with our layer
        return new BandwidthLimitingTransportLayer<InetSocketAddress>(
            wtl, amt, time, pn.getEnvironment());
      }      
    };
    return factory;
  } 

You can replace the construction of the SocketPastryNodeFactory at the beginning of any of the existing tutorials with this code to add the "bandwidth-limiting" feature.

Example B

Perhaps we don't want to include some of FreePastry's overhead in our bandwidth limits. We can remove the calculated overhead for liveness checks and from constructing source routes if we put the BandwidthLimitingTransportLayer above the SourceRouteManager. However, the SourceRouteManager also performs additional functions of providing Liveness and Proximity. Unfortunately our transport layer doesn't implement these additional interfaces.

In exampleB() we show how easy it is to replace only the TransportLayer functionality of the SourceRouteManager while still returning the existing SourceRouteManager for the Liveness and Proximity functionality. The returned object for getSourceRouteManagerLayer() is a TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>. This is a very simple interface that returns 3 objects:

  protected interface TransLivenessProximity<Identifier, MessageType> {
    TransportLayer<Identifier, ByteBuffer> getTransportLayer(); 
    LivenessProvider<Identifier> getLivenessProvider();
    ProximityProvider<Identifier> getProximityProvider();
  }

As in Example A, we need to create the default SourceRouteManagerLayer by calling super.getSourceRouteManagerLayer(). But we must also implement a TransLivenessProximity which returns our BandwidthLimitingTrasnportLayer for the TransportLayer, but the SourceRouteManagerLayer for the LivenessProvider and ProximityProvider.

Here is the code for exampleB():

  public static PastryNodeFactory exampleB(int bindport, Environment env, 
      NodeIdFactory nidFactory, final int amt, final int time) throws IOException {    
    PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) {

      @Override
      protected TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> getSourceRouteManagerLayer(
          TransportLayer<SourceRoute<MultiInetSocketAddress>, ByteBuffer> ltl, 
          LivenessProvider<SourceRoute<MultiInetSocketAddress>> livenessProvider, 
          Pinger<SourceRoute<MultiInetSocketAddress>> pinger, 
          TLPastryNode pn, 
          MultiInetSocketAddress proxyAddress, 
          MultiAddressSourceRouteFactory esrFactory) throws IOException {
        
        // get the default layer
        final TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> srm = 
          super.getSourceRouteManagerLayer(
            ltl, livenessProvider, pinger, pn, proxyAddress, esrFactory);
        
        // wrap the default layer with our layer
        final BandwidthLimitingTransportLayer<MultiInetSocketAddress> bll = 
          new BandwidthLimitingTransportLayer<MultiInetSocketAddress>(
            srm.getTransportLayer(), amt, time, pn.getEnvironment());
        
        return new TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>(){
          public TransportLayer<MultiInetSocketAddress, ByteBuffer> getTransportLayer() {
            return bll;
          }        
          public LivenessProvider<MultiInetSocketAddress> getLivenessProvider() {
            return srm.getLivenessProvider();
          }
          public ProximityProvider<MultiInetSocketAddress> getProximityProvider() {
            return srm.getProximityProvider();
          }
        };
      }
    };
    return factory;    
  }  

Note that BandwidthLimitingTransportLayer exports <MultiInetSocketAddress> this time.

Running the code

We modified DistTutorial from lesson4 to take some more parameters and call BandwidthLimitingTransportLayer.exampleA().

  public DistTutorial(int bindport, InetSocketAddress bootaddress, 
      int numNodes, Environment env, int bandwidth) throws Exception {
    ...

    // construct the PastryNodeFactory, this is how we use rice.pastry.socket
    PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleA(
      bindport, env, nidFactory, bandwidth, 1000);
    //  PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleB(
    //    bindport, env, nidFactory, bandwidth, 1000);
    //  PastryNodeFactory factory = new SocketPastryNodeFactory(
    //    nidFactory, bindport, env);

    ...
  }

  /**
   * Usage: 
   * java [-cp FreePastry-<version>.jar] rice.tutorial.transportlayer.DistTutorial localbindport bootIP bootPort numNodes bandwidth;
   * example java rice.tutorial.transportlayer.DistTutorial 9001 pokey.cs.almamater.edu 9001 10 1000
   */
  public static void main(String[] args) throws Exception {
    ...
  }

We need to turn on FINE logging in our transport layer, and let's also disable ProximityNeighborSelection:

    // Disable PNS for our example
    env.getParameters().setBoolean("transport_use_pns", false);
    
    // enable logging on our new layer
    env.getParameters().setInt(
      "rice.tutorial.transportlayer.BandwidthLimitingTransportLayer_loglevel", 
      Logger.FINE);    

Now lets run the code, but we must provide the bandwidth. Let's try 10K/second:

java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.transportlayer.DistTutorial 5009 10.9.8.7 5009 10 10000

Finished creating new node TLPastryNode[SNH: <0x9492E7..>/FOO/10.9.8.7:5009]
Finished creating new node TLPastryNode[SNH: <0x44C3E7..>/FOO/10.9.8.7:5010]
Finished creating new node TLPastryNode[SNH: <0xBE1C77..>/FOO/10.9.8.7:5011]
Finished creating new node TLPastryNode[SNH: <0x4C94DA..>/FOO/10.9.8.7:5012]
Finished creating new node TLPastryNode[SNH: <0x3FAE2B..>/FOO/10.9.8.7:5013]
Finished creating new node TLPastryNode[SNH: <0xA5DBDC..>/FOO/10.9.8.7:5014]
Finished creating new node TLPastryNode[SNH: <0x99460E..>/FOO/10.9.8.7:5015]
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632375:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2771 remote=/10.9.8.7:5014] to 315 bytes.
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0xB5DAB7..>/FOO/10.9.8.7:5016]
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634218:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:5017 remote=/10.9.8.7:2775] to 283 bytes.
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0xC3183D..>/FOO/10.9.8.7:5017]
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2786 remote=/10.9.8.7:5011] to 410 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2789 remote=/10.9.8.7:5015] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2788 remote=/10.9.8.7:5010] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0x317CF2..>/FOO/10.9.8.7:5018]
MyApp <0x9492E7..> sending to <0x1CACFE..>
MyApp <0x317CF2..> received MyMsg from <0x9492E7..> to <0x1CACFE..>

...
Note the logging of
Limiting SM XXX to XXX bytes.
and
Dropping message XXX because not enough bandwidth:XXX
and
0x34578E:org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl:1197900565238:/10.9.8.7:5014 rice.tutorial.transportlayer.NotEnoughBandwidthException
	at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:152)
	at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.sendMessage(BandwidthLimitingTransportLayer.java:1)
	at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:226)
	at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.sendMessage(MagicNumberTransportLayer.java:1)
	at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.sendMessage(LimitSocketsTransportLayer.java:220)
	at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:283)
	at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.sendMessage(MultiInetAddressTransportLayerImpl.java:1)
	at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:413)
	at org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayerImpl.messageReceived(SourceRouteTransportLayerImpl.java:1)
	at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:321)
	at org.mpisws.p2p.transport.multiaddress.MultiInetAddressTransportLayerImpl.messageReceived(MultiInetAddressTransportLayerImpl.java:1)
	at org.mpisws.p2p.transport.limitsockets.LimitSocketsTransportLayer.messageReceived(LimitSocketsTransportLayer.java:224)
	at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:251)
	at org.mpisws.p2p.transport.wire.magicnumber.MagicNumberTransportLayer.messageReceived(MagicNumberTransportLayer.java:1)
	at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:317)
	at rice.tutorial.transportlayer.BandwidthLimitingTransportLayer.messageReceived(BandwidthLimitingTransportLayer.java:1)
	at org.mpisws.p2p.transport.wire.WireTransportLayerImpl.messageReceived(WireTransportLayerImpl.java:183)
	at org.mpisws.p2p.transport.wire.UDPLayer.readHeader(UDPLayer.java:228)
	at org.mpisws.p2p.transport.wire.UDPLayer.read(UDPLayer.java:192)
	at rice.selector.SelectorManager.doSelections(SelectorManager.java:389)
	at rice.selector.SelectorManager.run(SelectorManager.java:255)

You can get rid of these exceptions by installing a proper errorHandler on the transport layers. The default one just prints the stack trace.

However, the code still runs successfully.

Congratulations! You have successfully extended FreePastry's transport layer.