Back to Blogs
python
performance
thread
coroutine
process

Python 服务性能提升方式

Soloman
2019-09-17

Python 服务性能提升方式

1 多线程

  • 适用于本质上是异步的;需要多个并发活动;每个活动的处理顺序可能是不确定的,或者说是随机的、不可预测的。
  • I/O 密集型的 Python 程序要比计算密集型的代码能够更好地利用多线程环境。
  • 守护线程:如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成。
  • 整个 Python 程序(可以解读为:主线程)将在所有非守护线程退出之后才退出,换句话说,就是没有剩下存活的非守护线程时。
  • 由于 Python 的 GIL 的限制,多线程更适合于 I/O 密集型应用(I/O 释放了 GIL,可以允许更多的并发),而不是计算密集型应用。对于后一种情况而言,为了实现更好的并行性,需要使用多进程,以便让 CPU 的其他内核来执行。

1.1 thread

  • 在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。总体思路是利用第三方中介来暂时存取结果信息。这里使用的是全局变量 info_queue。有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几百个Python线程, 没有任何压力,没啥可担心的。
# encoding=utf-8
import queue
import random
import threading
import time


# 用于多线程中储存信息,在其他地方取出使用
info_queue = queue.Queue()


def my_task(url):
    """
    模拟网络爬虫任务
    :param url: 网页地址
    :return: None
    """
    time.sleep(random.random())
    print(url)
    info_queue.put(url+100)


def execute_thread(urls, n):
    """
    创建执行多线程,一次最多 128个(根据服务器的性能调整)
    :param urls: 根据 urls 数量创建多线程
    :param n: 创建多线程的批次(一次最多128个,超过128则多批次创建)
    :return: None
    """
    start = n * 128
    end = (n + 1) * 128
    threads = [
        threading.Thread(
            target=my_task, 
            args=(url,)
        ) 
        for i, url in enumerate(urls) 
        if start <= i < end
    ]
    [i.start() for i in threads]
    [i.join() for i in threads]


def main():
    urls = [i for i in range(128)]
    execute_thread(urls, 0)
    while not info_queue.empty():
        info = info_queue.get()
        print(info)


if __name__ == '__main__':
    main()

1.2 线程池:submit 返回无序结果

  • concurrent.futures 模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的进一步抽象。as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。注意 submit 的返回结果是无序的。
# encoding=utf-8
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def my_task(url):
    """
    模拟网络爬虫任务
    :param url: 网页地址
    :return: 执行结果
    """
    time.sleep(random.random())
    print(f"url={url}")
    return url + 100


def run_thread_pool(target, params):
    """
    创建线程池
    :param target: 多线程的目标函数
    :param params: 传递给目标函数的所有参数
    :return: 目标函数执行结果
    """
    with ThreadPoolExecutor(max_workers=8) as t:
        res = [
            t.submit(target, param) 
            for param in params
        ]
        return res


def main():
    urls = [i for i in range(64)]
    res = run_thread_pool(my_task, urls)
    for future in as_completed(res):
        data = future.result()
        print(data)


if __name__ == '__main__':
    main()

1.3 线程池:map 返回有序结果

  • 此处的 map 方法与 python 标准库中的 map 含义相同,都是将序列中的每个元素都执行同一个函数。
# encoding=utf-8
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def my_task(url):
    """
    模拟网络爬虫任务
    :param url: 网页地址
    :return: 执行结果
    """
    time.sleep(random.random())
    print(f"url={url}")
    yield url + 100


def run_thread_pool(target, params):
    """
    创建线程池
    :param target: 多线程的目标函数
    :param params: 传递给目标函数的所有参数
    :return: 目标函数执行结果
    """
    with ThreadPoolExecutor(max_workers=8) as t:
        res = t.map(target, params)
        return res


def main():
    urls = [i for i in range(64)]
    res = run_thread_pool(my_task, urls)
    for i in res:
        for j in i:
            print(j)


if __name__ == '__main__':
    main()

2 协程

  • asyncio 提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。
# encoding=utf-8
import asyncio
import math
import time


coroutines_per_time = 16


async def my_task(url):
    """
    模拟网络爬虫任务
    :param url: 网页地址
    :return: 执行结果
    """
    time.sleep(random.random())
    print(f"url={url}")
    yield url + 100

    
async def execute_coroutines(urls, n):
    start = n*coroutines_per_time
    end = (n+1)*coroutines_per_time
    tasks = [
        my_task(url) 
        for i, url in enumerate(urls) 
        if start <= i < end
    ]
    await asyncio.gather(*tasks)
  

if __name__ == '__main__':
    urls = [i for i in range(64)]
    coroutine_nums = math.ceil(len(urls) / coroutines_per_time)
    for i in range(coroutine_nums):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(execute_coroutines(urls, i))

3 多进程

  • 多进程能很好的避开Python GIL 的问题,每个进程都拥有独立的GIL,互不干扰,可实现真正的并行执行,更好的利用多核CPU的资源。所以一般开发并发编程首选多进程。
# encoding=utf-8
import logging
import multiprocessing


def my_task(url):
    """
    模拟网络爬虫任务
    :param url: 网页地址
    :return: 执行结果
    """
    time.sleep(random.random())
    print(f"url={url}")
    yield url + 100


def main():
    logging.basicConfig(level=logging.INFO,
                        datefmt='%Y/%m/%d %H:%M:%S',
                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    cup_num = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cup_num)
    materials = [i for i in range(64)]
    for material in materials:
        p.apply_async(my_task, args=(material,))
    p.close()
    p.join()
    
    
if __name__ == '__main__':
    main()