Python 服务性能提升方式
1 多线程
- 适用于本质上是异步的;需要多个并发活动;每个活动的处理顺序可能是不确定的,或者说是随机的、不可预测的。
- I/O 密集型的 Python 程序要比计算密集型的代码能够更好地利用多线程环境。
- 守护线程:如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成。
- 整个 Python 程序(可以解读为:主线程)将在所有非守护线程退出之后才退出,换句话说,就是没有剩下存活的非守护线程时。
- 由于 Python 的 GIL 的限制,多线程更适合于 I/O 密集型应用(I/O 释放了 GIL,可以允许更多的并发),而不是计算密集型应用。对于后一种情况而言,为了实现更好的并行性,需要使用多进程,以便让 CPU 的其他内核来执行。
1.1 thread
- 在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。总体思路是利用第三方中介来暂时存取结果信息。这里使用的是全局变量 info_queue。有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几百个Python线程, 没有任何压力,没啥可担心的。
# encoding=utf-8
import queue
import random
import threading
import time
# 用于多线程中储存信息,在其他地方取出使用
info_queue = queue.Queue()
def my_task(url):
"""
模拟网络爬虫任务
:param url: 网页地址
:return: None
"""
time.sleep(random.random())
print(url)
info_queue.put(url+100)
def execute_thread(urls, n):
"""
创建执行多线程,一次最多 128个(根据服务器的性能调整)
:param urls: 根据 urls 数量创建多线程
:param n: 创建多线程的批次(一次最多128个,超过128则多批次创建)
:return: None
"""
start = n * 128
end = (n + 1) * 128
threads = [
threading.Thread(
target=my_task,
args=(url,)
)
for i, url in enumerate(urls)
if start <= i < end
]
[i.start() for i in threads]
[i.join() for i in threads]
def main():
urls = [i for i in range(128)]
execute_thread(urls, 0)
while not info_queue.empty():
info = info_queue.get()
print(info)
if __name__ == '__main__':
main()
1.2 线程池:submit 返回无序结果
- concurrent.futures 模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对 threading 和 multiprocessing 的进一步抽象。as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。注意 submit 的返回结果是无序的。
# encoding=utf-8
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def my_task(url):
"""
模拟网络爬虫任务
:param url: 网页地址
:return: 执行结果
"""
time.sleep(random.random())
print(f"url={url}")
return url + 100
def run_thread_pool(target, params):
"""
创建线程池
:param target: 多线程的目标函数
:param params: 传递给目标函数的所有参数
:return: 目标函数执行结果
"""
with ThreadPoolExecutor(max_workers=8) as t:
res = [
t.submit(target, param)
for param in params
]
return res
def main():
urls = [i for i in range(64)]
res = run_thread_pool(my_task, urls)
for future in as_completed(res):
data = future.result()
print(data)
if __name__ == '__main__':
main()
1.3 线程池:map 返回有序结果
- 此处的 map 方法与 python 标准库中的 map 含义相同,都是将序列中的每个元素都执行同一个函数。
# encoding=utf-8
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def my_task(url):
"""
模拟网络爬虫任务
:param url: 网页地址
:return: 执行结果
"""
time.sleep(random.random())
print(f"url={url}")
yield url + 100
def run_thread_pool(target, params):
"""
创建线程池
:param target: 多线程的目标函数
:param params: 传递给目标函数的所有参数
:return: 目标函数执行结果
"""
with ThreadPoolExecutor(max_workers=8) as t:
res = t.map(target, params)
return res
def main():
urls = [i for i in range(64)]
res = run_thread_pool(my_task, urls)
for i in res:
for j in i:
print(j)
if __name__ == '__main__':
main()
2 协程
- asyncio 提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。
# encoding=utf-8
import asyncio
import math
import time
coroutines_per_time = 16
async def my_task(url):
"""
模拟网络爬虫任务
:param url: 网页地址
:return: 执行结果
"""
time.sleep(random.random())
print(f"url={url}")
yield url + 100
async def execute_coroutines(urls, n):
start = n*coroutines_per_time
end = (n+1)*coroutines_per_time
tasks = [
my_task(url)
for i, url in enumerate(urls)
if start <= i < end
]
await asyncio.gather(*tasks)
if __name__ == '__main__':
urls = [i for i in range(64)]
coroutine_nums = math.ceil(len(urls) / coroutines_per_time)
for i in range(coroutine_nums):
loop = asyncio.get_event_loop()
loop.run_until_complete(execute_coroutines(urls, i))
3 多进程
- 多进程能很好的避开Python GIL 的问题,每个进程都拥有独立的GIL,互不干扰,可实现真正的并行执行,更好的利用多核CPU的资源。所以一般开发并发编程首选多进程。
# encoding=utf-8
import logging
import multiprocessing
def my_task(url):
"""
模拟网络爬虫任务
:param url: 网页地址
:return: 执行结果
"""
time.sleep(random.random())
print(f"url={url}")
yield url + 100
def main():
logging.basicConfig(level=logging.INFO,
datefmt='%Y/%m/%d %H:%M:%S',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
cup_num = multiprocessing.cpu_count()
p = multiprocessing.Pool(cup_num)
materials = [i for i in range(64)]
for material in materials:
p.apply_async(my_task, args=(material,))
p.close()
p.join()
if __name__ == '__main__':
main()