您的位置:首页 > 其它

Pandarize Your Spark Dataframes

2015-12-03 15:16 489 查看
In the last blog post I gave you an overview of our Data
Science stack based on Python. This time let’s focus on one important component: DataFrames.

DataFrames are a great abstraction for working with structured and semi-structured data. They are basically a collection of rows, organized into named columns. Think of relational
database tables: DataFrames are very similar and allow you to do similar operations on them:

slice data: select subset of rows or columns based on conditions (filters)
sort data by one or more columns
aggregate data and compute summary statistics
join multiple DataFrames

What makes them much more powerful than SQL is the fact that this nice, SQL-like API is actually exposed in a full-fledged programming language. Which means we can mix declarative SQL-like operations with arbitrary code written in a general-purpose programming
language.

DataFrames were popularized by R and then adopted by other languages and frameworks. For Python we have pandas, a great data analysis library, where DataFrame is one of the key abstractions.


PANDAS LIMITATIONS AND SPARK DATAFRAMES

Pandas won’t work in every case. It is a single machine tool, so we’re constrained by single machine limits. Moreover, pandas doesn’t have any parallelism built in, which means it uses only one CPU core.

It’s likely that you’ll hit a wall even on medium datasets (tens of gigabytes). And If you want to work with even bigger datasets, pandas won’t help you.

Fortunately, a few months ago Spark community released a new version of Spark with DataFrames support. They have a very similar API, but are designed
from the ground-up to support big data.

There is a lot of cool engineering behind Spark DataFrames such as code generation, manual memory management and Catalyst optimizer. But lets forget about them and focus on usability.

How hard it is to use Spark DataFrames if you know pandas?
Are there any quirks?
Is any important functionality missing?

We’ll compare pandas and Spark Dataframes on a few examples, and then fix some of the pains we’ve noticed.

Let’s get started!

Note: This blog-post is based on Spark 1.3.1 and pandas 0.16.1


COMPARISON: THE GOOD PART


READING THE DATA

Pandas:
>>> data = pd.read_csv("/data/adult-dataset-data.csv")
>>> data.head(1)
Out:
age  final_weight   workclass  education_num
0   39         77516   State-gov             13


Spark:
>>> data = sqlc.load("s3://data/adult-dataset-data.csv", "com.databricks.spark.csv")

>>> data.take(1)
Out:
[Row(age=45.0, final_weight=160962.0, workclass=u' Self-emp-not-inc', education_num=10.0)]


Both pandas and Spark DataFrames can easily read multiple formats including CSV, JSON, and some binary formats (some of them require additional libraries)

Note that Spark DataFrame doesn’t have an index. It doesn’t enumerate rows (which is a default index in pandas). In pandas the index is just a special column, so if we really need it, we should choose one of the columns of Spark DataFrame as ‘index’.


SLICING

Pandas:
>>> sliced = data[data.workclass.isin([' Local-gov', ' State-gov']) \
& (data.education_num > 1)][['age', 'workclass']]

>>> sliced.head(1)
Out:
age   workclass
0   39   State-gov


Spark:
>>> slicedSpark = dataSpark[dataSpark.workclass.inSet([' Local-gov', ' State-gov'])
& (dataSpark.education_num > 1)][['age', 'workclass']]

>>> slicedSpark.take(1)
Out:
[Row(age=48.0, workclass=u' State-gov')]


As you can see, they are very similar if you work on a single DataFrame.

There is one important difference. In pandas, boolean slicing expects just a boolean series, which means you can apply filter from another DataFrame if they match in length. In Spark you can only filter data based on columns from DataFrame you want to filter.


AGGREGATIONS

Simple value counts:

Pandas:
>>> data.groupby('workclass').workclass.count()
# or shortcut
>>> data.workclass.value_counts()
Out:
workclass
?                    1836
Federal-gov           960
...


Spark:
>>> dataSpark.groupBy('workclass').count().collect()
Out:
[Row(workclass=u' Self-emp-not-inc', count=2541),
Row(workclass=u' Federal-gov', count=960),
Row(workclass=u' ?', count=1836),
...]


