{"id":9601,"date":"2018-08-13T06:09:51","date_gmt":"2018-08-13T10:09:51","guid":{"rendered":"https:\/\/qxf2.com\/blog\/?p=9601"},"modified":"2018-09-02T09:43:42","modified_gmt":"2018-09-02T13:43:42","slug":"exploring-wallaroo-understanding-state-partitions-with-an-example","status":"publish","type":"post","link":"https:\/\/qxf2.com\/blog\/exploring-wallaroo-understanding-state-partitions-with-an-example\/","title":{"rendered":"Exploring Wallaroo: Understanding state partitions with an example"},"content":{"rendered":"<p>This post is aimed at <a href=\"https:\/\/www.wallaroolabs.com\/\">Wallaroo<\/a> users who are looking to develop an intuitive understanding of state partitioning. Wallaroo is a framework that makes it easy to handle streaming data and write event processing applications quickly. Wallaroo already has a really nice example of state partitioning called <code>alphabet_partitioned<\/code> but it is missing a final step illustrating how partitioning helps when there are multiple Wallaroo workers. In this post, we fill the gap. You will learn:<br \/>\na) how to run Wallaroo in multi-worker mode across multiple Docker containers<br \/>\nb) a simple testing trick to prove that state partitions help in parallelizing tasks<\/p>\n<hr>\n<h3>Overview<\/h3>\n<p>Wallaroo applications are built for processing millions of messages quickly. However, to make things illustrative, we are going to:<br \/>\na) control our input to have only 52 messages<br \/>\nb) introduce a sleep time in our computation to make the difference between the 1-worker and 2-worker experiments explicit<\/p>\n<p>We begin with a 1-worker setup and baseline the performance of the application. Then, when we move to a 2-worker setup, we can check if our latency numbers are (almost) halve since 2-workers are accessing the state in parallel. <\/p>\n<p>The rest of this guide is structured like this:<\/p>\n<ol>\n<li>Setup<\/li>\n<li>One worker experiment<\/li>\n<li>Two worker experiment<\/li>\n<li>An exercise<\/li>\n<\/ol>\n<hr>\n<h3>1. Setup<\/h3>\n<p>We will rely on Wallaroo\u2019s existing <a href=\"https:\/\/docs.wallaroolabs.com\/book\/python\/writing-your-own-partitioned-stateful-application.html\">alphabet_partitioned<\/a> application as a starting point to help us understand state partitioning better. To follow along, make sure you are setup with <a href=\"https:\/\/docs.wallaroolabs.com\/book\/getting-started\/docker-setup.html\">Wallaroo + Python<\/a> in Docker. The setup instructions are clear and easy to follow.<\/p>\n<p>Once you are done with the Docker setup, start the Docker container for worker 1 by running the command:<\/p>\n<pre lang=\"bash\">\r\ndocker run --rm -it --privileged -p 4000:4000 \\\r\n-v \/tmp\/wallaroo-docker\/wallaroo-src:\/src\/wallaroo \\\r\n-v \/tmp\/wallaroo-docker\/python-virtualenv:\/src\/python-virtualenv \\\r\n--name wally \\\r\nwallaroo-labs-docker-wallaroolabs.bintray.io\/release\/wallaroo:0.5.0\r\n<\/pre>\n<p>Your output should be similar to the image below:<br \/>\n<img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2018\/08\/Screen-Shot-2018-08-08-at-2.20.23-PM.png\" alt=\"Wallaroo: docker run\" width=\"816\" height=\"250\" class=\"aligncenter size-full wp-image-9568\" srcset=\"https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2018\/08\/Screen-Shot-2018-08-08-at-2.20.23-PM.png 816w, https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2018\/08\/Screen-Shot-2018-08-08-at-2.20.23-PM-300x92.png 300w, https:\/\/qxf2.com\/blog\/wp-content\/uploads\/2018\/08\/Screen-Shot-2018-08-08-at-2.20.23-PM-768x235.png 768w\" sizes=\"auto, (max-width: 816px) 100vw, 816px\" \/><\/p>\n<p>At the end of this step, you should have entered a Docker container called &#8216;wally&#8217;. Make sure to move to the <code>alphabet_partitioned<\/code> directory by <code>cd \/src\/wallaroo\/examples\/python\/alphabet_partitioned<\/code><\/p>\n<p>Now, figure out the IP address of the container by running <code>ifconfig<\/code>. For newbies, the IP address will usually be the &#8216;inet addr&#8217; listed for the interface <code>eth0<\/code>. Note down this IP address. For the rest of this post, we will refer to this IP address as <code>$IP_Initializer<\/code>.<\/p>\n<hr>\n<h3>2. One worker experiment <\/h3>\n<p>In this part of the experiment, you will have only one Docker container running Wallaroo. You will use this step to get a baseline for how long your application takes to process a given message set.<\/p>\n<p><strong>a. Create your own input file<\/strong><br \/>\nLet us handcraft a small input file and then use it to send a controlled set of messages. This will help us figure out when the experiment ends by simply looking at the output. <\/p>\n<p>The <code>alphabet_partitioned<\/code> application expects 9-byte messages:<br \/>\na) a 4-byte header that has just the length of the payload<br \/>\nb) a 1-byte alphabet<br \/>\nc) a 4-byte number that represents the number of votes for the alphabet<\/p>\n<p>To make things easy for us, we will design the file to have 52 messages. The first 26 messages will be an alphabet and the number 1. The second 26 messages will be an alphabet and the number 2.<\/p>\n<p>Save the following code as (say) <code>generate_binary_votes.py<\/code> in your \/src\/wallaroo\/examples\/python\/alphabet_partitioned directory.<\/p>\n<pre lang=\"python\">\r\n\"\"\"\r\nA scratch script to produce a file similar to votes.msg for Wallaroo's alphabet_partitioned application\r\n\r\nDESIGN:\r\n1. Open a file fake_votes.msg in binary format\r\n2. Write 5 byte messages alphabet,vote \r\n3. Prepend a header with the number 5 (unsigned int) in it because all messages are 5 bytes\r\n4. Write this binary string to a file\r\n5. To make it easy to test for correctness, we will design the file to have only 52 messages \r\n6. The first 26 messages will be an alphabet and the number 1\r\n7. The second 26 messages will be an alphabet and the number 2\r\n\"\"\"\r\n\r\nimport string\r\nimport struct\r\n\r\ndef generate_alphabet_input(filename):\r\n    \"Generate a binary file with alphabets and votes\"\r\n    alphabets = list(string.ascii_lowercase)\r\n    with open(filename,'wb') as fp: \r\n        for i in range(1,3):\r\n            for alphabet in alphabets:\r\n                msg = struct.pack('>IsI',5,alphabet,i)\r\n                fp.write(msg)\r\n\r\n\r\n#----START OF SCRIPT\r\nif __name__=='__main__':\r\n    generate_alphabet_input('fake_votes.msg')\r\n<\/pre>\n<p>Now, run the python script to produce a new 468 byte (=52*9) binary input file called <code>fake_votes.msg<\/code>.<\/p>\n<pre lang=\"bash\">\r\npython generate_binary_votes.py\r\n<\/pre>\n<p>At the end of this step, you should have a <code>fake_votes.msg<\/code> file in your current directory.<\/p>\n<p><strong>b. Edit the application to add sleep time<\/strong><br \/>\nOpen <code>alphabet_partitioned.py<\/code> using vi. Then, add two lines:<\/p>\n<p>a) <code>import time<\/code> along with the other imports<br \/>\nb) <code>time.sleep(0.5)<\/code> at the end of the <code>update()<\/code> method for the class <code>TotalVotes<\/code><\/p>\n<p>By doing so, we are forcing the computation to consume ~0.5 seconds per message processed.<\/p>\n<p><strong>c. Start the receiver<\/strong><br \/>\nStart the receiver by executing:<\/p>\n<pre lang=\"bash\">\r\ndata_receiver --ponythreads=1 --ponynoblock   --listen $IP_Initializer:7002\r\n<\/pre>\n<p>Note: <code>$IP_Initializer<\/code> is the IP address you captured in the Setup section.<\/p>\n<p><strong>d. Start the application<\/strong><br \/>\nIn a new terminal prompt, enter the container<\/p>\n<pre lang=\"bash\">\r\ndocker exec -it wally env-setup\r\n<\/pre>\n<p>Then, in the prompt of the Docker container, go to the <code>alphabet_partitioned<\/code> application&#8217;s directory<\/p>\n<pre lang=\"bash\">\r\ncd wallaroo\/examples\/python\/alphabet_partitioned\/\r\n<\/pre>\n<p>Start the application by executing:<\/p>\n<pre lang=\"bash\">\r\nmachida --application-module alphabet_partitioned \\\r\n  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \\\r\n  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \\\r\n  --data $IP_Initializer:6001 \\\r\n  --external $IP_Initializer:5050 --cluster-initializer\\\r\n  --ponythreads=1 --ponynoblock\r\n<\/pre>\n<p><strong>e. Start the sender<\/strong><br \/>\nIn a new terminal prompt, enter the container<\/p>\n<pre lang=\"bash\">\r\ndocker exec -it wally env-setup\r\n<\/pre>\n<p>Then, in the prompt of the Docker container, go to the <code>alphabet_partitioned<\/code> application&#8217;s directory<\/p>\n<pre lang=\"bash\">\r\ncd wallaroo\/examples\/python\/alphabet_partitioned\/\r\n<\/pre>\n<p><strong>IMPORTANT:<\/strong> Start a stopwatch as soon as you start the sender. We are sending 52 messages and each one will take ~0.5 seconds (our sleep time) to process. So we expect that the whole experiment will take about 26 seconds to complete. Start the sender by executing:<\/p>\n<pre lang=\"bash\">\r\nsender --host $IP_Initializer:7010 \\\r\n  --file fake_votes.msg --batch-size 26 --interval 10_000_000 \\\r\n  --messages 52 --binary --msg-size 9 --repeat --ponythreads=1 \\\r\n  --ponynoblock --no-write \r\n<\/pre>\n<p><strong>f. Time the app<\/strong><br \/>\nStop your timer as soon as you see &#8216;z=>3&#8217; in the output. If things went well, this experiment took (approximately) 26 seconds.<\/p>\n<p><strong>g. Close the cluster<\/strong><br \/>\nIn the sender&#8217;s terminal prompt, execute <\/p>\n<pre lang=\"bash\">\r\ncluster_shutdown $IP_Initializer:5050\r\n<\/pre>\n<p>At this point, the application should shut down. You can shut down the receiver by (Ctrl+c)-ing out of it. Leave the docker container running and all the terminals open since we are going to be using them in the two worker experiment too.<\/p>\n<hr>\n<h3>3. Two worker experiment<\/h3>\n<p>We are going to introduce a second worker. Then, we give <code>alphabet_partitioned<\/code> application the exact same input stream as before. If state partitioning is working right, we should observe that the time it takes to process the same 52 messages is now almost halved.  <\/p>\n<p><strong>a. Start the docker container of worker 2<\/strong><br \/>\nName the second docker container &#8216;roo&#8217; (yeah, I know, imaginative!) and start it like this:<\/p>\n<pre lang=\"bash\">\r\ndocker run --rm -it --privileged -p 4001:4000 \\\r\n  -v \/tmp\/wallaroo-docker\/wallaroo-src:\/src\/wallaroo \\\r\n  -v \/tmp\/wallaroo-docker\/python-virtualenv:\/src\/python-virtualenv \\\r\n  --name roo \\\r\n  wallaroo-labs-docker-wallaroolabs.bintray.io\/release\/wallaroo:0.5.0\r\n<\/pre>\n<p>Make sure to move to the <code>alphabet_partitioned<\/code> directory by <code>cd \/src\/wallaroo\/examples\/python\/alphabet_partitioned<\/code> <\/p>\n<p>To be safe, examine the code in <code>alphabet_partitioned.py<\/code> and confirm that your sleep statements are still there.<\/p>\n<p><strong>b. Start the receiver on worker 1<\/strong><br \/>\nIn the receiver prompt of the first worker repeat the command<\/p>\n<pre lang=\"bash\">\r\ndata_receiver --ponythreads=1 --ponynoblock   --listen $IP_Initializer:7002\r\n<\/pre>\n<p><strong>c. Start the application on worker 1<\/strong><br \/>\nWorker 1 (&#8216;wally&#8217;) will be the cluster initializer. To start the application, run the following command on worker 1:<\/p>\n<pre lang=\"bash\">\r\nmachida --application-module alphabet_partitioned \\\r\n  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \\\r\n  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \\\r\n  --data $IP_Initializer:6001 --external $IP_Initializer:5050 \\\r\n  --cluster-initializer --worker-count 2\\\r\n  --ponythreads=1 --ponynoblock\r\n<\/pre>\n<p>Notice we have added a <code>worker-count<\/code> argument telling the application that it has two workers.<\/p>\n<p><strong>d. Start the application on worker 2<\/strong><br \/>\nUse the terminal prompt in step 3.a and run the following command on worker 2 (&#8216;roo&#8217;):<\/p>\n<pre lang=\"bash\">\r\nmachida --application-module alphabet_partitioned \\\r\n  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \\\r\n  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \\\r\n  --ponythreads=1 --ponynoblock --name roo\r\n<\/pre>\n<p>At the end of this step, you should see the output of the terminal claiming that the application is ready to work.<\/p>\n<p><strong>e. Start the sender on worker 1<\/strong><br \/>\nThis is very similar to step 2.e. Run the following command on worker 1.<\/p>\n<p><strong>IMPORTANT:<\/strong> Start a stopwatch as soon as you start the sender. We are sending 52 messages and each one will take ~0.5 seconds (our sleep time) to process. But 2 messages are going to be processed in parallel. So we expect that the whole experiment will take about 13 seconds (=52*0.5\/2) to complete.<\/p>\n<p>In the sender&#8217;s terminal prompt, execute:<\/p>\n<pre lang=\"bash\">\r\nsender --host $IP_Initializer:7010 \\\r\n  --file fake_votes.msg --batch-size 26 --interval 10_000_000 \\\r\n  --messages 52 --binary --msg-size 9 --repeat --ponythreads=1 \\\r\n  --ponynoblock --no-write \r\n<\/pre>\n<p><strong>f. Time the app<\/strong><br \/>\nStop your timer as soon as you see &#8216;z=>3&#8217; in the output of the receiver. If things went well, this experiment took (approximately) 13 seconds.<\/p>\n<p><strong>g. Close the cluster<\/strong><br \/>\nIn the sender&#8217;s terminal prompt, execute <\/p>\n<pre lang=\"bash\">\r\ncluster_shutdown $IP_Initializer:5050\r\n<\/pre>\n<p>At this point, the application should shut down. You can shut down the receiver by (Ctrl+c)-ing out of it. <\/p>\n<hr>\n<h3>4. An exercise<\/h3>\n<p>To further solidify your understanding through experimentation, try the same experiment by creating an input file with just a1 in binary format repeating 52 times. What do you expect? Is state-partitioning going to help reduce the total time taken to process the messages?<\/p>\n<hr>\n<h3>References<\/h3>\n<p>1. <a href=\"https:\/\/docs.wallaroolabs.com\/book\/core-concepts\/partitioning.html\">Wallaroo Partitioning<\/a><br \/>\n2. <a href=\"https:\/\/dzone.com\/articles\/stateful-multi-stream-processing-in-python-with-wa\">Stateful multi-steam processing in Python with Wallaroo<\/a><\/p>\n<hr>\n","protected":false},"excerpt":{"rendered":"<p>This post is aimed at Wallaroo users who are looking to develop an intuitive understanding of state partitioning. Wallaroo is a framework that makes it easy to handle streaming data and write event processing applications quickly. Wallaroo already has a really nice example of state partitioning called alphabet_partitioned but it is missing a final step illustrating how partitioning helps when [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[154],"tags":[],"class_list":["post-9601","post","type-post","status-publish","format-standard","hentry","category-wallaroo"],"_links":{"self":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/9601","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/comments?post=9601"}],"version-history":[{"count":22,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/9601\/revisions"}],"predecessor-version":[{"id":9755,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/posts\/9601\/revisions\/9755"}],"wp:attachment":[{"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/media?parent=9601"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/categories?post=9601"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/qxf2.com\/blog\/wp-json\/wp\/v2\/tags?post=9601"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}