项目实践 | 行人跟踪与摔倒检测报警(文末获取完整源码)

1、简介

本项目的目的是为了给大家提供跟多的实战思路,抛砖引玉为大家提供一个案例,也希望读者可以根据该方法实现更多的思想与想法,也希望读者可以改进该项目种提到的方法,比如改进其中的行人检测器、跟踪方法、行为识别算法等等。

本项目主要检测识别的行为有7类: Standing, Walking, Sitting, Lying Down, Stand up, Sit down, Fall Down

2、项目方法简介

本文涉及的方法与算法包括:YOLO V3 Tiny、Deepsort、ST-GCN方法,其中YOLO V3 Tiny用于行人检测、DeepSort用于跟踪、而ST-GCN则是用于行为检测。

这里由于YOLO与DeepSort大家都已经比较了解,因此这里只简单说明一下ST-GCN 的流程,这里ST-GCN 的方法结构图如下:

picture.image

给出一个动作视频的骨架序列信息,首先构造出表示该骨架序列信息的图结构,ST-GCN的输入就是图节点上的关节坐标向量,然后是一系列时空图卷积操作来提取高层的特征,最后用SofMax分类器得到对应的动作分类。整个过程实现了端到端的训练。

GCN 帮助我们学习了到空间中相邻关节的局部特征。在此基础上,我们需要学习时间中关节变化的局部特征。如何为 Graph 叠加时序特征,是图卷积网络面临的问题之一。这方面的研究主要有两个思路:时间卷积(TCN)和序列模型(LSTM)。

ST-GCN 使用的是 TCN,由于形状固定,可以使用传统的卷积层完成时间卷积操作。为了便于理解,可以类比图像的卷积操作。st-gcn 的 feature map 最后三个维度的形状为(C,V,T),与图像 feature map 的形状(C,W,H)相对应。

  • 图像的通道数C对应关节的特征数C。
  • 图像的宽W对应关键帧数V。
  • 图像的高H对应关节数T。

在图像卷积中,卷积核的大小为『w』×『1』,则每次完成w行像素,1列像素的卷积。『stride』为s,则每次移动s像素,完成1行后进行下1行像素的卷积。

在时间卷积中,卷积核的大小为『temporal_kernel_size』×『1』,则每次完成1个节点,temporal_kernel_size 个关键帧的卷积。『stride』为1,则每次移动1帧,完成1个节点后进行下1个节点的卷积。

训练如下:

picture.image

输入的数据首先进行batch normalization,然后在经过9个ST-GCN单元,接着是一个global pooling得到每个序列的256维特征向量,最后用SoftMax函数进行分类,得到最后的标签。

每一个ST-GCN采用Resnet的结构,前三层的输出有64个通道,中间三层有128个通道,最后三层有256个通道,在每次经过ST-CGN结构后,以0.5的概率随机将特征dropout,第4和第7个时域卷积层的strides设置为2。用SGD训练,学习率为0.01,每10个epochs学习率下降0.1。

ST-GCN 最末卷积层的响应可视化结果图如下:

picture.image

本文项目主函数代码如下:


        
        
            

          
        
        import os  
import cv2  
import time  
import torch  
import argparse  
import numpy as np  
  
from Detection.Utils import ResizePadding  
from CameraLoader import CamLoader, CamLoader_Q  
from DetectorLoader import TinyYOLOv3_onecls  
  
from PoseEstimateLoader import SPPE_FastPose  
from fn import draw_single  
  
from Track.Tracker import Detection, Tracker  
from ActionsEstLoader import TSSTG  
  
# source = '../Data/test\_video/test7.mp4'  
# source = '../Data/falldata/Home/Videos/video (2).avi' # hard detect  
source = './output/test3.mp4'  
# source = 2  
def preproc(image):  
    """preprocess function for CameraLoader.  
 """  
 image = resize_fn(image)  
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  
    return image  
  
  
def kpt2bbox(kpt, ex=20):  
    """Get bbox that hold on all of the keypoints (x,y)  
 kpt: array of shape `(N, 2)`,  
 ex: (int) expand bounding box,  
 """  
 return np.array((kpt[:, 0].min() - ex, kpt[:, 1].min() - ex,  
 kpt[:, 0].max() + ex, kpt[:, 1].max() + ex))  
  
  
