30分钟吃掉YOLOv8实例分割范例

技术

本范例我们使用 torchkeras来实现对 ultralytics中的YOLOv8实例分割模型进行自定义的训练,从而对气球进行检测和分割。

尽管ultralytics提供了非常便捷且一致的训练API,再使用torchkeras实现自定义训练逻辑似乎有些多此一举。

但ultralytics的源码结构相对复杂,不便于用户做个性化的控制和修改。

并且,torchkeras在可视化上会比ultralytics的原生训练代码优雅许多。

不信的话,我们对比看看就明白啦。

picture.image

此外,本文的内容对同学们熟悉ultralytics这个库的代码结构也会有帮助。

😋😋公众号算法美食屋后台回复关键词: torchkeras ,获取本文notebook源代码和balloon数据集下载链接。

〇,预测过程


        
          
from ultralytics import YOLO   
model = YOLO('yolov8n-seg.pt')    

      

        
          
import numpy as np   
from PIL import Image  
img_path = 'park.jpg'  
try:  
    img = Image.open(img_path)  
except Exception as err:  
    from torchkeras.data import get_example_image   
    img = get_example_image(img_path)  
    img.save(img_path)  

      

        
          
#可以保存预测结果以及可视化图片  
result = model.predict(source=img_path, save=True,   
                       save_txt=True, conf = 0.3)   

      

        
          
from pathlib import Path   
import ultralytics  
from ultralytics.data import utils   
yaml_path = str(Path(ultralytics.__file__).parent/'cfg/datasets/coco128-seg.yaml')   
class_names = utils.yaml_load(yaml_path)['names']  
  

      

        
          
from torchkeras import plots   
  
boxes = result[0].boxes.data  
masks = result[0].masks.data  
plots.plot_instance_segmentation(img,boxes,masks,class_names)  
  

      

picture.image

一,准备数据

训练yolo实例分割模型需要将数据集整理成yolo数据集格式。


        
          
yolo_dataset  
├── images  
│   ├── train  
│   │   ├── train0.jpg  
│   │   └── train1.jpg  
│   ├── val  
│   │   ├── val0.jpg  
│   │   └── val1.jpg  
│   └── test  
│       ├── test0.jpg  
│       └── test1.jpg  
└── labels  
    ├── train  
    │   ├── train0.txt  
    │   └── train1.txt  
    ├── val  
    │   ├── val0.txt  
    │   └── val1.txt  
    └── test  
        ├── test0.txt  
        └── test1.txt  

      

对于实例分割模型,标签文件(如train0.txt)格式如下:


        
          
class_id point1(x,y) point2(x,y) point3(x,y) point4(x,y),...  
8 0.417781 0.771355 0.440328 0.735397 0.467375 0.658995 0.440328 0.605047 0.387719 0.524159 0.378703 0.443248 0.333625 0.436519 0.371188 0.375841 0.335125 0.364603 0.350156 0.315164 0.320094 0.299439 0.320094 0.256752 0.327609 0.198318 0.357672 0.184836 0.39825 0.155607 0.498937 0.139883 0.470375 0.0724766 0.513953 0.117407 0.553031 0.083715 0.608641 0.115164 0.67175 0.173598 0.704812 0.184836 0.710828 0.211799 0.707828 0.232033 0.718344 0.263481 0.713828 0.308435 0.707828 0.348879 0.691297 0.398318 0.676266 0.416285 0.673266 0.476963 0.641703 0.420794 0.623672 0.510678 0.604125 0.566846 0.560547 0.623037 0.547016 0.676963 0.568063 0.730888 0.607141 0.771355 0.584594 0.811799 0.506453 0.829766 0.411766 0.793832 0.420781 0.769112  

      

注意class_id从0开始, point坐标都是相对图片长宽的相对坐标。

1,转换成yolo格式

下面将原本是json格式的balloon数据集转换成yolo格式。


        
          
import os,json   
from pathlib import Path   
from shutil import copyfile   
from PIL import Image   
from tqdm import tqdm  
  
