dask

Dask

Dask是一款用于分析计算的灵活并行计算库。

Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,与Spark这些大数据处理框架相比较,Dask更轻。Dask更侧重与其他框架,如:Numpy,Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算

入门Dask DataFrame很容易,但是要很好的使用它需要一些经验。

Dask DataFrame是由许多较小的Pandas DataFrame组成的大型并行DataFrame

当内存无法容纳数据时,可以使用数据分块的方法:以数据块的形式分批加载到内存进行处理。这样就可以通过加载数据集的子集,来逐步处理整个数据集。

Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags

安装:pip install dask

1
2
3
4
import pandas as pd
import dask.dataframe as dd
tmp = pd.DataFrame({'name': range(10), 'content': [range(i) for i in range(10)]})
ddf = dd.from_pandas(tmp, npartitions=1)

dask.array

1
Dask中的Arrays(位于包dask.arrays下),其实就是对Numpy中的ndarray的部分接口进行了改进,从而方便处理大数据量。对于大数据集,特别是其大小大于内存时,如果我们要对其计算,按照传统的方式,,我们会将其全部塞进内存里,那么这就会报Out-Of-Memory错误,当然,我们也可以一次读取一部分数据,那么我们是否可以提前将大数据集进行分块处理了,我们只需要控制每块数据集不超过内存,从而满足In-Memory计算了?Dask就是这样做的

dask.dataframe(调用Pandas API)

1
2
3
4
5
Dask Dataframe对象则 在处理远大于当前主机内存的表格数据有用。与传统pandas Dataframe在加载完成所有数据后继续数据类型推断不同Dask Datadrame支持部分加载数据时,对表格数据类型进行推断。Dask Dataframe实现了分块并行Dataframe, 对Dask Dataframe的操作将被映射到按索引列划分的子Dataframe上
在Pandas上运行缓慢的操作(例如逐行迭代)在Dask DataFrame上仍然运行缓慢
处理大型数据集,即使这些数据集不适合存储在内存中
通过使用多个内核来加速长计算
使用标准的Pandas操作(例如groupby,join和时间序列计算)在大型数据集上进行分布式计算

dask.bag

1
2
对于Bags,其最主要的是用于半结构化的大数据集,比如日志或者博客等等,我们从其read_text(dask.bag.text.py)中来解析如何创建一个Dask Bag对象
Dataframe是基于Pandas Dataframe改进的一个可以并行处理大数据量的数据结构,即使对大于内存的数据也是能够处理的

四、Dask分布式

dask.delayed 和 dask.bag

1
2
3
4
delayed :延迟计算,并不是立即计算,而是将关系绘制成一个图,可以使用visualize查看并行的可能性,然后使用compute立刻开始计算

bag:相当于spark中的rdd操作,
bg.from_sequence([1,2,3,4,5], npartitions=5).map(lambda x:x**2)
Dask创建和存储数据框
1
2
3
4
5
6
7
8
9
10
11
读:
read_csv 将CSV文件读取到Dask.DataFrame中
read_json 从一组JSON文件创建数据框
read_sql_table 从SQL表创建数据框。
from_pandas 从Pandas DataFrame构造Dask DataFrame
dask不能读excel
存:
to_csv 将Dask DataFrame存储到CSV文件
to_sql 将Dask数据框存储到SQL表
to_json 将数据框写入JSON文本文件
其他见:https://docs.dask.org/en/latest/dataframe-api.html

#####Dask的Sql

1
2
3
4
1.Dask不支持任意文本查询,仅支持整个表和sqlalchemy
import dask.dataframe as dd
en = 'mysql+pymysql://user_rw:1a2s3d4f@192.168.0.251:3306/pre_formal_2?charset=UTF8mb4'
dd.read_sql_table("A61_single_us", en, index_col='id',npartitions=2)

1597109748639

默认情况下,Dask DataFrame使用多线程调度程序。

Dask是为处理大于内存的数据集而设计的

1633751522366

2021

dask分布式
1
2
3
4
5
6
7
1.python -m pip install dask distributed --upgrade

2.在某个节点启用调度节点,dask-scheduler

