From bb0a668feeae647c247a509ed69e5f6c926a045c Mon Sep 17 00:00:00 2001 From: Wenhao Chen Date: Mon, 25 Mar 2024 12:31:09 +0800 Subject: [PATCH] [hotfix] set return_outputs=False in examples and polish code (#5404) * fix: simplify merge_batch * fix: use return_outputs=False to eliminate extra memory consumption * feat: add return_outputs warning * style: remove `return_outputs=False` as it is the default value --- applications/ColossalMoE/train.py | 1 - colossalai/booster/plugin/hybrid_parallel_plugin.py | 3 +++ colossalai/pipeline/schedule/one_f_one_b.py | 10 +++------- .../train_gpt_using_hybrid_parallelism.md | 2 +- .../train_vit_with_hybrid_parallelism.md | 2 +- docs/source/en/features/pipeline_parallel.md | 3 +-- docs/source/en/features/shardformer.md | 2 +- .../train_gpt_using_hybrid_parallelism.md | 2 +- .../train_vit_with_hybrid_parallelism.md | 2 +- docs/source/zh-Hans/features/pipeline_parallel.md | 3 +-- docs/source/zh-Hans/features/shardformer.md | 2 +- examples/images/vit/vit_benchmark.py | 2 +- examples/language/bert/finetune.py | 2 +- examples/language/gpt/hybridparallelism/finetune.py | 2 +- examples/language/llama2/finetune.py | 2 +- examples/language/llama2/pretrain.py | 4 ++-- examples/language/openmoe/benchmark/benchmark_cai.py | 1 - examples/language/openmoe/train.py | 1 - examples/language/opt/opt_train_demo.py | 2 +- tests/test_booster/test_plugin/test_3d_plugin.py | 2 +- .../test_hybrid_parallel_plugin_checkpoint_io.py | 5 ++--- tests/test_moe/test_moe_checkpoint.py | 1 - tests/test_pipeline/test_schedule/test_interleaved.py | 4 ++-- tests/test_pipeline/test_schedule/test_oneF_oneB.py | 4 ++-- 24 files changed, 28 insertions(+), 36 deletions(-) diff --git a/applications/ColossalMoE/train.py b/applications/ColossalMoE/train.py index c567038ec..99603282b 100644 --- a/applications/ColossalMoE/train.py +++ b/applications/ColossalMoE/train.py @@ -238,7 +238,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index c37a6b4df..f51cb060c 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1183,6 +1183,9 @@ class HybridParallelPlugin(PipelinePluginBase): ) -> dict: assert self.enable_pipeline_parallelism, "pipeline parallelism is not enabled" + if return_outputs: + warnings.warn("return_outputs may lead to significant extra memory consumption.") + # Create a context for gradient synchronization based on the optimizer type. # If it's a HybridParallelZeroOptimizer, use optimizer.no_sync(); otherwise, use model.no_sync(). # This is to avoid redundant gradient reduction in pipeline parallelism (multiple microbatch values should be reduced once), diff --git a/colossalai/pipeline/schedule/one_f_one_b.py b/colossalai/pipeline/schedule/one_f_one_b.py index bf2f01b10..58008b98f 100644 --- a/colossalai/pipeline/schedule/one_f_one_b.py +++ b/colossalai/pipeline/schedule/one_f_one_b.py @@ -7,7 +7,7 @@ from torch.nn import Module from torch.utils._pytree import tree_map from colossalai.accelerator import get_accelerator -from colossalai.interface import ModelWrapper, OptimizerWrapper +from colossalai.interface import OptimizerWrapper from colossalai.pipeline.p2p import PipelineP2PCommunication, create_send_metadata from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.utils import get_current_device @@ -327,9 +327,7 @@ class OneForwardOneBackwardSchedule(PipelineSchedule): self.send_forward(output_obj) if outputs is not None: - if isinstance(model, ModelWrapper): - model = model.unwrap() - outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0)) + outputs = merge_batch(outputs) return {"loss": accum_loss, "outputs": outputs} def run_forward_backward( @@ -412,9 +410,7 @@ class OneForwardOneBackwardSchedule(PipelineSchedule): assert all(len(v) == 0 for v in input_objs) and all(len(v) == 0 for v in output_objs) if outputs is not None: - if isinstance(model, ModelWrapper): - model = model.unwrap() - outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0)) + outputs = merge_batch(outputs) return {"loss": accum_loss, "outputs": outputs} def forward_backward_step( diff --git a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index e87eafb6e..0133dfd86 100644 --- a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -178,7 +178,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md index 93fed61c3..dfc2cd596 100644 --- a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -231,7 +231,7 @@ def run_forward_backward( if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: # run pipeline forward backward when enabling pp in hybrid parallel plugin output_dict = booster.execute_pipeline( - data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True + data_iter, model, criterion, optimizer, return_loss=True ) loss, outputs = output_dict["loss"], output_dict["outputs"] else: diff --git a/docs/source/en/features/pipeline_parallel.md b/docs/source/en/features/pipeline_parallel.md index 31b20335e..d6f3cdfaf 100644 --- a/docs/source/en/features/pipeline_parallel.md +++ b/docs/source/en/features/pipeline_parallel.md @@ -198,8 +198,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: model, _criterion, optimizer, - return_loss=True, - return_outputs=True) + return_loss=True) # Backward and optimize if is_pp_last_stage: loss = outputs['loss'] diff --git a/docs/source/en/features/shardformer.md b/docs/source/en/features/shardformer.md index 1e633ebc0..672945ea2 100644 --- a/docs/source/en/features/shardformer.md +++ b/docs/source/en/features/shardformer.md @@ -271,7 +271,7 @@ However, if pipeline parallel is enabled, there are several usages different fro 3. Do forward and backward passing through calling `Booster.execute_pipeline` method: ```python outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) ``` Backward passing has been completed by this method, so there is no need to call `loss.backward()` after executing this method. diff --git a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index ae941b489..cf7d19172 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -175,7 +175,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md index 3de41601a..92775bafb 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -234,7 +234,7 @@ def run_forward_backward( if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: # run pipeline forward backward when enabling pp in hybrid parallel plugin output_dict = booster.execute_pipeline( - data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True + data_iter, model, criterion, optimizer, return_loss=True ) loss, outputs = output_dict["loss"], output_dict["outputs"] else: diff --git a/docs/source/zh-Hans/features/pipeline_parallel.md b/docs/source/zh-Hans/features/pipeline_parallel.md index e68802055..38e1fbfc5 100644 --- a/docs/source/zh-Hans/features/pipeline_parallel.md +++ b/docs/source/zh-Hans/features/pipeline_parallel.md @@ -193,8 +193,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: model, _criterion, optimizer, - return_loss=True, - return_outputs=True) + return_loss=True) # Backward and optimize if is_pp_last_stage: loss = outputs['loss'] diff --git a/docs/source/zh-Hans/features/shardformer.md b/docs/source/zh-Hans/features/shardformer.md index 972c48b0c..a7bcbd9f2 100644 --- a/docs/source/zh-Hans/features/shardformer.md +++ b/docs/source/zh-Hans/features/shardformer.md @@ -264,7 +264,7 @@ elif args.plugin == "hybrid_parallel": 3. 通过调用`Booster.execute_pipeline` 方法来执行前向和后向传递: ```python outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) ``` 该方法会自动执行后向传递,所以在执行该方法后不需要再调用 `loss.backward()`方法。 diff --git a/examples/images/vit/vit_benchmark.py b/examples/images/vit/vit_benchmark.py index 078017324..32b1ec803 100644 --- a/examples/images/vit/vit_benchmark.py +++ b/examples/images/vit/vit_benchmark.py @@ -120,7 +120,7 @@ def main(): # run pipeline forward backward batch = iter([batch]) outputs = booster.execute_pipeline( - batch, model, criterion, optimizer, return_loss=True, return_outputs=True + batch, model, criterion, optimizer, return_loss=True ) else: outputs = model(**batch) diff --git a/examples/language/bert/finetune.py b/examples/language/bert/finetune.py index 0b1e77fff..bd6c393a7 100644 --- a/examples/language/bert/finetune.py +++ b/examples/language/bert/finetune.py @@ -148,7 +148,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_device: diff --git a/examples/language/gpt/hybridparallelism/finetune.py b/examples/language/gpt/hybridparallelism/finetune.py index eb56ee530..888f47aaa 100644 --- a/examples/language/gpt/hybridparallelism/finetune.py +++ b/examples/language/gpt/hybridparallelism/finetune.py @@ -145,7 +145,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/llama2/finetune.py b/examples/language/llama2/finetune.py index 3dbd0cf35..122186c30 100644 --- a/examples/language/llama2/finetune.py +++ b/examples/language/llama2/finetune.py @@ -271,7 +271,7 @@ def main(): for step in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader_iter, model, _criterion, optimizer, return_loss=True ) loss = outputs["loss"] else: diff --git a/examples/language/llama2/pretrain.py b/examples/language/llama2/pretrain.py index fe7d95830..7b5805b80 100644 --- a/examples/language/llama2/pretrain.py +++ b/examples/language/llama2/pretrain.py @@ -185,7 +185,7 @@ def main(): microbatch_size=1, enable_jit_fused=False, zero_stage=0, - precision="fp32", + precision=args.mixed_precision, initial_scale=1, ) else: @@ -286,7 +286,7 @@ def main(): for step in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader_iter, model, _criterion, optimizer, return_loss=True ) loss = outputs["loss"] else: diff --git a/examples/language/openmoe/benchmark/benchmark_cai.py b/examples/language/openmoe/benchmark/benchmark_cai.py index 770c500d8..a6d5f8bf2 100644 --- a/examples/language/openmoe/benchmark/benchmark_cai.py +++ b/examples/language/openmoe/benchmark/benchmark_cai.py @@ -270,7 +270,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/openmoe/train.py b/examples/language/openmoe/train.py index 89c4d5420..f3267b7c6 100644 --- a/examples/language/openmoe/train.py +++ b/examples/language/openmoe/train.py @@ -340,7 +340,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/opt/opt_train_demo.py b/examples/language/opt/opt_train_demo.py index fddbc1b40..82dff1920 100644 --- a/examples/language/opt/opt_train_demo.py +++ b/examples/language/opt/opt_train_demo.py @@ -42,7 +42,7 @@ def train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, dataloader, b for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/tests/test_booster/test_plugin/test_3d_plugin.py b/tests/test_booster/test_plugin/test_3d_plugin.py index 38361d803..61558c003 100644 --- a/tests/test_booster/test_plugin/test_3d_plugin.py +++ b/tests/test_booster/test_plugin/test_3d_plugin.py @@ -74,7 +74,7 @@ def run_fn(init_method, model_fn, data_gen_fn, output_transform_fn) -> Optional[ loss = criterion(outputs[output_key]) return loss - booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True, return_outputs=False) + booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True) optimizer.step() except Exception as e: diff --git a/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py b/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py index b5cb31715..557666a80 100644 --- a/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py @@ -75,7 +75,7 @@ def exam_state_dict(shard: bool, model_name: str, size_per_shard: int, test_conf model.train() if booster.plugin.stage_manager is not None: booster.execute_pipeline( - _preprocess_data(data), model, _criterion, optimizer, return_loss=True, return_outputs=False + _preprocess_data(data), model, _criterion, optimizer, return_loss=True ) else: output = model(**_preprocess_data(data)) @@ -109,7 +109,7 @@ def exam_state_dict(shard: bool, model_name: str, size_per_shard: int, test_conf data_for_origin = data_gen_fn() if booster.plugin.stage_manager is not None: booster.execute_pipeline( - _preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True, return_outputs=False + _preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True ) booster.execute_pipeline( _preprocess_data(data_for_origin), @@ -117,7 +117,6 @@ def exam_state_dict(shard: bool, model_name: str, size_per_shard: int, test_conf _criterion, new_optimizer, return_loss=True, - return_outputs=False, ) else: old_model_loss = criterion(model(**_preprocess_data(data_for_shard))) diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index d6dad2d7f..10e63592a 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -49,7 +49,6 @@ def run_fwd_bwd( lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/tests/test_pipeline/test_schedule/test_interleaved.py b/tests/test_pipeline/test_schedule/test_interleaved.py index 0e81818eb..7aa464055 100644 --- a/tests/test_pipeline/test_schedule/test_interleaved.py +++ b/tests/test_pipeline/test_schedule/test_interleaved.py @@ -104,7 +104,7 @@ def run_pp( torch_loss.backward() pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) # check loss @@ -134,7 +134,7 @@ def run_pp( torch_loss = criterion(torch_output) pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) if stage_manager.is_last_stage(ignore_chunk=True): assert torch.allclose(torch_loss, pp_ret["loss"]) diff --git a/tests/test_pipeline/test_schedule/test_oneF_oneB.py b/tests/test_pipeline/test_schedule/test_oneF_oneB.py index a08dc6d27..e1a679890 100644 --- a/tests/test_pipeline/test_schedule/test_oneF_oneB.py +++ b/tests/test_pipeline/test_schedule/test_oneF_oneB.py @@ -100,7 +100,7 @@ def examine_pp(num_microbatch: int, batch_size: int): torch_loss = criterion(torch_output) torch_loss.backward() pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) # check loss @@ -130,7 +130,7 @@ def examine_pp(num_microbatch: int, batch_size: int): torch_loss = criterion(torch_output) pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) if stage_manager.is_last_stage(): assert torch.allclose(torch_loss, pp_ret["loss"])