The FreePastry Tutorial.
Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.
Application Level Socket Interface
Manage your application's communication more precicely.
Organization of the Tutorial.
Explanation
Overview
Similar to the selection mechanism in Java's non-blocking I/O facility, FreePastry notifies you of events (can read / can write / socket available), and then your code can handle the events immeadiately or later. FreePastry always makes these notifications by calling your implementation of the AppSocketReceiver on the Slector Thread. Since these calls are issued on the FreePastry's network thread, if you must do computationally intense or blocking I/O operations you need to do so on a different thread. If you spend too much time on FreePastry's selector you will interfere with overlay maintenance and responses to liveness checks, which will result in degraded performance. You can do time consuming operations on your own thread or on FreePastry's Processor or Blocking-I/O threads before calling read()
/write()
. See the Environment tutorial for more information.
Unlike FreePastry's routing/messaging interface, the AppSocket interface does not use any serialization technique. You will simply be able to send/receive ByteBuffers.
Why may applications need their own socket interface?
- Complete control of serialization of data: No need to chunk or aggregate into individual messages. No need to use Java Serialization or FreePastry serialization which may not be optimal for bulk data transfers.
- Control resources more closely, and provide end to end flow control:
For example, your application is disk bound. It is receiving a bulk data transfer over FreePastry. You accomplished this by chunking the data into multiple messages. If your receiver cannot write the items to disk fast enough, you have 3 options:
- Drop messages, and build retry-logic.
- Try to queue them, and risk running out of memory (RAM).
- Attempt to build some kind of flow control mechanism over FreePastry's message interface.
With the socket interface, you gain more control of your application's communication channel, and often the obvious answer becomes the correct answer. In this case, don't read from the TCP buffer until the previous disk IO completed. This will cause back-pressure to the source and that node will not waste bandwidth by sending data that you cannot process.
Fully utilize the connection without interfering with FreePastry's overlay traffic, other applications running on the node, or other applications running on the machine/network.
FreePastry uses only a single TCP connection for it's "Message Based" traffic. This includes FreePastry's maintenance traffic as well as Application's message traffic. FreePastry does not employ a chunking mechanism. Thus sending very large messages in FreePastry which could take several minutes to transfer will stall FreePastry's overlay maintenance messages, which could result in poor overlay performance.
With these limitations, it is difficult to utilize all available network resources on high-capacity nodes, while still allowing FreePastry to operate properly on lower capacity nodes.
With the Application Socket Interface, your application will have its own socket, which will be subject to all the benefits of TCP (such as congestion control) that make it interoperate well with other applications. You have more direct information about network performance in your particular environment.
Why not just open my own sockets?
FreePastry has made some effort to work properly in reduced connectivity environments such as NATs and in the presence of temporary routing anomlies. For example FreePastry only requires 1 (UDP/TCP) port to be forwarded, and does not require any specific port, thus making it possible to run multiple nodes behind the same NAT. Also, FreePastry uses source routing to bridge connections during temporary internet anomalies. If your application opens its own sockets then it will be required to allocate additional ports, open them on NATs, and have its own source-routing infrastructure to get through the communication difficluties that FreePastry already handles. As we attempt to handle additional connectivity issues in the future, we plan to continue to support the AppSocket interface.
The AppSocket interface is compatable with FreePastry's simulator. The sockets are simulated rather than requiring sys-calls. Our solution scales better than opening potentially n-squared connections on a single computer that is simulating tens or hundreds of thousands of nodes.
Note that the Application socket interface is no substitute for FreePastry's Routing mechanism which provides all of the advantages of the Key Based Routing primitive.
Combining Simulator/Wire-Protocol
This tutorial starts by combining both the Simulator tutorial and the Lesson 4 tutorials. The advantage is that by changing the command line argument, you can run in a "real" or simulated environment.
Let's take a look at the code to do this.
Download the tutorial files: Tutorial.java MyApp.java, into a directory called rice/tutorial/appsocket/.
First the command line args are more sophisticated. You can either use the previous syntax of localbindport bootIP bootPort numNodes
to use sockets, or specify -direct numNodes
to use the simulator.
Usage: java [-cp FreePastry-.jar] rice.tutorial.appsocket.Tutorial localbindport bootIP bootPort numNodes or java [-cp FreePastry- .jar] rice.tutorial.appsocket.Tutorial -direct numNodes example java rice.tutorial.DistTutorial 9001 pokey.cs.almamater.edu 9001 10 example java rice.tutorial.DistTutorial -direct 10
Here is the code that accepts the arguments from the command line, and builds the appropriate environment. I won't spend time explaining it because it is not very specific to FreePastry.
boolean useDirect; if (args[0].equalsIgnoreCase("-direct")) { useDirect = true; } else { useDirect = false; } // Loads pastry settings Environment env; if (useDirect) { env = Environment.directEnvironment(); } else { env = new Environment(); // disable the UPnP setting (in case you are testing this on a NATted LAN) env.getParameters().setString("nat_search_policy","never"); } int bindport = 0; InetSocketAddress bootaddress = null; // the number of nodes to use is always the last param int numNodes = Integer.parseInt(args[args.length-1]); if (!useDirect) { // the port to use locally bindport = Integer.parseInt(args[0]); // build the bootaddress from the command line args InetAddress bootaddr = InetAddress.getByName(args[1]); int bootport = Integer.parseInt(args[2]); bootaddress = new InetSocketAddress(bootaddr,bootport); }The constructor is similar to previous tutorials, and will simply ignore the irrevelent arguments if using the simulator.
/** * This constructor launches numNodes PastryNodes. They will bootstrap * to an existing ring if one exists at the specified location, otherwise * it will start a new ring. * * @param bindport the local port to bind to * @param bootaddress the IP:port of the node to boot from * @param numNodes the number of nodes to create in this JVM * @param env the environment for these nodes * @param useDirect true for the simulator, false for the socket protocol */ public DistTutorial(int bindport, InetSocketAddress bootaddress, int numNodes, Environment env, boolean useDirect) throws Exception {This code builds the appropriate PastryNodeFactory.
// construct the PastryNodeFactory PastryNodeFactory factory; if (useDirect) { NetworkSimulator sim = new EuclideanNetwork(env); factory = new DirectPastryNodeFactory(nidFactory, sim, env); } else { factory = new SocketPastryNodeFactory(nidFactory, bindport, env); }This code properly manages the bootHandle.
if (bootHandle == null) { if (useDirect) { bootHandle = node.getLocalHandle(); } else { // This will return null if we there is no node at that location bootHandle = ((SocketPastryNodeFactory)factory).getNodeHandle(bootaddress); } }
Additional changes to Tutorial.java from Lesson 4
We must note a few more changes to the code before proceeding.- We removed the "route 10 messages" loop in the tutorial. You cannot "route" in the overlay with the direct sockets. To route or lookup the nearest node to an Id, you must use the messaging layer as in lesson 4. To use the AppSocket interface, you must have a NodeHandle to a specific Node in the network that you wish to open the connection to. See Lessons 1, 2 and 3 for the explanation of these terms.
- We changed the name of the method from
routeMyMessageDirect()
tosendMyMessageDirect()
. - We want to show that AppSockets also work when connecting to yourself (the local node). Thus in the sending messages to the leafset loop, we took out the "
if (i != 0) {
" conditional, so that we also send to ourself.
AppSockets
When requested, FreePastry will provide your application with an AppSocket which you will use to read and write your data. The read()
and write()
calls are non-blocking, and calling them when the socket is not ready to read or write will be a waste of time: it will read or write zero bytes. Thus FreePastry needs to notify you when the socket is ready.
The AppSocketReceiver interface is how FreePastry's Application Socket Interface notifies your application that it may read or write or that a new socket is available. As you can see from the JavaDoc, it has 3 callbacks:
receiveSocket()
-- called when the socket has been established.receiveSelectResult()
-- called whenever your app has permission to read or write.receiveException()
-- called when there is a problem.
Acquiring a socket
Sockets are inheritently an asynchronus connection. There is always a connector and a connectee or usually we call these the client and server. Typically all of your nodes will both be servers and clients. The server/client designation only relates to the asymmetry of the connection process, not necessarally the intent of the application.
For example, we have node Alice who would like to get()
a large object from a DHT. It turns out that the primary replica holder of the requested key is Bob. But Alice doesn't know that yet. Here is how this could work:
- Alice calls
get(key)
which routes the request to the primary replica of the "key." The request contains the key of the object to be returned, and her NodeHandle. This is done using FreePastry's routing/message interface which you learned in lesson 3. - The request is routed to Bob, the primary replica holder.
- Bob uses Alice's NodeHandle to connect to Alice using FreePastry's Application Socket Interface.
- When the socket becomes available for writing, Bob sends the requested data.
- Once Alice receives the object, she checks the key of the object she received to determine which request the opened socket referred to.
Note that there is no need for Bob to read from the socket he opened, and there is no need for Alice to write on the socket she received. In this case, even thouhgh Alice is the requester, she is the server, and must accept the socket to handle the response. Note that both the server and the client need access to an AppSocket:
- Alice, the server -- so she can read the response
- Bob, the client -- so he can send the response.
Our example will skip the routing/messaging component from the above example. We expect that you should be able to accomplish this if you have completed tutorial less #4.
Accepting a Socket
Take a look at the constructor of MyApp.java.To accept a socket, you must call endpoint.accept()
with an AppSocketReceiver. When another node's application attempts to connect to your application, the new AppSocket will be delivered via the receiveSocket()
method on this AppSocketReceiver.
// register the endpoint this.endpoint = node.buildEndpoint(this, "myinstance"); ... // example receiver interface endpoint.accept(new AppSocketReceiver() { /** * When we accept a new socket. */ public void receiveSocket(AppSocket socket) { ... handle socket here ... // it's critical to call this to be able to accept multiple times endpoint.accept(this); } ... rest of the interface ... } ... // register after we have set the AppSocketReceiver endpoint.register();Note that we call
endpoint.accept()
before calling endpoint.register() so we can guarantee that it is ready before the applicaion completes registration. If we do this later in the code, it is possible that a connecting application will receive an NoReceiverAvailableException.
You may receive an AppNotRegisteredException if you attempt to open a socket on a remote node who has not yet registered the application.
Note the other call toendpoint.accept()
in your receiveSocket()
method. Each call to accept()
is only valid for accepting 1 socket. To accept the next socket, you must call accept()
again. This allows a simple flow-control mechanism for accepting sockets. If you only want to handle transaction at a time, don't call accept()
here, but instead do it later when you have completed your previous transaction. If you wish to limit the number of concurrent connections to say 5 simultanious transactions, you can do so by only calling accept()
when you have less than 5 outstanding transactions. Note, it is not possible to "pre-call" accept()
with multiple AppSocketReceivers. If you call accept()
twice without first getting an AppSocketReceiver.receive() call, you will only overwrite the previous AppSocketReceiver in your endpoint.
Congratulations! You now know how to accept sockets in FreePastry's Application Socket Interface! Next, you will learn how to read from the AppSocket.
Reading from a Socket
As stated before, the read() method of AppSocket is non-blocking. If you attempt to read from it when it is not ready, you will be wasting time because you will read zero bytes. Thus you must be notified when it is ready. You do this with the AppSocketReceiver interface.
As in this tutorial, when you receive the AppSocket in the receiveSocket()
method, you will usually immeadiately call socket.register()
with an AppSocketReceiver. This will cause receiveSelectResult() to be called when the socket is ready for I/O.
Here is the documentation and method signature for socket.register().
/** * Must be called every time a Read/Write occurs to continue operation. * * @param wantToRead if you want to read from this socket * @param wantToWrite if you want to write to this socket * @param timeout // the socket's timeout value (this is a TCP level param) * @param receiver will have receiveSelectResult() called on it * note that you must call select() each time receiveSelectResult() is called. This is so * your application can properly handle flow control */ void register(boolean wantToRead, boolean wantToWrite, int timeout, AppSocketReceiver receiver);As you can see, our example code reuses "this" AppSocketReceiver, and registers for reading only; with a 30 second timeout.
// this code reuses "this" AppSocketReceiver, and registers for reading only, and a timeout of 30000. socket.register(true, false, 30000, this);Here is the code for our AppSocketReceiver.receiveSelectResult()
public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) { ... // read from the socket into ins long ret = socket.read(ins, 0, ins.length); ... // only need to do this if expecting more messages // socket.register(true, false, 3000, this); }
The AppSocket is the socket who can now read or write. The booleans canRead and canWrite tell you what operations you can do on the socket. Since we only registered to read on this socket, we don't need to check these arguments. If you registerd to read and write when you registered the AppSocketReceiver, it would be important to examine these booleans before reading/writing from the AppSocket.
The call to read() is pretty straightforward. It is nearly identical to SocketChannel.read().
The socket.register() call works in a similar way to endpoint.accept(). You only get notified once each time you register, and must re-register for additional data once you are ready to read or write more data. Usually, you will re-register with the same arguments as the initial registration. In our example, we don't expect to send additional data, thus we don't re-retister. (The call is commented out.)
Congratulations! You now know how to register for notificaton of read events and read from the AppSocket.
Connecting a Socket
This code is in thesendMyMsgDirect()
method of MyApp.java. It is similar to the accept code earlier in this tutorial. The additional parameters in endpoint.connect()
are the NodeHandle to connect to, and the timeout for the connection to occur (in millis).
endpoint.connect(nh, new AppSocketReceiver() { /** * Called when the socket comes available. */ public void receiveSocket(AppSocket socket) { // handle the socket once it connects ... } /** * Called if there is a problem. */ public void receiveException(AppSocket socket, Exception e) { e.printStackTrace(); } ... }, 30000);
Writing to a Socket
This is nearly identical to registering for reading from a socket.// register for writing socket.register(false, true, 30000, this);And similarly in the
receiveSelectResult()
method, we have a call to socket.write()
and socket.register()
.
/** * Example of how to write some bytes */ public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) { ... long ret = socket.write(outs,0,outs.length); ... // keep writing socket.register(false, true, 30000, this); ... } }, 30000); }The rest of the new code in this tutorial involves setting up the ByteBuffer code to properly read/write. The details of this are out of the scope of this tutorial. Note, it is possible to register a different AppSocketReceiver for reading vs. writing. It is also possible to register the same AppSocketReceiver for both reading and writing. However re-registering for an event-type (read or write) will overwrite the previously registered receiver.