您的位置:首页 > 运维架构

Understanding the Parallelism of a Storm Topology

2015-03-25 00:00 561 查看

Understanding the Parallelism of a Storm Topology

Oct 16th, 2012

Table of Contents

What is Storm?

What makes a running topology: worker processes, executors and tasks

Configuring the parallelism of a topology

Configuring parallelism on multi-tenant Storm clusters

Example of a running topology

How to change the parallelism of a running topology

References for this article

Summary

In the past few days I have been test-driving Twitter’s Storm project, which is adistributed real-time data processing platform. One of my findings so far has been that the quality of Storm’sdocumentation and example code is pretty good – it is very easy to get up and running with Storm. Big props to theStorm developers! At the same time, I found the sections on how a Storm topology runs in a cluster not perfectlyclear, and learned that the recent releases of Storm changed some of its behavior in a way that is not yet fullyreflected in the Storm wiki and in the API docs.

In this article I want to share my own understanding of the parallelism of a Storm topology after reading thedocumentation and writing some first prototype code. More specifically, I describe the relationships of workerprocesses, executors (threads) and tasks, and how you can configure them according to your needs. This article isbased on Storm release 0.8.1, the latest version as of October 2012.

Update 2012-11-05: This blog post has been merged into
Storm’s documentation.

What is Storm?

For those readers unfamiliar with Storm here is a brief description taken from its homepage:

Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.

What makes a running topology: worker processes, executors and tasks

Storm distinguishes between the following three main entities that are used to actually run a topology in a Stormcluster:

Worker processes

Executors (threads)

Tasks

Here is a simple illustration of their relationships:



Figure 1: The relationships of worker processes, executors (threads) and tasks in Storm

A worker process executes a subset of a topology, and runs in its own JVM. A worker process belongs to a specifictopology and may run one or more executors for one or more components (spouts or bolts) of this topology. A runningtopology consists of many such processes running on many machines within a Storm cluster.

An executor is a thread that is spawned by a worker process and runs within the worker’s JVM. An executor may run oneor more tasks for the same component (spout or bolt). An executor always has one thread that it uses for all of itstasks, which means that tasks run serially on an executor.

A task performs the actual data processing and is run within its parent executor’s thread of execution. Each spout orbolt that you implement in your code executes as many tasks across the cluster. The number of tasks for a component isalways the same throughout the lifetime of a topology, but the number of executors (threads) for a component can changeover time. This means that the following condition holds true:
#threads <= #tasks
. By default, the number of tasksis set to be the same as the number of executors, i.e. Storm will run one task per thread (which is usually what youwant anyways).

Also be aware that:

The number of executor threads can be changed after the topology has been started (see
storm rebalance
commandbelow).

The number of tasks of a topology is static.

SeeUnderstanding the Internal Message Buffers of Stormfor another view on the various threads that are running within the lifetime of a worker process and itsassociated executors and tasks.

Configuring the parallelism of a topology

Note that in Storm’s terminology “parallelism” is specifically used to describe the so-called parallelism hint, whichmeans the initial number of executors (threads) of a component. In this article though I use the term “parallelism”in a more general sense to describe how you can configure not only the number of executors but also the number of workerprocesses and the number of tasks of a Storm topology. I will specifically call out when “parallelism” is used in thenarrow definition of Storm.

The following table gives an overview of the various configuration options and how to set them in your code. There ismore than one way of setting these options though, and the table lists only some of them. Storm currently has thefollowing order of precedence for configuration settings:
defaults.yaml
>
storm.yaml
> topology-specific configuration > internal component-specific configuration >external component-specific configuration. Please take a look at the Storm documentation for more details.

WhatDescriptionConfiguration optionHow to set in your code (examples)
#worker processesHow many worker processes to create for the topology across machines in the cluster.TOPOLOGY_WORKERSConfig#setNumWorkers
#executors (threads)How many executors to spawn per component.?TopologyBuilder#setSpout() and TopologyBuilder#setBolt()

