使用 PAI Python SDK 训练和部署 XGBoost 模型#
XGBoost 是基于决策树的梯度提升算法(Gradient Boosting)的高效工程实现,是一个流行的机器学习库,它能够处理大的数据集合,并且做了许多训练性能优化工作。
在这个教程示例中,我们将使用PAI Python SDK,在PAI上完成XGBoost模型的训练,然后将输出的模型部署为在线推理服务,并进行调用测试。
Step1: 准备工作#
我们需要首先安装 PAI Python SDK 以运行本示例。
!python -m pip install --upgrade alipai
SDK 需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在 PAI SDK 安装之后,通过在 命令行终端 中执行以下命令,按照引导配置密钥,工作空间等信息。
# 以下命令,请在 命令行终端 中执行.
python -m pai.toolkit.config
我们可以通过以下代码验证当前的配置。
# 验证安装
import pai
from pai.session import get_default_session
print(pai.__version__)
sess = get_default_session()
assert sess.workspace_name is not None
print(sess.workspace_name)
Step2: 准备数据集#
我们将使用Breast Cancer数据集,训练和测试XGBoost模型。准备数据集的步骤如下:
通过
scikit-learn
下载和拆分 Breast Cancer 数据集,使用csv
格式保存到本地。将本地数据集上传到OSS Bucket上,获得数据集的OSS URI,供云上执行的训练作业使用。
使用SKLearn下载和拆分数据集。
import sys
# 安装 sklearn, 用于数据集下载和切分
!{sys.executable} -m pip install --quiet scikit-learn
# 创建数据集目录
!mkdir -p ./train_data
!mkdir -p ./test_data
from sklearn import datasets
from sklearn.model_selection import train_test_split
df = datasets.load_breast_cancer(as_frame=True)
train, test = train_test_split(df.frame, test_size=0.3)
train_data_local = "./train_data/train.csv"
test_data_local = "./test_data/train.csv"
train.to_csv(train_data_local, index=False)
test.to_csv(test_data_local, index=False)
print(f"train data local path: {train_data_local}")
print(f"test data local path: {test_data_local}")
上传数据集到OSS Bucket。
# 上传数据集到OSS Bucket
from pai.common.oss_utils import upload
# 上传训练数据到OSS
train_data = upload(
train_data_local,
"pai/xgboost-example/train_data/",
sess.oss_bucket,
)
test_data = upload(
test_data_local,
"pai/xgboost-example/test_data/",
sess.oss_bucket,
)
print(f"train data: {train_data}")
print(f"test data: {test_data}")
Step3: 提交训练作业#
通过PAI Python SDK提供Estimator
,用户可以将训练脚本,提交到PAI创建一个训练作业,获得输出模型,主要流程包括:
用户编写训练作业脚本
训练脚本负责模型代码的编写,它需要遵循PAI训练作业的规则获取作业超参,读取输入数据,并且将需要保存模型到指定的输出目录。
构建
Estimator
对象
通过Estimator
API,用户配置训练作业使用的脚本,镜像,超参,以及机器实例类型等信息。
本地的脚本会有Estimator上传到OSS Bucket,然后加载到训练作业内。
调用
Estimator.fit
API提交作业
通过.fit
提交一个训练作业,默认.fit
方法会等到作业停止之后,才会退出,作业结束后,用户可以通过estimator.model_data()
获得输出模型OSS URI路径。
更加完整的介绍请参考 文档: 提交训练作业
我们通过XGboost提供的SKlearn API,构建了一个XGBoost的训练脚本:
训练作业默认接收两个输入Channel: train 和 test,训练脚本会从
/ml/input/data/{channel_name}
中读取训练数据。训练结束之后,训练脚本需要将模型写出到到
/ml/output/model
目录下。
!mkdir -p xgb_src/
%%writefile xgb_src/train.py
import argparse
import logging
import os
import pandas as pd
from xgboost import XGBClassifier
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
TRAINING_BASE_DIR = "/ml/"
TRAINING_OUTPUT_MODEL_DIR = os.path.join(TRAINING_BASE_DIR, "output/model/")
def load_dataset(channel_name):
path = os.path.join(TRAINING_BASE_DIR, "input/data", channel_name)
if not os.path.exists(path):
return None, None
# use first file in the channel dir.
file_name = next(
iter([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]),
None,
)
if not file_name:
logging.warning(f"Not found input file in channel path: {path}")
return None, None
file_path = os.path.join(path, file_name)
df = pd.read_csv(
filepath_or_buffer=file_path,
sep=",",
)
train_y = df["target"]
train_x = df.drop(["target"], axis=1)
return train_x, train_y
def main():
parser = argparse.ArgumentParser(description="XGBoost train arguments")
# 用户指定的任务参数
parser.add_argument(
"--n_estimators", type=int, default=500, help="The number of base model."
)
parser.add_argument(
"--objective", type=str, help="Objective function used by XGBoost"
)
parser.add_argument(
"--max_depth", type=int, default=3, help="The maximum depth of the tree."
)
parser.add_argument(
"--eta",
type=float,
default=0.2,
help="Step size shrinkage used in update to prevents overfitting.",
)
parser.add_argument(
"--eval_metric",
type=str,
default=None,
help="Evaluation metrics for validation data"
)
args, _ = parser.parse_known_args()
# 加载数据集
train_x, train_y = load_dataset("train")
print("Train dataset: train_shape={}".format(train_x.shape))
test_x, test_y = load_dataset("test")
if test_x is None or test_y is None:
print("Test dataset not found")
eval_set = [(train_x, train_y)]
else:
eval_set = [(train_x, train_y), (test_x, test_y)]
clf = XGBClassifier(
max_depth=args.max_depth,
eta=args.eta,
n_estimators=args.n_estimators,
objective=args.objective,
)
clf.fit(train_x, train_y, eval_set=eval_set, eval_metric=args.eval_metric)
model_path = os.environ.get("PAI_OUTPUT_MODEL")
os.makedirs(model_path, exist_ok=True)
clf.save_model(os.path.join(model_path, "model.json"))
print(f"Save model succeed: model_path={model_path}/model.json")
if __name__ == "__main__":
main()
使用Estimator提交训练作业#
通过 Estimator, 我们将以上构建的训练脚本 (xgb_src/train.py) 上传到 OSS上,通过fit
提交一个在云端执行XGBoost训练作业。 fit API接收的inputs分别是之前上传的训练和测试的数据,会被挂载到作业容器中(分别挂载到 /ml/input/data/{channel_name}/
),供训练脚本读取输入数据。
提交之后,SDK 会打印作业的详情URL,并且打印作业日志,直到作业退出(成功,失败,或是停止)。用户可以点击作业URL查看任务详情,执行日志,模型的Metric,机器资源使用率等信息。
from pai.estimator import Estimator
from pai.image import retrieve
# 获取PAI提供的XGBoost训练镜像
image_uri = retrieve("xgboost", framework_version="latest").image_uri
print(image_uri)
# 构建一个Estimator实例
est = Estimator(
# 作业启动脚本
command="python train.py $PAI_USER_ARGS",
# 作业脚本的本地文件夹路径,会被打包上传到OSS
source_dir="./xgb_src/",
image_uri=image_uri,
# 作业超参: 会通过Command arguments的方式传递给到作业脚本
hyperparameters={
"n_estimator": 100,
"criterion": "gini",
"max_depth": 5,
"eval_metric": "auc",
},
# 作业使用的机器实例
instance_type="ecs.c6.large",
)
# 使用上传到OSS的训练数据作为作业的数据
est.fit(
inputs={
"train": train_data, # train_data 将被挂载到`/ml/input/data/train`目录
"test": test_data, # test_data 将被挂载到`/ml/input/data/test`目录
},
)
print(est.model_data())
Step4: 部署模型#
以上训练获得模型,我们将使用预置XGBoost Processor部署为一个在线服务。主要流程包括:
通过构建一个InferenceSpec
InferenceSpec负责描述模型如何部署为一个在线服务,例如模型使用镜像部署,还是使用processor部署等。
构建Model对象
Model对象可以直接部署服务,也可以通过.register
注册到PAI的模型仓库。
使用
Model.deploy
部署在线服务。
通过指定服务名称,机器实例类型,部署一个新的在线推理服务。
from pai.model import Model, InferenceSpec
from pai.predictor import Predictor
from pai.common.utils import random_str
import os
# 使用模型文件地址以及 InferenceSpec 构建一个Model对象
m = Model(
# `est.model_data()`返回的是模型文件所在的OSS目录的URI,XGBoost processor需要传递具体的模型文件。
model_data=os.path.join(est.model_data(), "model.json"),
inference_spec=InferenceSpec(processor="xgboost"),
)
# 部署服务
p: Predictor = m.deploy(
service_name="example_xgb_{}".format(random_str(6)),
instance_type="ecs.c6.xlarge",
# 启动的服务实例个数。
instance_count=1,
# 按照 每一个服务的资源使用量,而不是机器类型创建服务。
# instance_resource_config=ResourceConfig(
# cpu=2,
# memory=4000,
# )
)
Step5: 测试在线服务#
Model.deploy
方法返回一个 Predictor
对象,Predictor.predict
方法支持向创建的推理服务发送推理请求,拿到预测结果。
print(p.service_name)
test_x = test.drop(["target"], axis=1)
p.predict(test_x.to_numpy())
在测试结束后,删除服务。
p.delete_service()