本文介绍如何使用 `asyncio.as_completed()` 动态管理可变长度任务队列,始终保持 n 个并发任务运行;当任务失败时立即重新入队并获得最高优先级,确保快速重试,避免批量提交导致的调度延迟。
在实际异步任务调度场景中(如网络请求、文件下载或批量 API 调用),我们常需满足三个关键约束:
此时,若一次性创建全部 1000 个 Task(如 asyncio.create_task() 批量调用),不仅内存开销大,更会导致失败任务被“埋没”——因为 asyncio.as_completed(task_list) 仅作用于已创建的 Task 对象集合,无法动态插入新任务或调整优先级。
✅ 正确解法是:只维护一个大小可控的活跃任务池(pool),通过循环“取—执行—回收—补充”实现流式调度。核心思路如下:
协程,便于失败后精准重试; 以下是生产就绪的参考实现(含错误处理与资源清理):
import asyncio
from typing import List, Coroutine, Any, Iterator
async def run_with_priority_retry(
task_coros: List[Coroutine],
max_concurrent: int = 8,
) -> Iterator[Any]:
"""
动态维持 max_concurrent 个并发任务,失败任务立即重试(LIFO 优先)。
Args:
task_coros: 初始任务协程列表(可修改)
max_concurrent: 最大并发数
Yields:
成功执行的结果
"""
# 使用 list 模拟优先队列:append → 高优;pop() → 取最高优
pending = list(task_coros)
task_pool = {} # asyncio.Task -> Coroutine
while pending or task_pool:
# ✅ 补充任务至满额
while pending and len(task_pool) < max_concurrent:
coro = pending.pop() # LIFO:最后加入的最先执行(高优)
task = asyncio.create_task(coro)
task_pool[task] = coro
if not task_pool:
break
# ✅ 等待任意一个完成(as_completed 返回迭代器,next 即首个)
done, _ = await asyncio.wait(
task_pool.keys(),
return_when=asyncio.FIRST_COMPLETED
)
completed_task = done.pop()
# ✅ 提取原始协程并清理池
coro = task_pool.pop(completed_task)
try:
result = await completed_task
yield result
except Exception as e:
print(f"Task failed with {type(e).__name__}: {e} — re-queued with high priority")
pending.append(coro) # 失败任务插到末尾,下次 pop 优先执行
# ✅ 清理已完成 task 引用(防止内存泄漏)
completed_task.cancel()
try:
await completed_task
except (asyncio.CancelledError, Exception):
pass? 关键注意事项:
该模式已被广泛应用于异步爬虫、批量数据同步及微服务熔断降级等场景,兼顾性能、可控性与健壮性。