很久之前,写过一篇短文,用进程池Pool进行代码加速,这篇文章会结合自己最近的实践,进一步讨论一下这个问题。

场景问题是这样的。

给定一个大文件,文件行数10000000。需要读这个文件,然后将文件内容写入另外一个文件。

这是一个简化的场景,“读”这个操作,一般对应我们自己的处理逻辑。

朴素文件读写

data = load_data()
writer = open('out.txt','a')
def save(line):
    writer.write(str(line)+'\n')
@timer
def single_process():
    for line in data:
        save(line)
writer.close()

多进程读写

这里有两种方式,分别是自动并行和按照CPU核心数手动并行。

自动并行(非结果汇总版)
def save(line):
    writer.write(str(line)+'\n')
def test(line_num):
    return data[line_num]

@timer
def multi_process():
    p = mp.Pool()
    for i in range(len(data)):#按照文件行分配
        p.apply_async(test, (i,), callback=save)
    p.close()
    p.join()
    writer.close()

这里需要注意的是,callback函数和test函数的配合使用。

自动并行(结果汇总版)
def func(idx):
	pass

import multiprocessing as mp
num_worker = mp.cpu_count()
pool = mp.Pool(num_worker)

results = []
for idx in idxs:#按照文件行分配
	result = pool.apply_async(func, args = (idx,))
	results.append(result)
pool.close()
pool.join()
[r.get() for r in tqdm(results)]
手动并行
def save_(lines:list):
    for line in lines:
        writer.write(str(line)+'\n')
def test_(line):
    return line

@timer
def multi_process_():
    cpu_nums = mp.cpu_count()
    bs = len(data) // cpu_nums
    p = mp.Pool()
    for i in range(cpu_nums):#按照CPU核心数分配
        if i == cpu_nums - 1:
            p.apply_async(test_, (data[bs * i:],), callback=save_)
        else:
            p.apply_async(test_, (data[bs * i: bs * (i+1)],), callback=save_)
    p.close()
    p.join()
    writer.close()

多线程读写

def save__(in_queue, out_queue):
    while not in_queue.empty():
        text = in_queue.get()
        out_queue.put(text)

def write(out_queue):
    while not out_queue.empty():
        text = out_queue.get()
        writer.write(text+'\n')
        writer.flush()
@timer
def multi_threading(thread_num):
    in_queue = Queue()
    out_queue = Queue()
    for i,line in enumerate(data):
        in_queue.put(line)
    tasks = []
    for i in range(thread_num):
        task = threading.Thread(target=save__,name=str(i),args=(in_queue,out_queue,))
        task.start()
        tasks.append(task)
    task = threading.Thread(target=write,name=str(i), args=(out_queue,))
    task.start()
    tasks.append(task)
    for t in tasks:
        t.join()
    writer.close()

这里需要注意的是,Queue的应用,将Queue作为一个数据中转结构,读和写发生在Queue中。此外,Queue是读写安全的。(代码的结束逻辑需要处理。)

Spark读写

实际上,针对场景问题的数据规模,远没有到达需要利用Spark的时候。实际上,个人也确实这么尝试了,时间感人,因此就不展开讲了。使用分布式的一些基础计算设置,需要同时考虑IO和计算,当然包括上下文切换的代价,否则就得不偿失了。

其他

基于cache的优化,也是简单高效的方法。该方法需要结合具体场景,能够使用cache的地方,那就放心的用吧。

总结:实际加速效果,还是要看自己的处理的具体问题。

补充材料:

1.浅谈Python中的多线程和多进程(用一个非常简单的例子解释了几种并行方式,比上文写的好。)

2.pandas中apply函数加速百倍的技巧

3.如何在数分钟内处理完别人1天的数据?

4.python中的四种队列

collections.deque没用应用场景的支持(一个基础的数据结构);

queue:面向多生产线程,多消费线程;

asyncio.queue:面向多生产协程,多消费协程;

multiprocessing.queue:面向多生产进程,多消费进程;