python︱大规模数据存储与读取、并行计算:Dask库简述
2017-09-20 19:22
866 查看
数据结构与pandas非常相似,比较容易理解。
原文文档:http://dask.pydata.org/en/latest/index.html
github:https://github.com/dask
dask的内容很多,挑一些我比较看好的内容着重点一下。
.
公众号“素质云笔记”定期更新博客内容:
![](http://img.blog.csdn.net/20180226155348545?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
![](http://img.blog.csdn.net/20170920190546342?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
非常相似,除了.compute()
.
左是Pandas,右边是dask
.
读取大规模json文件,几亿都很easy
读取txt
变为dataframe格式的内容
.
.
.
再来看看用delay加速的:
![](http://dask.pydata.org/en/latest/_images/inc-add.svg)
还可以将计算流程可视化:
![](http://dask.pydata.org/en/latest/_images/delayed-inc-double-add.svg)
.
tensorflow深度学习库:Dask-Tensorflow
以XGBoost为例,官方:https://github.com/dask/dask-xgboost
来看一个案例code
.
![](http://img.blog.csdn.net/20170920191251673?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
.
训练
.
.
![](http://img.blog.csdn.net/20170920191556943?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
.
![](http://img.blog.csdn.net/20170920191741292?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
![](http://img.blog.csdn.net/20170920191809969?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc2luYXRfMjY5MTczODM=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
来一个二维模块的:
原文文档:http://dask.pydata.org/en/latest/index.html
github:https://github.com/dask
dask的内容很多,挑一些我比较看好的内容着重点一下。
.
公众号“素质云笔记”定期更新博客内容:
一、数据读取与存储
先来看看dask能读入哪些内容:1、csv
dask并不能读入excel,这个注意# pandas import pandas as pd df = pd.read_csv('2015-01-01.csv') df.groupby(df.user_id).value.mean() #dask import dask.dataframe as dd df = dd.read_csv('2015-*-*.csv') df.groupby(df.user_id).value.mean().compute()
非常相似,除了.compute()
.
2、Dask Array读取hdf5
import numpy as np import dask.array as da f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5') x = np.array(f['/small-data']) x = da.from_array(f['/big-data'], chunks=(1000, 1000)) x - x.mean(axis=1) x - x.mean(axis=1).compute()
左是Pandas,右边是dask
.
3、Dask Bag
import dask.bag as db b = db.read_text('2015-*-*.json.gz').map(json.loads) b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
读取大规模json文件,几亿都很easy
>>> b = db.read_text('myfile.txt') >>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...]) >>> b = db.read_text('myfile.*.txt')
读取txt
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
变为dataframe格式的内容
.
4、Dask Delayed 并行计算
from dask import delayed L = [] for fn in filenames: # Use for loops to build up computation data = delayed(load)(fn) # Delay execution of function L.append(delayed(process)(data)) # Build connections between variables result = delayed(summarize)(L) result.compute()
.
5、concurrent.futures自定义任务
from dask.distributed import Client client = Client('scheduler:port') futures = [] for fn in filenames: future = client.submit(load, fn) futures.append(future) summary = client.submit(summarize, futures) summary.result()
.
二、Delayed 并行计算模块
一个先行例子,本来的案例:def inc(x): return x + 1 def double(x): return x + 2 def add(x, y): return x + y data = [1, 2, 3, 4, 5] output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = sum(output)
再来看看用delay加速的:
from dask import delayed output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c) total = delayed(sum)(output)
还可以将计算流程可视化:
total.visualize() # see image to the right
.
三、和SKLearn结合的并行算法
广义回归GLM:https://github.com/dask/dask-glmtensorflow深度学习库:Dask-Tensorflow
以XGBoost为例,官方:https://github.com/dask/dask-xgboost
来看一个案例code
.
1、加载数据
import dask.dataframe as dd # Subset of the columns to use cols = ['Year', 'Month', 'DayOfWeek', 'Distance', 'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest'] # Create the dataframe df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols, storage_options={'anon': True}) df = df.sample(frac=0.2) # we blow out ram otherwise is_delayed = (df.DepDelay.fillna(16) > 15) df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399) del df['DepDelay'] df, is_delayed = persist(df, is_delayed) progress(df, is_delayed)
2、One hot encode编码
df2 = dd.get_dummies(df.categorize()).persist()
.
3、准备训练集和测试集 + 训练
data_train, data_test = df2.random_split([0.9, 0.1], random_state=1234) labels_train, labels_test = is_delayed.random_split([0.9, 0.1], random_state=1234)
训练
import dask_xgboost as dxgb params = {'objective': 'binary:logistic', 'nround': 1000, 'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 'min_child_weight': 1} bst = dxgb.train(client, params, data_train, labels_train) bst
.
4、预测
# Use normal XGBoost model with normal Pandas import xgboost as xgb dtest = xgb.DMatrix(data_test.head()) bst.predict(dtest)
predictions = dxgb.predict(client, bst, data_test).persist() predictions.head()
.
5、模型评估
from sklearn.metrics import roc_auc_score, roc_curve print(roc_auc_score(labels_test.compute(), predictions.compute())) import matplotlib.pyplot as plt %matplotlib inline fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute()) # Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py plt.figure(figsize=(8, 8)) lw = 2 plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve') plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver operating characteristic example') plt.legend(loc="lower right") plt.show()
.
四、计算流程可视化部分——Dask.array
来源:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97eimport numpy as np import dask.array as da x = da.ones(15, chunks=(5,)) x.visualize('dask.svg')
(x + 1).sum().visualize('dask.svg')
来一个二维模块的:
x = da.ones((15, 15), chunks=(5, 5)) x.visualize('dask.svg') (x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask.svg')
相关文章推荐
- Python读取大规模TXT数据
- java 数据与文本文件存储读取,文件大小计算。
- python数据存储系列教程——python对象与json字符串的相互转化,json文件的存储与读取
- python相关的几种数据类型的存储读取方式
- python json 数据读取,存储
- Python读取PostgreSQL数据并使用基于物品的协同过滤算法计算推荐结果
- Python存储和读取数据
- [Python]网络数据采集概述(2)—存储数据及读取文档
- 使用Python3 xlrd pymysql 实现读取Excel数据读取以及mysql存储
- Python 文本挖掘:数据存储和读取
- Revit明细表读取、将明细表数据存储到sql server数据库、python读取数据库
- 大规模IM在线用户的计算和数据存储方案
- 用python读取以及存储数据数据
- Python存储和读取数据
- Python 通过thrift接口连接Hbase读取存储数据
- 利用Python serial和openpyxl模块进行数据的读取和存储
- python mysqldb 读取数据存储到excel中
- 安装MySql+连接数据库+读取数据并存储成dataframe(python3.6)
- pickle存储数据-读取数据-入门---python学习笔记25
- python:输入一个int型的正整数,计算出该int型数据在内存中存储时1的个数。