[test] skip tests when not enough GPUs are detected (#1090)

* [test] skip tests when not enough GPUs are detected

* polish code

* polish code
This commit is contained in:
Frank Lee 2022-06-09 17:19:13 +08:00 committed by GitHub
parent 3a7571b1d7
commit 50ec3a7e06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 43 additions and 17 deletions

View File

@ -1,7 +1,7 @@
from .comparison import assert_equal, assert_not_equal, assert_close, assert_close_loose, assert_equal_in_group from .comparison import assert_equal, assert_not_equal, assert_close, assert_close_loose, assert_equal_in_group
from .utils import parameterize, rerun_on_exception, rerun_if_address_is_in_use from .utils import parameterize, rerun_on_exception, rerun_if_address_is_in_use, skip_if_not_enough_gpus
__all__ = [ __all__ = [
'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize', 'assert_equal', 'assert_not_equal', 'assert_close', 'assert_close_loose', 'assert_equal_in_group', 'parameterize',
'rerun_on_exception', 'rerun_if_address_is_in_use' 'rerun_on_exception', 'rerun_if_address_is_in_use', 'skip_if_not_enough_gpus'
] ]

View File

@ -172,3 +172,32 @@ def rerun_if_address_is_in_use():
func_wrapper = rerun_on_exception(exception_type=exception, pattern=".*Address already in use.*") func_wrapper = rerun_on_exception(exception_type=exception, pattern=".*Address already in use.*")
return func_wrapper return func_wrapper
def skip_if_not_enough_gpus(min_gpus: int):
"""
This function is used to check the number of available GPUs on the system and
automatically skip the test cases which require more GPUs.
Note:
The wrapped function must have `world_size` in its keyword argument.
Usage:
@skip_if_not_enough_gpus(min_gpus=8)
def test_something():
# will be skipped if there are fewer than 8 GPUs available
do_something()
Arg:
min_gpus (int): the minimum number of GPUs required to run this test.
"""
def _wrap_func(f):
def _execute_by_gpu_num(*args, **kwargs):
num_avail_gpu = torch.cuda.device_count()
if num_avail_gpu >= min_gpus:
f(*args, **kwargs)
return _execute_by_gpu_num
return _wrap_func

View File

@ -10,7 +10,7 @@ import torch.multiprocessing as mp
from colossalai.amp import AMP_TYPE from colossalai.amp import AMP_TYPE
from colossalai.trainer import Trainer, hooks from colossalai.trainer import Trainer, hooks
from colossalai.context import ParallelMode from colossalai.context import ParallelMode
from colossalai.testing import rerun_if_address_is_in_use from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus
from colossalai.utils import free_port from colossalai.utils import free_port
from colossalai.core import global_context as gpc from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger from colossalai.logging import get_dist_logger
@ -83,7 +83,7 @@ def run_trainer(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test requires 8 GPUs to execute") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_if_address_is_in_use() @rerun_if_address_is_in_use()
def test_hybrid_parallel(): def test_hybrid_parallel():
world_size = 8 world_size = 8

View File

@ -9,7 +9,7 @@ from colossalai.core import global_context as gpc
from colossalai.initialize import launch from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers from colossalai.logging import disable_existing_loggers
from colossalai.utils import free_port from colossalai.utils import free_port
from colossalai.testing import rerun_if_address_is_in_use from colossalai.testing import rerun_if_address_is_in_use, skip_if_not_enough_gpus
from checks_3d.check_layer_3d import (check_classifier_given_embed_weight, check_classifier_no_given_weight, from checks_3d.check_layer_3d import (check_classifier_given_embed_weight, check_classifier_no_given_weight,
check_embed, check_layernorm, check_linear, check_loss, check_patch_embed, check_embed, check_layernorm, check_linear, check_loss, check_patch_embed,
check_vocab_parallel_classifier_given_embed_weight, check_vocab_parallel_classifier_given_embed_weight,
@ -38,7 +38,6 @@ def check_layer():
check_loss() check_loss()
check_vocab_parallel_loss() check_vocab_parallel_loss()
def check_layer_and_operation(rank, world_size, port): def check_layer_and_operation(rank, world_size, port):
disable_existing_loggers() disable_existing_loggers()
launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl') launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
@ -51,7 +50,7 @@ def check_layer_and_operation(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test requires 8 GPUs to execute") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_if_address_is_in_use() @rerun_if_address_is_in_use()
def test_3d(): def test_3d():
world_size = 8 world_size = 8

View File

@ -15,7 +15,7 @@ from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers from colossalai.logging import disable_existing_loggers
from colossalai.utils import free_port, is_using_pp from colossalai.utils import free_port, is_using_pp
from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint
from colossalai.testing import rerun_on_exception from colossalai.testing import rerun_on_exception, skip_if_not_enough_gpus
def build_pipeline(model): def build_pipeline(model):
@ -67,7 +67,7 @@ def check_checkpoint_1d(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test should be invoked with 8 GPUs") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*") @rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
def test_checkpoint_1d(): def test_checkpoint_1d():
world_size = 8 world_size = 8

View File

@ -15,7 +15,7 @@ from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers from colossalai.logging import disable_existing_loggers
from colossalai.utils import free_port, get_current_device, is_using_pp from colossalai.utils import free_port, get_current_device, is_using_pp
from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint
from colossalai.testing import rerun_on_exception from colossalai.testing import rerun_on_exception, skip_if_not_enough_gpus
def build_pipeline(model): def build_pipeline(model):
@ -67,7 +67,7 @@ def check_checkpoint_2d(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test should be invoked with 8 GPUs") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*") @rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
def test_checkpoint_2d(): def test_checkpoint_2d():
world_size = 8 world_size = 8

View File

@ -15,7 +15,7 @@ from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers from colossalai.logging import disable_existing_loggers
from colossalai.utils import free_port, get_current_device, is_using_pp from colossalai.utils import free_port, get_current_device, is_using_pp
from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint
from colossalai.testing import rerun_on_exception from colossalai.testing import rerun_on_exception, skip_if_not_enough_gpus
def build_pipeline(model): def build_pipeline(model):
@ -37,7 +37,6 @@ def build_pipeline(model):
def check_equal(A, B): def check_equal(A, B):
assert torch.allclose(A, B, rtol=1e-3, atol=1e-2) assert torch.allclose(A, B, rtol=1e-3, atol=1e-2)
def check_checkpoint_2p5d(rank, world_size, port): def check_checkpoint_2p5d(rank, world_size, port):
config = dict(parallel=dict(pipeline=dict(size=2), tensor=dict(size=4, depth=1, mode="2.5d")),) config = dict(parallel=dict(pipeline=dict(size=2), tensor=dict(size=4, depth=1, mode="2.5d")),)
@ -67,7 +66,7 @@ def check_checkpoint_2p5d(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test should be invoked with 8 GPUs") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*") @rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
def test_checkpoint_2p5d(): def test_checkpoint_2p5d():
world_size = 8 world_size = 8

View File

@ -15,7 +15,7 @@ from colossalai.initialize import launch
from colossalai.logging import disable_existing_loggers from colossalai.logging import disable_existing_loggers
from colossalai.utils import free_port, get_current_device, is_using_pp from colossalai.utils import free_port, get_current_device, is_using_pp
from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint from colossalai.utils.checkpointing import gather_pipeline_parallel_state_dict, load_checkpoint, save_checkpoint
from colossalai.testing import rerun_on_exception from colossalai.testing import rerun_on_exception, skip_if_not_enough_gpus
def build_pipeline(model): def build_pipeline(model):
@ -37,7 +37,6 @@ def build_pipeline(model):
def check_equal(A, B): def check_equal(A, B):
assert torch.allclose(A, B, rtol=1e-3, atol=1e-2) assert torch.allclose(A, B, rtol=1e-3, atol=1e-2)
def check_checkpoint_3d(rank, world_size, port): def check_checkpoint_3d(rank, world_size, port):
config = dict(parallel=dict(pipeline=dict(size=1), tensor=dict(size=8, mode="3d")),) config = dict(parallel=dict(pipeline=dict(size=1), tensor=dict(size=8, mode="3d")),)
@ -67,7 +66,7 @@ def check_checkpoint_3d(rank, world_size, port):
@pytest.mark.dist @pytest.mark.dist
@pytest.mark.skip("This test requires 8 GPUs to execute") @skip_if_not_enough_gpus(min_gpus=8)
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*") @rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
def test_checkpoint_3d(): def test_checkpoint_3d():
world_size = 8 world_size = 8