Python线程池处理多任务并发
使用场景1:多线程任务
假如现在要启动多个长期运行的线程,每个线程去消费redis中的数据并处理,这个处理任务可能是请求耗时的http等,要同时保证每个任务在运行,如果被外部kill掉,需要等待正在运行的任务执行完以后再退出,避免数据丢失。
如果是要CPU计算型任务,替换ThreadPoolExecutor为ProcessPoolExecutor即可。
import time
import signal
import concurrent.futures
import traceback
global executor, program_running
def consume_data_from_redis(task_id):
global executor, program_running
while program_running:
try:
print("start ", task_id)
time.sleep(10) # 耗时任务:这里可以消费 redis 中的任务并处理
print("finish ", task_id)
except Exception:
# 保证现场内部不会崩掉,如果崩掉了不至于线程退出
traceback.print_exc()
time.sleep(2)
def graceful_exit():
global executor, program_running
program_running = False
print("等待所有任务完成...")
executor.shutdown(wait=True)
print("所有任务已完成,程序退出。")
def signal_handler(signum, frame):
print("收到中断,等待所有任务结束后,准备退出.")
graceful_exit()
def main():
global program_running, executor
program_running = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
task_count = 5 # 初始任务数量
executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_count)
futures = [executor.submit(consume_data_from_redis, i) for i in range(task_count)]
try:
concurrent.futures.wait(futures)
except KeyboardInterrupt:
print("KeyboardInterrupt 收到中断,等待所有任务结束后,准备退出.")
graceful_exit()
if __name__ == "__main__":
# 切勿使用 kill -9 pid 杀死进程
# 直接 kill pid 等待所有任务完成后再退出,避免任务丢失
main()
使用场景2: 并发测试
假设现在每秒钟去请求第三方接口10次,总共提交20秒,也就是总共提交200个接口,可以使用如下代码:
import time
import uuid
import random
import concurrent
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
batch_size = 10 # 每个批次提交的任务数量
total_batches = 20 # 总共提交多少个批次,如果 interval_seconds=1,那么提交任务将耗时20秒
interval_seconds = 1 # 一批任务提交后的间隔(秒),通常情况下为1,提交一批任务几乎是不耗时的。
program_running = True # 如果提交的任务完成,则标记为 False
def process_task(task_id):
# print("start " + task_id)
time.sleep(random.random() * 5) # 耗时任务
# print("finish " + task_id)
def submit_tasks(executor, processing_queue):
submit_cnt = 0
while submit_cnt < total_batches:
submit_cnt += 1
print(f"启动{batch_size}个任务")
for _ in range(batch_size):
task_id = str(uuid.uuid1())
future = executor.submit(process_task, task_id)
processing_queue.put(future)
print("current processing tasks: " + str(processing_queue.qsize()))
time.sleep(interval_seconds) # 几秒后继续提交
global program_running
# 表示任务已经提交完成,让接收的任务结果的线程看到这个消息后就可以退出了
program_running = False
def process_response(processing_queue: Queue, finished_queue: Queue):
while True:
try:
# 60秒后都还没有任务超时抛出错误,此时可能没有任务了
future = processing_queue.get(block=True, timeout=10)
res_data = future.result()
processing_queue.task_done()
finished_queue.put(1)
print("already finished tasks: " + str(finished_queue.qsize()))
except Empty:
if not program_running:
# 此时说明确实提交任务已经结束了,可以退出了
break
def main():
processing_queue = Queue(-1) # 统计正在处理的任务数量
finished_queue = Queue(-1) # 统计已经完成的任务数量
with ThreadPoolExecutor(max_workers=10000000) as executor:
# 提交任务的线程:专门用来提交任务
executor.submit(submit_tasks, executor, processing_queue)
# 启动多个线程去接收完成的任务的返回结果
futures = []
for _ in range(total_batches):
futures.append(executor.submit(process_response, processing_queue, finished_queue))
concurrent.futures.wait(futures)
if __name__ == "__main__":
main()