部署异步推理服务#

在复杂的模型推理场景中,例如AIGC、视频处理等场景中,模型服务推理耗时较长,存在长连接超时导致请求失败或实例负载不均衡等问题,不适用于实时推理的场景。针对以上问题,PAI提供了异步推理服务,用于支持类似的场景,用户可以在提交预测请求之后,通过轮询或是订阅的方式获取到推理服务的预测结果。

在当前文档中,我们将介绍如何使用PAI Python SDK在PAI上部署和调用异步推理服务。

准备工作#

我们可以通过以下命令安装PAI Python SDK。

!python -m pip install --upgrade alipai

SDK需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI Python SDK安装之后,通过在 命令行终端 中执行以下命令,按照引导配置密钥,工作空间等信息。

# 以下命令,请在 命令行终端 中执行.

python -m pai.toolkit.config

我们可以通过以下代码验证当前的配置。

import pai
from pai.session import get_default_session

print(pai.__version__)
sess = get_default_session()

assert sess.workspace_name is not None
print(sess.workspace_name)

部署异步推理服务模型#

将模型部署为异步推理服务与部署标准的在线推理服务类似,用户仅需在部署时(Model.deploy),传递service_type=ServicType.Async即可。

当前流程中,我们将使用镜像部署的模式,部署一个异步的推理服务。

# 准备异步推理服务的应用代码目录
!mkdir -p serve_src/

通过%%writefile指令,我们将推理服务代码写入到serve_src/run.py文件中。

%%writefile serve_src/run.py
import asyncio
from random import random

from fastapi import FastAPI, Request
import uvicorn, json, datetime

# 默认模型加载路径
model_path = "/eas/workspace/model/"

app = FastAPI()


@app.post("/")
async def create_item(request: Request):
    print("Make mock prediction starting ...")
    # Mock prediction
    await asyncio.sleep(15)
    print("Prediction finished.")
    return [random() for _ in range(10)]


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)

我们将使用PAI提供的PyTorch推理镜像部署以上的模型。

from pai.model import Model, container_serving_spec, ServiceType
from pai.image import retrieve, ImageScope

m = Model(
    inference_spec=container_serving_spec(
        source_dir="serve_src",
        command="python run.py",
        image_uri=retrieve(
            "PyTorch",
            framework_version="1.10",
            accelerator_type="gpu",
            image_scope=ImageScope.INFERENCE,
        ),
        requirements=[
            "fastapi",
            "uvicorn",
        ],
    )
    # 用户可以通过`model_data`参数,传递一个OSS上的模型。相应的模型会被加载到推理服务的容器中。
    # model_data="oss://<YourOssBucket>/path/to/model/"
)

通过设置部署服务的service_type=ServiceType.Async参数,我们可以将模型部署为异步推理服务。异步推理服务使用分别使用输入队列(source)和输出队列(sink)保存预测请求和预测结果。通过options参数,可以配置队列使用的资源,队列最大长度,是否开启自动驱逐等高阶参数。异步服务支持的完整的高阶参数,请参考文档:异步服务-参数配置

from pai.predictor import AsyncPredictor
from pai.common.utils import random_str


service_name = f"async_service_example_{random_str(6)}"

p: AsyncPredictor = m.deploy(
    service_name=service_name,
    instance_type="ecs.c6.large",
    # 设置当前部署的服务类型为异步服务
    service_type=ServiceType.Async,
    # 用户可以通过options字段配置高阶参数
    options={
        # 异步推理详细参数文档: https://help.aliyun.com/document_detail/476812.html
        "queue.cpu": 2,  # 队列使用的CPU核数,默认为1
        "queue.memory": 2048,  # 异步服务使用过的队列内存,单位为MB
    },
)

print()

print(p)
print(p.service_name)
print(p.access_token)

调用推理服务#

用户发送调用异步队列服务与请求同步推理服务的方式相同,但是异步推理服务会立即返回本次预测请求的RequestId,而不是预测结果。用户可以通过轮询获取到推理服务的预测结果。

  • 用户客户端发送推理请求,加入到推理服务的输入队列中,PAI-EAS返回请求的RequestId。

  • PAI处理输入队列中的请求,转发给到用户的推理服务,推理服务处理完请求后,将结果写入到输出队列中

  • 用户客户端可以通过RequestId轮询,可以获取到用户推理服务的预测结果

