Exploring Wallaroo: Understanding state partitions with an example

This post is aimed at Wallaroo users who are looking to develop an intuitive understanding of state partitioning. Wallaroo is a framework that makes it easy to handle streaming data and write event processing applications quickly. Wallaroo already has a really nice example of state partitioning called alphabet_partitioned but it is missing a final step illustrating how partitioning helps when there are multiple Wallaroo workers. In this post, we fill the gap. You will learn:
a) how to run Wallaroo in multi-worker mode across multiple Docker containers
b) a simple testing trick to prove that state partitions help in parallelizing tasks


Overview

Wallaroo applications are built for processing millions of messages quickly. However, to make things illustrative, we are going to:
a) control our input to have only 52 messages
b) introduce a sleep time in our computation to make the difference between the 1-worker and 2-worker experiments explicit

We begin with a 1-worker setup and baseline the performance of the application. Then, when we move to a 2-worker setup, we can check if our latency numbers are (almost) halve since 2-workers are accessing the state in parallel.

The rest of this guide is structured like this:

  1. Setup
  2. One worker experiment
  3. Two worker experiment
  4. An exercise

1. Setup

We will rely on Wallaroo’s existing alphabet_partitioned application as a starting point to help us understand state partitioning better. To follow along, make sure you are setup with Wallaroo + Python in Docker. The setup instructions are clear and easy to follow.

Once you are done with the Docker setup, start the Docker container for worker 1 by running the command:

docker run --rm -it --privileged -p 4000:4000 \
-v /tmp/wallaroo-docker/wallaroo-src:/src/wallaroo \
-v /tmp/wallaroo-docker/python-virtualenv:/src/python-virtualenv \
--name wally \
wallaroo-labs-docker-wallaroolabs.bintray.io/release/wallaroo:0.5.0

Your output should be similar to the image below:
Wallaroo: docker run

At the end of this step, you should have entered a Docker container called ‘wally’. Make sure to move to the alphabet_partitioned directory by cd /src/wallaroo/examples/python/alphabet_partitioned

Now, figure out the IP address of the container by running ifconfig. For newbies, the IP address will usually be the ‘inet addr’ listed for the interface eth0. Note down this IP address. For the rest of this post, we will refer to this IP address as $IP_Initializer.


2. One worker experiment

In this part of the experiment, you will have only one Docker container running Wallaroo. You will use this step to get a baseline for how long your application takes to process a given message set.

a. Create your own input file
Let us handcraft a small input file and then use it to send a controlled set of messages. This will help us figure out when the experiment ends by simply looking at the output.

The alphabet_partitioned application expects 9-byte messages:
a) a 4-byte header that has just the length of the payload
b) a 1-byte alphabet
c) a 4-byte number that represents the number of votes for the alphabet

To make things easy for us, we will design the file to have 52 messages. The first 26 messages will be an alphabet and the number 1. The second 26 messages will be an alphabet and the number 2.

Save the following code as (say) generate_binary_votes.py in your /src/wallaroo/examples/python/alphabet_partitioned directory.

"""
A scratch script to produce a file similar to votes.msg for Wallaroo's alphabet_partitioned application
 
DESIGN:
1. Open a file fake_votes.msg in binary format
2. Write 5 byte messages alphabet,vote 
3. Prepend a header with the number 5 (unsigned int) in it because all messages are 5 bytes
4. Write this binary string to a file
5. To make it easy to test for correctness, we will design the file to have only 52 messages 
6. The first 26 messages will be an alphabet and the number 1
7. The second 26 messages will be an alphabet and the number 2
"""
 
import string
import struct
 
def generate_alphabet_input(filename):
    "Generate a binary file with alphabets and votes"
    alphabets = list(string.ascii_lowercase)
    with open(filename,'wb') as fp: 
        for i in range(1,3):
            for alphabet in alphabets:
                msg = struct.pack('>IsI',5,alphabet,i)
                fp.write(msg)
 
 
#----START OF SCRIPT
if __name__=='__main__':
    generate_alphabet_input('fake_votes.msg')

Now, run the python script to produce a new 468 byte (=52*9) binary input file called fake_votes.msg.

python generate_binary_votes.py

At the end of this step, you should have a fake_votes.msg file in your current directory.

b. Edit the application to add sleep time
Open alphabet_partitioned.py using vi. Then, add two lines:

a) import time along with the other imports
b) time.sleep(0.5) at the end of the update() method for the class TotalVotes

By doing so, we are forcing the computation to consume ~0.5 seconds per message processed.

c. Start the receiver
Start the receiver by executing:

data_receiver --ponythreads=1 --ponynoblock   --listen $IP_Initializer:7002

Note: $IP_Initializer is the IP address you captured in the Setup section.

d. Start the application
In a new terminal prompt, enter the container

docker exec -it wally env-setup