Multiple aggregations:

Pandas:
>>> data.groupby('workclass').agg({'final_weight': 'sum', 'age': 'mean'})
Out:
age  final_weight
workclass
?                 40.960240     346115997
Federal-gov       42.590625     177812394
Local-gov         41.751075     394822919
...


Spark:
>>> dataSpark.groupBy('workclass').agg({'capital_gain': 'sum', 'education_num': 'mean'}).collect()

Out:
[Row(workclass=u' Self-emp-not-inc', AVG(age)=44.96969696969697, SUM(final_weight)=446221558.0),
Row(workclass=u' Federal-gov', AVG(age)=42.590625, SUM(final_weight)=177812394.0),
...]


Again syntax is very similar and you can easily calculate standard aggregations. Unfortunately currently Spark DataFrames don’t support custom aggregation functions, so you can use only several built-ins. It’s still
possible to aggregate data in a custom way (using Hive UDAF or transitioning to raw RDD), but it’s less convenient and less performant.


MAPPING

Let’s define a custom function:
def f(workclass, final_weight):
if "gov" in workclass.lower():
return final_weight * 2.0
else:
return final_weight


Pandas:
>>> pandasF = lambda row: f(row.workclass, row.final_weight)

>>> data.apply(pandasF, axis=1)
Out:
0        155032
1         83311
...


Spark:
>>> sparkF = pyspark.sql.functions.udf(f,  pyspark.sql.types.IntegerType())

>>> dataSpark.select(
sparkF(dataSpark.workclass, dataSpark.final_weight).alias('result')
).collect()
Out:
[Row(result=155032.0),
Row(result=83311.0),
...]


As you can see, applying a custom function on one or more columns is very easy in both cases, we were even able to reuse the same function, just wrapped up differently for pandas and Spark. A slight inconvenience for
Spark UDFs is that they require us to specify function return type upfront.


COMPARISON: THE UGLY PART

So far so good, Spark and pandas are very similar and equally easy to use. Spark DataFrames, although much simpler to use than any other Big Data tool, are still a young element of Spark ecosystem and there are somerough
edges. Let’s see a few examples.


JOINS

Let’s define two very simple DataFrames for the join example.
>>> pandasA = pd.DataFrame([
[1, "te", 1],
[2, "pandas", 4]],
columns=['colX', 'colY', 'colW'])

>>> pandasB = pd.DataFrame([
[3.0, "st", 2],
[4.0, "spark", 3]],
columns=['colY', 'colZ', 'colX'])

>>> sparkA = sqlc.createDataFrame(pandasA)
>>> sparkB = sqlc.createDataFrame(pandasB)


Pandas has super-easy join syntax with its merge method:

Pandas:
>>> pandasA.merge(pandasB, on='colX', suffixes=('_A', '_B'), how='left')
Out:
colX  colY_A  colW  colY_B colZ
0     1      te     1     NaN  NaN
1     2  pandas     4       3   st


Pandas offers a few useful features:

you specify join key instead of equality condition
it automatically adds suffixes to common columns
it keeps only one copy of the join key

Now let’s see how it looks in Spark.

Spark:
>>> joined = sparkA.join(sparkB, sparkA.colX == sparkB.colX, 'left_outer')
>>> joined.toPandas()
Out:
colX    colY  colW  colY  colZ  colX
0     1      te     1   NaN  None   NaN
1     2  pandas     4     3    st     2


As you can see it requires the whole equality condition, keeps 2 copies of the join key and doesn’t add suffixes. It’s problematic, because now we can’t use
df.col
notation
to select the columns. It’s even more confusing if you use collect instead of toPandas, because it seems that the second column with the same name overrides the first one (it doesn’t really do that, but it’s very confusing).

Spark:
>>> joined.collect()
Out:
[Row(colX=None, colY=None, colW=1, colY=None, colZ=None, colX=None),
Row(colX=2, colY=3.0, colW=4, colY=3.0, colZ=u'st', colX=2)]


