[zero] add unit test for AgChunk's append, close, access (#1423)

This commit is contained in:
HELSON
2022-08-09 18:03:10 +08:00
committed by GitHub
parent c577ed016e
commit 4fb3c52cf0
2 changed files with 133 additions and 3 deletions

View File

@@ -36,7 +36,7 @@ class AgChunk:
self.utilized_size = 0
# Here, we use torch process group,
# since ColoProcessGroup might get deprecated soon
self.torch_pg = process_group.dp_process_group
self.torch_pg = process_group.dp_process_group()
self.pg_size = dist.get_world_size(self.torch_pg)
self.pg_rank = dist.get_rank(self.torch_pg)
@@ -69,6 +69,8 @@ class AgChunk:
# some chunks can keep gathered all the time
# so their computation patterns are the same as that of the parameters in DDP
self.keep_gathered = keep_gathered
if self.keep_gathered:
pin_memory = False # since this chunk is gathered, it doesn't need to pin
# if pin_memory is True, we allocate a piece of CPU pin-memory
# for it all the time
@@ -134,7 +136,7 @@ class AgChunk:
if new_utilized_size > self.chunk_size:
raise ChunkFullError
self.chunk_temp[self.utilized_size: new_utilized_size].copy_(tensor.flatten())
self.chunk_temp[self.utilized_size: new_utilized_size].copy_(tensor.data.flatten())
assert type(self.chunk_temp) == torch.Tensor, "copy_tensor_to_chunk_slice must use a torch tensor"
tensor.data = self.chunk_temp[self.utilized_size: new_utilized_size].view(tensor.shape)
@@ -145,7 +147,7 @@ class AgChunk:
self.tensors_state_monitor[tensor_state] += 1
self.utilized_size = new_utilized_size
def close_chunk(self, shard_dev: torch.device):
def close_chunk(self, shard_dev: Optional[torch.device] = None):
"""Close the chunk. Any tensor can't be appended to a closed chunk.
"""
# sanity check
@@ -159,6 +161,14 @@ class AgChunk:
self.__scatter()
if self.keep_gathered:
if shard_dev is None:
shard_dev = get_current_device()
else:
assert shard_dev.type == 'cuda'
elif shard_dev is None:
shard_dev = torch.device('cpu')
if self.pin_memory or shard_dev.type == 'cpu':
self.cpu_shard = torch.empty(self.shard_size,
dtype=self.dtype,
@@ -364,3 +374,42 @@ class AgChunk:
for tensor_info in self.tensors_info.values():
if prev_state is None or tensor_info.state == prev_state:
self.__update_one_tensor_info(tensor_info, next_state)
def __repr__(self, detailed: bool = False):
output = [
"AgChunk Information:\n",
"\tchunk size: {}, chunk dtype: {}, process group size: {}\n".format(
self.chunk_size, self.dtype, self.pg_size),
"\t# of tensors: {}, utilized size: {}, utilized percentage: {:.2f}\n".format(
self.num_tensors, self.utilized_size, self.utilized_size / self.chunk_size)
]
def print_tensor(tensor, prefix=''):
output.append("{}shape: {}, dtype: {}, device: {}\n".format(
prefix, tensor.shape, tensor.dtype, tensor.device))
if self.chunk_temp is not None:
output.append("\tchunk temp:\n")
print_tensor(tensor=self.chunk_temp, prefix='\t\t')
if self.chunk_total is not None and self.chunk_total.storage().size() > 0:
output.append("\tchunk total:\n")
print_tensor(tensor=self.chunk_total, prefix='\t\t')
if self.cuda_shard is not None:
output.append("\tcuda shard:\n")
print_tensor(tensor=self.cuda_shard, prefix='\t\t')
if self.cpu_shard is not None:
output.append("\tcpu shard:\n")
print_tensor(tensor=self.cpu_shard, prefix='\t\t')
memory_info = self.memory_usage
output.append("\tmemory usage: cuda {}, cpu {}\n".format(memory_info['cuda'], memory_info['cpu']))
if detailed:
output.append("\ttensor state monitor:\n")
for st in TensorState:
output.append("\t\t# of {}: {}\n".format(st, self.tensors_state_monitor[st]))
return ''.join(output)