部署异步推理服务#
在复杂的模型推理场景中,例如AIGC、视频处理等场景中,模型服务推理耗时较长,存在长连接超时导致请求失败或实例负载不均衡等问题,不适用于实时推理的场景。针对以上问题,PAI提供了异步推理服务,用于支持类似的场景,用户可以在提交预测请求之后,通过轮询或是订阅的方式获取到推理服务的预测结果。
在当前文档中,我们将介绍如何使用PAI Python SDK在PAI上部署和调用异步推理服务。
准备工作#
我们可以通过以下命令安装PAI Python SDK。
!python -m pip install --upgrade alipai
SDK需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在PAI Python SDK安装之后,通过在 命令行终端 中执行以下命令,按照引导配置密钥,工作空间等信息。
# 以下命令,请在 命令行终端 中执行.
python -m pai.toolkit.config
我们可以通过以下代码验证当前的配置。
import pai
from pai.session import get_default_session
print(pai.__version__)
sess = get_default_session()
assert sess.workspace_name is not None
print(sess.workspace_name)
部署异步推理服务模型#
将模型部署为异步推理服务与部署标准的在线推理服务类似,用户仅需在部署时(Model.deploy
),传递service_type=ServicType.Async
即可。
当前流程中,我们将使用镜像部署的模式,部署一个异步的推理服务。
# 准备异步推理服务的应用代码目录
!mkdir -p serve_src/
通过%%writefile
指令,我们将推理服务代码写入到serve_src/run.py
文件中。
%%writefile serve_src/run.py
import asyncio
from random import random
from fastapi import FastAPI, Request
import uvicorn, json, datetime
# 默认模型加载路径
model_path = "/eas/workspace/model/"
app = FastAPI()
@app.post("/")
async def create_item(request: Request):
print("Make mock prediction starting ...")
# Mock prediction
await asyncio.sleep(15)
print("Prediction finished.")
return [random() for _ in range(10)]
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
我们将使用PAI提供的PyTorch推理镜像部署以上的模型。
from pai.model import Model, container_serving_spec, ServiceType
from pai.image import retrieve, ImageScope
m = Model(
inference_spec=container_serving_spec(
source_dir="serve_src",
command="python run.py",
image_uri=retrieve(
"PyTorch",
framework_version="1.10",
accelerator_type="gpu",
image_scope=ImageScope.INFERENCE,
),
requirements=[
"fastapi",
"uvicorn",
],
)
# 用户可以通过`model_data`参数,传递一个OSS上的模型。相应的模型会被加载到推理服务的容器中。
# model_data="oss://<YourOssBucket>/path/to/model/"
)
通过设置部署服务的service_type=ServiceType.Async
参数,我们可以将模型部署为异步推理服务。异步推理服务使用分别使用输入队列(source)和输出队列(sink)保存预测请求和预测结果。通过options
参数,可以配置队列使用的资源,队列最大长度,是否开启自动驱逐等高阶参数。异步服务支持的完整的高阶参数,请参考文档:异步服务-参数配置
from pai.predictor import AsyncPredictor
from pai.common.utils import random_str
service_name = f"async_service_example_{random_str(6)}"
p: AsyncPredictor = m.deploy(
service_name=service_name,
instance_type="ecs.c6.large",
# 设置当前部署的服务类型为异步服务
service_type=ServiceType.Async,
# 用户可以通过options字段配置高阶参数
options={
# 异步推理详细参数文档: https://help.aliyun.com/document_detail/476812.html
"queue.cpu": 2, # 队列使用的CPU核数,默认为1
"queue.memory": 2048, # 异步服务使用过的队列内存,单位为MB
},
)
print()
print(p)
print(p.service_name)
print(p.access_token)
调用推理服务#
用户发送调用异步队列服务与请求同步推理服务的方式相同,但是异步推理服务会立即返回本次预测请求的RequestId
,而不是预测结果。用户可以通过轮询获取到推理服务的预测结果。
用户客户端发送推理请求,加入到推理服务的输入队列中,PAI-EAS返回请求的RequestId。
PAI处理输入队列中的请求,转发给到用户的推理服务,推理服务处理完请求后,将结果写入到输出队列中
用户客户端可以通过RequestId轮询,可以获取到用户推理服务的预测结果
PAI Python SDK提供了AsyncPredictor
,支持用户更加简单得调用异步推理服务。
调用异步推理服务#
AsyncPredictor
提供了predict
和raw_predict
方法发送预测请求,它们都会返回一个AsyncTask
,用户可以通过AsyncTask.result()
获取预测结果。
二者的区别在于predict
方法会使用Serializer
对象对输入数据进行序列化,对预测结果进行反序列化,而raw_predict
方法直接将输入数据发送给异步推理服务,返回HTTP响应结果(RawResponse
)。
from pai.predictor import AsyncPredictor, AsyncTask
from pai.serializer import JsonSerializer
p = AsyncPredictor(service_name='test_async_service', serializer=JsonSerializer())
t1: AsyncTask = p.predict(data={"some": "data"})
# result是推理服务的响应结果(Response Body),经过Serialzier.deserialize处理后返回的结果.
result = t1.result()
t2: AsyncTask = p.raw_predict(data=b'{"some": "data"}')
resp: RawResponse = t2.result()
print(resp.status_code, resp.content)
AsyncPredictor
会维护一个线程池,通过一个线程去发送推理请求,并等待请求处理完成。用户可以通过max_workers
参数配置线程池的大小。
p = AsyncPredictor(service_name='test_async_service', max_workers=20)
当用户需要在异步请求完成之后,对于响应的结果进行处理时,可以通过callback
参数传递一个回调函数。回调函数的参数为AsyncTask.result()
,也就实际响应的结果。
以下的示例代码中,我们将使用AsyncPredictor
调用异步推理服务,并通过会回调函数处理预测结果。
from pai.predictor import RawResponse, AsyncTask
import time
# 结果列表
results = []
# 定义回调函数
def callback_fn(resp: RawResponse):
print("Callback: get prediction result ", resp.json())
results.append(resp.json())
# 发送预测请求,使用回调函数处理预测结果。
task: AsyncTask = p.raw_predict(
data=b"test_data",
callback=callback_fn,
)
# result() 方法等待预测完成
resp: RawResponse = task.result()
print(resp.json())
# 等待回调函数执行完成
time.sleep(1)
print(results)
assert len(results) == 1
以下示例中,我们批量发送异步推理请求,然后等待所有的请求完成。
tasks = []
for i in range(10):
task: AsyncTask = p.raw_predict(
data=b"test_data",
callback=lambda x: print("Prediction result: ", x.json()),
)
tasks.append(task)
prediction_results = [t.result().json() for t in tasks]
print(prediction_results)
print(len(prediction_results))
使用异步API调用推理服务#
AsyncPredictor
提供了异步API raw_predict_async
和 predict_async
,支持用户使用Python提供的异步框架(asyncio)调用推理服务。
from pai.predictor import RawResponse
# 使用异步API调用异步推理服务
res: RawResponse = await p.raw_predict_async(data=b"test_data")
print(res.status_code)
print(res.content)
print(res.json())
通过SDK提供的异步API,我们可以不借助于线程池,批量发送异步预测请求。
以下的示例中,我们将使用异步API,批量发送异步预测请求,等待推理完成,并使用回调函数打印预测请求结果。
import asyncio
# 定义回调函数
def task_done_cb(task: asyncio.Task):
if task.exception():
raise task.exception()
else:
print("Prediction result: ", task.result().json())
# 使用异步API批量调用异步推理服务
async def batch_predict():
tasks = []
for _ in range(10):
task = asyncio.create_task(
# raw_predict_async 是一个coroutine
p.raw_predict_async(
data=b"test_data",
)
)
# 调用完成之后,打印调用返回结果
task.add_done_callback(task_done_cb)
tasks.append(task)
# 等待所有任务完成
return await asyncio.gather(*tasks, return_exceptions=True)
batch_results = await batch_predict()
for result in batch_results:
print(result.json())
测试完成之后,可以使用delete_service
方法删除对应服务,释放资源。
p.delete_service()