使用 PAI Python SDK 训练和部署 XGBoost 模型#

XGBoost 是基于决策树的梯度提升算法(Gradient Boosting)的高效工程实现,是一个流行的机器学习库,它能够处理大的数据集合,并且做了许多训练性能优化工作。

在这个教程示例中,我们将使用PAI Python SDK,在PAI上完成XGBoost模型的训练,然后将输出的模型部署为在线推理服务,并进行调用测试。

Step1: 准备工作#

我们需要首先安装 PAI Python SDK 以运行本示例。

!python -m pip install --upgrade alipai

SDK 需要配置访问阿里云服务需要的 AccessKey,以及当前使用的工作空间和OSS Bucket。在 PAI SDK 安装之后,通过在 命令行终端 中执行以下命令,按照引导配置密钥,工作空间等信息。

# 以下命令,请在 命令行终端 中执行.

python -m pai.toolkit.config

我们可以通过以下代码验证当前的配置。

# 验证安装
import pai
from pai.session import get_default_session

print(pai.__version__)

sess = get_default_session()

assert sess.workspace_name is not None
print(sess.workspace_name)

Step2: 准备数据集#

我们将使用Breast Cancer数据集,训练和测试XGBoost模型。准备数据集的步骤如下:

  1. 通过 scikit-learn 下载和拆分 Breast Cancer 数据集,使用 csv 格式保存到本地。

  2. 将本地数据集上传到OSS Bucket上,获得数据集的OSS URI,供云上执行的训练作业使用。

使用SKLearn下载和拆分数据集。

import sys

# 安装 sklearn, 用于数据集下载和切分
!{sys.executable} -m pip install --quiet  scikit-learn

# 创建数据集目录
!mkdir -p ./train_data
!mkdir -p ./test_data

from sklearn import datasets
from sklearn.model_selection import train_test_split

df = datasets.load_breast_cancer(as_frame=True)

train, test = train_test_split(df.frame, test_size=0.3)

train_data_local = "./train_data/train.csv"
test_data_local = "./test_data/train.csv"

train.to_csv(train_data_local, index=False)
test.to_csv(test_data_local, index=False)

print(f"train data local path: {train_data_local}")
print(f"test data local path: {test_data_local}")

上传数据集到OSS Bucket。

# 上传数据集到OSS Bucket
from pai.common.oss_utils import upload


# 上传训练数据到OSS
train_data = upload(
    train_data_local,
    "pai/xgboost-example/train_data/",
    sess.oss_bucket,
)


test_data = upload(
    test_data_local,
    "pai/xgboost-example/test_data/",
    sess.oss_bucket,
)

print(f"train data: {train_data}")
print(f"test data: {test_data}")

Step3: 提交训练作业#

通过PAI Python SDK提供Estimator,用户可以将训练脚本,提交到PAI创建一个训练作业,获得输出模型,主要流程包括:

  1. 用户编写训练作业脚本

训练脚本负责模型代码的编写,它需要遵循PAI训练作业的规则获取作业超参,读取输入数据,并且将需要保存模型到指定的输出目录。

  1. 构建Estimator对象

通过Estimator API,用户配置训练作业使用的脚本,镜像,超参,以及机器实例类型等信息。 本地的脚本会有Estimator上传到OSS Bucket,然后加载到训练作业内。

  1. 调用Estimator.fitAPI提交作业

通过.fit提交一个训练作业,默认.fit方法会等到作业停止之后,才会退出,作业结束后,用户可以通过estimator.model_data()获得输出模型OSS URI路径。

更加完整的介绍请参考 文档: 提交训练作业

我们通过XGboost提供的SKlearn API,构建了一个XGBoost的训练脚本:

  • 训练作业默认接收两个输入Channel: train 和 test,训练脚本会从 /ml/input/data/{channel_name} 中读取训练数据。

  • 训练结束之后,训练脚本需要将模型写出到到 /ml/output/model 目录下。

!mkdir -p xgb_src/
%%writefile xgb_src/train.py


import argparse
import logging
import os

import pandas as pd
from xgboost import XGBClassifier

logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

TRAINING_BASE_DIR = "/ml/"
TRAINING_OUTPUT_MODEL_DIR = os.path.join(TRAINING_BASE_DIR, "output/model/")


def load_dataset(channel_name):
    path = os.path.join(TRAINING_BASE_DIR, "input/data", channel_name)
    if not os.path.exists(path):
        return None, None

    # use first file in the channel dir.
    file_name = next(
        iter([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]),
        None,
    )
    if not file_name:
        logging.warning(f"Not found input file in channel path: {path}")
        return None, None

    file_path = os.path.join(path, file_name)
    df = pd.read_csv(
        filepath_or_buffer=file_path,
        sep=",",
    )

    train_y = df["target"]
    train_x = df.drop(["target"], axis=1)
    return train_x, train_y