Note that as of Storm 0.8 the parallelism_hint parameter now specifies the initial number of executors (not tasks!) for that bolt.
#tasksHow many tasks to create per component.TOPOLOGY_TASKSComponentConfigurationDeclarer#setNumTasks()
Here is an example code snippet to show these settings in practice:

Configuring the parallelism of a Storm bolt

1
2
3
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)  .setNumTasks(4)  .shuffleGrouping("blue-spout");
In the above code we configured Storm to run the bolt
GreenBolt
with an initial number of two executors and fourassociated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number oftasks, Storm will run by default one task per executor.

Configuring parallelism on multi-tenant Storm clusters

Storm 0.8.2 introduced the Isolation Schedulerthat makes it easy and safe to share a cluster among many topologies, i.e. it solves the multi-tenancy problem— avoiding resource contention between topologies — by providing full isolation between topologies.

When you use the isolation scheduler Nathan recommends you set num workers to a multiple of number of machines. And parallelism hint to a multiple of the number of workers. If you do call setNumTasks() (which most people don’t), you should set that to a multiple of the parallelism hint. If you do this, then what happens is your workload is uniform distributed. Each machine and jvm process will have the same number of threads, and roughly the same amount of work.

Jason Jackson on storm-user
grokbase.com/t/gg/storm-user/…

Example of a running topology

The following illustration shows how a simple topology would look like in operation. The topology consists of threecomponents: one spout called
BlueSpout
and two bolts called
GreenBolt
and
YellowBolt
. The components arelinked such that
BlueSpout
sends its output to
GreenBolt
, which in turns sends its own output to
YellowBolt
.



Figure 2: Example of a running topology

The
GreenBolt
was configured as per the code snippet above whereas
BlueSpout
and
YellowBolt
only set the parallelism hint(number of executors). Here is the relevant code:

Configuring the parallelism of a simple Storm topology

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Config conf = new Config(); conf.setNumWorkers(2); // use two worker processes  topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint  topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)  .setNumTasks(4)  .shuffleGrouping("blue-spout"); topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)  .shuffleGrouping("green-bolt");  StormSubmitter.submitTopology(  "mytopology",  conf,  topologyBuilder.createTopology()  ); 
And of course Storm comes with additional configuration settings to control the parallelism of a topology, including:

TOPOLOGY_MAX_TASK_PARALLELISM:This setting puts a ceiling on the number of executors that can be spawned for a single component. It is typicallyused during testing to limit the number of threads spawned when running a topology in local mode. You can set thisoption via e.g. Config#setMaxTaskParallelism().

Update Oct 18: Nathan Marz informed me that
TOPOLOGY_OPTIMIZE
will be removed in a future release. I havetherefore removed its entry from the configuration list above.

How to change the parallelism of a running topology

A nifty feature of Storm is that you can increase or decrease the number of worker processes and/or executors withoutbeing required to restart the cluster or the topology. The act of doing so is called rebalancing.

You have two options to rebalance a topology:

Use the Storm web UI to rebalance the topology.

Use the CLI tool storm rebalance as described below.

Here is an example of using the CLI tool:

1
2
3
4
5
# Reconfigure the topology "mytopology" to use 5 worker processes,  # the spout "blue-spout" to use 3 executors and  # the bolt "yellow-bolt" to use 10 executors.   $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

References for this article

To compile this article (and to write my related test code) I used information primarily from the following sources:

The Storm wiki, most notably the pagesConcepts,Running topologies on a production cluster,Local mode,Tutorial

Storm 0.8.1 API documentation, most notably the classConfig

The announcement of Storm 0.8.0 releaseon the storm-user mailing list.

Summary

My personal impression is that Storm is a very promising tool. On the one hand I like its clean and elegant design,and on the other hand I loved to find out that a young open source tool can still have an excellent documentation.In this article I tried to summarize my own understanding of the parallelism of topologies, which may or may not be100% correct – feel free to let me know if there are any mistakes in the description above!

Interested in more? You can
subscribe to this blog, or
follow me on Twitter.

Posted by Michael G. Noll Oct 16
th, 2012
Filed under Programming, Real-time, Storm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm topology 并发度