root_path = './datasets/balloon-seg/'  
  
# 1,构建目录  
data_root = Path(root_path)  
for tp in ('images','labels'):  
    for part in ('train','val'):  
        (data_root/tp/part).mkdir(parents=True, exist_ok=True)  
          
# 2,复制图片文件  
train_images = [str(x) for x in Path('balloon/train/').rglob('*.jpg')]  
val_images = [str(x) for x in Path('balloon/val/').rglob('*.jpg')]  
  
for src_file in tqdm(train_images):  
    name = os.path.basename(src_file)  
    dst_file = root_path+'images/train/'+name  
    copyfile(src_file,dst_file)  
      
for src_file in tqdm(val_images):  
    name = os.path.basename(src_file)  
    dst_file = root_path+'images/val/'+name  
    copyfile(src_file,dst_file)  
      
      
# 3,生成标签文件  
train_dir = "balloon/train/"  
val_dir = "balloon/val/"  
  
train_json_file = train_dir + "via\_region\_data.json"  
val_json_file = val_dir + "via\_region\_data.json"  
  
def get\_poly(anno):  
    anno = anno["shape\_attributes"]  
    px = anno["all\_points\_x"]  
    py = anno["all\_points\_y"]  
    poly = [(x + 0.5, y + 0.5) for x, y in zip(px, py)]  
    poly = [p for x in poly for p in x]  
    #box = [np.min(px), np.min(py), np.max(px), np.max(py)]  
    return poly  
  
def convert\_yolo(size,poly):  
    width,height = size   
    dh,dw = 1.0/height,1.0/width  
    poly = [dw*x if i%2==0 else dh*x for i,x in enumerate(poly)]  
    return poly  
  
def write\_yolo\_txt(label\_path,catids,yolo\_polys):  
    lines = [f"{cls} {' '.join(f'{x:.6f}' for x in poly)}\n"   
             for cls,poly in zip(catids,yolo_polys)]  
    with open(label_path, 'w') as fl:  
        fl.writelines(lines)   
      
def write\_labels(data\_dir,part):   
    with open(data_dir + "via\_region\_data.json") as f:  
        info = json.load(f)  
    info_values = list(info.values())   
    for info_value in tqdm(info_values):  
        img_path = data_dir + info_value['filename']  
        anno_list = list(info_value['regions'].values())  
        polys = [get_poly(anno) for anno in anno_list]  
        catids = np.array([0 for x in polys])  
        size = Image.open(img_path).size   
        yolo_polys = [convert_yolo(size,poly) for poly in polys]  
        txt_path = data_root/'labels'/part/info_value['filename'].replace('.jpg','.txt')  
        write_yolo_txt(txt_path,catids,yolo_polys)  
          
write_labels(train_dir,'train')  
write_labels(val_dir,'val')  
  

      

          
100%|██████████| 63/63 [00:00<00:00, 2057.52it/s]  
100%|██████████| 13/13 [00:00<00:00, 1704.63it/s]  
100%|██████████| 61/61 [00:00<00:00, 2880.77it/s]  
100%|██████████| 13/13 [00:00<00:00, 2820.50it/s]  

      

2,样本可视化


        
          
from PIL import Image,ImageDraw   
import os  
from pathlib import Path   
from shutil import copyfile   
from tqdm import tqdm  
import numpy as np   
  
def get\_labels\_polys(img\_path,gt\_path):  
    img = Image.open(img_path)  
    w,h = img.size    
    with open(gt_path, 'r') as fl:  
        lines = [x.rstrip() for x in fl.readlines()]  
    str_data = [x.split(' ') for x in lines]  
    relative_polys = [[float(x) for x in arr[1:]] for arr in str_data]  
    labels = [int(arr[0]) for arr in str_data]  
    polys = [ [x*w if i%2==0 else x*h  for i,x in enumerate(arr)]  for arr in relative_polys]  
    return labels,polys  
  
