2

I have 2 functions.

1st function stores the data received in a list and 2nd function writes the data into a csv file.

I'm using Flask. Whenever a web service has been called it will store the data and send response to it, as soon as it sends response it triggers the 2nd function.

My Code:

from flask import Flask, flash, request, redirect, url_for, session
import json

app = Flask(__name__)

arr = []

@app.route("/test", methods=['GET','POST'])
def check():
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}
    return json.dumps(res)

def trigger():
    df = pd.DataFrame({'x': arr})
    df.to_csv("docs/xyz.csv", index=False)
    return 

Obviously the 2nd function is not called.

Is there a way to achieve this?

P.S: My real life problem is different where trigger function is time consuming and I don't want user to wait for it to finish execution.

Sociopath
  • 13,068
  • 19
  • 47
  • 75
  • 1
    I had similar problem in past, I used celery to push the function to task queue and returned success. you can also check async implementation with aiohttp. https://stackoverflow.com/questions/53430465/creating-non-blocking-restful-service-using-aiohttp – Sach Nov 30 '18 at 11:55
  • What was the end solution? – iHaag Dec 08 '21 at 12:49

5 Answers5

1

Im actually working on another interesting case on my side where i pass the work off to a python worker that sends the job to a redis queue. There are some great blogs using redis with Flask , you basically need to ensure redis is running (able to connect on port 6379)

The worker would look something like this:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

In my example I have a function that queries a database for usage and since it might be a lengthy process i pass it off to the worker (running as a seperate script)

def post(self):

    data = Task.parser.parse_args()

    job = q.enqueue_call(
        func=migrate_usage, args=(my_args),
        result_ttl=5000
    )
    print("Job ID is: {}".format(job.get_id()))
    job_key = job.get_id()

    print(str(Job.fetch(job_key, connection=conn).result))

    if job:
        return {"message": "Job : {} added to queue".format(job_key)}, 201

Credit due to the following article:

https://realpython.com/flask-by-example-implementing-a-redis-task-queue/#install-requirements

Sociopath
  • 13,068
  • 19
  • 47
  • 75
Anton
  • 31
  • 1
1

One solution would be to have a background thread that will watch a queue. You put your csv data in the queue and the background thread will consume it. You can start such a thread before first request:

import threading
from multiprocessing import Queue

class CSVWriterThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self.input_queue = Queue()

    def send(self, item):
        self.input_queue.put(item)

    def close(self):
        self.input_queue.put(None)
        self.input_queue.join()

    def run(self):
        while True:
            csv_array = self.input_queue.get()
            if csv_array is None:
                break

            # Do something here ...
            df = pd.DataFrame({'x': csv_array})
            df.to_csv("docs/xyz.csv", index=False)


            self.input_queue.task_done()
            time.sleep(1)
        # Done
        self.input_queue.task_done()
        return

@app.before_first_request
def activate_job_monitor():
    thread = CSVWriterThread()
    app.csvwriter = thread
    thread.start()

And in your code put the message in the queue before returning:

@app.route("/test", methods=['GET','POST'])
def check():
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}
    app.csvwriter.send(arr)
    return json.dumps(res)
mehdix
  • 4,984
  • 1
  • 28
  • 36
  • 1
    and `Queue` is in `multiprocessing` module now. – Sociopath Nov 30 '18 at 12:52
  • One more thing, how can I pass multiple params to `app.csvwriter.send(arr)`. ?Suppose I have to send one more array called `j = [1,2]` How to read it in `run` function? – Sociopath Nov 30 '18 at 13:14
  • 1
    Pass and get them as a tuple: ```app.csvwriter.send((first_arg, second_arg))``` and then ```first_arg, second_arg, = self.input_queue.get()``` – mehdix Nov 30 '18 at 13:26
1

P.S: My real life problem is different where trigger function is time consuming and I don't want user to wait for it to finish execution.

Consider using celery which is made for the very problem you're trying to solve. From docs:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

I recommend you integrate celery with your flask app as described here. your trigger method would then become a straightforward celery task that you can execute without having to worry about long response time.

Jazib
  • 1,200
  • 1
  • 16
  • 39
0

You can try use streaming. See next example:

import time
from flask import Flask, Response

app = Flask(__name__)

@app.route('/')
def main():
    return '''<div>start</div>
    <script>
        var xhr = new XMLHttpRequest();
        xhr.open('GET', '/test', true);
        xhr.onreadystatechange = function(e) {
            var div = document.createElement('div');
            div.innerHTML = '' + this.readyState + ':' + this.responseText;
            document.body.appendChild(div);
        };
        xhr.send();
    </script>
    '''

@app.route('/test')
def test():
    def generate():
        app.logger.info('request started')
        for i in range(5):
            time.sleep(1)
            yield str(i)
        app.logger.info('request finished')
        yield ''
    return Response(generate(), mimetype='text/plain')

if __name__ == '__main__':
    app.run('0.0.0.0', 8080, True)

All magic in this example in genarator where you can start response data, after do some staff and yield empty data to end your stream.

For details look at http://flask.pocoo.org/docs/patterns/streaming/.

hondvryer
  • 442
  • 1
  • 3
  • 18
0

You can defer route specific actions with limited context by combining after_this_request and response.call_on_close. Note that request and response context won't be available but the route function context remains available. So you'll need to copy any request/response data you'll need into local variables for deferred access.

I moved your array to a local var to show how the function context is preserved. You could change your csv write function to an append so you're not pushing data endlessly into memory.

from flask import Flask, flash, request, redirect, url_for, session
import json

app = Flask(__name__)

@app.route("/test", methods=['GET','POST'])
def check():
    arr = []
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}

    @flask.after_this_request
    def add_close_action(response):
        @response.call_on_close
        def process_after_request():
            df = pd.DataFrame({'x': arr})
            df.to_csv("docs/xyz.csv", index=False)
        return response
    return json.dumps(res)
VoteCoffee
  • 4,692
  • 1
  • 41
  • 44