# Copyright (c) OpenMMLab. All rights reserved.
from queue import Full, Queue
from threading import Thread
from typing import List, Union

import cv2

from ..node import Node
from ..registry import NODES

[docs]@NODES.register_module() class RecorderNode(Node): """Record the video frames into a local file. :class:`RecorderNode` uses OpenCV backend to record the video. Recording is performed in a separate thread to avoid blocking the data stream. A buffer queue is used to cached the arrived frame images. Args: name (str): The node name (also thread name) input_buffer (str): The name of the input buffer output_buffer (str|list): The name(s) of the output buffer(s) out_video_file (str): The path of the output video file out_video_fps (int): The frame rate of the output video. Default: 30 out_video_codec (str): The codec of the output video. Default: 'mp4v' buffer_size (int): Size of the buffer queue that caches the arrived frame images. enable (bool): Default enable/disable status. Default: ``True``. Example:: >>> cfg = dict( ... type='RecorderNode', ... name='recorder', ... out_video_file='webcam_demo.mp4', ... input_buffer='display', ... output_buffer='_display_' ... # `_display_` is an executor-reserved buffer ... ) >>> from import NODES >>> node = """ def __init__( self, name: str, input_buffer: str, output_buffer: Union[str, List[str]], out_video_file: str, out_video_fps: int = 30, out_video_codec: str = 'mp4v', buffer_size: int = 30, enable: bool = True, ): super().__init__(name=name, enable_key=None, enable=enable) self.queue = Queue(maxsize=buffer_size) self.out_video_file = out_video_file self.out_video_fps = out_video_fps self.out_video_codec = out_video_codec self.vwriter = None # Register buffers self.register_input_buffer(input_buffer, 'input', trigger=True) self.register_output_buffer(output_buffer) # Start a new thread to write frame self.t_record = Thread(target=self._record, args=(), daemon=True) self.t_record.start()
[docs] def process(self, input_msgs): input_msg = input_msgs['input'] img = input_msg.get_image() if input_msg is not None else None img_queued = False while not img_queued: try: self.queue.put(img, timeout=1) img_queued = True'Recorder received one frame.') except Full: self.logger.warn('Recorder jamed!') return input_msg
def _record(self): """This method is used to create a thread to get frame images from the buffer queue and write them into the file.""" while True: img = self.queue.get() if img is None: break if self.vwriter is None: fourcc = cv2.VideoWriter_fourcc(*self.out_video_codec) fps = self.out_video_fps frame_size = (img.shape[1], img.shape[0]) self.vwriter = cv2.VideoWriter(self.out_video_file, fourcc, fps, frame_size) assert self.vwriter.isOpened() self.vwriter.write(img)'Recorder released.') if self.vwriter is not None: self.vwriter.release()
[docs] def on_exit(self): try: # Try putting a None into the output queue so the self.vwriter will # be released after all queue frames have been written to file. self.queue.put(None, timeout=1) self.t_record.join(timeout=1) except Full: pass if self.t_record.is_alive(): # Force to release self.vwriter self.logger.warn('Recorder forced release!') if self.vwriter is not None: self.vwriter.release()
