import torch
import torch.distributed as dist
from torch.distributed.distributed_c10d import _get_default_group
from torch.testing import assert_close

from colossalai import launch
from colossalai.accelerator import get_accelerator
from colossalai.quantization.fp8 import all_to_all_single_fp8
from colossalai.testing import clear_cache_before_run, parameterize, rerun_if_address_is_in_use, spawn


@clear_cache_before_run()
@parameterize("shape", [(4,), (1, 8, 16), (4, 8, 16)])
@parameterize("dtype", [torch.bfloat16, torch.float16])
@parameterize("async_op", [True, False])
def check_all2all(shape, dtype, async_op):
    x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device())
    output = torch.empty_like(x)
    output_fp8 = torch.empty_like(x)
    origin_hanle = dist.all_to_all_single(output, x, group=_get_default_group(), async_op=async_op)
    fp8_handle = all_to_all_single_fp8(output_fp8, x, group=_get_default_group(), async_op=async_op)
    if async_op:
        origin_hanle.wait()
        fp8_handle.wait()
    assert_close(output, output_fp8, rtol=0.1, atol=0.1)


@clear_cache_before_run()
@parameterize("shape", [(8, 8, 16)])
@parameterize("dtype", [torch.bfloat16, torch.float16])
@parameterize("async_op", [True, False])
def check_all2all_uneven(shape, dtype, async_op):
    x = torch.rand(shape, dtype=dtype, device=get_accelerator().get_current_device())
    input_split_sizes = [3, 3, 1, 1]
    if dist.get_rank() in [0, 1]:
        output_split_sizes = [3, 3, 3, 3]
    else:
        output_split_sizes = [1, 1, 1, 1]
    output_shape = list(shape)
    output_shape[0] = sum(output_split_sizes)
    output = torch.empty(output_shape, device=x.device, dtype=x.dtype)
    output_fp8 = torch.empty(output_shape, device=x.device, dtype=x.dtype)
    origin_hanle = dist.all_to_all_single(
        output,
        x,
        output_split_sizes=output_split_sizes,
        input_split_sizes=input_split_sizes,
        group=_get_default_group(),
        async_op=async_op,
    )
    fp8_handle = all_to_all_single_fp8(
        output_fp8,
        x,
        output_split_sizes=output_split_sizes,
        input_split_sizes=input_split_sizes,
        group=_get_default_group(),
        async_op=async_op,
    )
    if async_op:
        origin_hanle.wait()
        fp8_handle.wait()
    assert_close(output, output_fp8, rtol=0.1, atol=0.1)


def run_dist(rank, world_size, port):
    launch(rank=rank, world_size=world_size, port=port, host="localhost")
    check_all2all()
    check_all2all_uneven()


@rerun_if_address_is_in_use()
def test_all_to_all_single():
    spawn(run_dist, 4)


if __name__ == "__main__":
    test_all_to_all_single()