Python 海量数据处理之 _ 单机优化

1. 说明

  数据处理时,可能会遇到数千万以及上亿条数据的情况。一次处理所有数据,会遇到内存不够,计算时间太长等问题。一般的解法是:先拆分,再处理,最后将处理的结果合并(当然数据少的时候不需要这么麻烦)。本文将介绍在单机上,只使用 Python 如何处理大量数据。

2. 实例

  本例是天池大数据竞赛中的“淘宝穿衣搭配”比赛,这是一个新人赛,只要注册参赛,即可下载数据。目标是根据商品信息,专家推荐,用户购物信息,计算出最佳商品组合。

  本例中处理的是用户购物信息“表 1”:每条记录包含用户号 uid,商品号 mid,购物时间 time。

1
2
3
4
uid,mid,time
4371603,8,20150418
8034236,8,20150516
6135829,8,20150405

需要统计每个用户都购买了什么物品,即生成“表 2”:记录包含用户号 uid,商品组合 mids。

1
2
3
uid,mids
15 "1795974,1852545,98106,654166"
20 "2639977,79267"

  赛题提供了千万级的购物数据,其中含有百万级的用户,全部 load 到内存再计算生成新的结构,虽然能运行,但内存占用让机器变得非常慢,普通计算只用到单 CPU,我的机器用 10 个小时才处理了 200 多万条数据,优化之后半小时以内处理完所有数据。下面看看具体实现。

3. 切分数据

(1) 目标

  把数据切分成十份,分别存入文件

(2) 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
user = pd.read_csv("../../data/user_bought_history.txt", sep=" ")
user.columns = ['uid','mid','time']

dur = len(user)/10
ifrom = 0
idx = 0
while ifrom < len(user):
ito = ifrom + dur
data = user[ifrom:ito]
print("from ", ifrom, "to ", (ito-1), "total", len(data))
data.to_csv('../../data/user_bought_' + str(idx) + '.csv', index=False)
ifrom = ito
idx += 1

4. 处理数据

(1) 目标

  用多线程方式处理切分后的数据,将表 1 转换成表 2 格式

(1) 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def do_conv(index):
path = "../../data/user_bought_" + str(index) + ".csv"
if not os.path.exists(path):
return
user = pd.read_csv(path)
user['mid']=user['mid'].astype(str)
grp=user.groupby('uid')
print(index, len(grp))

user_buy_count_data = pd.DataFrame(columns=['uid','mids'])
idx=0
arr_uid=[]
arr_mid=[]

for name, group in grp:
mids = ",".join(group['mid'])
arr_uid.append(name)
arr_mid.append(mids)
if idx % 10000 == 0:
show_info.show_time(str(index) + " : " + str(len(arr_uid)))
idx+=1

user_buy_count_data['uid']=arr_uid
user_buy_count_data['mids']=arr_mid

user_buy_count_data.to_csv("../../data/user_" + str(index) + ".csv", index=False)

if __name__ == '__main__':
param_list = range(0, 11) # 线程参数
pool = threadpool.ThreadPool(3) # 同时最多开3个线程
requests = threadpool.makeRequests(do_conv, param_list)
[pool.putRequest(req) for req in requests]
pool.wait() # 等待所有线程结束

(3) 技术点

  1. 统一处理数据格式

从文件中读出的数据默认为 int 型,用 astype 函数将整个数据表的 mid 字段变为 str 型,相对于每次处理时再转换更节约时间。

  1. 使用 groupby

groupby 函数将数据按不同的 uid 划分为成多个表格,groupby 还带有多种统计功能,相对于用字典方式统计数据效率高得多。

  1. 多线程

现在的机器都是多核的,能明显提高计算速度。python 中提供了几种不同的多线程方式,这时使用了线程池,它可以控制线程的数量,以免本例中太多线程占用大量内存让机器变慢。使用之前需要安装 threadpool 库。

1
sudo pip install threadpool

5. 合并数据

(1) 目标

  将转换完的数据合并,当同一个 user 在两个表中同时出现时,将 mids 累加在一起。

(2) 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def do_add(x):
if x.m == 'nan':
return x.mids
if x.mids == 'nan':
return x.m
return str(x.mids) + "," + str(x.m)

def do_merge(data, path):
if not os.path.exists(path):
return data
data.columns = ['uid','m']
ex = pd.read_csv(path)
print(len(data),len(ex))
data = pd.merge(data, ex, how='outer')
data['m']=data['m'].astype(str)
data['mids']=data['mids'].astype(str)
data['mids']=data.apply(do_add, axis=1)
data = data.drop('m',axis=1)
print(data.head())
return data

data = pd.DataFrame(columns=['uid','mids'])
for index in range(0, 11):
data = do_merge(data, "../../data/user_" + str(index) + ".csv")
show_info.show_time("")
print("after merge ", index, "len", len(data))
data.to_csv('../../data/user_all.csv',index=False)

6. 相关工具

  1. top 命令

 top 是 linux 系统中统计系统资源占用的工具,默认为每秒统计一次,打开后按 1 键,可看到多核的占用情况。

7. 总结

  在特征工程和算法的计算过程中,都可以使用先拆分再组合的方式,但前提是切分数据不会造成数据意义的变化。本文介绍了单机处理大数据的优化方式,下篇将介绍用 Hadoop 集群方案处理海量数据。