The FreePastry Tutorial.
This tutorial is designed to get you cooking quickly with the FreePastry
API and software toolkit.
Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.
Scribe
Introducing Scribe.
Download the tutorial files: MyScribeClient.java, MyScribeContent.java, ScribeTutorial.java into a directory called rice/tutorial/scribe/.
Scribe is an application that allows you to subscribe to groups and publish messages to that group. This tutorial will show you how to get scribe up and running. You will learn how to do the following:- Create a topic.
- Create a ScribeClient.
- Subscribe to a topic.
- Publish (Multicast) content.
- Receive content.
- Anycast content.
- Introspect into the tree.
- Advanced Scribe Lesson: Policies, will show you how to control tree formation and anycast selection.
Terms:
- Scribe—A scalable group communication system for topic-based publish-subscribe applications.Scribe builds an efficient multicast tree for dissemination of events to a topic.
- Topic—Group. A topic builds a hash of the group name which is used as a unique identifier for the topic, as well used as a rendezvous point in Pastry.
- IdFactory—A hash function. An IdFactory implements a hash function to build Ids that are compatible with pastry.
- PastryIdFactory—A commonly used IdFactory in Pastry. The PastryIdFactory uses SHA1 as it's underlieing hash function.
- ScribeContent—A scribe message.
- ScribeClient—An application that receives ScribeContent. The client can subscribe to one or more topics.
- Multicast—A broadcast received by everyone subscribed to the corresponding topic.
- Anycast—A message that is received by a single node in a group. Anycast is most commonly used to find a single available service provider. The anycast message will be rejected until it finds a node willing to supply the service that is being requested.
- ScribePolicy—A policy to determine application specific details of Scribe. This includes formation and anycast selection.
Creating a topic.
This is fairly straightforward, but each node will have to do this for each topic of interest.Topic myTopic = new Topic(new PastryIdFactory(), "example topic");This constructs a topic with the "common name" of "example topic". It uses the PastryIdFactory to generate an appropriate Id for this topic.
Creating a ScribeClient.
Let's take a look at MyScribeClient. We are only going to subscribe to a single topic. We are going to publish content every 5 seconds using the FreePastry timer. See Timer for more details on the timer. The client is also going to implement rice.p2p.commonapi.Application in addition to rice.p2p.scribe.ScribeClient. This will allow us to send and receive non-scribe messages should this be important. Specifically it will allow us to receive messages queued on the timer.Here's the constructor and some member variables:
Scribe myScribe; Topic myTopic; protected Endpoint endpoint; public MyScribeClient(PastryNode node) { // you should recognize this from lesson 3 this.endpoint = node.buildEndpoint(this, "myinstance"); // construct Scribe myScribe = new ScribeImpl(node,"myScribeInstance"); // construct the topic myTopic = new Topic(new PastryIdFactory(node.getEnvironment()), "example topic"); System.out.println("myTopic = "+myTopic); // now we can receive messages endpoint.register(); }The only thing that should be new here is the construction of the ScribeImpl and Topic. The instance name "myScribeInstance" allows you to remain independent of other applications running on the same ring who also use scribe. They will have their own instance of Scribe that won't be confused with your instance.
In this example, one of the nodes is going to publish content every 5 seconds. We use the timer pattern described in the Timer tutorial. Each time we are going to send 1 multicast, and 1 anycast.
class PublishContent implements Message { public int getPriority() { return 0; } } public void startPublishTask() { publishTask = endpoint.scheduleMessage(new PublishContent(), 5000, 5000); } public void deliver(Id id, Message message) { if (message instanceof PublishContent) { sendMulticast(); sendAnycast(); } }In case you forgot, the
PublishContent
is similar to the Timer tutorial's MessageToSelf
.
The startPublishTask()
method schedules this to be delivered locally every 5 seconds.
The deliver(id,message)
method calls sendMulticast()
when the PublishMethod
is received.
In your application you will likely have some other event that causes content to be published.
We will look at sendMulticast()
and sendAnycast()
shortly.
Subscribing to a group.
Subscribing is very easy. Just callScribe.subscribe()
and provide the topic, and your client.
public void subscribe() { myScribe.subscribe(myTopic, this); }
Multicasting content.
First, we need some content to send. MyScribeContent implements ScribeContent and takes a NodeHandle sender, and an int sequence number. These are just so the output of the program is more interesting.public class MyScribeContent implements ScribeContent { NodeHandle from; int seq; public MyScribeContent(NodeHandle from, int seq) { this.from = from; this.seq = seq; } public String toString() { return "MyScribeContent #"+seq+" from "+from; } }To send the content, simply construct the message, then call Scribe.publish(). You give it the topic and the message. The rest of this function is just to print output and update the sequence number.
public void sendMulticast() { System.out.println("Node "+endpoint.getLocalNodeHandle()+" broadcasting "+seqNum); MyScribeContent myMessage = new MyScribeContent(endpoint.getLocalNodeHandle(), seqNum); myScribe.publish(myTopic, myMessage); seqNum++; }
Receiving content.
Receiving content is as easy as any other p2p application. The method signature is only slightly different:public void deliver(Topic topic, ScribeContent content) { System.out.println("MyScribeClient.deliver("+topic+","+content+")"); }All we are doing here is printing output to stdout.
Anycasting.
Anycast will get called on your clients until one returns true. This occurs on a call toScribeClient.anycast()
. To make this interesting, we're going to only accept the message
1/3 of the time, randomly. Your application will ususally want to do something more interesting, such as see
if a requested resource is available.Here is
sendAnycast()
which is nearly identical to sendMulticast()
except for the call to
Scribe.anycast()
instead of Scribe.publish()
.
public void sendAnycast() { System.out.println("Node "+endpoint.getLocalNodeHandle()+" anycasting "+seqNum); MyScribeContent myMessage = new MyScribeContent(endpoint.getLocalNodeHandle(), seqNum); myScribe.anycast(myTopic, myMessage); seqNum++; }Here is the code that only accepts the anycast 1/3 of the time. This will allow us to see that the anycast message can be rejected and sent elsewhere.
public boolean anycast(Topic topic, ScribeContent content) { boolean returnValue = rng.nextInt(3) == 0; System.out.println("MyScribeClient.anycast("+topic+","+content+"):"+returnValue); return returnValue; }
Examining the tree.
Lastly we have code to introspect the tree. Note that it is only easy to print out the entire tree because we are running all of the nodes in the same VM and have global information. It is significantly more difficult to print the scribe tree in an actual distributed environment, as scribe does not provide this information automatically. Furthermore this information could change rapidly as nodes join and leave.Because we are only going to run the application with a small number of nodes, the tree will most likely be only 1 level deep. (The root being level 0). However, this will show you how to print out your parent and children.
The first thing to note is that we have 3 accessor methods at the bottom of
MyScribeContent
:
public boolean isRoot() { return myScribe.isRoot(myTopic); } public NodeHandle getParent() { return myScribe.getParent(myTopic); } public NodeHandle[] getChildren() { return myScribe.getChildren(myTopic); }Note that these simply call through to the same method on myScribe, with the correct topic.
This code can be found in
ScribeTutorial.java
printTree()
does the following:
- Create a table mapping
NodeHandle
toMyScribeClient
. - Recursively traverse the tree to the root, using the helper:
getRoot()
. - Recursively traverse the tree down from the root, depth first and print the nodes using the helper:
recursivelyPrintChildren()
.
public static void printTree(Vector apps) { // build a hashtable of the apps, keyed by nodehandle Hashtable appTable = new Hashtable(); Iterator i = apps.iterator(); while (i.hasNext()) { MyScribeClient app = (MyScribeClient)i.next(); appTable.put(app.endpoint.getLocalNodeHandle(), app); } NodeHandle seed = ((MyScribeClient)apps.get(0)).endpoint.getLocalNodeHandle(); // get the root NodeHandle root = getRoot(seed, appTable); // print the tree from the root down recursivelyPrintChildren(root, 0, appTable); }
getRoot()
looks up the client for the seed handle from the appTable.- If the seed is the root, it is returned.
- Otherwise, it calls
getRoot()
on the parent.
public static NodeHandle getRoot(NodeHandle seed, Hashtable appTable) { MyScribeClient app = (MyScribeClient)appTable.get(seed); if (app.isRoot()) return seed; NodeHandle nextSeed = app.getParent(); return getRoot(nextSeed, appTable); }
recursivelyPrintChildren()
prints the curNode with appropriate whitespace based on the depth in the tree.- Then calls recursivelyPrintChildren() on all children (if it has any)
public static void recursivelyPrintChildren(NodeHandle curNode, int recursionDepth, Hashtable appTable) { // print self at appropriate tab level String s = ""; for (int numTabs = 0; numTabs < recursionDepth; numTabs++) { s+=" "; } s+=curNode.getId().toString(); System.out.println(s); // recursively print all children MyScribeClient app = (MyScribeClient)appTable.get(curNode); NodeHandle[] children = app.getChildren(); for (int curChild = 0; curChild < children.length; curChild++) { recursivelyPrintChildren(children[curChild], recursionDepth+1, appTable); } }
Initializing the apps.
The majority of the ScribeTutorial is identical to Lesson 4 where we ran multiple nodes within the same JVM. The last part is listed below. On each app, we callsubscribe()
, and on the first one we call startPublishTask()
.
After that we wait a few seconds then print the tree.
Iterator i = apps.iterator(); MyScribeClient app = (MyScribeClient)i.next(); app.subscribe(); app.startPublishTask(); while(i.hasNext()) { app = (MyScribeClient)i.next(); app.subscribe(); } env.getTimeSource().sleep(3000); printTree(apps);
Execution.
The parameters are identical to those in Lesson 4:- Local bind port.
- Bootstrap host. (the local host address)
- Bootstrap port. (usually whatever you passed in the first arg)
- The number of nodes to launch.
java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.scribe.ScribeTutorial 9001 10.9.8.7 9001 10 :1122932166578:Error connecting to address /10.9.8.7:9001: java.net.ConnectException: Connection refused: no further information :1122932166578:No bootstrap node provided, starting a new ring... Finished creating new node SocketNodeHandle (<0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]) myTopic = [TOPIC <0x19A8F5..>] Finished creating new node SocketNodeHandle (<0x8904D9..>/FOO/10.9.8.7:9002 [4410881318286179245]) myTopic = [TOPIC <0x19A8F5..>] Finished creating new node SocketNodeHandle (<0x281817..>/FOO/10.9.8.7:9003 [1144941711194723161]) ... <0x281817..> <0x8904D9..> <0x061BB8..> <0x85DA64..> <0x489BCB..> <0x0A9FCC..> <0x39CE29..> <0xA20DF8..> <0x7D350E..> <0xCF76F1..> Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 0 Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 1 MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #1 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #1 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):true MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #0 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 2 Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 3 MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #3 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #3 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):true MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #2 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] broadcasting 4 Node [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]] anycasting 5 MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x281817..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0xA20DF8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0xCF76F1..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x0A9FCC..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x7D350E..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x85DA64..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x489BCB..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.deliver([TOPIC <0x19A8F5..>],MyScribeContent #4 from [SNH: <0x061BB8..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]) MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x39CE29..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):false MyScribeClient.anycast([TOPIC <0x19A8F5..>],MyScribeContent #5 from [SNH: <0x8904D9..> -> <0x489BCB..>/FOO/10.9.8.7:9001 [3776408266594462189]]):trueNote that the tree is only 1 level deep.
Note how each publish message is delivered to each node in the group.
Note that anycast is called on different nodes until a node returns true.