if __name__ == '\_\_main\_\_':  
    par = argparse.ArgumentParser(description='Human Fall Detection Demo.')  
    par.add_argument('-C', '--camera', default=source, # required=True, # default=2,  
 help='Source of camera or video file path.')  
    par.add_argument('--detection\_input\_size', type=int, default=384,  
 help='Size of input in detection model in square must be divisible by 32 (int).')  
    par.add_argument('--pose\_input\_size', type=str, default='224x160',  
 help='Size of input in pose model must be divisible by 32 (h, w)')  
    par.add_argument('--pose\_backbone', type=str, default='resnet50', help='Backbone model for SPPE FastPose model.')  
    par.add_argument('--show\_detected', default=False, action='store\_true', help='Show all bounding box from detection.')  
    par.add_argument('--show\_skeleton', default=True, action='store\_true', help='Show skeleton pose.')  
    par.add_argument('--save\_out', type=str, default='./output/output3.mp4', help='Save display to video file.')  
    par.add_argument('--device', type=str, default='cuda', help='Device to run model on cpu or cuda.')  
    args = par.parse_args()  
  
    device = args.device  
  
    # DETECTION MODEL.  
 inp_dets = args.detection_input_size  
    detect_model = TinyYOLOv3_onecls(inp_dets, device=device)  
  
    # POSE MODEL.  
 inp_pose = args.pose_input_size.split('x')  
    inp_pose = (int(inp_pose[0]), int(inp_pose[1]))  
    pose_model = SPPE_FastPose(args.pose_backbone, inp_pose[0], inp_pose[1], device=device)  
  
    # Tracker.  
 max_age = 30  
 tracker = Tracker(max\_age=max_age, n\_init=3)  
  
    # Actions Estimate.  
 action_model = TSSTG()  
  
    resize_fn = ResizePadding(inp_dets, inp_dets)  
  
    cam_source = args.camera  
    if type(cam_source) is str and os.path.isfile(cam_source):  
        # Use loader thread with Q for video file.  
 cam = CamLoader_Q(cam_source, queue\_size=1000, preprocess=preproc).start()  
    else:  
        # Use normal thread loader for webcam.  
 cam = CamLoader(int(cam_source) if cam_source.isdigit() else cam_source,  
 preprocess=preproc).start()  
  
    # frame\_size = cam.frame\_size  
 # scf = torch.min(inp\_size / torch.FloatTensor([frame\_size]), 1)[0]  
 outvid = False  
 if args.save_out != '':  
        outvid = True  
 codec = cv2.VideoWriter_fourcc(*'mp4v')  
        print((inp_dets * 2, inp_dets * 2))  
        writer = cv2.VideoWriter(args.save_out, codec, 25, (inp_dets * 2, inp_dets * 2))  
  
    fps_time = 0  
 f = 0  
 while cam.grabbed():  
        f += 1  
 frame = cam.getitem()  
        image = frame.copy()  
  
        # Detect humans bbox in the frame with detector model.  
 detected = detect_model.detect(frame, need\_resize=False, expand\_bb=10)  
  
        # Predict each tracks bbox of current frame from previous frames information with Kalman filter.  
 tracker.predict()  
        # Merge two source of predicted bbox together.  
 for track in tracker.tracks:  
            det = torch.tensor([track.to_tlbr().tolist() + [0.5, 1.0, 0.0]], dtype=torch.float32)  
            detected = torch.cat([detected, det], dim=0) if detected is not None else det  
  
        detections = []  # List of Detections object for tracking.  
 if detected is not None:  
            # detected = non\_max\_suppression(detected[None, :], 0.45, 0.2)[0]  
 # Predict skeleton pose of each bboxs.  
 poses = pose_model.predict(frame, detected[:, 0:4], detected[:, 4])  
  
            # Create Detections object.  
 detections = [Detection(kpt2bbox(ps['keypoints'].numpy()),  
 np.concatenate((ps['keypoints'].numpy(),  
 ps['kp\_score'].numpy()), axis=1),  
 ps['kp\_score'].mean().numpy()) for ps in poses]  
  
            # VISUALIZE.  
 if args.show_detected:  
                for bb in detected[:, 0:5]:  
                    frame = cv2.rectangle(frame, (bb[0], bb[1]), (bb[2], bb[3]), (0, 0, 255), 1)  
  
        # Update tracks by matching each track information of current and previous frame or  
 # create a new track if no matched.  
 tracker.update(detections)  
  
        # Predict Actions of each track.  
 for i, track in enumerate(tracker.tracks):  
            if not track.is_confirmed():  
                continue  
 track_id = track.track_id  
            bbox = track.to_tlbr().astype(int)  
            center = track.get_center().astype(int)  
  
            action = 'pending..'  
 clr = (0, 255, 0)  
            # Use 30 frames time-steps to prediction.  
 if len(track.keypoints_list) == 30:  
                pts = np.array(track.keypoints_list, dtype=np.float32)  
                out = action_model.predict(pts, frame.shape[:2])  
                action_name = action_model.class_names[out[0].argmax()]  
                action = '{}: {:.2f}%'.format(action_name, out[0].max() * 100)  
                if action_name == 'Fall Down':  
                    clr = (255, 0, 0)  
                elif action_name == 'Lying Down':  
                    clr = (255, 200, 0)  
  
            # VISUALIZE.  
 if track.time_since_update == 0:  
                if args.show_skeleton:  
                    frame = draw_single(frame, track.keypoints_list[-1])  
                frame = cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 1)  
                frame = cv2.putText(frame, str(track_id), (center[0], center[1]), cv2.FONT_HERSHEY_COMPLEX, 0.4, (255, 0, 0), 2)  
                frame = cv2.putText(frame, action, (bbox[0] + 5, bbox[1] + 15), cv2.FONT_HERSHEY_COMPLEX, 0.4, clr, 1)  
  
        # Show Frame.  
 frame = cv2.resize(frame, (0, 0), fx=2., fy=2.)  
        frame = cv2.putText(frame, '%d, FPS: %f' % (f, 1.0 / (time.time() - fps_time)), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)  
        frame = frame[:, :, ::-1]  
        fps_time = time.time()  
  
        if outvid:  
            writer.write(frame)  
  
        cv2.imshow('frame', frame)  
        if cv2.waitKey(1) & 0xFF == ord('q'):  
            break  
 # Clear resource.  
 cam.stop()  
    if outvid:  
        writer.release()  
    cv2.destroyAllWindows()  

      

参考

[1].https://arxiv.org/abs/1801.07455

[2].https://blog.csdn.net/haha0825/article/details/107192773/

[3].https://github.com/yysijie/st-gcn

原文获取方式,扫描下方二维码

回复【

行人摔倒检测与跟踪

】即可获取论文与源码

picture.image

声明:转载请说明出处

扫描下方二维码关注【AI人工智能初学者】公众号,获取更多实践项目源码和论文解读,非常期待你我的相遇,让我们以梦为马,砥砺前行!!!

picture.image 点“在看”给我一朵小黄花呗 picture.image

0
0
0
0
评论
未登录
暂无评论