Then, in the prompt of the Docker container, go to the alphabet_partitioned application’s directory

cd wallaroo/examples/python/alphabet_partitioned/

Start the application by executing:

machida --application-module alphabet_partitioned \
  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \
  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \
  --data $IP_Initializer:6001 \
  --external $IP_Initializer:5050 --cluster-initializer\
  --ponythreads=1 --ponynoblock

e. Start the sender
In a new terminal prompt, enter the container

docker exec -it wally env-setup

Then, in the prompt of the Docker container, go to the alphabet_partitioned application’s directory

cd wallaroo/examples/python/alphabet_partitioned/

IMPORTANT: Start a stopwatch as soon as you start the sender. We are sending 52 messages and each one will take ~0.5 seconds (our sleep time) to process. So we expect that the whole experiment will take about 26 seconds to complete. Start the sender by executing:

sender --host $IP_Initializer:7010 \
  --file fake_votes.msg --batch-size 26 --interval 10_000_000 \
  --messages 52 --binary --msg-size 9 --repeat --ponythreads=1 \
  --ponynoblock --no-write

f. Time the app
Stop your timer as soon as you see ‘z=>3’ in the output. If things went well, this experiment took (approximately) 26 seconds.

g. Close the cluster
In the sender’s terminal prompt, execute

cluster_shutdown $IP_Initializer:5050

At this point, the application should shut down. You can shut down the receiver by (Ctrl+c)-ing out of it. Leave the docker container running and all the terminals open since we are going to be using them in the two worker experiment too.


3. Two worker experiment

We are going to introduce a second worker. Then, we give alphabet_partitioned application the exact same input stream as before. If state partitioning is working right, we should observe that the time it takes to process the same 52 messages is now almost halved.

a. Start the docker container of worker 2
Name the second docker container ‘roo’ (yeah, I know, imaginative!) and start it like this:

docker run --rm -it --privileged -p 4001:4000 \
  -v /tmp/wallaroo-docker/wallaroo-src:/src/wallaroo \
  -v /tmp/wallaroo-docker/python-virtualenv:/src/python-virtualenv \
  --name roo \
  wallaroo-labs-docker-wallaroolabs.bintray.io/release/wallaroo:0.5.0

Make sure to move to the alphabet_partitioned directory by cd /src/wallaroo/examples/python/alphabet_partitioned

To be safe, examine the code in alphabet_partitioned.py and confirm that your sleep statements are still there.

b. Start the receiver on worker 1
In the receiver prompt of the first worker repeat the command

data_receiver --ponythreads=1 --ponynoblock   --listen $IP_Initializer:7002

c. Start the application on worker 1
Worker 1 (‘wally’) will be the cluster initializer. To start the application, run the following command on worker 1:

machida --application-module alphabet_partitioned \
  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \
  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \
  --data $IP_Initializer:6001 --external $IP_Initializer:5050 \
  --cluster-initializer --worker-count 2\
  --ponythreads=1 --ponynoblock

Notice we have added a worker-count argument telling the application that it has two workers.

d. Start the application on worker 2
Use the terminal prompt in step 3.a and run the following command on worker 2 (‘roo’):

machida --application-module alphabet_partitioned \
  --in $IP_Initializer:7010 --out $IP_Initializer:7002 \
  --metrics $IP_Initializer:5001 --control $IP_Initializer:6000 \
  --ponythreads=1 --ponynoblock --name roo

At the end of this step, you should see the output of the terminal claiming that the application is ready to work.

e. Start the sender on worker 1
This is very similar to step 2.e. Run the following command on worker 1.

IMPORTANT: Start a stopwatch as soon as you start the sender. We are sending 52 messages and each one will take ~0.5 seconds (our sleep time) to process. But 2 messages are going to be processed in parallel. So we expect that the whole experiment will take about 13 seconds (=52*0.5/2) to complete.

In the sender’s terminal prompt, execute:

sender --host $IP_Initializer:7010 \
  --file fake_votes.msg --batch-size 26 --interval 10_000_000 \
  --messages 52 --binary --msg-size 9 --repeat --ponythreads=1 \
  --ponynoblock --no-write

f. Time the app
Stop your timer as soon as you see ‘z=>3’ in the output of the receiver. If things went well, this experiment took (approximately) 13 seconds.

g. Close the cluster
In the sender’s terminal prompt, execute

cluster_shutdown $IP_Initializer:5050

At this point, the application should shut down. You can shut down the receiver by (Ctrl+c)-ing out of it.


4. An exercise

To further solidify your understanding through experimentation, try the same experiment by creating an input file with just a1 in binary format repeating 52 times. What do you expect? Is state-partitioning going to help reduce the total time taken to process the messages?


References

1. Wallaroo Partitioning
2. Stateful multi-steam processing in Python with Wallaroo


Leave a Reply

Your email address will not be published. Required fields are marked *