3.dask-worker 192.168.0.227:8786 --nprocs 20 --nthreads 2 创建工人,并指向调度节点
--memory-limit 2e10 设置内存
若启动失败:pip3 install click==7.1.2

使用

#####同时使用某函数——>等价分布式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#==================================1=================================
from dask.distributed import Client
client = Client('192.168.0.227:8786') #使用分布式

def test(x):
ls = []
for i in range(x):
for j in range(x):
ls.append(i+j)
print(sum(ls))
return sum(ls)

a = client.map(test, [10000,10,20,100,50])
client.gather(a)
client.restart() #重启
client.close() #关闭
#==================================2=================================
from dask.distributed import Client
client = Client() #使用当前节点多进程
from datetime import datetime
import os
import pandas as pd
def func(num):
t1 = datetime.now()
ls = []
for i in range(num):
for j in range(num):
ls.append(i+j)
print(num,datetime.now()-t1,os.getpid())
return pd.DataFrame(range(num)).sum().values[0]

a = client.map(func,range(2000,3000,10))
client.gather(a)
#====================================3================================
from dask.distributed import Client, progress
cl(n_workers=20,threads_per_worker=3) #单机设置

#传参
[dask_cl.submit(func_patent_expect_life,dic,i) for i in group_name_list]
[future] = dask_cl.scatter([df_data.values],broadcast=True) #广播变量,将数据放到调度器中
#replicate复制数据到节点
client_dask.replicate([group_name_num])

分布式传参案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
# author = 'kl'
# create_date: 2021/10/14
import pandas as pd
from itertools import chain
from clickhouse_driver import Client
import warnings
warnings.filterwarnings("ignore")
from dask.distributed import Client as cl
from datetime import datetime

def func_bfpm_history(company_list_short, group_name_num, jiqun_company, year,dic_com):
try:
group_name_num = pd.DataFrame(group_name_num,columns=['applicant_name','group_names','group_num'])
jiqun_company = pd.DataFrame(jiqun_company,columns=['group_name','applicants'])
storage_table = 'history_jsbj' + '_20211008'
data_all = []
company_list_short_ = [company_list_short[i:i+50] for i in range(0,len(company_list_short),50)]
for companys in company_list_short_:
df_100 = group_name_num[group_name_num.applicant_name.isin(companys)]
df_100['applicants'] = df_100.group_names.apply(lambda x: list(set(chain(*jiqun_company.loc[jiqun_company['group_name'].isin(x)].applicants.tolist()))))
df_100_split = df_100.explode('applicants')
df_100_split['num_com'] = df_100_split['applicants'].apply(lambda x: dic_com[x] if x in dic_com else 0)
df_res = df_100_split.groupby('applicant_name').apply(lambda x: round(len(x[x.num_com >= x.group_num.values[0]]) / x.shape[0],4)).reset_index()
df_res.columns = ['applicant_name','bf_pm']
df_res['year'] = year
df_final = df_res.merge(df_100[['applicant_name','group_names','group_num']],on=['applicant_name'],how='left')
df_final.loc[df_final.bf_pm == 1,'bf_pm'] = 0.9999
df_final.loc[df_final.bf_pm == 0, 'bf_pm'] = 0.0001
data_all.append(df_final)
data_final = pd.concat(data_all)
data_final.rename(columns = {'group_names':'groups'},inplace=True)
data_final_tuple = [tuple(i) for i in data_final.values]
client = Client(host='192.168.0.170', port='9000', user='algorithm', password='1a2s3d4f', database='algorithm_dis')
sql = "insert into algorithm_dis.{} (`applicant_name`,`bf_pm`,`year`,`groups`,`group_num`) VALUES".format(storage_table)
client.execute(sql, data_final_tuple)
client.disconnect()
print('存完l')
return 1
except Exception as e:
print(e)


