2

I'm trying to build a Server-Sent Events endpoint with FastAPI but I'm unsure if what I'm trying to accomplish is possible or how I would go about doing it.

Introduction to the problem

Basically let's say I have a run_task(limit, task) async function that sends an async request, makes a transaction, or something similar. Let's say that for each task run_task can return some JSON data.

I'd like to run multiple tasks (multiple run_task(limit, task)) asynchronously, to do so I'm using trio and nurseries like so:

async with trio.open_nursery() as nursery:
    limit = trio.CapacityLimiter(10)
    for task in tasks:
        nursery.start_soon(run_task, limit, task)

And finally, I want to return the results of each task via a FastAPI endpoint

At first, I simply created an object containing a list, and passed that object (by reference) to each run_task, when a task was finished I'd push the JSON data as a dictionary, and return the whole object via the endpoint once all the tasks were finished.

This works, but I find it inefficient, the client sending the request needs to wait for all the tasks to finish before it can display the data, however, some tasks can be quite slow, meaning the data fetched from other tasks just ends up stagnating.

What I would like to accomplish

Whenever a task is finished, I'd want the API to directly return the data of said task (that I would've previously added to the object) so that the client can display said data in real-time.

That's when I discovered what Server-Sent Events and Web-sockets were. Server sent events seemed like the appropriate solution to my problem, as I don't need bidirectional communication.

Since FastAPI is built on Starlette, I decided to use sse-Starlette to build an endpoint with server-sent events, to do so I need to build an endpoint like so

@router.get('/stream')
async def runTasks(
        param1: str,
        request: Request
):
    event_generator = status_event_generator(request, param1)
    return EventSourceResponse(event_generator)

The actual problem

As the name status_event_generator implies, sse-starlette needs to return an event generator, and that's where I'm kind of stuck. I'd want the generator to yield the data of a task when it finishes (so that the client can receive the data of each task in real-time), however, the tasks are within the async trio nursery so I'm unsure how to proceed

As per Is yielding from inside a nursery in an asynchronous generator function bad?, it seems (if I understand correctly) that I can't just put a yield in run_task(limit, task) and expect it to work

Tom
  • 571
  • 2
  • 11
  • 29
  • Since yielding is only a problem across a nursery boundary, would an ambient nursery work? – user3840170 Jan 15 '22 at 11:17
  • @user3840170 possibly? I must admit I don't quite know what am ambient nursery is, or how I'd implement it – Tom Jan 15 '22 at 18:38
  • It's a term I made up. Basically, open a nursery somewhere in a wider scope that will contain the loop that goes over the generator, and use that nursery in the generator itself to spawn background tasks. – user3840170 Jan 15 '22 at 18:45
  • Ah right, but if the stackoverflow question I had linked is correct, I unfortunately believe that the yield operator isn't accessible anywhere in the nursery, neither in the tasks nor the nursery itself, meaning opening a nursery in a wider scope and putting the generator in it wouldn't work either – Tom Jan 15 '22 at 21:05
  • 1
    : _We don’t actually want to forbid every `yield` that happens *inside* our `with` block; we only want to forbid `yield`s that *temporarily exit the with block*. It’s fine if the code inside the `with` block iterates over a generator that has some internal `yield`._ Written by the same person who wrote the answer under that question, who is also the author of the trio library. I’ll give him the benefit of the doubt that he knows what he is talking about. – user3840170 Jan 15 '22 at 21:15
  • That's really interesting, I'll have to try and experiment. Do you believe if I use `for task in tasks` as the generator loop, and add a yield within the `run_task`, it might work? – Tom Jan 16 '22 at 12:27

1 Answers1

1

Solution with websockets

I decided to ultimately go with websockets rather than SSE, as I realised I needed to pass an object as data to my endpoint, and while SEE can accept query params, dealing with objects as query parameters was too much of a hassle.

websockets with FastAPI are based on starlette, and are pretty easy to use, implementing them to the problem above can be done like so:

@router.websocket('/stream')
async def runTasks(
        websocket: WebSocket
):
    # Initialise websocket
    await websocket.accept()

    # Receive data
    tasks = await websocket.receive_json()

    async with trio.open_nursery() as nursery:
        limit = trio.CapacityLimiter(10)
        for task in tasks:
            nursery.start_soon(run_task, limit, task, websocket)

To return data we can then simply use await websocket.send_json() in run_task (This is a simplified example, you'd preferably want to handle websocket closures and edge-cases with your nursery)

Solution with SSE

To answer the original problem, thanks to @user3840170 and https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091, we should be able to solve the problem by opening a nursery somewhere in a wider scope that will contain the loop that goes over the generator, and use that nursery in the generator itself to spawn background tasks.

Tom
  • 571
  • 2
  • 11
  • 29