From de97e3936714ccb5c436b3eb620bf2004cf39d43 Mon Sep 17 00:00:00 2001 From: Mamta Singh <168400541+quic-mamta@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:33:01 +0530 Subject: [PATCH] [QEff. Finetune] : Sort dataset based on length for DDP (#211) Sorting in dataset, sample based on length for DDP fix for run_validation = False --------- Signed-off-by: Mamta Singh --- QEfficient/cloud/finetune.py | 9 ++-- QEfficient/finetune/configs/training.py | 1 + QEfficient/finetune/data/sampler.py | 62 +++++++++++++++++++++++ QEfficient/finetune/utils/config_utils.py | 56 ++++++++++---------- docs/source/finetune.md | 14 ++++- 5 files changed, 110 insertions(+), 32 deletions(-) create mode 100644 QEfficient/finetune/data/sampler.py diff --git a/QEfficient/cloud/finetune.py b/QEfficient/cloud/finetune.py index b1f7cbc21..0a57d9724 100644 --- a/QEfficient/cloud/finetune.py +++ b/QEfficient/cloud/finetune.py @@ -173,9 +173,12 @@ def main(**kwargs): else: print(f"--> Num of Validation Set Batches loaded = {len(eval_dataloader)}") - longest_seq_length, longest_seq_ix = get_longest_seq_length( - torch.utils.data.ConcatDataset([train_dataloader.dataset, eval_dataloader.dataset]) - ) + longest_seq_length, _ = get_longest_seq_length( + torch.utils.data.ConcatDataset([train_dataloader.dataset, eval_dataloader.dataset]) + ) + else: + longest_seq_length, _ = get_longest_seq_length(train_dataloader.dataset) + print( f"The longest sequence length in the train data is {longest_seq_length}, " f"passed context length is {train_config.context_length} and overall model's context length is " diff --git a/QEfficient/finetune/configs/training.py b/QEfficient/finetune/configs/training.py index b714259e4..ac750ebe4 100644 --- a/QEfficient/finetune/configs/training.py +++ b/QEfficient/finetune/configs/training.py @@ -38,6 +38,7 @@ class train_config: save_metrics: bool = True # saves training metrics to a json file for later plotting intermediate_step_save: int = 1000 batching_strategy: str = "packing" + enable_sorting_for_ddp: bool = "True" # TODO: vbaddi: Uncomment post adding qaic to Pytorch Profiler # flop_counter: bool = False # Enable flop counter to measure model throughput, can not be used with pytorch profiler at the same time. diff --git a/QEfficient/finetune/data/sampler.py b/QEfficient/finetune/data/sampler.py new file mode 100644 index 000000000..6050f2e92 --- /dev/null +++ b/QEfficient/finetune/data/sampler.py @@ -0,0 +1,62 @@ +# ----------------------------------------------------------------------------- +# +# Copyright (c) 2024 Qualcomm Innovation Center, Inc. All rights reserved. +# SPDX-License-Identifier: BSD-3-Clause +# +# ----------------------------------------------------------------------------- + +import random +from itertools import islice + +import numpy as np +import torch + + +class LengthBasedBatchSampler(torch.utils.data.BatchSampler): + def __init__(self, data_source, batch_size: int, drop_last: bool, shuffle: bool = True) -> None: + if isinstance(next(iter(data_source)), dict): + first_key = next(iter(next(iter(data_source)).keys())) + self.lengths = [len(d[first_key]) for d in data_source] + else: + self.lengths = [len(d) for d in data_source] + self.batch_size = batch_size + self.drop_last = drop_last + self.shuffle = shuffle + + def __iter__(self): + ids = np.argsort(self.lengths, kind="mergesort") + if self.drop_last: + ids = ids[: len(ids) // self.batch_size * self.batch_size] + + batches = [ids[i : i + self.batch_size] for i in range(0, len(ids), self.batch_size)] + + if self.shuffle: + random.shuffle(batches) + + for b in batches: + yield b + + def __len__(self): + if self.drop_last: + return len(self.lengths) // self.batch_size + else: + return len(self.lengths) // self.batch_size + (len(self.lengths) % self.batch_size > 0) + + +class DistributedLengthBasedBatchSampler(torch.utils.data.BatchSampler): + def __init__( + self, data_source, batch_size: int, num_replicas: int, rank: int, shuffle: bool = True, seed: int = 0 + ) -> None: + random.seed(seed) + self.batch_sampler = LengthBasedBatchSampler( + data_source, batch_size=batch_size, drop_last=True, shuffle=shuffle + ) + self.num_replicas = num_replicas + self.rank = rank + + def __iter__(self): + max_length = len(self.batch_sampler) // self.num_replicas * self.num_replicas + return islice(self.batch_sampler, self.rank, max_length, self.num_replicas) + + def __len__(self): + return len(self.batch_sampler) // self.num_replicas diff --git a/QEfficient/finetune/utils/config_utils.py b/QEfficient/finetune/utils/config_utils.py index d2f71c76b..58344b190 100644 --- a/QEfficient/finetune/utils/config_utils.py +++ b/QEfficient/finetune/utils/config_utils.py @@ -16,10 +16,12 @@ PrefixTuningConfig, ) from transformers import default_data_collator +from transformers.data import DataCollatorForSeq2Seq import QEfficient.finetune.configs.dataset_config as datasets from QEfficient.finetune.configs.peft_config import lora_config, prefix_config from QEfficient.finetune.configs.training import train_config +from QEfficient.finetune.data.sampler import DistributedLengthBasedBatchSampler from QEfficient.finetune.dataset.dataset_config import DATASET_PREPROC @@ -63,41 +65,39 @@ def generate_peft_config(train_config, kwargs): def generate_dataset_config(train_config, kwargs): names = tuple(DATASET_PREPROC.keys()) - assert train_config.dataset in names, f"Unknown dataset: {train_config.dataset}" - dataset_config = {k: v for k, v in inspect.getmembers(datasets)}[train_config.dataset]() - update_config(dataset_config, **kwargs) - return dataset_config -# def get_dataloader_kwargs(train_config, dataset, dataset_processer, mode): -# kwargs = {} -# batch_size = ( -# train_config.batch_size_training -# if mode == "train" -# else train_config.val_batch_size -# ) -# if train_config.batching_strategy == "padding": -# kwargs["batch_sampler"] = LengthBasedBatchSampler( -# dataset, batch_size, drop_last=True, shuffle=mode == "train" -# ) -# kwargs["collate_fn"] = DataCollatorForSeq2Seq(dataset_processer) -# # kwargs["collate_fn"] = default_data_collator -# return kwargs - - -def get_dataloader_kwargs(train_config: train_config, dataset, dataset_processer, mode): +def get_dataloader_kwargs(train_config, dataset, dataset_processer, mode): kwargs = {} batch_size = train_config.batch_size_training if mode == "train" else train_config.val_batch_size - kwargs["batch_size"] = batch_size - kwargs["drop_last"] = True - kwargs["collate_fn"] = default_data_collator - # use a distributed sampler to split data between devices if train_config.enable_ddp: - kwargs["sampler"] = data_utils.DistributedSampler( - dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank(), shuffle=True - ) + if train_config.enable_sorting_for_ddp: + if train_config.context_length: + raise ValueError( + "Sorting cannot be done with padding, Please disable sorting or pass context_length as None to disable padding" + ) + else: + kwargs["batch_sampler"] = DistributedLengthBasedBatchSampler( + dataset, + batch_size=batch_size, + rank=dist.get_rank(), + num_replicas=dist.get_world_size(), + shuffle=False, + ) + kwargs["collate_fn"] = DataCollatorForSeq2Seq(dataset_processer) + else: + kwargs["sampler"] = data_utils.DistributedSampler( + dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank(), shuffle=True + ) + kwargs["batch_size"] = batch_size + kwargs["drop_last"] = True + kwargs["collate_fn"] = default_data_collator + else: + kwargs["batch_size"] = batch_size + kwargs["drop_last"] = True + kwargs["collate_fn"] = default_data_collator return kwargs diff --git a/docs/source/finetune.md b/docs/source/finetune.md index c42a53f31..2b585a0ba 100644 --- a/docs/source/finetune.md +++ b/docs/source/finetune.md @@ -6,6 +6,9 @@ Same CLI can be used to run Finetuning on gpu by setting the device flag.(for fi ## Installation Same as QEfficient along with QAIC PyTorch Eager mode. + +For QEfficient Library : https://github.com/quic/efficient-transformers + For torch_qaic, assuming QEfficient is already installed, ```bash pip install /opt/qti-aic/integrations/torch_qaic/py310/torch_qaic-0.1.0-cp310-cp310-linux_x86_64.whl @@ -51,4 +54,13 @@ python -m QEfficient.cloud.finetune --device qaic:0 --use-peft --output_dir ./me ```python QAIC_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc-per-node 4 -m QEfficient.cloud.finetune --device qaic --enable_ddp --dist_backend qccl --num_epochs 2 --model_name "meta-llama/Llama-3.2-1B" ``` -**nproc-per-node is number of workers(gpus) running locally. \ No newline at end of file +**nproc-per-node is number of workers(QAIC devices) running locally. + +## Visualization + +Tensorboard logs are generated inside runs/ directory with date and time stamp. +to visualise the data, + +```python +tensorboard --logdir runs/ --bind_all +``` \ No newline at end of file