The FreePastry Tutorial.

This tutorial is designed to get you cooking quickly with the FreePastry API and software toolkit.

Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.



Scribe

Introducing Scribe.

Download the tutorial files: MyScribeClient.java, MyScribeContent.java, ScribeTutorial.java into a directory called rice/tutorial/scribe/.

Scribe is an application that allows you to subscribe to groups and publish messages to that group. This tutorial will show you how to get scribe up and running. You will learn how to do the following:

Terms:

Creating a topic.

This is fairly straightforward, but each node will have to do this for each topic of interest.
  Topic myTopic = new Topic(new PastryIdFactory(), "example topic");
This constructs a topic with the "common name" of "example topic". It uses the PastryIdFactory to generate an appropriate Id for this topic.

Creating a ScribeClient.

Let's take a look at MyScribeClient. We are only going to subscribe to a single topic. We are going to publish content every 5 seconds using the FreePastry timer. See Timer for more details on the timer. The client is also going to implement rice.p2p.commonapi.Application in addition to rice.p2p.scribe.ScribeClient. This will allow us to send and receive non-scribe messages should this be important. Specifically it will allow us to receive messages queued on the timer.

Here's the constructor and some member variables:
  Scribe myScribe;
  Topic myTopic;
  protected Endpoint endpoint;
  public MyScribeClient(PastryNode node) {
    // you should recognize this from lesson 3
    this.endpoint = node.buildEndpoint(this, "myinstance");

    // construct Scribe
    myScribe = new ScribeImpl(node,"myScribeInstance");

    // construct the topic
    myTopic = new Topic(new PastryIdFactory(node.getEnvironment()), "example topic");
    System.out.println("myTopic = "+myTopic);
    
    // now we can receive messages
    endpoint.register();
  }
The only thing that should be new here is the construction of the ScribeImpl and Topic. The instance name "myScribeInstance" allows you to remain independent of other applications running on the same ring who also use scribe. They will have their own instance of Scribe that won't be confused with your instance.
In this example, one of the nodes is going to publish content every 5 seconds. We use the timer pattern described in the Timer tutorial. Each time we are going to send 1 multicast, and 1 anycast.
  class PublishContent implements Message {
    public int getPriority() {
      return 0;
    }
  }
  public void startPublishTask() {
    publishTask = endpoint.scheduleMessage(new PublishContent(), 5000, 5000);    
  }
  public void deliver(Id id, Message message) {
    if (message instanceof PublishContent) {
      sendMulticast(); 
      sendAnycast();
    }
  }
In case you forgot, the PublishContent is similar to the Timer tutorial's MessageToSelf. The startPublishTask() method schedules this to be delivered locally every 5 seconds. The deliver(id,message) method calls sendMulticast() when the PublishMethod is received. In your application you will likely have some other event that causes content to be published. We will look at sendMulticast() and sendAnycast() shortly.

Subscribing to a group.

Subscribing is very easy. Just call Scribe.subscribe() and provide the topic, and your client.
  public void subscribe() {
    myScribe.subscribe(myTopic, this); 
  }

Multicasting content.

First, we need some content to send. MyScribeContent implements ScribeContent and takes a NodeHandle sender, and an int sequence number. These are just so the output of the program is more interesting.
public class MyScribeContent implements ScribeContent {
  NodeHandle from;
  int seq;
 
  public MyScribeContent(NodeHandle from, int seq) {
    this.from = from;
    this.seq = seq;
  }

  public String toString() {
    return "MyScribeContent #"+seq+" from "+from;
  }  
}
To send the content, simply construct the message, then call Scribe.publish(). You give it the topic and the message. The rest of this function is just to print output and update the sequence number.
  public void sendMulticast() {
    System.out.println("Node "+endpoint.getLocalNodeHandle()+" broadcasting "+seqNum);
    MyScribeContent myMessage = new MyScribeContent(endpoint.getLocalNodeHandle(), seqNum);
    myScribe.publish(myTopic, myMessage); 
    seqNum++;
  }

Receiving content.

Receiving content is as easy as any other p2p application. The method signature is only slightly different:
  public void deliver(Topic topic, ScribeContent content) {
    System.out.println("MyScribeClient.deliver("+topic+","+content+")");
  }
All we are doing here is printing output to stdout.

Anycasting.

Anycast will get called on your clients until one returns true. This occurs on a call to ScribeClient.anycast(). To make this interesting, we're going to only accept the message 1/3 of the time, randomly. Your application will ususally want to do something more interesting, such as see if a requested resource is available.
Here is sendAnycast() which is nearly identical to sendMulticast() except for the call to Scribe.anycast() instead of Scribe.publish().
  public void sendAnycast() {
    System.out.println("Node "+endpoint.getLocalNodeHandle()+" anycasting "+seqNum);
    MyScribeContent myMessage = new MyScribeContent(endpoint.getLocalNodeHandle(), seqNum);
    myScribe.anycast(myTopic, myMessage); 
    seqNum++;
  }
Here is the code that only accepts the anycast 1/3 of the time. This will allow us to see that the anycast message can be rejected and sent elsewhere.
  public boolean anycast(Topic topic, ScribeContent content) {
    boolean returnValue = rng.nextInt(3) == 0;
    System.out.println("MyScribeClient.anycast("+topic+","+content+"):"+returnValue);
    return returnValue;
  }

