Skip to content

Commit

Permalink
fix: save postpretrain
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhjz committed Feb 1, 2024
1 parent 07bdc4d commit b2e1ec1
Show file tree
Hide file tree
Showing 8 changed files with 743 additions and 236 deletions.
12 changes: 6 additions & 6 deletions src/qianfan/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ class Model(
"""model name"""
service: Optional["Service"] = None
"""model service"""
task_id: Optional[int]
task_id: Optional[str]
"""train tkas id"""
job_id: Optional[int]
job_id: Optional[str]
"""train job id"""

def __init__(
self,
id: Optional[str] = None,
version_id: Optional[str] = None,
task_id: Optional[int] = None,
job_id: Optional[int] = None,
task_id: Optional[str] = None,
job_id: Optional[str] = None,
name: Optional[str] = None,
**kwargs: Any,
):
Expand Down Expand Up @@ -215,11 +215,11 @@ def publish(self, name: str = "", **kwargs: Any) -> "Model":
self._wait_for_publish(**kwargs)

# 发布模型
self.model_name = name if name != "" else f"m_{self.task_id}_{self.job_id}"
self.model_name = name if name != "" else f"m_{self.job_id}_{self.task_id}"
model_publish_resp = ResourceModel.publish(
is_new=True,
model_name=self.model_name,
version_meta={"taskId": self.task_id, "iterationId": self.job_id},
version_meta={"taskId": self.job_id, "iterationId": self.task_id},
**kwargs,
)
log_info(
Expand Down
8 changes: 4 additions & 4 deletions src/qianfan/resources/console/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ class ServiceStatus(str, Enum):


class TrainStatus(str, Enum):
Finish = "FINISH"
Finish = "Done"
"""训练完成"""
Running = "RUNNING"
Running = "Running"
"""训练进行中"""
Fail = "FAIL"
Fail = "Fail"
"""训练失败"""
Stop = "STOP"
Stop = "Stopped"
"""训练停止"""


Expand Down
5 changes: 4 additions & 1 deletion src/qianfan/resources/llm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,10 @@ def _generate_body(
f"The required key `{key}` is not provided."
)
kwargs["stream"] = stream
kwargs["extra_parameters"] = {"request_source": f"qianfan_py_sdk_v{VERSION}"}
if "extra_parameters" not in kwargs:
kwargs["extra_parameters"] = {}
else:
kwargs["extra_parameters"]["request_source"] = f"qianfan_py_sdk_v{VERSION}"
return kwargs

def _data_postprocess(self, data: QfResponse) -> QfResponse:
Expand Down
177 changes: 95 additions & 82 deletions src/qianfan/trainer/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
)
from qianfan.trainer.configs import (
DefaultTrainConfigMapping,
DefaultPostPretrainTrainConfigMapping,
ModelInfoMapping,
TrainConfig,
TrainLimit,
PeftType,
)
from qianfan.utils import (
bos_uploader,
Expand All @@ -42,6 +44,7 @@
log_warn,
utils,
)
from qianfan.utils.bos_uploader import is_invalid_bos_path


class LoadDataSetAction(BaseAction[Dict[str, Any], Dict[str, Any]]):
Expand All @@ -65,27 +68,46 @@ class LoadDataSetAction(BaseAction[Dict[str, Any], Dict[str, Any]]):
from qianfan.dataset.dataset import Dataset

dataset: Optional[Dataset] = None
bos_path: Optional[str] = None

def __init__(
self,
dataset: Optional[Dataset] = None,
bos_path: Optional[str] = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.dataset = dataset
if dataset is not None:
return
elif is_invalid_bos_path(self.bos_path):
self.bos_path = bos_path

@with_event
def exec(self, input: Dict[str, Any] = {}, **kwargs: Dict) -> Dict[str, Any]:
return self._exec(input, **kwargs)

def _exec(self, input: Dict[str, Any] = {}, **kwargs: Dict) -> Dict[str, Any]:
from qianfan.dataset.data_source import BosDataSource, QianfanDataSource

"""
Load dataset implementation, may called by exec and resume.
"""
if self.bos_path is not None:
if not self.bos_path.endswith("/"):
bos_path = bos_uploader.generate_bos_file_parent_path(self.bos_path)
log_warn(f"input bos_path {self.bos_path} is a file, auto_convert to dir: {bos_path}")
else:
bos_path = self.bos_path
return {
"datasets": [
{
"bosPath": bos_path,
"type": console_consts.TrainDatasetType.PrivateBos.value,
}
]
}
from qianfan.dataset.data_source import BosDataSource, QianfanDataSource
if self.dataset is None:
raise InvalidArgumentError("dataset must be set")
raise InvalidArgumentError("dataset or bos_path must be set")
if self.dataset.inner_data_source_cache is None:
raise InvalidArgumentError("invalid dataset")
if isinstance(self.dataset.inner_data_source_cache, QianfanDataSource):
Expand Down Expand Up @@ -159,30 +181,22 @@ class TrainAction(
Input:
```
{'datasets':[{'type': 1, 'id': 111}]}
{'datasets':[{'type': 1, 'id': "ds-xxx"}]}
```
Output:
```
{'task_id': 47923, 'job_id': 33512}
{'task_id': "task-ddd", 'job_id': "job-xxxx"}
Sample code:
```
"""

task_id: Optional[int] = None
task_id: Optional[str] = None
"""train task id"""
job_id: Optional[int] = None
job_id: Optional[str] = None
"""train job id"""
# 这里的id新API的原因task/job和调换了,现在具体
# task_id对应job_str_id,job_id对应task_str_id
task_str_id: Optional[str] = None
"""train task str id"""
job_str_id: Optional[str] = None
"""job task str id"""
train_type: Optional[str] = ""
"""train_type"""
base_model: Optional[str] = None
"""base train type like 'ERNIE-Bot-turbo'"""
is_incr: bool = False
"""if it's incremental train or not"""
train_config: Optional[TrainConfig] = None
Expand Down Expand Up @@ -248,29 +262,24 @@ def __init__(
raise InvalidArgumentError("train_type must be specified")
# train from base model
self.train_type = train_type
if base_model is None:
model_info = ModelInfoMapping.get(self.train_type)
if model_info is None:
raise InvalidArgumentError(
"base_model_type must be specified caused train_type:"
f" {self.train_type} is not found"
)
self.base_model = model_info.base_model_type
else:
self.base_model = base_model
model_info = ModelInfoMapping.get(self.train_type)
if model_info is None:
log_warn(
f"unknown train model type: {self.train_type} is not found"
)
self.train_config = (
train_config
if train_config is not None
else self.get_default_train_config(train_type)
else self.get_default_train_config(train_type, train_mode, "ALL")
)
self.validateTrainConfig()
# self.validateTrainConfig()
if train_mode is not None:
self.train_mode = train_mode
self.task_name = self._generate_task_name(task_name, self.train_type)
self.task_name = self._generate_job_name(task_name, self.train_type)
self.task_description = task_description
self.job_description = job_description

def _generate_task_name(
def _generate_job_name(
self, task_name: Optional[str], train_type: Optional[str]
) -> str:
if task_name is not None:
Expand Down Expand Up @@ -401,52 +410,55 @@ def _exec(self, input: Dict[str, Any] = {}, **kwargs: Dict) -> Dict[str, Any]:

# request for create model train task
assert self.train_type is not None
assert self.base_model is not None
resp = api.FineTune.create_task(
resp = api.FineTune.V2.create_job(
name=self.task_name,
description=self.task_description,
train_type=self.train_type,
base_train_type=self.base_model,
model=self.train_type,
train_mode=self.train_mode,
**kwargs,
)
self.task_id = cast(int, resp["result"]["id"])
self.job_str_id = resp["result"]["uuid"]
log_debug(f"[train_action] create fine-tune task: {self.task_id}")
self.job_id = cast(int, resp["result"]["jobId"])
log_debug(f"[train_action] create {self.train_mode} train job: {self.job_id}")

assert self.train_config is not None
req_job = {
"taskId": self.task_id,
"description": self.job_description,
"baseTrainType": self.base_model,
"trainType": self.train_type,
"trainMode": self.train_mode.value,
"peftType": self.train_config.peft_type,
"trainConfig": {
"epoch": self.train_config.epoch,
"learningRate": self.train_config.learning_rate,
"batchSize": self.train_config.batch_size,
"maxSeqLen": self.train_config.max_seq_len,
"loggingSteps": self.train_config.logging_steps,
"warmupRatio": self.train_config.warmup_ratio,
"weightDecay": self.train_config.weight_decay,
"loraRank": self.train_config.lora_rank,
"loraAllLinear": self.train_config.lora_all_linear,
"loraAlpha": self.train_config.lora_alpha,
"loraDropout": self.train_config.lora_dropout,
"schedulerName": self.train_config.scheduler_name,
**self.train_config.extras,
},
"trainset": train_sets,
"trainsetRate": self.train_config.trainset_rate,
hyper_params_dict = {
"epoch": self.train_config.epoch,
"learningRate": self.train_config.learning_rate,
"batchSize": self.train_config.batch_size,
"maxSeqLen": self.train_config.max_seq_len,
"loggingSteps": self.train_config.logging_steps,
"warmupRatio": self.train_config.warmup_ratio,
"weightDecay": self.train_config.weight_decay,
"loraRank": self.train_config.lora_rank,
"loraAllLinear": self.train_config.lora_all_linear,
"loraAlpha": self.train_config.lora_alpha,
"loraDropout": self.train_config.lora_dropout,
"schedulerName": self.train_config.scheduler_name,
**self.train_config.extras,
}
hyper_params_dict = {
key: value for key, value in hyper_params_dict.items() if value is not None
}
tc_dict = cast(dict, req_job["trainConfig"])
req_job["trainConfig"] = {
key: value for key, value in tc_dict.items() if value is not None
ds_config = {
"sourceType": "Platform",
"datasets": [
{
"datasetId": input["datasets"][0]["id"]
}
],
"splitRatio": 20
}
create_job_resp = api.FineTune.create_job(req_job, **kwargs)
self.job_id = cast(int, create_job_resp["result"]["id"])


create_job_resp = api.FineTune.V2.create_task(
job_id=self.job_id,
params_scale=self.train_config.peft_type.value,
hyper_params=hyper_params_dict,
dataset_config=ds_config,
**kwargs)
self.task_id = cast(int, create_job_resp["result"]["taskId"])
self.task_str_id = create_job_resp["result"]["uuid"]
log_debug(f"[train_action] create fine-tune job_id: {self.job_id}")
log_debug(f"[train_action] create {self.train_mode} train task: {self.task_id}")

# 获取job状态,是否训练完成
self._wait_model_trained(**kwargs)
Expand All @@ -455,21 +467,20 @@ def _exec(self, input: Dict[str, Any] = {}, **kwargs: Dict) -> Dict[str, Any]:
return self.result

def _wait_model_trained(self, **kwargs: Dict) -> None:
if self.task_id is None or self.job_id is None:
raise InvalidArgumentError("task_id and job_id must not be None")
if self.task_id is None:
raise InvalidArgumentError("task_id must not be None")
while True:
job_status_resp = api.FineTune.get_job(
job_status_resp = api.FineTune.V2.task_detail(
task_id=self.task_id,
job_id=self.job_id,
**kwargs,
)
job_status = job_status_resp["result"]["trainStatus"]
job_progress = job_status_resp["result"]["progress"]
job_status = job_status_resp["result"]["runStatus"]
job_progress = int(job_status_resp["result"]["runProgress"][:-1])
log_info(
"[train_action] fine-tune running..."
f" task_name:{self.task_name} current status: {job_status},"
f" {job_progress}% check train task log in"
f" https://console.bce.baidu.com/qianfan/train/sft/{self.job_str_id}/{self.task_str_id}/detail/traininglog"
f" https://console.bce.baidu.com/qianfan/train/sft/{self.job_id}/{self.task_id}/detail/traininglog"
)
if job_progress >= 50:
log_info(f" check vdl report in {job_status_resp['result']['vdlLink']}")
Expand All @@ -482,18 +493,18 @@ def _wait_model_trained(self, **kwargs: Dict) -> None:
]:
log_error(
"[train_action] fine-tune job"
f" {self.job_str_id}/{self.task_str_id} has ended,"
f" {self.job_id}/{self.task_id} has ended,"
f" {job_status_resp}"
)
raise InternalError(
f"fine-tune job {self.job_str_id}/{self.task_str_id} has ended with"
f"fine-tune job {self.job_id}/{self.task_id} has ended with"
f" status: {job_status}"
)
else:
time.sleep(get_config().TRAIN_STATUS_POLLING_INTERVAL)
log_info(
"[train_action] fine-tune job has ended:"
f" {self.job_str_id}/{self.task_str_id} with status: {job_status}"
f" {self.job_id}/{self.task_id} with status: {job_status}"
)

@with_event
Expand Down Expand Up @@ -536,11 +547,13 @@ def stop(self, **kwargs: Dict) -> None:
api.FineTune.stop_job(self.task_id, self.job_id)
log_debug(f"train job {self.task_id}/{self.job_id} stopped")

def get_default_train_config(self, model_type: str) -> TrainConfig:
def get_default_train_config(self, model_type: str, train_mode: console_consts.TrainMode, peft_mode: PeftType) -> TrainConfig:
if train_mode == console_consts.TrainMode.PostPretrain:
return DefaultTrainConfigMapping.get(model_type)[peft_mode]
return DefaultTrainConfigMapping.get(
model_type,
DefaultTrainConfigMapping[get_config().DEFAULT_FINE_TUNE_TRAIN_TYPE],
)
)[peft_mode]


class ModelPublishAction(BaseAction[Dict[str, Any], Dict[str, Any]]):
Expand All @@ -560,9 +573,9 @@ class ModelPublishAction(BaseAction[Dict[str, Any], Dict[str, Any]]):
```
"""

task_id: Optional[int] = None
task_id: Optional[str] = None
"""task id"""
job_id: Optional[int] = None
job_id: Optional[str] = None
"""job id"""
result: Optional[Dict[str, Any]] = None
"""result of model publish action"""
Expand All @@ -573,8 +586,8 @@ class ModelPublishAction(BaseAction[Dict[str, Any], Dict[str, Any]]):
def exec(self, input: Dict[str, Any] = {}, **kwargs: Dict) -> Dict[str, Any]:
if self.task_id == "" or self.job_id == "":
raise InvalidArgumentError("task_id or job_id must be set")
self.task_id = int(input.get("task_id", ""))
self.job_id = int(input.get("job_id", ""))
self.task_id = input.get("task_id", "")
self.job_id = input.get("job_id", "")
self.model = Model(task_id=self.task_id, job_id=self.job_id)
return self._exec(input, **kwargs)

Expand Down
Loading

0 comments on commit b2e1ec1

Please sign in to comment.