def plot\_polys(image,polys):  
    image_result = image.copy()  
    draw = ImageDraw.Draw(image_result)   
    for poly in polys:  
        draw.polygon(poly, fill ="cyan", outline ="red")   
    return image_result   
  

      

        
          
from pathlib import Path  
  
root_path = './datasets/balloon-seg/'  
  
data_root = Path(root_path)  
val_imgs = [str(x) for x in (data_root/'images'/'val').rglob("*.jpg") if 'checkpoint' not in str(x)]  
  
img_path = val_imgs[2]   
gt_path = img_path.replace('images','labels').replace('.jpg','.txt')  
  

      

        
          
labels,polys = get_labels_polys(img_path,gt_path)  
plot_polys(Image.open(img_path),polys)  
  

      

picture.image

3,数据集配置文件

仿照 ultralytics/data/yolo/data/datasets 中已有的一些yaml数据集配置文件,构建我们自己的数据集yaml文件。


        
          
import ultralytics   
print(ultralytics.__file__)    
  

      

        
          
%%writefile balloon-seg.yaml  
# Train/val/test sets as 1) dir: path/to/imgs, 2) file: path/to/imgs.txt, or 3) list: [path/to/imgs1, path/to/imgs2, ..]  
path: balloon-seg  # dataset root dir  
train: images/train  # train images (relative to 'path') 128 images  
val: images/val  # val images (relative to 'path') 128 images  
test:  # test images (optional)  
  
# Classes  
names:  
  0: ballon  
  
# Download script/URL (optional)  
# download: https://ultralytics.com/assets/coco128.zip  

      

Overwriting balloon-seg.yaml

4,数据管道


        
          
import torch  
from torch.utils.data import DataLoader  
from ultralytics.cfg import get_cfg  
from ultralytics.utils import DEFAULT_CFG,yaml_load   
from ultralytics.data.utils import check_cls_dataset, check_det_dataset  
from ultralytics.data import build_yolo_dataset,build_dataloader  
  

      

        
          
overrides = {'task':'segment',  
             'data':'balloon-seg.yaml',  
             'imgsz':640,  
             'workers':4  
            }  
cfg = get_cfg(cfg = DEFAULT_CFG,overrides=overrides)  
data_info = check_det_dataset(cfg.data)  
  
  

      

        
          
ds_train = build_yolo_dataset(cfg,img_path=data_info['train'],batch=cfg.batch,  
                              data = data_info,mode='train',rect=False,stride=32)  
  
ds_val = build_yolo_dataset(cfg,img_path=data_info['val'],batch=cfg.batch,data = data_info,  
    mode='val',rect=False,stride=32)  
  

      

        
          
dl_train = DataLoader(ds_train,batch_size = cfg.batch, num_workers = cfg.workers,  
                      collate_fn = ds_train.collate_fn)  
  
dl_val = DataLoader(ds_val,batch_size = cfg.batch, num_workers = cfg.workers,  
                      collate_fn = ds_val.collate_fn)  
  

      

二,定义模型

可以选择 yolov8n-seg,yolov8s-seg,yolov8l-seg,等官方定义好的模型结构,

也可以通过修改yaml模型配置文件来实现用户自定义的模型结构。


        
          
from ultralytics.nn.tasks import SegmentationModel   
model = SegmentationModel(cfg = 'yolov8n-seg.yaml', ch=3, nc=1)  
#weights = torch.hub.load\_state\_dict\_from\_url('https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt')  
weights = torch.load('yolov8n-seg.pt')  
model.load(weights['model'])  
  

      

        
          
model.args = cfg  
model.nc = data_info['nc']  # attach number of classes to model  
model.names = data_info['names']  

      

三,训练模型

我们分别演示使用ultralytics原生接口以及使用torchkeras的KerasModel两种接口训练模型的方法。

1, 使用ultralytics原生接口


        
          
from ultralytics import YOLO   
model = YOLO('yolov8n-seg.pt')    
results = model.train(data = 'balloon-seg.yaml', epochs = 50, workers=2)  

      

        
        
            

          #自动调参接口
            

          #yolo\_model = YOLO('yolov8n-seg.pt')
            

          #yolo\_model.tune(data='balloon-seg.yaml', epochs=10, 
            

          #                iterations=300, optimizer='AdamW', 
            

          #                plots=True, save=False, val=True)
            

            

        
      

picture.image

2, torchkeras梦中情炉接口


        
          
#测试loss计算过程  
for batch in dl_train:  
    break  
      
for key,value in batch.items():  
    if isinstance(value,torch.Tensor):  
        batch[key] = batch[key].cuda()  
      
model = model.cuda()  
model.train();  
batch['img'] = batch['img'].float()/255.0  
loss,_ = model.forward(batch)  
loss   

      

tensor(89.2061)


        
          
from torchkeras import KerasModel   
  
#我们需要修改StepRunner以适应Yolov8的数据集格式  
  
class StepRunner:  
    def \_\_init\_\_(self, net, loss\_fn, accelerator, stage = "train", metrics\_dict = None,   
                 optimizer = None, lr\_scheduler = None  
                 ):  
        self.net,self.loss_fn,self.metrics_dict,self.stage = net,loss_fn,metrics_dict,stage  
        self.optimizer,self.lr_scheduler = optimizer,lr_scheduler  
        self.accelerator = accelerator  
        self.net.train()   
  
      
    def \_\_call\_\_(self, batch):  
          
        batch['img'] = batch['img'].float()/255  
          
        #loss  
        loss,_ = model.forward(batch)  
  
        #backward()  
        if self.optimizer is not None and self.stage=="train":  
            self.accelerator.backward(loss)  
            self.optimizer.step()  
            if self.lr_scheduler is not None:  
                self.lr_scheduler.step()  
            self.optimizer.zero_grad()  
              
        all_loss = self.accelerator.gather(loss).sum()  
          
        #losses  
        step_losses = {self.stage+"\_loss":all_loss.item()}  
          
        #metrics  
        step_metrics = {}  
          
        if self.stage=="train":  
            if self.optimizer is not None:  
                step_metrics['lr'] = self.optimizer.state_dict()['param\_groups'][0]['lr']  
            else:  
                step_metrics['lr'] = 0.0  
        return step_losses,step_metrics  
      
KerasModel.StepRunner = StepRunner   
  

      

        
          
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)   
keras_model = KerasModel(net = model,   
                         loss_fn = None,   
                         optimizer = optimizer)  
  

      

        
          
keras_model.fit(train_data=dl_train,  
                val_data=dl_val,  
                epochs = 200,  
                ckpt_path='checkpoint',  
                patience=20,  
                monitor='val\_loss',  
                mode='min',  
                mixed_precision='no',  
                plot= True  
               )  

      

picture.image


        
          
#关闭mosaic增强再训一次  
ds_train.close_mosaic(cfg)  
keras_model.from_scratch = False  

      

        
          
keras_model.fit(train_data=dl_train,  
                val_data=dl_val,  
                epochs = 200,  
                ckpt_path='checkpoint',  
                patience=20,  
                monitor='val\_loss',  
                mode='min',  
                mixed_precision='no',  
                plot= True  
               )  

      

picture.image

四,评估模型

为了便于评估 map等指标,我们将权重再次保存后,用ultralytics的原生YOLO接口进行加载后评估。


        
          
from ultralytics import YOLO   
keras_model.load_ckpt('checkpoint')  
save_dic = dict(model = keras_model.net, train_args =dict(cfg))  
torch.save(save_dic, 'best\_yolo.pt')  
  

      

        
          
from ultralytics import YOLO   
best_model = YOLO(model = 'best\_yolo.pt')  
metrics = best_model.val(data = cfg.data )  
  

      

        
          
import pandas as pd   
df = pd.DataFrame()  
df['metric'] = metrics.keys  
for i,c in best_model.names.items():  
    df[c] = metrics.class_result(i)  
df   
  

      

picture.image

五,使用模型


        
          
from pathlib import Path   
root_path = './datasets/balloon-seg/'  
data_root = Path(root_path)  

      

        
          
val_imgs = [str(x) for x in (data_root/'images'/'train').rglob("*.jpg") if 'checkpoint' not in str(x)]  
img_path = val_imgs[10]   
Image.open(img_path)
      

        
          
result = best_model(img_path,conf=0.1)  

      

        
          
from torchkeras import plots  
masks = result[0].masks.data  
boxes = result[0].boxes.data   
plots.plot_instance_segmentation(Image.open(img_path),boxes,masks,  
    class_names = ['balloon'],min_score=0.0)  
  

      

picture.image

六, 导出onnx格式


        
          
success = best_model.export(format='onnx',dynamic=True)    

      

        
          
model = YOLO('best\_yolo.onnx',task='segment')  

      

        
          
img_path = val_imgs[5]  
result = model.predict(img_path,save_txt=True);  

      

        
          
from torchkeras import plots  
masks = result[0].masks.data  
boxes = result[0].boxes.data   
plots.plot_instance_segmentation(Image.open(img_path),boxes,masks,  
    class_names = ['balloon'],min_score=0.0)  

      

        
          
import torch   
from torch import nn   
import onnxruntime  
from PIL import Image   
from ultralytics.models.yolo.segment.predict import SegmentationPredictor   
from ultralytics.yolo.utils.torch_utils import select_device  
  
class OnnxModel(nn.Module):  
    def \_\_init\_\_(self,weights,  
                 device=torch.device('cpu'),  
                 dnn=False,  
                 data=None,  
                 fp16=False,  
                 fuse=True,  
                 verbose=True):  
          
        super().__init__()  
        w = weights  
        nn_module = False  
        onnx = True  
        pt, jit, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle, triton = [False]*12   
        nhwc = False  
        stride = 32    
        model, metadata = None, None  
        cuda = torch.cuda.is_available() and device.type != 'cpu'    
        names = ['circle']  
        self.__dict__.update(locals())  # assign all variables to self  
          
  
        providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else [  
            'CPUExecutionProvider']  
        self.session = onnxruntime.InferenceSession(w, providers=providers)  
        self.output_names = [x.name for x in self.session.get_outputs()]  
  
    def forward(self, im, augment=False, visualize=False):  
        im = im.cpu().numpy()    
        y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})  
        if isinstance(y, (list, tuple)):  
            return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]  
        else:  
            return self.from_numpy(y)  
      
    def from\_numpy(self, x):  
        return torch.tensor(x).to(self.device) if isinstance(x, np.ndarray) else x  
      
    def warmup(self,imgsz=(1, 3, 640, 640)):  
        im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float,   
                         device=self.device)  # input  
        for _ in range(2 if self.jit else 1):  #  
            self.forward(im)  # warmup  
      
      
class Predictor(SegmentationPredictor):  
    def setup\_model(self, model, verbose=True):  
        device = select_device(self.args.device, verbose=verbose)  
        model = model or self.args.model  
        self.args.half &= device.type != 'cpu'   
        self.model = OnnxModel(model,  
                                 device=device,  
                                 dnn=self.args.dnn,  
                                 data=self.args.data,  
                                 fp16=self.args.half,  
                                 verbose=verbose)  
        self.device = device  
        self.model.eval()  
          
args = dict(model='best\_yolo.onnx')  
predictor = Predictor(overrides=args)  
  

      

        
          
result = predictor(source = Image.open(img_path) )  

      

        
          
from torchkeras import plots  
masks = result[0].masks.data  
boxes = result[0].boxes.data   
plots.plot_instance_segmentation(Image.open(img_path),boxes,masks,  
    class_names = ['balloon'],min_score=0.0)  

      

picture.image

公众号算法美食屋后台回复关键词: torchkeras ,获取本文notebook代码和balloon数据集,以及更多有趣范例~

picture.image picture.image

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论