已复制
全屏展示
复制代码

Python线程池处理多任务并发


· 3 min read

使用场景1:多线程任务

假如现在要启动多个长期运行的线程,每个线程去消费redis中的数据并处理,这个处理任务可能是请求耗时的http等,要同时保证每个任务在运行,如果被外部kill掉,需要等待正在运行的任务执行完以后再退出,避免数据丢失。

如果是要CPU计算型任务,替换ThreadPoolExecutor为ProcessPoolExecutor即可。

import time
import signal
import concurrent.futures
import traceback

global executor, program_running


def consume_data_from_redis(task_id):
    global executor, program_running
    while program_running:
        try:
            print("start  ", task_id)
            time.sleep(10)  # 耗时任务:这里可以消费 redis 中的任务并处理
            print("finish ", task_id)
        except Exception:
            # 保证现场内部不会崩掉,如果崩掉了不至于线程退出
            traceback.print_exc()
            time.sleep(2)


def graceful_exit():
    global executor, program_running
    program_running = False
    print("等待所有任务完成...")
    executor.shutdown(wait=True)
    print("所有任务已完成,程序退出。")


def signal_handler(signum, frame):
    print("收到中断,等待所有任务结束后,准备退出.")
    graceful_exit()


def main():
    global program_running, executor
    program_running = True
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    task_count = 5  # 初始任务数量
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=task_count)
    futures = [executor.submit(consume_data_from_redis, i) for i in range(task_count)]
    try:
        concurrent.futures.wait(futures)
    except KeyboardInterrupt:
        print("KeyboardInterrupt 收到中断,等待所有任务结束后,准备退出.")
    graceful_exit()


if __name__ == "__main__":
    # 切勿使用 kill -9 pid 杀死进程
    # 直接 kill pid 等待所有任务完成后再退出,避免任务丢失
    main()

使用场景2: 并发测试

假设现在每秒钟去请求第三方接口10次,总共提交20秒,也就是总共提交200个接口,可以使用如下代码:

import time
import uuid
import random
import concurrent
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor

batch_size = 10  # 每个批次提交的任务数量
total_batches = 20  # 总共提交多少个批次,如果 interval_seconds=1,那么提交任务将耗时20秒
interval_seconds = 1  # 一批任务提交后的间隔(秒),通常情况下为1,提交一批任务几乎是不耗时的。
program_running = True  # 如果提交的任务完成,则标记为 False


def process_task(task_id):
    # print("start  " + task_id)
    time.sleep(random.random() * 5)  # 耗时任务
    # print("finish " + task_id)


def submit_tasks(executor, processing_queue):
    submit_cnt = 0
    while submit_cnt < total_batches:
        submit_cnt += 1
        print(f"启动{batch_size}个任务")
        for _ in range(batch_size):
            task_id = str(uuid.uuid1())
            future = executor.submit(process_task, task_id)
            processing_queue.put(future)
        print("current processing tasks: " + str(processing_queue.qsize()))
        time.sleep(interval_seconds)  # 几秒后继续提交
    global program_running

    # 表示任务已经提交完成,让接收的任务结果的线程看到这个消息后就可以退出了
    program_running = False


def process_response(processing_queue: Queue, finished_queue: Queue):
    while True:
        try:
            # 60秒后都还没有任务超时抛出错误,此时可能没有任务了
            future = processing_queue.get(block=True, timeout=10)
            res_data = future.result()
            processing_queue.task_done()
            finished_queue.put(1)
            print("already finished tasks: " + str(finished_queue.qsize()))
        except Empty:
            if not program_running:
                # 此时说明确实提交任务已经结束了,可以退出了
                break


def main():
    processing_queue = Queue(-1)  # 统计正在处理的任务数量
    finished_queue = Queue(-1)  # 统计已经完成的任务数量
    with ThreadPoolExecutor(max_workers=10000000) as executor:
        # 提交任务的线程:专门用来提交任务
        executor.submit(submit_tasks, executor, processing_queue)
        
        # 启动多个线程去接收完成的任务的返回结果
        futures = []
        for _ in range(total_batches):
            futures.append(executor.submit(process_response, processing_queue, finished_queue))
        concurrent.futures.wait(futures)


if __name__ == "__main__":
    main()
🔗

文章推荐