FreePastry/html/rawtutorial/tut_splitstream.html

324 lines
18 KiB
HTML
Raw Permalink Normal View History

2019-05-13 16:45:05 +04:00
<html><head>
<title>FreePastry Tutorial</title>
<link rel="stylesheet" href="tutorial.css" />
</head>
<body>
<div class="content">
<div class="frontmatter">
<h1>The FreePastry Tutorial.</h1>
<div class="abstract">This tutorial is designed to get you cooking quickly with the FreePastry
API and software toolkit.</div>
<h4>Version @tutorial_version@; @tutorial_date@. For <a href="http://freepastry.org/">FreePastry</a> version @freepastry_version@. Maintained by @maintainer@.</h4>
</div>
<div class="nav">
<span class="nav-left"><a href="tut_app_sockets.html#appsocket">Previous (Application Sockets)</a></span>
<span class="nav-center"><a href="index.html">Contents</a></span>
<span class="nav-right"><a href="tut_raw_serialization.html#raw">Next (Raw Serialization 1)</a></span>
</div><br/><hr/>
<a name="SplitStream"><h1>SplitStream</h1></a>
<h2>Introducing SplitStream.</h2>
<h3>Download the tutorial files:
<a href="./src/splitstream/MySplitStreamClient.java">MySplitStreamClient.java</a>,
<a href="./src/splitstream/SplitStreamTutorial.java">SplitStreamTutorial.java</a> into a directory called rice/tutorial/splitstream/.</h3>
<p/>SplitStream is an application that utilizes multiple independent scribe trees to provide high-bandwidth content distribution. For each Channel, SplitStream creates B (FreePastry's routing base, 16 by default) Stripes. Each Stripe is an independent Scribe tree. To load-balance upstream bandwidth among the nodes in the Channel, SplitStream attempts to make each node an interior node of only 1 Stripe, and a leaf of all the rest of the Stripes. Each Stripe delivers data on a best-effort basis. Due to membership changes and unreliable links and nodes, you should not expect to receive all data that is sent to all stripes. A typical use case would be to use redundant encoding (such as <a href="http://en.wikipedia.org/wiki/Erasure_codes">Erasure Codes</a>) for the data dissemination. The coding mechanisms and recommendations on the number of Stripes to use is out of the scope of this tutorial.
<p/>This tutorial will show you how to get SplitStream up and running. You will learn how to do the following:
<ul>
<li><a href="#Client">Create a SplitStreamClient.</a>&mdash;Similar to a scribe client. The client can receive data from SplitStream, and is notified of errors.</li>
<li><a href="#Channel">Attach to a Channel.</a>&mdash;Similar to a Scribe topic but has multiple independent Stripes to deliver content.</li>
<li><a href="#Subscribe">Subscribe</a>&mdash;to receive data from the SplitStream Channel.</li>
<li><a href="#Publish">Publish</a>&mdash;Multicast content over (the Stripes of) a Channel.</a></li>
<li><a href="#Receive">Receive</a>&mdash;Receive the data from each stripe.</a></li>
<li><a href="#Run">Run</a>&mdash;Execute the application.</a></li>
</ul>
<p>This tutorial is based on the <a href="tut_scribe.html">Scribe Tutorial</a> and runs in both Simulator and Socket modes similar to the <a href="tut_app_sockets.html">Application Sockets tutorial</a>. We suggest becoming familar with those tutorials before continuing with this one. The <b>SplitStreamTutorial.java</b> is simply a combination of these 2 tutorials, so we won't explain that file. We focus on <b>MySplitStreamClient.java</b>, which is very similar to the <b>MyScribeClient.java</b> in the Scribe Tutorial.</p>
<a name="Client"></a><h3>Creating a SplitStreamClient.</h3>
<p/>The <code>SplitStreamClient</code> interface has 2 methods. <code>joinFailed()</code> is called if there is an error subscribing to a Stripe, typically you should try to re-subscribe when this method is called. <code>deliver()</code> is called when data arrives on a Stripe.
<pre>
public interface SplitStreamClient {
/**
* This is a call back into the application to notify it that one of the stripes was unable to to
* find a parent, and thus unable to recieve data.
*
* @param s The stripe which the join failed on
*/
public void joinFailed(Stripe s);
/**
* Is called when data is received on a stripe which this client has registered interest
*
* @param data The data that was received
* @param s The stripe the data as received on
*/
public void deliver(Stripe s, byte[] data);
</pre>
Now we take a look at the member variables in MySplitStreamClient.
<pre>
/**
* The lenght of a message in bytes.
*/
public static final int DATA_LENGTH = 10;
/**
* The number of messages to publish.
*/
public static final int NUM_PUBLISHES = 10;
/**
* The message sequence number. Will be incremented after each send.
*
* Out of laziness we are encoding this as a byte in the stream, so the range is limited
*/
byte seqNum = 0;
/**
* My handle to a SplitStream impl.
*/
SplitStream mySplitStream;
/**
* The Id of the only Channel we are subscribing to.
*/
ChannelId myChannelId;
/**
* The channel.
*/
Channel myChannel;
/**
* The stripes... acquired from myChannel.
*/
Stripe[] myStripes;
/**
* Data source...
*/
protected RandomSource random;
</pre>
<p/>The <code>ChannelId</code> is similar to the <code>Topic</code> in SplitStream. It is usually a hash of a "human readable" content name.
<p/>In this tutorial, one node will publish 10 messages on all 16 Stripes of the Content. If all nodes receive all the data they will each receive 160 messages. The data will be the Sequence Number, the Stripe (to show that it properly corresponds to the correct internal Stripe data structure) and 8 bytes of random data: for a total of 10 bytes.
<h4>Here is how we initialize these variables.</h4>
<pre>
public MySplitStreamClient(Node node) {
// you should recognize this from lesson 3
this.endpoint = node.buildEndpoint(this, "myinstance");
// use this to generate data
this.random = endpoint.getEnvironment().getRandomSource();
// construct Scribe
mySplitStream = new SplitStreamImpl(node,"splitStreamTutorial");
// The ChannelId is built from a normal PastryId
Id temp = new PastryIdFactory(node.getEnvironment()).buildId("my channel");
// construct the ChannelId
myChannelId = new ChannelId(temp);
// now we can receive messages
endpoint.register();
}
</pre>
<p/>The <code>environment</code> already contains our <code>RandomSource</code>.
<p/>We create the ChannelId in the Constructor from a regular Pastry Id which is a hash of the key "my channel". We will initialize myChannel and myStripes later in the <code>subscribe()</code> method.
<a name="Channel"></a><h3>Creating a Channel.</h3>
This is trivial once you have the ChannelId. We do this in the <code>MySplitStreamClient.subscribe()</code> method.
<pre>
// attaching makes you part of the Channel, and volunteers to be an internal node of one of the trees
myChannel = mySplitStream.attachChannel(myChannelId);
</pre>
<a name="Subscribe"></a><h3>Subscribing to a group.</h3>
<p/>Attaching to the Channel does not cause you to receive messages published on the channel. It only volunteers you to become an internal member of a Stripe. To receive data, you must subscribe to each stripe of the channel you want to receive data from. The Id of each stripe will be identical ChannelId, except the first bits replaced with the Stripe number. For example if your <code>ContentId</code> is <code><b>0xBB55AA..</b></code> then your stripes will be named <code><b>0x0B55AA..</b></code>, <code><b>0x1B55AA..</b></code> <code><b>0x2B55AA..</b></code>, ... ,<code><b>0xFB55AA..</b></code></li>
<p/>Your application may <i>not</i> need to publish/subscribe on every channel. <i>If you don't use every channel in a Content, it is important to find a mechanism to distribute load over different stripes in the different Channels. Failure to do so will place all of the forwarding burdon onto a few of the nodes in the underlieing overlay (FreePastry).</i> For example if you have 3 Channels, and each only needs 4 stripes, do not use the first 4 stripes of each channel. You could use Stripes 0-3 of the first Channel, 4-7 of the second, and 8-B of the third. In practice you will need a mechanism to deterministically distribute which Stripes you use for each Channel. Such a mechanism is out of the scope of the Tutorial, but we suggest using a hash of the content to also determine which stripes to use.
<p/>Subscribing is also trivial. Just call <code>Stripe.subscribe()</code> for each stripe you wish to join. In this case we are subscribing to all of them.
<pre>
// subscribing notifies your application when data comes through the tree
myStripes = myChannel.getStripes();
for (int curStripe = 0; curStripe &lt; myStripes.length; curStripe++) {
myStripes[curStripe].subscribe(this);
}
</pre>
<a name="Publish"></a><h3>Multicasting content.</h3>
We use a periodic message as originally described in <a href="tut_timertask.html">lesson 5</a>, and used in the <a href="tut_scribe.html#l6Client">Scribe Tutorial</a>.
Unlike Scribe which sends a ScribeContent, SplitStream disseminates byte arrays.
<pre>
public void publish() {
for (byte curStripe = 0; curStripe &lt; myStripes.length; curStripe++) {
// format of the data:
// first byte: seqNum
// second byte: stripe
// rest: random
byte[] data = new byte[DATA_LENGTH];
// yes, we waste some random bytes here
random.nextBytes(data);
data[0] = seqNum;
data[1] = curStripe;
// print what we are sending
System.out.println("Node "+endpoint.getLocalNodeHandle()+" publishing "+seqNum+" "+printData(data));
// publish the data
myStripes[curStripe].publish(data);
}
// increment the sequence number
seqNum++;
// cancel after sending all the messages
if (seqNum >= NUM_PUBLISHES) publishTask.cancel();
}
</pre>
The above code construct the <code>byte[] data</code> for each stripe and then calls <code>stripe.publish()</code>. To show that the random data is getting properly delivered, we created a little helper function called <code>printData()</code> which prints out the contents of the byte array.
<pre>
private String printData(byte[] data) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i &lt; data.length-1; i++) {
sb.append((int)data[i]);
sb.append(',');
}
sb.append((int)data[data.length-1]);
return sb.toString();
}
</pre>
<a name="Receive"></a><h3>Receiving content.</h3>
Receiving content is as easy as any other p2p application. The method signature is only
slightly different:
<pre>
public void deliver(Stripe s, byte[] data) {
System.out.println(endpoint.getId()+" deliver("+s+"):seq:"+data[0]+" stripe:"+data[1]+" "+printData(data)+")");
}
</pre>
All we are doing here is printing output to stdout.
<a name="Run"></a><h3>Run the code.</h3>
Now, let's run the code using the <code>-direct</code> flag. and 10 nodes.
Your output will resemble:
<pre>
<span class="input">java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.splitstream.SplitStreamTutorial -direct 10</span>
<span class="output">:rice.pastry.direct.DirectPastryNodeFactory:0:No bootstrap node provided, starting a new ring...
Finished creating new node Pastry node &lt;0x30C9E7..&gt;
Finished creating new node Pastry node &lt;0x5C775C..&gt;
Finished creating new node Pastry node &lt;0x0B86E5..&gt;
Finished creating new node Pastry node &lt;0xD09F58..&gt;
Finished creating new node Pastry node &lt;0x003E86..&gt;
Finished creating new node Pastry node &lt;0x8DA8E3..&gt;
Finished creating new node Pastry node &lt;0x17B578..&gt;
Finished creating new node Pastry node &lt;0x841BF0..&gt;
Finished creating new node Pastry node &lt;0x770070..&gt;
Finished creating new node Pastry node &lt;0x84C88C..&gt;
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,0,-124,35,-76,-49,-2,17,32,-64
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,1,124,51,60,-82,75,6,126,105
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,2,104,44,50,113,26,65,104,-114
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x26F8A8..&gt;]):seq:0 stripe:2 0,2,104,44,50,113,26,65,104,-114)
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,3,-124,-85,-80,-20,19,118,-85,5
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x36F8A8..&gt;]):seq:0 stripe:3 0,3,-124,-85,-80,-20,19,118,-85,5)
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,4,50,68,88,24,4,-50,106,-93
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,5,-26,123,-94,-8,2,-116,4,-109
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,6,93,118,-108,-66,108,105,-13,110
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,7,-33,-45,-79,70,22,94,1,95
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,8,-107,41,-117,-64,-14,46,-97,73
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,9,41,13,25,86,-83,-37,-71,56
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,10,112,115,6,47,-70,-13,-122,84
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,11,-104,13,-9,62,-58,74,-92,-40
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,12,99,-11,122,67,-64,-11,122,42
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,13,45,53,97,-75,27,-78,28,101
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,14,-118,68,52,-125,91,117,98,94
Node [DNH &lt;0x30C9E7..&gt;] publishing 0 0,15,-63,36,-79,-94,-127,-47,-2,85
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x66F8A8..&gt;]):seq:0 stripe:6 0,6,93,118,-108,-66,108,105,-13,110)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x26F8A8..&gt;]):seq:0 stripe:2 0,2,104,44,50,113,26,65,104,-114)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x56F8A8..&gt;]):seq:0 stripe:5 0,5,-26,123,-94,-8,2,-116,4,-109)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x36F8A8..&gt;]):seq:0 stripe:3 0,3,-124,-85,-80,-20,19,118,-85,5)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x46F8A8..&gt;]):seq:0 stripe:4 0,4,50,68,88,24,4,-50,106,-93)
&lt;0x8DA8E3..&gt; deliver(Stripe [StripeId &lt;0xA6F8A8..&gt;]):seq:0 stripe:10 0,10,112,115,6,47,-70,-13,-122,84)
&lt;0x8DA8E3..&gt; deliver(Stripe [StripeId &lt;0x96F8A8..&gt;]):seq:0 stripe:9 0,9,41,13,25,86,-83,-37,-71,56)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x66F8A8..&gt;]):seq:0 stripe:6 0,6,93,118,-108,-66,108,105,-13,110)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x56F8A8..&gt;]):seq:0 stripe:5 0,5,-26,123,-94,-8,2,-116,4,-109)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x46F8A8..&gt;]):seq:0 stripe:4 0,4,50,68,88,24,4,-50,106,-93)
&lt;0x8DA8E3..&gt; deliver(Stripe [StripeId &lt;0x76F8A8..&gt;]):seq:0 stripe:7 0,7,-33,-45,-79,70,22,94,1,95)
&lt;0x003E86..&gt; deliver(Stripe [StripeId &lt;0xF6F8A8..&gt;]):seq:0 stripe:15 0,15,-63,36,-79,-94,-127,-47,-2,85)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x86F8A8..&gt;]):seq:0 stripe:8 0,8,-107,41,-117,-64,-14,46,-97,73)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x86F8A8..&gt;]):seq:0 stripe:8 0,8,-107,41,-117,-64,-14,46,-97,73)
&lt;0x003E86..&gt; deliver(Stripe [StripeId &lt;0x76F8A8..&gt;]):seq:0 stripe:7 0,7,-33,-45,-79,70,22,94,1,95)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x26F8A8..&gt;]):seq:0 stripe:2 0,2,104,44,50,113,26,65,104,-114)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x06F8A8..&gt;]):seq:0 stripe:0 0,0,-124,35,-76,-49,-2,17,32,-64)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x36F8A8..&gt;]):seq:0 stripe:3 0,3,-124,-85,-80,-20,19,118,-85,5)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x76F8A8..&gt;]):seq:0 stripe:7 0,7,-33,-45,-79,70,22,94,1,95)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0x96F8A8..&gt;]):seq:0 stripe:9 0,9,41,13,25,86,-83,-37,-71,56)
&lt;0x30C9E7..&gt; deliver(Stripe [StripeId &lt;0xA6F8A8..&gt;]):seq:0 stripe:10 0,10,112,115,6,47,-70,-13,-122,84)
&lt;0x17B578..&gt; deliver(Stripe [StripeId &lt;0x16F8A8..&gt;]):seq:0 stripe:1 0,1,124,51,60,-82,75,6,126,105)
&lt;0x17B578..&gt; deliver(Stripe [StripeId &lt;0x06F8A8..&gt;]):seq:0 stripe:0 0,0,-124,35,-76,-49,-2,17,32,-64)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x86F8A8..&gt;]):seq:0 stripe:8 0,8,-107,41,-117,-64,-14,46,-97,73)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x16F8A8..&gt;]):seq:0 stripe:1 0,1,124,51,60,-82,75,6,126,105)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0xA6F8A8..&gt;]):seq:0 stripe:10 0,10,112,115,6,47,-70,-13,-122,84)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x96F8A8..&gt;]):seq:0 stripe:9 0,9,41,13,25,86,-83,-37,-71,56)
&lt;0x8DA8E3..&gt; deliver(Stripe [StripeId &lt;0xF6F8A8..&gt;]):seq:0 stripe:15 0,15,-63,36,-79,-94,-127,-47,-2,85)
&lt;0x5C775C..&gt; deliver(Stripe [StripeId &lt;0x76F8A8..&gt;]):seq:0 stripe:7 0,7,-33,-45,-79,70,22,94,1,95)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0xC6F8A8..&gt;]):seq:0 stripe:12 0,12,99,-11,122,67,-64,-11,122,42)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0xE6F8A8..&gt;]):seq:0 stripe:14 0,14,-118,68,52,-125,91,117,98,94)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0xB6F8A8..&gt;]):seq:0 stripe:11 0,11,-104,13,-9,62,-58,74,-92,-40)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0x36F8A8..&gt;]):seq:0 stripe:3 0,3,-124,-85,-80,-20,19,118,-85,5)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0xD6F8A8..&gt;]):seq:0 stripe:13 0,13,45,53,97,-75,27,-78,28,101)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0x26F8A8..&gt;]):seq:0 stripe:2 0,2,104,44,50,113,26,65,104,-114)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0x16F8A8..&gt;]):seq:0 stripe:1 0,1,124,51,60,-82,75,6,126,105)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x66F8A8..&gt;]):seq:0 stripe:6 0,6,93,118,-108,-66,108,105,-13,110)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x56F8A8..&gt;]):seq:0 stripe:5 0,5,-26,123,-94,-8,2,-116,4,-109)
&lt;0x0B86E5..&gt; deliver(Stripe [StripeId &lt;0x46F8A8..&gt;]):seq:0 stripe:4 0,4,50,68,88,24,4,-50,106,-93)
&lt;0xD09F58..&gt; deliver(Stripe [StripeId &lt;0x06F8A8..&gt;]):seq:0 stripe:0 0,0,-124,35,-76,-49,-2,17,32,-64)
...
</span></pre>
Note that the order of delivery of the individual stripes is not preserved. We see that <code><b>Stripe 6</b></code> on node <code><b>0x5C775C</b></code> was delivered first, but that the data is identical to what was published.
<h3>Congratulations! You have built and run your first SplitStream application!</h3><br>
<hr/><div class="nav">
<span class="nav-left"><a href="tut_app_sockets.html#appsocket">Previous (Application Sockets)</a></span>
<span class="nav-center"><a href="index.html">Contents</a></span>
<span class="nav-right"><a href="tut_raw_serialization.html#raw">Next (Raw Serialization 1)</a></span>
</div><br/>
<div class="footer">
Pastry tutorial version @tutorial_version@. &nbsp;&nbsp;&nbsp; Last updated @tutorial_date@.
&nbsp;&nbsp;&nbsp; For FreePastry @freepastry_version@. &nbsp;&nbsp;&nbsp; Maintained by @maintainer@.
</div>
</div>
</body>
</html>