1

Lets say we have a 12 tasks, we need to runn all of them with one condition: we can have only 3 tasks running simultaneously. So we can start only 3 tasks at the beggining, then wayt until one of them finishes and launch another one. I am using Asyncio with semafore for this purpose in the simple code below.

import asyncio
import random

max_tasks = 12
sem = asyncio.Semaphore(3)


async def counter(n):
    print(f'counter with argument {n} has been launched')
    for i in range(n):
        for j in range(n):
            for k in range(n):
                pass
    await asyncio.sleep(1)

    print(f'counter with argument {n} has FINISHED')


async def safe_calc(n):
    async with sem:
        await counter(n)


async def main():
    tasks = [asyncio.ensure_future(safe_calc(random.randint(100, 600))) for _ in range(max_tasks)]
    await asyncio.gather(*tasks)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

but what if we have variable max_tasks dynamic, like it is another function or coroutine returnin number of tasks we have to do and during the main loop run we get this number changed and from this point we need to calculate more tasks in the loop?

And could you please explain what exactly does this line- "loop.run_until_complete(loop.shutdown_asyncgens())"

  • 1
    Instead of designing a task pool yourself, have you considered using a library for this? E.g. [asyncio-taskpool](https://asyncio-taskpool.readthedocs.io/en/latest/pages/pool.html#the-simpletaskpool-class) seems to provide everything you need including dynamic task pool sizing. *From that point on, all we need is the .start() add .stop() methods to adjust the number of concurrently running tasks.* Alternative: [AioPool](https://github.com/gistart/asyncio-pool), and perhaps more exist. – gertvdijk Nov 09 '22 at 14:20
  • The gory details of async generators, and the reason they require finalization, is explained here: https://peps.python.org/pep-0525/. You don't have any of them in your program, so it's not necessary to shut them down in this case. If you start the event loop with a call to `asyncio.run`, it will handle all the shutdown details for you. That is highly recommended. – Paul Cornelius Nov 09 '22 at 20:44

2 Answers2

0

you need to code around a producer-consumer approach, this question has more explanation of it, Using asyncio.Queue for producer-consumer flow , but basically you need to make a queue and have workers pull items from it, in this answer i created 3 workers and have them pull coroutines from queue till the queue is empty.

import asyncio
import random

max_tasks = 12
tasks_concurrent = 3

async def counter(n):
    print(f'counter with argument {n} has been launched')
    for i in range(n):
        for j in range(n):
            for k in range(n):
                pass
    await asyncio.sleep(1)

    print(f'counter with argument {n} has FINISHED')


async def safe_calc(n):
    await counter(n)
    return n

async def process_queue(queue:asyncio.Queue):
    while True:
        try:
            task = queue.get_nowait()
            await task
        except asyncio.QueueEmpty:
            return

async def main():
    coroutines_to_do = asyncio.Queue()  # add tasks here , await tasks_to_do.put(task)
    for _ in range(max_tasks):
        await coroutines_to_do.put(safe_calc(random.randint(100, 600)))

    workers = [asyncio.create_task(process_queue(coroutines_to_do)) for _ in range(tasks_concurrent)]

    # put logic here to add extra things to tasks_to_do queue

    return await asyncio.gather(*workers)


loop = asyncio.get_event_loop()

loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Ahmed AEK
  • 8,584
  • 2
  • 7
  • 23
0

As you pointed out yourself, asyncio.Semaphore is the wrong tool to use if you want to vary the number of active tasks. I would recommend encapsulating the desired behavior in a small class.

I want to make a couple of remarks first. You say that you want to limit the number of simultaneous tasks, but I don't think that's exactly what you mean. I can't think of any reason why you ever want to do that. What your example program does is to limit the number of tasks that are simultaneously executing a particular function: counter. That's not quite the same thing. It's also a common requirement; for example, you might want to limit the number of parallel requests to a server.

This chunk of code:

for i in range(n):
    for j in range(n):
        for k in range(n):
            pass

. . . isn't a good thing to have in an asyncio program. It's number crunching without any await expression. So once a task enters that piece of code, it won't allow any other task to run until it's finished. If this CPU-intensive code is really a good model for your application, you should consider using multiprocessing in order to bring multiple CPU cores into play. As part of your example, it makes it difficult to observe the structure of the asyncio program. In my code, I replaced this with a variable asyncio.sleep delay.

Here is a class that does what you want. You set the maximum count in the constructor. No more than max_count tasks will simultaneously enter the function calc. I use an asyncio.Condition and a simple counter to gate the number of parallel tasks.

The class also contains a method change_max that allows dynamic adjustment of max_count. When you call change_max, it signals the Condition object in case there are gated tasks that are now allowed to proceed.

To illustrate how this works, a delayed function issues a call to change_max. I start the event loop with asyncio.run.

import asyncio
import random

class Safe:
    def __init__(self, max_count: int):
        self.max_count = max_count
        self.task_count = 0
        self.ready = asyncio.Condition()
        
    def change_max(self, new_max: int):
        self.max_count = new_max
        asyncio.create_task(self.check_count())
        print("Max count is now", new_max)
        
    async def calc(self, n: int):
        async with self.ready:
            await self.ready.wait_for(lambda: self.task_count < self.max_count)
        try:
            self.task_count += 1
            await self.counter(n)
        finally:
            self.task_count -= 1
            await self.check_count()
                
    async def check_count(self):
        async with self.ready:
            self.ready.notify_all()

    async def counter(self, n: int):
        print(f'counter with argument {n} has been launched')
        await asyncio.sleep(1.0 + n / 1000)
        print(f'counter with argument {n} has FINISHED')
    
max_tasks = 12

async def main():
    safe = Safe(3)
    asyncio.get_event_loop().call_later(2.0, lambda: safe.change_max(4))
    tasks = [asyncio.create_task(safe.calc(random.randint(500, 1500)))
             for _ in range(max_tasks)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())
Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24