In order to get the same effect as in pandas, we need to do something like this:

Spark:
>>> sparkARenamed = sparkA \
.withColumnRenamed('colY', 'colY_A')
>>> sparkBRenamed = sparkB \
.withColumnRenamed('colX', 'colX_B') \
.withColumnRenamed('colY', 'colY_B')

>>> sparkARenamed.join(sparkBRenamed, sparkARenamed.colX == sparkBRenamed.colX_B, 'left_outer') \
.select('colX', 'colY_A', 'colW', 'colY_B', 'colZ') \
.toPandas()
Out:

colX  colY_A  colW  colY_B  colZ
0     1      te     1     NaN  None
1     2  pandas     4       3    st


Ugh, so ugly!


UNIONS

Now let’s try to concat / union DataFrames:

Pandas:
>>> pd.concat([pandasA, pandasB])
Out:
colW  colX    colY   colZ
0     1     1      te    NaN
1     4     2  pandas    NaN
0   NaN     2       3     st
1   NaN     3       4  spark


It looks reasonably. Pandas matched columns from both DataFrames, and filled missing values with empty values (NaNs).

Spark:
>>> sparkA.unionAll(sparkB).collect()
Out:
[Row(colX=1.0, colY=u'te', colW=1),
Row(colX=2.0, colY=u'pandas', colW=4),
Row(colX=3.0, colY=u'st', colW=2),
Row(colX=4.0, colY=u'spark', colW=3)]


Well, clearly something is wrong here. Spark union is very limited. It only works if DataFrames have exactly the same schema (including the order of columns), but it doesn’t throw any error, so it’s very easy to fall
in this trap.


FIXING THE UGLY PART

As you can see, there are some ugly parts, but can we do something about it?

Well, it looks like it’s just the matter of input data, so let’s write a wrapper function which will simulatepandas-like concat on Spark DataFrames.

Spark:
def addEmptyColumns(df, colNames):
exprs = df.columns + ["null as " + colName for colName in colNames]
return df.selectExpr(*exprs)

def concatTwoDfs(left, right):
# append columns from right df to left df
missingColumnsLeft = set(right.columns) - set(left.columns)
left = addEmptyColumns(left, missingColumnsLeft)

# append columns from left df to right df
missingColumnsRight = set(left.columns) - set(right.columns)
right = addEmptyColumns(right, missingColumnsRight)

# let's set the same order of columns
right = right[left.columns]

# finally, union them
return left.unionAll(right)

def concat(dfs):
return reduce(concatTwoDfs, dfs)


Now we can concat those DataFrames in pandas style:

Spark:
>>> concat([sparkA, sparkB]).collect()
Out:
[Row(colX=1, colY=u'te', colW=1, colZ=None),
Row(colX=2, colY=u'pandas', colW=4, colZ=None),
Row(colX=2, colY=u'3.0', colW=None, colZ=u'st'),
Row(colX=3, colY=u'4.0', colW=None, colZ=u'spark')]


You can write a pandas-like merge as an exercise.


SUMMARY

I only covered small subset of pandas & Spark DFs functionalities, but I hope you get the impression.

Spark DataFrames in their current state are already powerful and easy to use. However, there are some pain points. They still need to catch up to usability of pandas – in some
cases the API is pretty raw and makes unintuitive assumptions. Additionally, some important pieces of functionality like custom aggregations are missing. Fortunately these are already on the Spark roadmap, so the future releases will likely improve the situation.

On the other hand, it’s very easy to extend basic functionality with custom wrappers. We started using Spark several months ago and we’ve been able to fix almost all of the pain points we’ve encountered.

If you use pandas and want to work on a bigger datasets, go for PySpark and DataFrames!

Cover photo by Elizabeth
Haslam licensed with Attribution-NonCommercial 2.0 Generic License

原文:https://lab.getbase.com/pandarize-spark-dataframes/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: