mirror of
https://github.com/hpcaitech/ColossalAI.git
synced 2025-09-09 13:00:52 +00:00
[example] add diffusion inference (#1986)
This commit is contained in:
152
examples/images/diffusion/ldm/data/teyvat.py
Normal file
152
examples/images/diffusion/ldm/data/teyvat.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from typing import Dict
|
||||
import numpy as np
|
||||
from omegaconf import DictConfig, ListConfig
|
||||
import torch
|
||||
from torch.utils.data import Dataset
|
||||
from pathlib import Path
|
||||
import json
|
||||
from PIL import Image
|
||||
from torchvision import transforms
|
||||
from einops import rearrange
|
||||
from ldm.util import instantiate_from_config
|
||||
from datasets import load_dataset
|
||||
|
||||
def make_multi_folder_data(paths, caption_files=None, **kwargs):
|
||||
"""Make a concat dataset from multiple folders
|
||||
Don't suport captions yet
|
||||
If paths is a list, that's ok, if it's a Dict interpret it as:
|
||||
k=folder v=n_times to repeat that
|
||||
"""
|
||||
list_of_paths = []
|
||||
if isinstance(paths, (Dict, DictConfig)):
|
||||
assert caption_files is None, \
|
||||
"Caption files not yet supported for repeats"
|
||||
for folder_path, repeats in paths.items():
|
||||
list_of_paths.extend([folder_path]*repeats)
|
||||
paths = list_of_paths
|
||||
|
||||
if caption_files is not None:
|
||||
datasets = [FolderData(p, caption_file=c, **kwargs) for (p, c) in zip(paths, caption_files)]
|
||||
else:
|
||||
datasets = [FolderData(p, **kwargs) for p in paths]
|
||||
return torch.utils.data.ConcatDataset(datasets)
|
||||
|
||||
class FolderData(Dataset):
|
||||
def __init__(self,
|
||||
root_dir,
|
||||
caption_file=None,
|
||||
image_transforms=[],
|
||||
ext="jpg",
|
||||
default_caption="",
|
||||
postprocess=None,
|
||||
return_paths=False,
|
||||
) -> None:
|
||||
"""Create a dataset from a folder of images.
|
||||
If you pass in a root directory it will be searched for images
|
||||
ending in ext (ext can be a list)
|
||||
"""
|
||||
self.root_dir = Path(root_dir)
|
||||
self.default_caption = default_caption
|
||||
self.return_paths = return_paths
|
||||
if isinstance(postprocess, DictConfig):
|
||||
postprocess = instantiate_from_config(postprocess)
|
||||
self.postprocess = postprocess
|
||||
if caption_file is not None:
|
||||
with open(caption_file, "rt") as f:
|
||||
ext = Path(caption_file).suffix.lower()
|
||||
if ext == ".json":
|
||||
captions = json.load(f)
|
||||
elif ext == ".jsonl":
|
||||
lines = f.readlines()
|
||||
lines = [json.loads(x) for x in lines]
|
||||
captions = {x["file_name"]: x["text"].strip("\n") for x in lines}
|
||||
else:
|
||||
raise ValueError(f"Unrecognised format: {ext}")
|
||||
self.captions = captions
|
||||
else:
|
||||
self.captions = None
|
||||
|
||||
if not isinstance(ext, (tuple, list, ListConfig)):
|
||||
ext = [ext]
|
||||
|
||||
# Only used if there is no caption file
|
||||
self.paths = []
|
||||
for e in ext:
|
||||
self.paths.extend(list(self.root_dir.rglob(f"*.{e}")))
|
||||
if isinstance(image_transforms, ListConfig):
|
||||
image_transforms = [instantiate_from_config(tt) for tt in image_transforms]
|
||||
image_transforms.extend([transforms.ToTensor(),
|
||||
transforms.Lambda(lambda x: rearrange(x * 2. - 1., 'c h w -> h w c'))])
|
||||
image_transforms = transforms.Compose(image_transforms)
|
||||
self.tform = image_transforms
|
||||
|
||||
|
||||
def __len__(self):
|
||||
if self.captions is not None:
|
||||
return len(self.captions.keys())
|
||||
else:
|
||||
return len(self.paths)
|
||||
|
||||
def __getitem__(self, index):
|
||||
data = {}
|
||||
if self.captions is not None:
|
||||
chosen = list(self.captions.keys())[index]
|
||||
caption = self.captions.get(chosen, None)
|
||||
if caption is None:
|
||||
caption = self.default_caption
|
||||
filename = self.root_dir/chosen
|
||||
else:
|
||||
filename = self.paths[index]
|
||||
|
||||
if self.return_paths:
|
||||
data["path"] = str(filename)
|
||||
|
||||
im = Image.open(filename)
|
||||
im = self.process_im(im)
|
||||
data["image"] = im
|
||||
|
||||
if self.captions is not None:
|
||||
data["txt"] = caption
|
||||
else:
|
||||
data["txt"] = self.default_caption
|
||||
|
||||
if self.postprocess is not None:
|
||||
data = self.postprocess(data)
|
||||
|
||||
return data
|
||||
|
||||
def process_im(self, im):
|
||||
im = im.convert("RGB")
|
||||
return self.tform(im)
|
||||
|
||||
def hf_dataset(
|
||||
path = "Fazzie/Teyvat",
|
||||
image_transforms=[],
|
||||
image_column="image",
|
||||
text_column="text",
|
||||
image_key='image',
|
||||
caption_key='txt',
|
||||
):
|
||||
"""Make huggingface dataset with appropriate list of transforms applied
|
||||
"""
|
||||
ds = load_dataset(path, name="train")
|
||||
ds = ds["train"]
|
||||
image_transforms = [instantiate_from_config(tt) for tt in image_transforms]
|
||||
image_transforms.extend([transforms.Resize((256, 256)),
|
||||
transforms.ToTensor(),
|
||||
transforms.Lambda(lambda x: rearrange(x * 2. - 1., 'c h w -> h w c'))]
|
||||
)
|
||||
tform = transforms.Compose(image_transforms)
|
||||
|
||||
assert image_column in ds.column_names, f"Didn't find column {image_column} in {ds.column_names}"
|
||||
assert text_column in ds.column_names, f"Didn't find column {text_column} in {ds.column_names}"
|
||||
|
||||
def pre_process(examples):
|
||||
processed = {}
|
||||
processed[image_key] = [tform(im) for im in examples[image_column]]
|
||||
processed[caption_key] = examples[text_column]
|
||||
|
||||
return processed
|
||||
|
||||
ds.set_transform(pre_process)
|
||||
return ds
|
Reference in New Issue
Block a user