• Docs >
  • Module code >
  • mmpose.datasets.datasets.body3d.body3d_mview_direct_panoptic_dataset

Source code for mmpose.datasets.datasets.body3d.body3d_mview_direct_panoptic_dataset

# Copyright (c) OpenMMLab. All rights reserved.
import copy
import glob
import json
import os.path as osp
import pickle
import tempfile
import warnings
from collections import OrderedDict

import mmcv
import numpy as np
from mmcv import Config, deprecated_api_warning

from mmpose.core.camera import SimpleCamera
from mmpose.datasets.builder import DATASETS
from mmpose.datasets.datasets.base import Kpt3dMviewRgbImgDirectDataset

[docs]@DATASETS.register_module() class Body3DMviewDirectPanopticDataset(Kpt3dMviewRgbImgDirectDataset): """Panoptic dataset for direct multi-view human pose estimation. `Panoptic Studio: A Massively Multiview System for Social Motion Capture' ICCV'2015 More details can be found in the `paper <https://openaccess.thecvf.com/content_iccv_2015/papers/ Joo_Panoptic_Studio_A_ICCV_2015_paper.pdf>`__ . The dataset loads both 2D and 3D annotations as well as camera parameters. Panoptic keypoint indexes:: 'neck': 0, 'nose': 1, 'mid-hip': 2, 'l-shoulder': 3, 'l-elbow': 4, 'l-wrist': 5, 'l-hip': 6, 'l-knee': 7, 'l-ankle': 8, 'r-shoulder': 9, 'r-elbow': 10, 'r-wrist': 11, 'r-hip': 12, 'r-knee': 13, 'r-ankle': 14, 'l-eye': 15, 'l-ear': 16, 'r-eye': 17, 'r-ear': 18, Args: ann_file (str): Path to the annotation file. img_prefix (str): Path to a directory where images are held. Default: None. data_cfg (dict): config pipeline (list[dict | callable]): A sequence of data transforms. dataset_info (DatasetInfo): A class containing all dataset info. test_mode (bool): Store True when building test or validation dataset. Default: False. """ ALLOWED_METRICS = {'mpjpe', 'mAP'} def __init__(self, ann_file, img_prefix, data_cfg, pipeline, dataset_info=None, test_mode=False): if dataset_info is None: warnings.warn( 'dataset_info is missing. ' 'Check https://github.com/open-mmlab/mmpose/pull/663 ' 'for details.', DeprecationWarning) cfg = Config.fromfile('configs/_base_/datasets/panoptic_body3d.py') dataset_info = cfg._cfg_dict['dataset_info'] super().__init__( ann_file, img_prefix, data_cfg, pipeline, dataset_info=dataset_info, test_mode=test_mode) self.load_config(data_cfg) self.ann_info['use_different_joint_weights'] = False if ann_file is None: self.db_file = osp.join( img_prefix, f'group_{self.subset}_cam{self.num_cameras}.pkl') else: self.db_file = ann_file if osp.exists(self.db_file): with open(self.db_file, 'rb') as f: info = pickle.load(f) assert info['sequence_list'] == self.seq_list assert info['interval'] == self.seq_frame_interval assert info['cam_list'] == self.cam_list self.db = info['db'] else: self.db = self._get_db() info = { 'sequence_list': self.seq_list, 'interval': self.seq_frame_interval, 'cam_list': self.cam_list, 'db': self.db } with open(self.db_file, 'wb') as f: pickle.dump(info, f) self.db_size = len(self.db) print(f'=> load {len(self.db)} samples')
[docs] def load_config(self, data_cfg): """Initialize dataset attributes according to the config. Override this method to set dataset specific attributes. """ self.num_joints = data_cfg['num_joints'] assert self.num_joints <= 19 self.seq_list = data_cfg['seq_list'] self.cam_list = data_cfg['cam_list'] self.num_cameras = data_cfg['num_cameras'] assert self.num_cameras == len(self.cam_list) self.seq_frame_interval = data_cfg.get('seq_frame_interval', 1) self.subset = data_cfg.get('subset', 'train') self.need_camera_param = True self.root_id = data_cfg.get('root_id', 0) self.max_persons = data_cfg.get('max_num', 10)
def _get_cam(self, seq): """Get camera parameters. Args: seq (str): Sequence name. Returns: Camera parameters. """ cam_file = osp.join(self.img_prefix, seq, 'calibration_{:s}.json'.format(seq)) with open(cam_file) as cfile: calib = json.load(cfile) M = np.array([[1.0, 0.0, 0.0], [0.0, 0.0, -1.0], [0.0, 1.0, 0.0]]) cameras = {} for cam in calib['cameras']: if (cam['panel'], cam['node']) in self.cam_list: sel_cam = {} R_w2c = np.array(cam['R']).dot(M) T_w2c = np.array(cam['t']).reshape((3, 1)) * 10.0 # cm to mm R_c2w = R_w2c.T T_c2w = -R_w2c.T @ T_w2c sel_cam['R'] = R_c2w.tolist() sel_cam['T'] = T_c2w.tolist() sel_cam['K'] = cam['K'][:2] distCoef = cam['distCoef'] sel_cam['k'] = [distCoef[0], distCoef[1], distCoef[4]] sel_cam['p'] = [distCoef[2], distCoef[3]] cameras[(cam['panel'], cam['node'])] = sel_cam return cameras def _get_db(self): """Get dataset base. Returns: dict: the dataset base (2D and 3D information) """ width = 1920 height = 1080 db = [] sample_id = 0 for seq in self.seq_list: cameras = self._get_cam(seq) curr_anno = osp.join(self.img_prefix, seq, 'hdPose3d_stage1_coco19') anno_files = sorted(glob.iglob('{:s}/*.json'.format(curr_anno))) print(f'load sequence: {seq}', flush=True) for i, file in enumerate(anno_files): if i % self.seq_frame_interval == 0: with open(file) as dfile: bodies = json.load(dfile)['bodies'] if len(bodies) == 0: continue for k, cam_param in cameras.items(): single_view_camera = SimpleCamera(cam_param) postfix = osp.basename(file).replace('body3DScene', '') prefix = '{:02d}_{:02d}'.format(k[0], k[1]) image_file = osp.join(seq, 'hdImgs', prefix, prefix + postfix) image_file = image_file.replace('json', 'jpg') all_poses_3d = np.zeros( (self.max_persons, self.num_joints, 3), dtype=np.float32) all_poses_vis_3d = np.zeros( (self.max_persons, self.num_joints, 3), dtype=np.float32) all_roots_3d = np.zeros((self.max_persons, 3), dtype=np.float32) all_poses = np.zeros( (self.max_persons, self.num_joints, 3), dtype=np.float32) cnt = 0 person_ids = -np.ones(self.max_persons, dtype=int) for body in bodies: if cnt >= self.max_persons: break pose3d = np.array(body['joints19']).reshape( (-1, 4)) pose3d = pose3d[:self.num_joints] joints_vis = pose3d[:, -1] > 0.1 if not joints_vis[self.root_id]: continue # Coordinate transformation M = np.array([[1.0, 0.0, 0.0], [0.0, 0.0, -1.0], [0.0, 1.0, 0.0]]) pose3d[:, 0:3] = pose3d[:, 0:3].dot(M) * 10.0 all_poses_3d[cnt] = pose3d[:, :3] all_roots_3d[cnt] = pose3d[self.root_id, :3] all_poses_vis_3d[cnt] = np.repeat( np.reshape(joints_vis, (-1, 1)), 3, axis=1) pose2d = np.zeros((pose3d.shape[0], 3)) # get pose_2d from pose_3d pose2d[:, :2] = single_view_camera.world_to_pixel( pose3d[:, :3]) x_check = np.bitwise_and(pose2d[:, 0] >= 0, pose2d[:, 0] <= width - 1) y_check = np.bitwise_and( pose2d[:, 1] >= 0, pose2d[:, 1] <= height - 1) check = np.bitwise_and(x_check, y_check) joints_vis[np.logical_not(check)] = 0 pose2d[:, -1] = joints_vis all_poses[cnt] = pose2d person_ids[cnt] = body['id'] cnt += 1 if cnt > 0: db.append({ 'image_file': osp.join(self.img_prefix, image_file), 'joints_3d': all_poses_3d, 'person_ids': person_ids, 'joints_3d_visible': all_poses_vis_3d, 'joints': [all_poses], 'roots_3d': all_roots_3d, 'camera': cam_param, 'num_persons': cnt, 'sample_id': sample_id, 'center': np.array((width / 2, height / 2), dtype=np.float32), 'scale': self._get_scale((width, height)) }) sample_id += 1 return db
[docs] @deprecated_api_warning(name_dict=dict(outputs='results')) def evaluate(self, results, res_folder=None, metric='mpjpe', **kwargs): """ Args: results (list[dict]): Testing results containing the following items: - pose_3d (np.ndarray): predicted 3D human pose - sample_id (np.ndarray): sample id of a frame. res_folder (str, optional): The folder to save the testing results. If not specified, a temp folder will be created. Default: None. metric (str | list[str]): Metric to be performed. Defaults: 'mpjpe'. **kwargs: Returns: """ pose_3ds = np.concatenate([result['pose_3d'] for result in results], axis=0) sample_ids = [] for result in results: sample_ids.extend(result['sample_id']) _results = [ dict(sample_id=sample_id, pose_3d=pose_3d) for (sample_id, pose_3d) in zip(sample_ids, pose_3ds) ] _results = self._sort_and_unique_outputs(_results, key='sample_id') metrics = metric if isinstance(metric, list) else [metric] for _metric in metrics: if _metric not in self.ALLOWED_METRICS: raise ValueError( f'Unsupported metric "{_metric}"' f'Supported metrics are {self.ALLOWED_METRICS}') if res_folder is not None: tmp_folder = None res_file = osp.join(res_folder, 'result_keypoints.json') else: tmp_folder = tempfile.TemporaryDirectory() res_file = osp.join(tmp_folder.name, 'result_keypoints.json') mmcv.dump(_results, res_file) eval_list = [] gt_num = self.db_size // self.num_cameras assert len( _results) == gt_num, f'number mismatch: {len(_results)}, {gt_num}' total_gt = 0 for i in range(gt_num): index = self.num_cameras * i db_rec = copy.deepcopy(self.db[index]) joints_3d = db_rec['joints_3d'] joints_3d_vis = db_rec['joints_3d_visible'] if joints_3d_vis.sum() < 1: continue pred = _results[i]['pose_3d'].copy() pred = pred[pred[:, 0, 3] >= 0] for pose in pred: mpjpes = [] for (gt, gt_vis) in zip(joints_3d, joints_3d_vis): vis = gt_vis[:, 0] > 0 if vis.sum() < 1: break mpjpe = np.mean( np.sqrt( np.sum((pose[vis, 0:3] - gt[vis])**2, axis=-1))) mpjpes.append(mpjpe) min_gt = np.argmin(mpjpes) min_mpjpe = np.min(mpjpes) score = pose[0, 4] eval_list.append({ 'mpjpe': float(min_mpjpe), 'score': float(score), 'gt_id': int(total_gt + min_gt) }) total_gt += (joints_3d_vis[:, :, 0].sum(-1) >= 1).sum() mpjpe_threshold = np.arange(25, 155, 25) aps = [] ars = [] for t in mpjpe_threshold: ap, ar = self._eval_list_to_ap(eval_list, total_gt, t) aps.append(ap) ars.append(ar) name_value_tuples = [] for _metric in metrics: if _metric == 'mpjpe': stats_names = ['RECALL 500mm', 'MPJPE 500mm'] info_str = list( zip(stats_names, [ self._eval_list_to_recall(eval_list, total_gt), self._eval_list_to_mpjpe(eval_list) ])) elif _metric == 'mAP': stats_names = [ 'AP 25', 'AP 50', 'AP 75', 'AP 100', 'AP 125', 'AP 150', 'mAP', 'AR 25', 'AR 50', 'AR 75', 'AR 100', 'AR 125', 'AR 150', 'mAR' ] mAP = np.array(aps).mean() mAR = np.array(ars).mean() info_str = list(zip(stats_names, aps + [mAP] + ars + [mAR])) else: raise NotImplementedError name_value_tuples.extend(info_str) if tmp_folder is not None: tmp_folder.cleanup() return OrderedDict(name_value_tuples)
@staticmethod def _eval_list_to_ap(eval_list, total_gt, threshold): """Get Average Precision (AP) and Average Recall at a certain threshold.""" eval_list.sort(key=lambda k: k['score'], reverse=True) total_num = len(eval_list) tp = np.zeros(total_num) fp = np.zeros(total_num) gt_det = [] for i, item in enumerate(eval_list): if item['mpjpe'] < threshold and item['gt_id'] not in gt_det: tp[i] = 1 gt_det.append(item['gt_id']) else: fp[i] = 1 tp = np.cumsum(tp) fp = np.cumsum(fp) recall = tp / (total_gt + 1e-5) precise = tp / (tp + fp + 1e-5) for n in range(total_num - 2, -1, -1): precise[n] = max(precise[n], precise[n + 1]) precise = np.concatenate(([0], precise, [0])) recall = np.concatenate(([0], recall, [1])) index = np.where(recall[1:] != recall[:-1])[0] ap = np.sum((recall[index + 1] - recall[index]) * precise[index + 1]) return ap, recall[-2] @staticmethod def _eval_list_to_mpjpe(eval_list, threshold=500): """Get MPJPE within a certain threshold.""" eval_list.sort(key=lambda k: k['score'], reverse=True) gt_det = [] mpjpes = [] for i, item in enumerate(eval_list): if item['mpjpe'] < threshold and item['gt_id'] not in gt_det: mpjpes.append(item['mpjpe']) gt_det.append(item['gt_id']) return np.mean(mpjpes) if len(mpjpes) > 0 else np.inf @staticmethod def _eval_list_to_recall(eval_list, total_gt, threshold=500): """Get Recall at a certain threshold.""" gt_ids = [e['gt_id'] for e in eval_list if e['mpjpe'] < threshold] return len(np.unique(gt_ids)) / total_gt def __getitem__(self, idx): """Get the sample given index.""" results = {} for c in range(self.num_cameras): result = copy.deepcopy(self.db[self.num_cameras * idx + c]) result['ann_info'] = self.ann_info width = 1920 height = 1080 result['mask'] = [np.ones((height, width), dtype=np.float32)] results[c] = result return self.pipeline(results)
Read the Docs v: 0.x
On Read the Docs
Project Home

Free document hosting provided by Read the Docs.