As you pointed out yourself, asyncio.Semaphore
is the wrong tool to use if you want to vary the number of active tasks. I would recommend encapsulating the desired behavior in a small class.
I want to make a couple of remarks first. You say that you want to limit the number of simultaneous tasks, but I don't think that's exactly what you mean. I can't think of any reason why you ever want to do that. What your example program does is to limit the number of tasks that are simultaneously executing a particular function: counter
. That's not quite the same thing. It's also a common requirement; for example, you might want to limit the number of parallel requests to a server.
This chunk of code:
for i in range(n):
for j in range(n):
for k in range(n):
pass
. . . isn't a good thing to have in an asyncio program. It's number crunching without any await expression. So once a task enters that piece of code, it won't allow any other task to run until it's finished. If this CPU-intensive code is really a good model for your application, you should consider using multiprocessing in order to bring multiple CPU cores into play. As part of your example, it makes it difficult to observe the structure of the asyncio program. In my code, I replaced this with a variable asyncio.sleep delay.
Here is a class that does what you want. You set the maximum count in the constructor. No more than max_count tasks will simultaneously enter the function calc. I use an asyncio.Condition and a simple counter to gate the number of parallel tasks.
The class also contains a method change_max
that allows dynamic adjustment of max_count. When you call change_max, it signals the Condition object in case there are gated tasks that are now allowed to proceed.
To illustrate how this works, a delayed function issues a call to change_max. I start the event loop with asyncio.run
.
import asyncio
import random
class Safe:
def __init__(self, max_count: int):
self.max_count = max_count
self.task_count = 0
self.ready = asyncio.Condition()
def change_max(self, new_max: int):
self.max_count = new_max
asyncio.create_task(self.check_count())
print("Max count is now", new_max)
async def calc(self, n: int):
async with self.ready:
await self.ready.wait_for(lambda: self.task_count < self.max_count)
try:
self.task_count += 1
await self.counter(n)
finally:
self.task_count -= 1
await self.check_count()
async def check_count(self):
async with self.ready:
self.ready.notify_all()
async def counter(self, n: int):
print(f'counter with argument {n} has been launched')
await asyncio.sleep(1.0 + n / 1000)
print(f'counter with argument {n} has FINISHED')
max_tasks = 12
async def main():
safe = Safe(3)
asyncio.get_event_loop().call_later(2.0, lambda: safe.change_max(4))
tasks = [asyncio.create_task(safe.calc(random.randint(500, 1500)))
for _ in range(max_tasks)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())