bestsource

플라스크에서 비동기 태스크 만들기

bestsource 2023. 7. 18. 21:51
반응형

플라스크에서 비동기 태스크 만들기

저는 플라스크에서 신청서를 쓰고 있는데, 그것을 제외하고는 정말 잘 작동합니다.WSGI동기화 및 차단 중입니다.특히 타사 API를 호출하는 작업이 하나 있는데, 이 작업을 완료하는 데 몇 분이 걸릴 수 있습니다.제어권이 플라스크로 반환되는 동안 해당 호출(사실은 일련의 호출)을 실행하고 싶습니다.

내 견해는 다음과 같습니다.

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

자, 제가 하고 싶은 것은 그 대사를

final_file = audio_class.render_audio()

를 실행하고 메서드가 반환될 때 실행할 콜백을 제공하는 한편 플라스크는 요청을 계속 처리할 수 있습니다.플라스크를 비동기식으로 실행해야 하는 유일한 작업인데, 이를 어떻게 구현할 것인지에 대한 조언을 듣고 싶습니다.

트위스트와 클라인을 살펴보았지만, 스레딩으로 충분할 수도 있기 때문에 그들이 과잉 살상을 하는지 확신할 수 없습니다.아니면 셀러리가 여기에 적합한 선택인가요?

셀러리를 사용하여 비동기 작업을 처리하겠습니다.작업 대기열로 사용할 브로커를 설치해야 합니다(RabbitMQ 및 Redis 권장).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

플라스크 앱을 실행하고 셀러리 워커를 실행하기 위한 다른 프로세스를 시작합니다.

$ celery worker -A app.celery --loglevel=debug

나는 또한 플라스크와 함께 셀러리를 사용하는 것에 대한 더 자세한 가이드를 위해 미겔 그링버그의 을 참조할 것입니다.

스레드화는 또 다른 가능한 해결책입니다.셀러리 기반 솔루션이 규모에 맞는 애플리케이션에 더 적합하지만, 해당 엔드포인트에서 너무 많은 트래픽이 발생하지 않을 경우 스레드화를 사용할 수 있습니다.

이 솔루션은 Miguel Grinberg의 PyCon 2016 Flask at Scale 프레젠테이션, 특히 슬라이드 데크의 슬라이드 41을 기반으로 합니다.그의 코드는 원래 소스에 관심이 있는 사람들을 위해 github에서도 이용할 수 있습니다.

사용자 관점에서 코드는 다음과 같이 작동합니다.

  1. 장시간 실행 작업을 수행하는 엔드포인트로 호출합니다.
  2. 이 끝점은 작업 상태를 확인할 수 있는 링크와 함께 202 수락을 반환합니다.
  3. 상태 링크에 대한 호출은 TAK가 실행 중인 동안 202를 반환하고 작업이 완료되면 200(및 결과)을 반환합니다.

api 호출을 백그라운드 작업으로 변환하려면 @async_api 데코레이터를 추가하기만 하면 됩니다.

다음은 완전히 포함된 예입니다.

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

사용해 볼 수도 있습니다.daemon=True그자리의 process.start()메소드는 차단되지 않으며 값비싼 함수가 백그라운드에서 실행되는 동안 호출자에게 즉시 응답/상태를 반환할 수 있습니다.

저는 falcon framework와 사용하면서 비슷한 문제를 경험했습니다.daemon프로세스가 도움이 되었습니다.

다음 작업을 수행해야 합니다.

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

즉시 응답이 표시되고 10초 후에 콘솔에 인쇄된 메시지가 표시됩니다.

참고: 다음 사항을 기억하십시오.daemonic프로세스는 하위 프로세스를 생성할 수 없습니다.

플라스크 2.0

플라스크 2.0은 이제 비동기 경로를 지원합니다.httpx 라이브러리를 사용하고 비동기 코루틴을 사용할 수 있습니다.아래와 같이 코드를 조금 변경할 수 있습니다.

@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file =  await asyncio.gather(
        audio_class.render_audio(data=text_list),
        do_other_stuff_function()
    )
    # Just make sure that the coroutine should not  having any blocking calls inside it. 
    return Response(
        mimetype='application/json',
        status=200
    )

위의 것은 단지 의사 코드일 뿐이지만, 당신은 플라스크 2.0에서 비동기식이 어떻게 작동하는지 확인할 수 있고 HTTP 통화의 경우 httpx를 사용할 수 있습니다.또한 코루틴이 일부 I/O 작업만 수행하는지 확인합니다.

사중인경우를 .redis사용할 수 있습니다.Pubsub백그라운드 작업을 처리하는 이벤트입니다.더 보기: https://redis.com/ebook/part-2-core-concepts/chapter-3-commands-in-redis/3-6-publishsubscribe/

언급URL : https://stackoverflow.com/questions/31866796/making-an-asynchronous-task-in-flask

반응형