if __name__ == '__main__':
n = 10000 # 10000 开一个进程
table_tail = '_20210908' # 版本表后缀(开始计算的时间或者)
company_formername = 'company_formername_20210908' # 企业最新名跟曾用名的唯一对应 这个后期会有变化,每次都记得调整
similer_company = 'similer_company' # 企业名称别名
es_time = '20210901' # es更新截至的时间 用于计算多个时间相关的数据
country_code = 'country_code' # 国家缩写表
ipc_split_10000_c_list_v = 'ipc_split_10000_c_list_v3' # 集群内ipc表
history_group_patent_num = 'history_group_patent_num' + table_tail
zl_zu_L = 'zl_zu_L' + table_tail
t1 = datetime.now()
for year in range(1985, int(es_time[:4]) + 1):
t0 = datetime.now()
try:
## 获取每个年份内企业的集群数
client = Client(host='192.168.0.170', port='9000', user='algorithm', password='1a2s3d4f', database='algorithm_dis')
sql = '''
SELECT
applicant_name,
arrayDistinct(arrayFlatten(groupArray(group_name))) AS `group_names`,
length(group_names) AS group_num
FROM
(
SELECT
arrayJoin(applicants) AS applicant_name,
group_name
FROM {}
WHERE (applicant_name GLOBAL IN
(
SELECT arrayJoin(arrayDistinct(arrayFlatten(groupArray(applicants))))
FROM {}
WHERE year = {}
GROUP BY year
)) AND (toYear(app_date) <= {})
)
GROUP BY applicant_name
'''.format(zl_zu_L, history_group_patent_num, year, year)
group_name_num = client.execute(sql, columnar=False, with_column_types=True)
group_name_num = pd.DataFrame(group_name_num[0], columns=[i[0] for i in group_name_num[1]])
dic_com = dict(zip(group_name_num.applicant_name, group_name_num.group_num))
sql = "select group_name,applicants from {} where `year` = {}".format(history_group_patent_num, year)
jiqun_company = pd.DataFrame(client.execute(sql, columnar=False), columns=['group_name', 'applicants'])
client.disconnect()
company_list = group_name_num['applicant_name'].unique().tolist()
company_list_short = [company_list[i:i+1000] for i in range(0,len(company_list),1000)]
client_dask = cl('192.168.0.227:8786')
[future] = client_dask.scatter([group_name_num.values], broadcast=True)
[future2] = client_dask.scatter([jiqun_company.values], broadcast=True)
[future3] = client_dask.scatter([dic_com], broadcast=True)
a = client_dask.map(func_bfpm_history,company_list_short, [future]*len(company_list_short), [future2]*len(company_list_short), [year]*len(company_list_short), [future3]*len(company_list_short))
client_dask.gather(a)
client_dask.close()
except Exception as e:
print(e)
print(year,'完成,耗时:',datetime.now() - t0)
print('总耗时:',datetime.now() - t1)

`

1
2
3
4
client.persist(df) #持久化dataframe,主要针对大的数据集
client.compute() #处理较小的结果集

要点:pandas基于内存的计算和分布式dask混合使用

#####pandas groupby 优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1.multiprocessing
2.joblib
3.parallel_apply
=======================delayed===================================
from joblib import Parallel, delayed
import multiprocessing
t1 = datetime.now()
def beta_cal_mult(one_fund_df):
return pd.DataFrame([[one_fund_df.app_text.values[0],one_fund_df['dict'].tolist()]],columns=['app_text','dict_new'])

def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)

app_text_list_json = applyParallel(app_text_list.groupby('app_text'), beta_cal_mult)
print(datetime.now()-t1)

==========================parallel_apply=============================
app_text_list.groupby(by='app_text').parallel_apply(lambda x: list(x.dict)).rename('dict_new').reset_index()
不能写成
app_text_list.groupby(by='app_text')['dict'].parallel_apply(lambda x: list(x)).rename('dict_new').reset_index()


===================================swifter===============
cython

numba
parallel_apply的速度有时候并不一定比单个apply快,注意使用时机,针对多进程能够明显提升,

注意:parallel_apply应用的函数不应该是lambda函数===>尽量

np.vectorize,矢量化函数,矢量化操作,可用来优化apply
1
2
3
4
5
6
7
8
def func(a,b):
if a== b:
return 1
else:
return 0
fun = np.vectorize(func)
fun([1,2,3,4],[4,5,3,4])
array([0, 0, 1, 1])

#####可视化失败时候

1633751522366