def main():
    parser = argparse.ArgumentParser(description="XGBoost train arguments")
    # 用户指定的任务参数
    parser.add_argument(
        "--n_estimators", type=int, default=500, help="The number of base model."
    )
    parser.add_argument(
        "--objective", type=str, help="Objective function used by XGBoost"
    )

    parser.add_argument(
        "--max_depth", type=int, default=3, help="The maximum depth of the tree."
    )

    parser.add_argument(
        "--eta",
        type=float,
        default=0.2,
        help="Step size shrinkage used in update to prevents overfitting.",
    )
    parser.add_argument(
        "--eval_metric",
        type=str,
        default=None,
        help="Evaluation metrics for validation data"
    )

    args, _ = parser.parse_known_args()

    # 加载数据集
    train_x, train_y = load_dataset("train")
    print("Train dataset: train_shape={}".format(train_x.shape))
    test_x, test_y = load_dataset("test")
    if test_x is None or test_y is None:
        print("Test dataset not found")
        eval_set = [(train_x, train_y)]
    else:
        eval_set = [(train_x, train_y), (test_x, test_y)]

    clf = XGBClassifier(
        max_depth=args.max_depth,
        eta=args.eta,
        n_estimators=args.n_estimators,
        objective=args.objective,
    )
    clf.fit(train_x, train_y, eval_set=eval_set, eval_metric=args.eval_metric)

    model_path = os.environ.get("PAI_OUTPUT_MODEL")
    os.makedirs(model_path, exist_ok=True)
    clf.save_model(os.path.join(model_path, "model.json"))
    print(f"Save model succeed: model_path={model_path}/model.json")


if __name__ == "__main__":
    main()

使用Estimator提交训练作业#

通过 Estimator, 我们将以上构建的训练脚本 (xgb_src/train.py) 上传到 OSS上,通过fit 提交一个在云端执行XGBoost训练作业。 fit API接收的inputs分别是之前上传的训练和测试的数据,会被挂载到作业容器中(分别挂载到 /ml/input/data/{channel_name}/),供训练脚本读取输入数据。

提交之后,SDK 会打印作业的详情URL,并且打印作业日志,直到作业退出(成功,失败,或是停止)。用户可以点击作业URL查看任务详情,执行日志,模型的Metric,机器资源使用率等信息。

from pai.estimator import Estimator
from pai.image import retrieve


# 获取PAI提供的XGBoost训练镜像
image_uri = retrieve("xgboost", framework_version="latest").image_uri
print(image_uri)

# 构建一个Estimator实例
est = Estimator(
    # 作业启动脚本
    command="python train.py $PAI_USER_ARGS",
    # 作业脚本的本地文件夹路径,会被打包上传到OSS
    source_dir="./xgb_src/",
    image_uri=image_uri,
    # 作业超参: 会通过Command arguments的方式传递给到作业脚本
    hyperparameters={
        "n_estimator": 100,
        "criterion": "gini",
        "max_depth": 5,
        "eval_metric": "auc",
    },
    # 作业使用的机器实例
    instance_type="ecs.c6.large",
)

# 使用上传到OSS的训练数据作为作业的数据
est.fit(
    inputs={
        "train": train_data,  # train_data 将被挂载到`/ml/input/data/train`目录
        "test": test_data,  # test_data 将被挂载到`/ml/input/data/test`目录
    },
)
print(est.model_data())

Step4: 部署模型#

以上训练获得模型,我们将使用预置XGBoost Processor部署为一个在线服务。主要流程包括:

  1. 通过构建一个InferenceSpec

InferenceSpec负责描述模型如何部署为一个在线服务,例如模型使用镜像部署,还是使用processor部署等。

  1. 构建Model对象

Model对象可以直接部署服务,也可以通过.register注册到PAI的模型仓库。

  1. 使用Model.deploy部署在线服务。

通过指定服务名称,机器实例类型,部署一个新的在线推理服务。

from pai.model import Model, InferenceSpec
from pai.predictor import Predictor

from pai.common.utils import random_str
import os


# 使用模型文件地址以及 InferenceSpec 构建一个Model对象
m = Model(
    # `est.model_data()`返回的是模型文件所在的OSS目录的URI,XGBoost processor需要传递具体的模型文件。
    model_data=os.path.join(est.model_data(), "model.json"),
    inference_spec=InferenceSpec(processor="xgboost"),
)


# 部署服务
p: Predictor = m.deploy(
    service_name="example_xgb_{}".format(random_str(6)),
    instance_type="ecs.c6.xlarge",
    # 启动的服务实例个数。
    instance_count=1,
    # 按照 每一个服务的资源使用量,而不是机器类型创建服务。
    # instance_resource_config=ResourceConfig(
    #     cpu=2,
    #     memory=4000,
    # )
)

Step5: 测试在线服务#

Model.deploy方法返回一个 Predictor 对象,Predictor.predict方法支持向创建的推理服务发送推理请求,拿到预测结果。

print(p.service_name)

test_x = test.drop(["target"], axis=1)

p.predict(test_x.to_numpy())

在测试结束后,删除服务。

p.delete_service()