找到速度瓶颈是代码加速的的第一个工作。经过我们的分析,瓶颈主要集中在两个地方:特征生成预测。特征工程中,采用人工构造特征,共11个特征,其中6个是统计特征,统计特征的计算随着移动窗口的不同,计算量也会不同,其中涉及的典型运算包括:求和,排序。另外两类特征是天气和节假日,天气数据通过爬虫(会跳街舞的队友的工作)获取,节假日数据通过第三方接口(easybots)获取,之后二者本地持久化。这样的好处之一是避免了在特征生成阶段由于接口访问速度较慢的原因降低特征生成速度。预测阶段细分为两个工作,第一是预测结果绘图并持久化,第二是读取特征文件,利用移动窗口生成预测当日特征,将预测结果写文件。

由于我们之前已经将生成特征本地持久化,故需要关注的地方在预测加速

groot项目中利用C++多线程做了Random Forest的并行加速,但是由于比赛代码是用python写的,对python多线程早有耳闻。此处有名言,“进程是资源分配的最小单位,线程是CPU调度的最小单位”,OS将CPU时间片分配给多个线程,每个线程在指定的时间片内完成。在CPython解释器中,由于GIL(Global Interpreter Lock)的存在,同一时间只有一个获得GIL的线程在跑,其他线程处于等待状态,使得python多线程在多核CPU上,只对I/O密集型计算产生正面效果,而当有至少一个CPU密集型线程存在时,多线程效率会大幅下降。问题是,怎么知道我们的任务是CPU密集型还是I/O密集型?

使用多线程总不至于使我们的任务执行的更慢,我们关注的问题是执行速度的大幅提升。当然,有一种更直接的方法,使用多进程

在开始问题讨论前,给出任务的具体描述:

给定2000个商家过去92天的流量特征和流量值,预测商家在未来14天的流量值。将92天的数据作为训练集,未来14天作为测试集,同时给出训练误差。

比赛方案的设计中,我们对商家单独建模,故商家之间彼此不相关。一种简单的想法是:将1份代码拷贝成4份,第1份代码处理商家1到500,第2份代码处理商家501到1000,第3份代码处理商家1001到1500,第4份代码处理商家1501到2000。同时运行4个程序,将每个程序得到的训练误差手动求和就是最终的训练误差。

这或许是多进程最初的状态?此处应该有掌声。

鼓掌

好,看看这样做的问题:第一,代码拷贝。假设需要开启多个进程,就需要多份代码拷贝。第二,代码调整。在拷贝的多份代码中,只有商家的ID是改变的,其余都不变。第三,手动求和训练误差。当然可以将每个进程运行的结果存入文件,在所有预测进程执行完毕之后,开启另外一个计算进程用于对训练误差求和。或许还有很多其他的方式,不过在填坑的过程中又挖了很多坑,或者坑越填越深。NAIVE!

利用python中的multiprocessing库,可以相对优雅的解决上述问题。此处安利:

from multiprocessing import Pool #多进程

from multiprocessing.dummy import Pool #多线程

第一,利用multiprocessing.Process抽象代码拷贝的过程

第二,利用 multiprocessing.Process(target=target_name,args=(arg_name,))中的参数args抽象代码调整的过程

第三,利用multiProcessing.Value,Array,Queue抽象共享内存队列通信,解决训练误差求和的问题

在比赛代码中,我们用Queue做训练误差求和的桥梁,分别采用Pool和Process的方式来做多进程优化。算法伪代码如下:

Process方式:

from multiprocessing import Process,Queue,Manager,Pool
# compute train_loss and do something
def worker(q,i,40):
    train_loss = q.get()
    for j in range(i,i+40)
            #do something
            loss = getLoss()
    train_loss += loss
    q.put(train_loss)
# create processes
q = Queue()
q.put(train_loss)
jobs= []
for i in range(1,2000,40):
    p = Process(target=worker,args=(q,i,40))
    p.start()
    jobs.append(p)
for p in jobs:
    p.join()
# print total train_loss
print q.get()

Pool方式:

# def worker is the same as above
# create pool
manager = Manager()
q = manager.Queue()
q.put(train_loss)
pool = Pool(50)
for i in range(1,2000,40)
    pool.apply_async(func=worker,args=(q,i,40))
pool.close()
pool.join()
# print total train_loss
print q.get()

针对上述两段伪代码,谈几个有意思的点儿。首先是一个关于“概念设计”的问题,”概念设计”是本人杜撰出来的术语,表明了好的概念需要设计的态度,这样的例子在数学领域层出不穷,基本特征是简单深刻

join在官方文档中的意思是:阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。

注意到,

print q.get()

该行代码是输出总的训练误差,在主进程中,需要在其他所有进程计算完毕之后,才能输出该值,或者说,此时的训练误差是有意义的训练误差总和。当代码运行到该行,由于子进程没有执行完毕,故主进程阻塞等待子进程执行完毕,然后执行该行。如果要控制子进程的执行次序,也可以利用join实现。举个例子:

p1.start()
p1.join()
p2.start()
p2.join()

p2.start()在主进程中,目的是开启一个子进程p2(是不是有点拗口?)。当要执行该行代码时,一看p1要join,好吧,只能等待p1执行完毕之后才能开启子进程p2了。将这个例子和伪代码结合起来看,join的含义是不是有点儿意思?

其次,注意到,

from multiprocessing import Process,Queue,Manager,Pool

那么问题来了:multiprocessing.Queue和Queue.Queue有什么不同呢?(感谢万能的StackOverflow)

Queue.Queue is just an in-memory queue that knows how to deal with multiple threads using it at the same time.
It only works if both the producer and the consumer are in the same process.

Once you have them in separate system processes, which is what the multiprocessing library is about,
things are a little more complicated, because the processes no longer share the same memory.
You need some kind of inter-process communication method to allow the two processes to talk to each other.
It can be a shared memory, a pipe or a socket, or possibly something else. This is what multiprocessing.Queue does.
It uses pipes to provide a way for two processes to communicate.
It just happens to implement the same API as Queue.Queue, because most Python programmers are already familiar with it.
@Lukáš Lalinský

关于队列,还有一个问题,那就是queue和Queue有什么不同?(我也是够无聊,可是这个真是个问题)

python的官方文档在note中指出:

The Queue module has been renamed to queue in Python 3

请看图:

queue&Queue

说了这么多,到底加速效果怎样?我的Mac是4核,理论上时间能够缩短到原来的四分之一,但是实验结果表明,缩短了不足二分之一。TERRIBLE!从40分钟降低到20分钟,似乎这点时间不起眼,但是从400小时降低到200小时呢?

仔细分析问题,我们发现,在并行方案设计中,还有一个地方是可以并行化的。我们原来分4个进程,每个进程处理500个商家,由于对每个商家的处理是不相关的,故此处也可以并行。如果将任务分给M个机器,每个机器有N个核,充分利用M*N个核,或许是个不错的想法,这就是分布式计算了。

本文聊了python的多线程,多进程,针对我们遇到的具体问题,设计并行方案,进行加速试验。同时,在对方案进一步细化思考之后,提出了分布式计算的想法。在李沐的博士这五年中,看到一句话,“对于系统而言,设计是一门艺术而不是科学,这是设计者审美和哲学理念的体现”,与诸君共勉。