PAI Python SDK提供了AsyncPredictor,支持用户更加简单得调用异步推理服务。

调用异步推理服务#

AsyncPredictor提供了predictraw_predict方法发送预测请求,它们都会返回一个AsyncTask,用户可以通过AsyncTask.result()获取预测结果。

二者的区别在于predict方法会使用Serializer对象对输入数据进行序列化,对预测结果进行反序列化,而raw_predict方法直接将输入数据发送给异步推理服务,返回HTTP响应结果(RawResponse)。


from pai.predictor import AsyncPredictor, AsyncTask
from pai.serializer import JsonSerializer

p = AsyncPredictor(service_name='test_async_service', serializer=JsonSerializer())

t1: AsyncTask = p.predict(data={"some": "data"})
# result是推理服务的响应结果(Response Body),经过Serialzier.deserialize处理后返回的结果.
result = t1.result()


t2: AsyncTask = p.raw_predict(data=b'{"some": "data"}')
resp: RawResponse = t2.result()
print(resp.status_code, resp.content)

AsyncPredictor会维护一个线程池,通过一个线程去发送推理请求,并等待请求处理完成。用户可以通过max_workers参数配置线程池的大小。


p = AsyncPredictor(service_name='test_async_service', max_workers=20)

当用户需要在异步请求完成之后,对于响应的结果进行处理时,可以通过callback参数传递一个回调函数。回调函数的参数为AsyncTask.result(),也就实际响应的结果。

以下的示例代码中,我们将使用AsyncPredictor调用异步推理服务,并通过会回调函数处理预测结果。

from pai.predictor import RawResponse, AsyncTask
import time

# 结果列表
results = []


# 定义回调函数
def callback_fn(resp: RawResponse):
    print("Callback: get prediction result ", resp.json())
    results.append(resp.json())


# 发送预测请求,使用回调函数处理预测结果。
task: AsyncTask = p.raw_predict(
    data=b"test_data",
    callback=callback_fn,
)

# result() 方法等待预测完成
resp: RawResponse = task.result()
print(resp.json())

# 等待回调函数执行完成
time.sleep(1)

print(results)
assert len(results) == 1

以下示例中,我们批量发送异步推理请求,然后等待所有的请求完成。

tasks = []

for i in range(10):
    task: AsyncTask = p.raw_predict(
        data=b"test_data",
        callback=lambda x: print("Prediction result: ", x.json()),
    )
    tasks.append(task)

prediction_results = [t.result().json() for t in tasks]

print(prediction_results)
print(len(prediction_results))

使用异步API调用推理服务#

AsyncPredictor 提供了异步API raw_predict_asyncpredict_async,支持用户使用Python提供的异步框架(asyncio)调用推理服务。

from pai.predictor import RawResponse

# 使用异步API调用异步推理服务
res: RawResponse = await p.raw_predict_async(data=b"test_data")

print(res.status_code)
print(res.content)
print(res.json())

通过SDK提供的异步API,我们可以不借助于线程池,批量发送异步预测请求。

以下的示例中,我们将使用异步API,批量发送异步预测请求,等待推理完成,并使用回调函数打印预测请求结果。

import asyncio


# 定义回调函数
def task_done_cb(task: asyncio.Task):
    if task.exception():
        raise task.exception()
    else:
        print("Prediction result: ", task.result().json())


# 使用异步API批量调用异步推理服务
async def batch_predict():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(
            # raw_predict_async 是一个coroutine
            p.raw_predict_async(
                data=b"test_data",
            )
        )
        # 调用完成之后,打印调用返回结果
        task.add_done_callback(task_done_cb)

        tasks.append(task)
    # 等待所有任务完成
    return await asyncio.gather(*tasks, return_exceptions=True)


batch_results = await batch_predict()


for result in batch_results:
    print(result.json())

测试完成之后,可以使用delete_service方法删除对应服务,释放资源。

p.delete_service()