Examining the tree.

Lastly we have code to introspect the tree. Note that it is only easy to print out the entire tree because we are running all of the nodes in the same VM and have global information. It is significantly more difficult to print the scribe tree in an actual distributed environment, as scribe does not provide this information automatically. Furthermore this information could change rapidly as nodes join and leave.

Because we are only going to run the application with a small number of nodes, the tree will most likely be only 1 level deep. (The root being level 0). However, this will show you how to print out your parent and children.

The first thing to note is that we have 3 accessor methods at the bottom of MyScribeContent:
  public boolean isRoot() {
    return myScribe.isRoot(myTopic);
  }
  
  public NodeHandle getParent() {
    return myScribe.getParent(myTopic); 
  }
  
  public NodeHandle[] getChildren() {
    return myScribe.getChildren(myTopic); 
  }
Note that these simply call through to the same method on myScribe, with the correct topic.

This code can be found in ScribeTutorial.java

printTree() does the following:
  1. Create a table mapping NodeHandle to MyScribeClient.
  2. Recursively traverse the tree to the root, using the helper: getRoot().
  3. Recursively traverse the tree down from the root, depth first and print the nodes using the helper: recursivelyPrintChildren().
  public static void printTree(Vector apps) {
    // build a hashtable of the apps, keyed by nodehandle
    Hashtable appTable = new Hashtable();
    Iterator i = apps.iterator();
    while (i.hasNext()) {
      MyScribeClient app = (MyScribeClient)i.next();
      appTable.put(app.endpoint.getLocalNodeHandle(), app);
    }
    NodeHandle seed = ((MyScribeClient)apps.get(0)).endpoint.getLocalNodeHandle();
    
    // get the root 
    NodeHandle root = getRoot(seed, appTable);
    
    // print the tree from the root down
    recursivelyPrintChildren(root, 0, appTable);
  }
  1. getRoot() looks up the client for the seed handle from the appTable.
  2. If the seed is the root, it is returned.
  3. Otherwise, it calls getRoot() on the parent.
  public static NodeHandle getRoot(NodeHandle seed, Hashtable appTable) {
    MyScribeClient app = (MyScribeClient)appTable.get(seed);
    if (app.isRoot()) return seed;
    NodeHandle nextSeed = app.getParent();
    return getRoot(nextSeed, appTable);
  }
  1. recursivelyPrintChildren() prints the curNode with appropriate whitespace based on the depth in the tree.
  2. Then calls recursivelyPrintChildren() on all children (if it has any)
  public static void recursivelyPrintChildren(NodeHandle curNode, int recursionDepth, Hashtable appTable) {
    // print self at appropriate tab level
    String s = "";
    for (int numTabs = 0; numTabs < recursionDepth; numTabs++) {
      s+="  "; 
    }
    s+=curNode.getId().toString();
    System.out.println(s);
    
    // recursively print all children
    MyScribeClient app = (MyScribeClient)appTable.get(curNode);
    NodeHandle[] children = app.getChildren();
    for (int curChild = 0; curChild < children.length; curChild++) {
      recursivelyPrintChildren(children[curChild], recursionDepth+1, appTable);
    }    
  }

Initializing the apps.

The majority of the ScribeTutorial is identical to Lesson 4 where we ran multiple nodes within the same JVM. The last part is listed below. On each app, we call subscribe(), and on the first one we call startPublishTask(). After that we wait a few seconds then print the tree.
    Iterator i = apps.iterator();    
    MyScribeClient app = (MyScribeClient)i.next();
    app.subscribe();
    app.startPublishTask();
    while(i.hasNext()) {
      app = (MyScribeClient)i.next();
      app.subscribe();
    }
    
    env.getTimeSource().sleep(3000);
    printTree(apps);

Execution.

The parameters are identical to those in Lesson 4:
  1. Local bind port.
  2. Bootstrap host. (the local host address)
  3. Bootstrap port. (usually whatever you passed in the first arg)
  4. The number of nodes to launch.
Your output will resemble:
java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.scribe.ScribeTutorial 9001 10.9.8.7 9001 10
:1122932166578:Error connecting to address /10.9.8.7:9001: java.net.ConnectException: Connection refused: no further information
:1122932166578:No bootstrap node provided, starting a new ring...
Finished creating new node SocketNodeHandle (<0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189])
myTopic = [TOPIC <0x19A8F5..>]
Finished creating new node SocketNodeHandle (<0x8904D9..>/FOO/10.9.8.7:9002 [4410881318286179245])
myTopic = [TOPIC <0x19A8F5..>]
Finished creating new node SocketNodeHandle (<0x281817..>/FOO/10.9.8.7:9003 [1144941711194723161])

...

<0x281817..>
  <0x8904D9..>
  <0x061BB8..>
  <0x85DA64..>
  <0x489BCB..>
  <0x0A9FCC..>
  <0x39CE29..>
  <0xA20DF8..>
  <0x7D350E..>
  <0xCF76F1..>
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 0
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 1
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #1 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #1 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):true
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 2
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 3
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #3 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #3 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):true
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 4
Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 5
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]])
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false
MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):true
Note that the tree is only 1 level deep.
Note how each publish message is delivered to each node in the group.
Note that anycast is called on different nodes until a node returns true.

Congratulations! You have built and run your first scribe application!