mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2026-05-05 12:24:38 +00:00
[polish] add test_ops directory (#1431)
This commit is contained in:
77
tests/test_ops/test_addmm_tp.py
Normal file
77
tests/test_ops/test_addmm_tp.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import colossalai
|
||||
import torch
|
||||
import pytest
|
||||
import torch.nn as nn
|
||||
import torch.multiprocessing as mp
|
||||
from colossalai.tensor import ColoTensor, ProcessGroup
|
||||
from colossalai.tensor import ColoTensorSpec
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
from functools import partial
|
||||
from tests.test_tensor.common_utils import tensor_shard_equal, tensor_equal, split_param_row_tp1d, split_param_col_tp1d
|
||||
|
||||
|
||||
class Conv1D(nn.Module):
|
||||
"""
|
||||
1D-convolutional layer as defined by Radford et al. for OpenAI GPT (and also used in GPT-2).
|
||||
Basically works like a linear layer but the weights are transposed.
|
||||
Args:
|
||||
nf (`int`): The number of output features.
|
||||
nx (`int`): The number of input features.
|
||||
"""
|
||||
|
||||
def __init__(self, nf, nx):
|
||||
super().__init__()
|
||||
self.nf = nf
|
||||
w = torch.empty(nx, nf)
|
||||
nn.init.normal_(w, std=0.02)
|
||||
self.weight = nn.Parameter(w)
|
||||
self.bias = nn.Parameter(torch.ones(nf))
|
||||
|
||||
def forward(self, x):
|
||||
size_out = x.size()[:-1] + (self.nf,)
|
||||
x = torch.addmm(self.bias, x.view(-1, x.size(-1)), self.weight)
|
||||
x = x.view(size_out)
|
||||
return x
|
||||
|
||||
|
||||
def run_with_spec(spec_init_func, split_bias):
|
||||
model = Conv1D(4, 16).cuda()
|
||||
world_size = torch.distributed.get_world_size()
|
||||
pg = ProcessGroup(tp_degree=world_size)
|
||||
|
||||
weight = ColoTensor(torch.nn.Parameter(model.weight.detach()), ColoTensorSpec(pg))
|
||||
bias = ColoTensor(torch.nn.Parameter(model.bias.detach()), ColoTensorSpec(pg))
|
||||
|
||||
spec_init_func(weight, pg)
|
||||
if split_bias:
|
||||
spec_init_func(bias, pg)
|
||||
|
||||
x = torch.rand(2, 16).cuda()
|
||||
out = model(x)
|
||||
colo_out = torch.addmm(bias, x, weight)
|
||||
colo_out = colo_out.to_replicate()
|
||||
assert tensor_equal(out, colo_out)
|
||||
grad = torch.rand_like(out)
|
||||
out.backward(grad)
|
||||
colo_out.backward(grad)
|
||||
tensor_shard_equal(model.weight.grad, weight.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
tensor_shard_equal(model.bias.grad, bias.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
run_with_spec(spec_init_func=split_param_row_tp1d, split_bias=False)
|
||||
run_with_spec(spec_init_func=split_param_col_tp1d, split_bias=True)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 4])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_addmm_1d(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_addmm_1d(4)
|
||||
234
tests/test_ops/test_cache_embedding.py
Normal file
234
tests/test_ops/test_cache_embedding.py
Normal file
@@ -0,0 +1,234 @@
|
||||
import pytest
|
||||
from functools import partial
|
||||
import torch
|
||||
import torch.multiprocessing as mp
|
||||
import numpy as np
|
||||
import random
|
||||
|
||||
import colossalai
|
||||
from colossalai.utils import free_port
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.tensor import ColoParameter, ProcessGroup, ShardSpec, ComputePattern, ComputeSpec
|
||||
from colossalai.nn._ops.cache_embedding import CachedParamMgr, FreqAwareEmbeddingBag, ParallelFreqAwareEmbeddingBag
|
||||
|
||||
NUM_EMBED, EMBED_DIM = 10, 8
|
||||
BATCH_SIZE = 8
|
||||
|
||||
|
||||
def set_seed(seed):
|
||||
"""
|
||||
To achieve reproducible results, it's necessary to fix random seeds
|
||||
"""
|
||||
random.seed(seed)
|
||||
np.random.seed(seed)
|
||||
torch.manual_seed(seed)
|
||||
|
||||
|
||||
def synthesize_1d_sparse_feature(
|
||||
batch_size,
|
||||
num_embed,
|
||||
device,
|
||||
):
|
||||
indices_in_batch = batch_size * 2
|
||||
indices = torch.randint(low=0, high=num_embed, size=(indices_in_batch,), device=device, dtype=torch.long)
|
||||
offsets = torch.from_numpy(
|
||||
np.array([
|
||||
0, *np.sort(np.random.randint(low=0, high=indices_in_batch, size=(indices_in_batch - 1,))), indices_in_batch
|
||||
])).to(device).long()
|
||||
return indices, offsets
|
||||
|
||||
|
||||
def test_cachemgr():
|
||||
model = torch.nn.EmbeddingBag(10000, 128)
|
||||
# 10 chunks, 5 in cuda
|
||||
mgr = CachedParamMgr(model.weight, 5)
|
||||
assert mgr.cuda_row_num == 5
|
||||
|
||||
mgr._admit(1)
|
||||
assert not mgr._chunk_in_cuda(2)
|
||||
assert mgr._chunk_in_cuda(1)
|
||||
|
||||
# print(mgr.cached_chunk_table)
|
||||
mgr._admit(8)
|
||||
|
||||
# now 3 chunk is available
|
||||
assert mgr.cuda_available_chunk_num == 3
|
||||
|
||||
mgr._evict()
|
||||
assert mgr.cuda_available_chunk_num == 4
|
||||
|
||||
mgr._prepare_rows_on_cuda(torch.tensor([9, 6, 5], dtype=torch.long, device=0))
|
||||
mgr._prepare_rows_on_cuda(torch.tensor([3, 4, 5], dtype=torch.long, device=0))
|
||||
# print(mgr.cached_chunk_table)
|
||||
# mgr.print_comm_stats()
|
||||
|
||||
mgr.flush()
|
||||
assert mgr.cuda_available_chunk_num == 5
|
||||
|
||||
|
||||
def test_reorder_with_freq():
|
||||
num_embed = 100
|
||||
chunk_size = 1
|
||||
num_chunk = 5
|
||||
|
||||
idx_map = np.random.randint(10000, size=(num_embed,))
|
||||
sorted_idx = np.flipud(np.argsort(idx_map)).tolist()
|
||||
chunkid, offset_in_chunk = [], []
|
||||
for i in range(num_embed):
|
||||
idx = sorted_idx.index(i)
|
||||
chunkid.append(idx // chunk_size)
|
||||
offset_in_chunk.append(idx % chunk_size)
|
||||
|
||||
chunkid = torch.tensor(chunkid, dtype=torch.long, device=torch.cuda.current_device())
|
||||
offset_in_chunk = torch.tensor(offset_in_chunk, dtype=torch.long, device=torch.cuda.current_device())
|
||||
|
||||
weight = torch.rand(num_embed, 2)
|
||||
mgr = CachedParamMgr(weight, num_chunk)
|
||||
|
||||
mgr.reorder(idx_map)
|
||||
|
||||
indices = mgr.idx_map.index_select(0, torch.arange(num_embed, dtype=torch.long, device=torch.cuda.current_device()))
|
||||
mgr_chunk_id = torch.div(indices, chunk_size, rounding_mode='floor')
|
||||
mgr_offsets = torch.remainder(indices, chunk_size)
|
||||
assert torch.allclose(chunkid, mgr_chunk_id), f"chunk id: {chunkid}, mgr: {mgr_chunk_id}"
|
||||
assert torch.allclose(offset_in_chunk, mgr_offsets), \
|
||||
f"offset in chunk: {offset_in_chunk}, mgr: {mgr_offsets}"
|
||||
|
||||
|
||||
def test_freq_aware_embed():
|
||||
device = torch.device('cuda', 0)
|
||||
model = FreqAwareEmbeddingBag(
|
||||
NUM_EMBED,
|
||||
EMBED_DIM,
|
||||
mode='mean',
|
||||
include_last_offset=True,
|
||||
).to(device)
|
||||
model.preprocess(cuda_row_num=BATCH_SIZE * 2, ids_freq_mapping=None)
|
||||
|
||||
assert model.weight.shape[0] == NUM_EMBED
|
||||
ref_model = torch.nn.EmbeddingBag.from_pretrained(model.weight.detach().to(device),
|
||||
mode='mean',
|
||||
include_last_offset=True,
|
||||
freeze=False)
|
||||
|
||||
assert torch.allclose(ref_model.weight.detach(), model.weight.detach().to(device))
|
||||
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
|
||||
ref_optimizer = torch.optim.SGD(ref_model.parameters(), lr=1e-3)
|
||||
|
||||
for i in range(5):
|
||||
indices, offsets = synthesize_1d_sparse_feature(BATCH_SIZE, NUM_EMBED, device)
|
||||
res = model(indices, offsets)
|
||||
ref_res = ref_model(indices, offsets)
|
||||
assert torch.allclose(res, ref_res), f"model result: {res}, reference: {ref_res}"
|
||||
|
||||
grad = torch.rand_like(res)
|
||||
# comparing gradient here is nontrivial
|
||||
res.backward(grad)
|
||||
ref_res.backward(grad)
|
||||
optimizer.step()
|
||||
optimizer.zero_grad()
|
||||
|
||||
ref_optimizer.step()
|
||||
ref_optimizer.zero_grad()
|
||||
|
||||
model.cache_weight_mgr.flush()
|
||||
model_weight = model.weight.detach().to(device)
|
||||
ref_weight = ref_model.weight.detach()
|
||||
assert torch.allclose(model_weight, ref_weight), \
|
||||
f"model weight: {model_weight[10:18, :8]}, reference: {ref_weight[10:18, :8]}"
|
||||
|
||||
|
||||
def gather_tensor(tensor, rank, world_size):
|
||||
gather_list = []
|
||||
if rank == 0:
|
||||
gather_list = [torch.empty_like(tensor) for _ in range(world_size)]
|
||||
|
||||
torch.distributed.gather(tensor, gather_list, dst=0)
|
||||
return gather_list
|
||||
|
||||
|
||||
def run_parallel_freq_aware_embed(rank, world_size):
|
||||
device = torch.device('cuda', torch.cuda.current_device())
|
||||
|
||||
num_embed = 100
|
||||
embed_dim = 16
|
||||
batch_size = 4
|
||||
|
||||
set_seed(4321)
|
||||
weight = torch.rand(num_embed, embed_dim)
|
||||
coloweight = ColoParameter(weight.clone().detach().cpu(), requires_grad=False)
|
||||
|
||||
# initialize the tensor spec for the embedding weight parameter,
|
||||
# which is an ColoParameter.
|
||||
coloweight.process_group = ProcessGroup(tp_degree=world_size)
|
||||
coloweight.set_tensor_spec(ShardSpec(dims=[-1], num_partitions=[world_size]), ComputeSpec(ComputePattern.TP1D))
|
||||
|
||||
model = ParallelFreqAwareEmbeddingBag.from_pretrained(coloweight,
|
||||
include_last_offset=True,
|
||||
freeze=False,
|
||||
cuda_row_num=batch_size * 2)
|
||||
|
||||
assert model.cache_weight_mgr.cpu_weight.device.type == 'cpu'
|
||||
assert model.cache_weight_mgr.cuda_cached_weight.requires_grad
|
||||
weight_in_rank = torch.tensor_split(weight, world_size, -1)[rank]
|
||||
assert torch.allclose(
|
||||
weight_in_rank,
|
||||
model.cache_weight_mgr.cpu_weight.detach()), f"{weight_in_rank - model.cache_weight_mgr.cpu_weight}"
|
||||
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
|
||||
|
||||
if rank == 0:
|
||||
ref_model = torch.nn.EmbeddingBag.from_pretrained(weight.detach().clone(),
|
||||
include_last_offset=True,
|
||||
freeze=False).to(device)
|
||||
ref_optimizer = torch.optim.SGD(ref_model.parameters(), lr=1e-3)
|
||||
|
||||
set_seed(4321)
|
||||
for i in range(5):
|
||||
indices, offsets = synthesize_1d_sparse_feature(batch_size, num_embed, device)
|
||||
res = model(indices, offsets)
|
||||
|
||||
grad = torch.rand(batch_size * 2, embed_dim, dtype=res.dtype, device=res.device)
|
||||
grad_in_rank = torch.tensor_split(grad, world_size, 0)[rank]
|
||||
res.backward(grad_in_rank)
|
||||
|
||||
optimizer.step()
|
||||
optimizer.zero_grad()
|
||||
|
||||
res_list = gather_tensor(res.detach(), rank, world_size)
|
||||
|
||||
if rank == 0:
|
||||
ref_res = ref_model(indices, offsets)
|
||||
recover_res = torch.cat(res_list, dim=0)
|
||||
|
||||
assert torch.allclose(ref_res, recover_res)
|
||||
|
||||
ref_res.backward(grad)
|
||||
ref_optimizer.step()
|
||||
ref_optimizer.zero_grad()
|
||||
|
||||
model.cache_weight_mgr.flush()
|
||||
weight_list = gather_tensor(model.cache_weight_mgr.cpu_weight.detach().cuda(), rank, world_size)
|
||||
if rank == 0:
|
||||
recover_weight = torch.cat(weight_list, dim=1)
|
||||
assert torch.allclose(recover_weight, ref_model.weight.detach()), f"{recover_weight - ref_model.weight}"
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
run_parallel_freq_aware_embed(rank, world_size)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 4])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_parallel_freq_aware_embed(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# test_freq_aware_embed()
|
||||
# test_chunkmgr_admit()
|
||||
test_parallel_freq_aware_embed(2)
|
||||
47
tests/test_ops/test_embedding_bag_tp.py
Normal file
47
tests/test_ops/test_embedding_bag_tp.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from torch.nn import functional as F
|
||||
from functools import partial
|
||||
|
||||
import colossalai
|
||||
import pytest
|
||||
import torch
|
||||
import torch.multiprocessing as mp
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
from colossalai.tensor import ColoParameter, ColoTensorSpec, ProcessGroup
|
||||
from tests.test_tensor.common_utils import tensor_equal, tensor_shard_equal, split_param_col_tp1d
|
||||
|
||||
|
||||
def run_with_spec(spec_init_func):
|
||||
pg = ProcessGroup(tp_degree=torch.distributed.get_world_size())
|
||||
model = torch.nn.EmbeddingBag(10, 4).cuda()
|
||||
weight = ColoParameter(model.weight.clone(), True, ColoTensorSpec(pg))
|
||||
|
||||
spec_init_func(weight, pg)
|
||||
|
||||
inputs = torch.tensor([1, 2, 4, 5, 4, 3, 2, 9]).cuda()
|
||||
offsets = torch.tensor([0, 4]).cuda()
|
||||
out = model(inputs, offsets=offsets)
|
||||
colo_out = F.embedding_bag(inputs, weight, offsets=offsets)
|
||||
assert tensor_equal(out, colo_out)
|
||||
grad = torch.rand_like(out)
|
||||
out.backward(grad)
|
||||
colo_out.backward(grad)
|
||||
assert tensor_shard_equal(model.weight.grad, weight.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
config = dict(parallel=dict(tensor=dict(mode="1d", size=world_size),))
|
||||
colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
run_with_spec(split_param_col_tp1d)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 4])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_embedding_bag_1d(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_embedding_bag_1d(4)
|
||||
48
tests/test_ops/test_embedding_tp.py
Normal file
48
tests/test_ops/test_embedding_tp.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from torch.nn import functional as F
|
||||
from functools import partial
|
||||
|
||||
import colossalai
|
||||
import pytest
|
||||
import torch
|
||||
import torch.multiprocessing as mp
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
from colossalai.tensor import ColoTensorSpec, ProcessGroup, ColoTensor
|
||||
from tests.test_tensor.common_utils import tensor_equal, tensor_shard_equal, split_param_col_tp1d, split_param_row_tp1d
|
||||
|
||||
|
||||
def run_with_spec(spec_init_func, pg: ProcessGroup):
|
||||
model = torch.nn.Embedding(12, 32).cuda()
|
||||
weight = ColoTensor(torch.nn.Parameter(model.weight.detach()), ColoTensorSpec(pg))
|
||||
|
||||
spec_init_func(weight, pg)
|
||||
|
||||
x = torch.tensor((0, 3, 6, 9)).cuda()
|
||||
out = model(x)
|
||||
colo_out = F.embedding(x, weight)
|
||||
assert tensor_equal(out, colo_out)
|
||||
grad = torch.rand_like(out)
|
||||
out.backward(grad)
|
||||
colo_out.backward(grad)
|
||||
# compare grad inside a TP group
|
||||
assert tensor_shard_equal(model.weight.grad, weight.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
# config = dict(parallel=dict(tensor=dict(mode="1d", size=world_size),))
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
pg = ProcessGroup(tp_degree=world_size)
|
||||
run_with_spec(split_param_row_tp1d, pg)
|
||||
run_with_spec(split_param_col_tp1d, pg)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 4])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_embedding_1d(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_embedding_1d(4)
|
||||
52
tests/test_ops/test_linear_tp.py
Normal file
52
tests/test_ops/test_linear_tp.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from functools import partial
|
||||
|
||||
import colossalai
|
||||
import pytest
|
||||
import torch
|
||||
import torch.multiprocessing as mp
|
||||
import torch.nn.functional as F
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
from colossalai.tensor import ColoTensorSpec, ProcessGroup, ColoTensor
|
||||
from tests.test_tensor.common_utils import tensor_equal, tensor_shard_equal, split_param_col_tp1d, split_param_row_tp1d
|
||||
|
||||
|
||||
def run_with_spec(spec_init_func, split_bias):
|
||||
pg = ProcessGroup(tp_degree=torch.distributed.get_world_size())
|
||||
model = torch.nn.Linear(4, 8).cuda()
|
||||
weight = ColoTensor(torch.nn.Parameter(model.weight.detach()), ColoTensorSpec(pg))
|
||||
bias = ColoTensor(torch.nn.Parameter(model.bias.detach()), ColoTensorSpec(pg))
|
||||
|
||||
spec_init_func(weight, pg)
|
||||
if split_bias:
|
||||
spec_init_func(bias, pg)
|
||||
|
||||
x = torch.rand(2, 4).cuda()
|
||||
out = model(x)
|
||||
colo_out = F.linear(x, weight, bias)
|
||||
colo_out = colo_out.to_replicate()
|
||||
assert tensor_equal(out, colo_out)
|
||||
grad = torch.rand_like(out)
|
||||
out.backward(grad)
|
||||
colo_out.backward(grad)
|
||||
assert tensor_shard_equal(model.weight.grad, weight.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
assert tensor_shard_equal(model.bias.grad, bias.grad, pg.tp_local_rank(), pg.tp_world_size())
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
config = dict(parallel=dict(tensor=dict(mode="1d", size=world_size),))
|
||||
colossalai.launch(config=config, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
run_with_spec(spec_init_func=split_param_col_tp1d, split_bias=False)
|
||||
run_with_spec(spec_init_func=split_param_row_tp1d, split_bias=True)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 4])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_linear_1d(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_linear_1d(4)
|
||||
52
tests/test_ops/test_loss_func.py
Normal file
52
tests/test_ops/test_loss_func.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import torch
|
||||
import pytest
|
||||
import colossalai
|
||||
import torch.nn.functional as F
|
||||
import torch.multiprocessing as mp
|
||||
from functools import partial
|
||||
from colossalai.tensor import ColoTensor, ProcessGroup, ColoTensorSpec
|
||||
from colossalai.utils import get_current_device
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
from colossalai.tensor import ShardSpec, ComputeSpec, ComputePattern
|
||||
|
||||
|
||||
def check_cross_entropy():
|
||||
input_t = torch.randn(4, 4, device=get_current_device(), requires_grad=True)
|
||||
input_ct = torch.randn(4, 4, device=get_current_device(), requires_grad=True)
|
||||
with torch.no_grad():
|
||||
input_ct.copy_(input_t)
|
||||
|
||||
target = torch.randint(4, (4,), dtype=torch.int64, device=get_current_device())
|
||||
|
||||
world_size = torch.distributed.get_world_size()
|
||||
pg = ProcessGroup(tp_degree=world_size)
|
||||
input_t_colo = ColoTensor.from_torch_tensor(tensor=input_ct, spec=ColoTensorSpec(pg))
|
||||
input_shard = input_t_colo.redistribute(ShardSpec([-1], [pg.tp_world_size()]))
|
||||
input_shard.set_tensor_spec(dist_spec=None, compute_spec=ComputeSpec(ComputePattern.TP1D))
|
||||
|
||||
output = F.cross_entropy(input_t, target)
|
||||
output_colo = F.cross_entropy(input_shard, target)
|
||||
assert torch.allclose(output_colo, output)
|
||||
|
||||
output.backward()
|
||||
output_colo.backward()
|
||||
|
||||
assert torch.allclose(input_t.grad, input_ct.grad)
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
check_cross_entropy()
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1, 2])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_loss_func(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_loss_func(1)
|
||||
91
tests/test_ops/test_op.py
Normal file
91
tests/test_ops/test_op.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import torch
|
||||
import pytest
|
||||
import colossalai
|
||||
import torch.nn.functional as F
|
||||
import torch.multiprocessing as mp
|
||||
from functools import partial
|
||||
from colossalai.tensor import ColoTensor, ProcessGroup, ColoTensorSpec, ShardSpec
|
||||
from colossalai.utils import get_current_device
|
||||
from torch.nn import Parameter
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port
|
||||
|
||||
|
||||
def _run_layer_norm():
|
||||
ln_op = torch.nn.LayerNorm(2, 3, device=get_current_device())
|
||||
|
||||
input_t = torch.randn(3, 2, device=get_current_device())
|
||||
|
||||
pg = ProcessGroup(tp_degree=torch.distributed.get_world_size())
|
||||
input_t_colo = ColoTensor.from_torch_tensor(input_t.clone().detach(), ColoTensorSpec(pg))
|
||||
|
||||
# prepare colossalai LN
|
||||
weight = ColoTensor(Parameter(ln_op.weight.detach()), ColoTensorSpec(pg))
|
||||
bias = ColoTensor(Parameter(ln_op.bias.detach()), ColoTensorSpec(pg))
|
||||
|
||||
output = ln_op(input_t)
|
||||
output_colo = F.layer_norm(input_t_colo, ln_op.normalized_shape, weight, bias, ln_op.eps)
|
||||
|
||||
assert torch.allclose(output_colo, output)
|
||||
|
||||
torch.mean(output).backward()
|
||||
torch.mean(output_colo).backward()
|
||||
|
||||
assert torch.allclose(ln_op.weight.grad, weight.grad)
|
||||
|
||||
|
||||
def check_spec_eq(tensor, other):
|
||||
assert isinstance(tensor, ColoTensor) and isinstance(other, ColoTensor)
|
||||
for k in dir(tensor.dist_spec):
|
||||
if not k.startswith('__'):
|
||||
assert hasattr(other.dist_spec, k), f"{k}"
|
||||
assert getattr(tensor.dist_spec, k) == getattr(other.dist_spec, k)
|
||||
|
||||
|
||||
def check_element_wise_ops():
|
||||
world_size = torch.distributed.get_world_size()
|
||||
pg = ProcessGroup(tp_degree=world_size)
|
||||
t = torch.rand(2, 2)
|
||||
x = ColoTensor(t, spec=ColoTensorSpec(pg, ShardSpec([0], [pg.tp_world_size()])))
|
||||
|
||||
check_spec_eq(x, x.cuda())
|
||||
assert torch.equal(x.cuda(), t.cuda())
|
||||
check_spec_eq(x, torch.abs(x))
|
||||
assert torch.equal(torch.abs(x), torch.abs(t))
|
||||
check_spec_eq(x, F.sigmoid(x))
|
||||
assert torch.equal(F.sigmoid(x), F.sigmoid(t))
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
check_element_wise_ops()
|
||||
_run_layer_norm()
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [2])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_element_wise_ops(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
def run_dist2(rank, world_size, port):
|
||||
colossalai.launch(config={}, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
_run_layer_norm()
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [1])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_ln(world_size):
|
||||
run_func = partial(run_dist2, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
def check_all():
|
||||
test_element_wise_ops(2)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
check_all()
|
||||
100
tests/test_ops/test_view.py
Normal file
100
tests/test_ops/test_view.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from functools import partial
|
||||
|
||||
import colossalai
|
||||
import pytest
|
||||
import torch
|
||||
import torch.multiprocessing as mp
|
||||
import torch.distributed as dist
|
||||
from colossalai.testing import rerun_if_address_is_in_use
|
||||
from colossalai.utils import free_port, get_current_device
|
||||
from colossalai.tensor import ColoTensorSpec, ProcessGroup, ColoTensor, ShardSpec
|
||||
from colossalai.tensor.distspec import DistPlacementPattern
|
||||
from tests.test_tensor.common_utils import split_param_row_tp1d, split_param_col_tp1d, debug_print
|
||||
|
||||
|
||||
def exam_view_core(pg):
|
||||
# the case of replicated ColoTensors
|
||||
x = torch.randn(4, 4).cuda()
|
||||
x_colo = ColoTensor(x, ColoTensorSpec(pg))
|
||||
|
||||
y = x.view(2, -1, 2)
|
||||
y_colo = x_colo.view(2, -1, 2)
|
||||
|
||||
assert torch.all(y == y_colo)
|
||||
assert y_colo.dist_spec.placement == DistPlacementPattern.REPLICATE
|
||||
# the perfect case of col-sliced ColoTensors
|
||||
split_param_col_tp1d(x_colo, pg)
|
||||
|
||||
z = x.view(torch.Size((2, 1, 2, -1)))
|
||||
z_colo = x_colo.view(torch.Size((2, 1, 2, -1)))
|
||||
if dist.get_rank() == 0:
|
||||
z = z[:, :, :, 0:2]
|
||||
else:
|
||||
z = z[:, :, :, 2:]
|
||||
assert torch.all(z == z_colo)
|
||||
assert z_colo.dist_spec == x_colo.dist_spec
|
||||
# the perfect case of row-sliced ColoTensors
|
||||
split_param_row_tp1d(x_colo, pg)
|
||||
|
||||
z = x.view(torch.Size((-1, 2, 2)))
|
||||
z_colo = x_colo.view(torch.Size((-1, 2, 2)))
|
||||
if dist.get_rank() == 0:
|
||||
z = z[0:2, :, :]
|
||||
else:
|
||||
z = z[2:, :, :]
|
||||
assert torch.all(z == z_colo)
|
||||
assert z_colo.dist_spec == x_colo.dist_spec
|
||||
# the normal case of row-sliced ColoTensors
|
||||
z = x.view(-1, 2, 2, 2)
|
||||
z_colo = x_colo.view(-1, 2, 2, 2)
|
||||
assert torch.all(z == z_colo)
|
||||
assert y_colo.dist_spec.placement == DistPlacementPattern.REPLICATE
|
||||
|
||||
|
||||
def exam_view_autograd(pg):
|
||||
x = torch.randn(8, 2, device=get_current_device(), requires_grad=True)
|
||||
y = torch.randn(8, 2, device=get_current_device(), requires_grad=True)
|
||||
with torch.no_grad():
|
||||
y.copy_(x)
|
||||
y = ColoTensor(y, ColoTensorSpec(pg))
|
||||
y_slice = y.redistribute(ShardSpec([-1], [pg.tp_world_size()]))
|
||||
|
||||
xx = x.view(2, 2, -1)
|
||||
yy_slice = y_slice.view(2, 2, -1)
|
||||
yy = yy_slice.to_replicate()
|
||||
grad = torch.randn(2, 2, 4, device=get_current_device())
|
||||
|
||||
xx.backward(grad)
|
||||
yy.backward(grad)
|
||||
assert torch.all(x.grad == y.grad)
|
||||
|
||||
|
||||
def exam_view_errors(pg):
|
||||
x = torch.randn(8, 2, device=get_current_device())
|
||||
x = ColoTensor(x, ColoTensorSpec(pg))
|
||||
split_param_row_tp1d(x, pg)
|
||||
|
||||
x.view('a', 'b', 'c')
|
||||
x.view(8, -1)
|
||||
x.view([-2, -2, -2])
|
||||
x.view((-1, -1, -1))
|
||||
|
||||
|
||||
def run_dist(rank, world_size, port):
|
||||
colossalai.launch(config=dict(), rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
|
||||
pg = ProcessGroup(tp_degree=torch.distributed.get_world_size())
|
||||
exam_view_core(pg)
|
||||
exam_view_autograd(pg)
|
||||
# exam_view_errors(pg)
|
||||
|
||||
|
||||
@pytest.mark.dist
|
||||
@pytest.mark.parametrize('world_size', [2])
|
||||
@rerun_if_address_is_in_use()
|
||||
def test_view(world_size):
|
||||
run_func = partial(run_dist, world_size=world_size, port=free_port())
|
||||
mp.spawn(run_func, nprocs=world_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_view(2)
|
||||
Reference in New Issue
Block a user