[Python]多线程和多进程用于代码加速
很久之前,写过一篇短文,用进程池Pool进行代码加速,这篇文章会结合自己最近的实践,进一步讨论一下这个问题。
场景问题是这样的。
给定一个大文件,文件行数10000000。需要读这个文件,然后将文件内容写入另外一个文件。
这是一个简化的场景,“读”这个操作,一般对应我们自己的处理逻辑。
朴素文件读写
data = load_data()
writer = open('out.txt','a')
def save(line):
writer.write(str(line)+'\n')
@timer
def single_process():
for line in data:
save(line)
writer.close()
多进程读写
这里有两种方式,分别是自动并行和按照CPU核心数手动并行。
自动并行(非结果汇总版)
def save(line):
writer.write(str(line)+'\n')
def test(line_num):
return data[line_num]
@timer
def multi_process():
p = mp.Pool()
for i in range(len(data)):#按照文件行分配
p.apply_async(test, (i,), callback=save)
p.close()
p.join()
writer.close()
这里需要注意的是,callback函数和test函数的配合使用。
自动并行(结果汇总版)
def func(idx):
pass
import multiprocessing as mp
num_worker = mp.cpu_count()
pool = mp.Pool(num_worker)
results = []
for idx in idxs:#按照文件行分配
result = pool.apply_async(func, args = (idx,))
results.append(result)
pool.close()
pool.join()
[r.get() for r in tqdm(results)]
手动并行
def save_(lines:list):
for line in lines:
writer.write(str(line)+'\n')
def test_(line):
return line
@timer
def multi_process_():
cpu_nums = mp.cpu_count()
bs = len(data) // cpu_nums
p = mp.Pool()
for i in range(cpu_nums):#按照CPU核心数分配
if i == cpu_nums - 1:
p.apply_async(test_, (data[bs * i:],), callback=save_)
else:
p.apply_async(test_, (data[bs * i: bs * (i+1)],), callback=save_)
p.close()
p.join()
writer.close()
多线程读写
def save__(in_queue, out_queue):
while not in_queue.empty():
text = in_queue.get()
out_queue.put(text)
def write(out_queue):
while not out_queue.empty():
text = out_queue.get()
writer.write(text+'\n')
writer.flush()
@timer
def multi_threading(thread_num):
in_queue = Queue()
out_queue = Queue()
for i,line in enumerate(data):
in_queue.put(line)
tasks = []
for i in range(thread_num):
task = threading.Thread(target=save__,name=str(i),args=(in_queue,out_queue,))
task.start()
tasks.append(task)
task = threading.Thread(target=write,name=str(i), args=(out_queue,))
task.start()
tasks.append(task)
for t in tasks:
t.join()
writer.close()
这里需要注意的是,Queue的应用,将Queue作为一个数据中转结构,读和写发生在Queue中。此外,Queue是读写安全的。(代码的结束逻辑需要处理。)
Spark读写
实际上,针对场景问题的数据规模,远没有到达需要利用Spark的时候。实际上,个人也确实这么尝试了,时间感人,因此就不展开讲了。使用分布式的一些基础计算设置,需要同时考虑IO和计算,当然包括上下文切换的代价,否则就得不偿失了。
其他
基于cache的优化,也是简单高效的方法。该方法需要结合具体场景,能够使用cache的地方,那就放心的用吧。
总结:实际加速效果,还是要看自己的处理的具体问题。
补充材料:
1.浅谈Python中的多线程和多进程(用一个非常简单的例子解释了几种并行方式,比上文写的好。)
collections.deque没用应用场景的支持(一个基础的数据结构);
queue:面向多生产线程,多消费线程;
asyncio.queue:面向多生产协程,多消费协程;
multiprocessing.queue:面向多生产进程,多消费进程;