481 lines
18 KiB
Python
481 lines
18 KiB
Python
from typing import Optional, Callable, Dict
|
|
import os
|
|
import enum
|
|
import time
|
|
import json
|
|
import numpy as np
|
|
import pyrealsense2 as rs
|
|
import multiprocessing as mp
|
|
import cv2
|
|
from threadpoolctl import threadpool_limits
|
|
from multiprocessing.managers import SharedMemoryManager
|
|
from diffusion_policy.common.timestamp_accumulator import get_accumulate_timestamp_idxs
|
|
from diffusion_policy.shared_memory.shared_ndarray import SharedNDArray
|
|
from diffusion_policy.shared_memory.shared_memory_ring_buffer import SharedMemoryRingBuffer
|
|
from diffusion_policy.shared_memory.shared_memory_queue import SharedMemoryQueue, Full, Empty
|
|
from diffusion_policy.real_world.video_recorder import VideoRecorder
|
|
|
|
class Command(enum.Enum):
|
|
SET_COLOR_OPTION = 0
|
|
SET_DEPTH_OPTION = 1
|
|
START_RECORDING = 2
|
|
STOP_RECORDING = 3
|
|
RESTART_PUT = 4
|
|
|
|
class SingleRealsense(mp.Process):
|
|
MAX_PATH_LENGTH = 4096 # linux path has a limit of 4096 bytes
|
|
|
|
def __init__(
|
|
self,
|
|
shm_manager: SharedMemoryManager,
|
|
serial_number,
|
|
resolution=(1280,720),
|
|
capture_fps=30,
|
|
put_fps=None,
|
|
put_downsample=True,
|
|
record_fps=None,
|
|
enable_color=True,
|
|
enable_depth=False,
|
|
enable_infrared=False,
|
|
get_max_k=30,
|
|
advanced_mode_config=None,
|
|
transform: Optional[Callable[[Dict], Dict]] = None,
|
|
vis_transform: Optional[Callable[[Dict], Dict]] = None,
|
|
recording_transform: Optional[Callable[[Dict], Dict]] = None,
|
|
video_recorder: Optional[VideoRecorder] = None,
|
|
verbose=False
|
|
):
|
|
super().__init__()
|
|
|
|
if put_fps is None:
|
|
put_fps = capture_fps
|
|
if record_fps is None:
|
|
record_fps = capture_fps
|
|
|
|
# create ring buffer
|
|
resolution = tuple(resolution)
|
|
shape = resolution[::-1]
|
|
examples = dict()
|
|
if enable_color:
|
|
examples['color'] = np.empty(
|
|
shape=shape+(3,), dtype=np.uint8)
|
|
if enable_depth:
|
|
examples['depth'] = np.empty(
|
|
shape=shape, dtype=np.uint16)
|
|
if enable_infrared:
|
|
examples['infrared'] = np.empty(
|
|
shape=shape, dtype=np.uint8)
|
|
examples['camera_capture_timestamp'] = 0.0
|
|
examples['camera_receive_timestamp'] = 0.0
|
|
examples['timestamp'] = 0.0
|
|
examples['step_idx'] = 0
|
|
|
|
vis_ring_buffer = SharedMemoryRingBuffer.create_from_examples(
|
|
shm_manager=shm_manager,
|
|
examples=examples if vis_transform is None
|
|
else vis_transform(dict(examples)),
|
|
get_max_k=1,
|
|
get_time_budget=0.2,
|
|
put_desired_frequency=capture_fps
|
|
)
|
|
|
|
ring_buffer = SharedMemoryRingBuffer.create_from_examples(
|
|
shm_manager=shm_manager,
|
|
examples=examples if transform is None
|
|
else transform(dict(examples)),
|
|
get_max_k=get_max_k,
|
|
get_time_budget=0.2,
|
|
put_desired_frequency=put_fps
|
|
)
|
|
|
|
# create command queue
|
|
examples = {
|
|
'cmd': Command.SET_COLOR_OPTION.value,
|
|
'option_enum': rs.option.exposure.value,
|
|
'option_value': 0.0,
|
|
'video_path': np.array('a'*self.MAX_PATH_LENGTH),
|
|
'recording_start_time': 0.0,
|
|
'put_start_time': 0.0
|
|
}
|
|
|
|
command_queue = SharedMemoryQueue.create_from_examples(
|
|
shm_manager=shm_manager,
|
|
examples=examples,
|
|
buffer_size=128
|
|
)
|
|
|
|
# create shared array for intrinsics
|
|
intrinsics_array = SharedNDArray.create_from_shape(
|
|
mem_mgr=shm_manager,
|
|
shape=(7,),
|
|
dtype=np.float64)
|
|
intrinsics_array.get()[:] = 0
|
|
|
|
# create video recorder
|
|
if video_recorder is None:
|
|
# realsense uses bgr24 pixel format
|
|
# default thread_type to FRAEM
|
|
# i.e. each frame uses one core
|
|
# instead of all cores working on all frames.
|
|
# this prevents CPU over-subpscription and
|
|
# improves performance significantly
|
|
video_recorder = VideoRecorder.create_h264(
|
|
fps=record_fps,
|
|
codec='h264',
|
|
input_pix_fmt='bgr24',
|
|
crf=18,
|
|
thread_type='FRAME',
|
|
thread_count=1)
|
|
|
|
# copied variables
|
|
self.serial_number = serial_number
|
|
self.resolution = resolution
|
|
self.capture_fps = capture_fps
|
|
self.put_fps = put_fps
|
|
self.put_downsample = put_downsample
|
|
self.record_fps = record_fps
|
|
self.enable_color = enable_color
|
|
self.enable_depth = enable_depth
|
|
self.enable_infrared = enable_infrared
|
|
self.advanced_mode_config = advanced_mode_config
|
|
self.transform = transform
|
|
self.vis_transform = vis_transform
|
|
self.recording_transform = recording_transform
|
|
self.video_recorder = video_recorder
|
|
self.verbose = verbose
|
|
self.put_start_time = None
|
|
|
|
# shared variables
|
|
self.stop_event = mp.Event()
|
|
self.ready_event = mp.Event()
|
|
self.ring_buffer = ring_buffer
|
|
self.vis_ring_buffer = vis_ring_buffer
|
|
self.command_queue = command_queue
|
|
self.intrinsics_array = intrinsics_array
|
|
|
|
@staticmethod
|
|
def get_connected_devices_serial():
|
|
serials = list()
|
|
for d in rs.context().devices:
|
|
if d.get_info(rs.camera_info.name).lower() != 'platform camera':
|
|
serial = d.get_info(rs.camera_info.serial_number)
|
|
product_line = d.get_info(rs.camera_info.product_line)
|
|
if product_line == 'D400':
|
|
# only works with D400 series
|
|
serials.append(serial)
|
|
serials = sorted(serials)
|
|
return serials
|
|
|
|
# ========= context manager ===========
|
|
def __enter__(self):
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.stop()
|
|
|
|
# ========= user API ===========
|
|
def start(self, wait=True, put_start_time=None):
|
|
self.put_start_time = put_start_time
|
|
super().start()
|
|
if wait:
|
|
self.start_wait()
|
|
|
|
def stop(self, wait=True):
|
|
self.stop_event.set()
|
|
if wait:
|
|
self.end_wait()
|
|
|
|
def start_wait(self):
|
|
self.ready_event.wait()
|
|
|
|
def end_wait(self):
|
|
self.join()
|
|
|
|
@property
|
|
def is_ready(self):
|
|
return self.ready_event.is_set()
|
|
|
|
def get(self, k=None, out=None):
|
|
if k is None:
|
|
return self.ring_buffer.get(out=out)
|
|
else:
|
|
return self.ring_buffer.get_last_k(k, out=out)
|
|
|
|
def get_vis(self, out=None):
|
|
return self.vis_ring_buffer.get(out=out)
|
|
|
|
# ========= user API ===========
|
|
def set_color_option(self, option: rs.option, value: float):
|
|
self.command_queue.put({
|
|
'cmd': Command.SET_COLOR_OPTION.value,
|
|
'option_enum': option.value,
|
|
'option_value': value
|
|
})
|
|
|
|
def set_exposure(self, exposure=None, gain=None):
|
|
"""
|
|
exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s)
|
|
gain: (0, 128)
|
|
"""
|
|
|
|
if exposure is None and gain is None:
|
|
# auto exposure
|
|
self.set_color_option(rs.option.enable_auto_exposure, 1.0)
|
|
else:
|
|
# manual exposure
|
|
self.set_color_option(rs.option.enable_auto_exposure, 0.0)
|
|
if exposure is not None:
|
|
self.set_color_option(rs.option.exposure, exposure)
|
|
if gain is not None:
|
|
self.set_color_option(rs.option.gain, gain)
|
|
|
|
def set_white_balance(self, white_balance=None):
|
|
if white_balance is None:
|
|
self.set_color_option(rs.option.enable_auto_white_balance, 1.0)
|
|
else:
|
|
self.set_color_option(rs.option.enable_auto_white_balance, 0.0)
|
|
self.set_color_option(rs.option.white_balance, white_balance)
|
|
|
|
def get_intrinsics(self):
|
|
assert self.ready_event.is_set()
|
|
fx, fy, ppx, ppy = self.intrinsics_array.get()[:4]
|
|
mat = np.eye(3)
|
|
mat[0,0] = fx
|
|
mat[1,1] = fy
|
|
mat[0,2] = ppx
|
|
mat[1,2] = ppy
|
|
return mat
|
|
|
|
def get_depth_scale(self):
|
|
assert self.ready_event.is_set()
|
|
scale = self.intrinsics_array.get()[-1]
|
|
return scale
|
|
|
|
def start_recording(self, video_path: str, start_time: float=-1):
|
|
assert self.enable_color
|
|
|
|
path_len = len(video_path.encode('utf-8'))
|
|
if path_len > self.MAX_PATH_LENGTH:
|
|
raise RuntimeError('video_path too long.')
|
|
self.command_queue.put({
|
|
'cmd': Command.START_RECORDING.value,
|
|
'video_path': video_path,
|
|
'recording_start_time': start_time
|
|
})
|
|
|
|
def stop_recording(self):
|
|
self.command_queue.put({
|
|
'cmd': Command.STOP_RECORDING.value
|
|
})
|
|
|
|
def restart_put(self, start_time):
|
|
self.command_queue.put({
|
|
'cmd': Command.RESTART_PUT.value,
|
|
'put_start_time': start_time
|
|
})
|
|
|
|
# ========= interval API ===========
|
|
def run(self):
|
|
# limit threads
|
|
threadpool_limits(1)
|
|
cv2.setNumThreads(1)
|
|
|
|
w, h = self.resolution
|
|
fps = self.capture_fps
|
|
align = rs.align(rs.stream.color)
|
|
# Enable the streams from all the intel realsense devices
|
|
rs_config = rs.config()
|
|
if self.enable_color:
|
|
rs_config.enable_stream(rs.stream.color,
|
|
w, h, rs.format.bgr8, fps)
|
|
if self.enable_depth:
|
|
rs_config.enable_stream(rs.stream.depth,
|
|
w, h, rs.format.z16, fps)
|
|
if self.enable_infrared:
|
|
rs_config.enable_stream(rs.stream.infrared,
|
|
w, h, rs.format.y8, fps)
|
|
|
|
try:
|
|
rs_config.enable_device(self.serial_number)
|
|
|
|
# start pipeline
|
|
pipeline = rs.pipeline()
|
|
pipeline_profile = pipeline.start(rs_config)
|
|
|
|
# report global time
|
|
# https://github.com/IntelRealSense/librealsense/pull/3909
|
|
d = pipeline_profile.get_device().first_color_sensor()
|
|
d.set_option(rs.option.global_time_enabled, 1)
|
|
|
|
# setup advanced mode
|
|
if self.advanced_mode_config is not None:
|
|
json_text = json.dumps(self.advanced_mode_config)
|
|
device = pipeline_profile.get_device()
|
|
advanced_mode = rs.rs400_advanced_mode(device)
|
|
advanced_mode.load_json(json_text)
|
|
|
|
# get
|
|
color_stream = pipeline_profile.get_stream(rs.stream.color)
|
|
intr = color_stream.as_video_stream_profile().get_intrinsics()
|
|
order = ['fx', 'fy', 'ppx', 'ppy', 'height', 'width']
|
|
for i, name in enumerate(order):
|
|
self.intrinsics_array.get()[i] = getattr(intr, name)
|
|
|
|
if self.enable_depth:
|
|
depth_sensor = pipeline_profile.get_device().first_depth_sensor()
|
|
depth_scale = depth_sensor.get_depth_scale()
|
|
self.intrinsics_array.get()[-1] = depth_scale
|
|
|
|
# one-time setup (intrinsics etc, ignore for now)
|
|
if self.verbose:
|
|
print(f'[SingleRealsense {self.serial_number}] Main loop started.')
|
|
|
|
# put frequency regulation
|
|
put_idx = None
|
|
put_start_time = self.put_start_time
|
|
if put_start_time is None:
|
|
put_start_time = time.time()
|
|
|
|
iter_idx = 0
|
|
t_start = time.time()
|
|
while not self.stop_event.is_set():
|
|
# wait for frames to come in
|
|
frameset = pipeline.wait_for_frames()
|
|
receive_time = time.time()
|
|
# align frames to color
|
|
frameset = align.process(frameset)
|
|
|
|
# grab data
|
|
data = dict()
|
|
data['camera_receive_timestamp'] = receive_time
|
|
# realsense report in ms
|
|
data['camera_capture_timestamp'] = frameset.get_timestamp() / 1000
|
|
if self.enable_color:
|
|
color_frame = frameset.get_color_frame()
|
|
data['color'] = np.asarray(color_frame.get_data())
|
|
t = color_frame.get_timestamp() / 1000
|
|
data['camera_capture_timestamp'] = t
|
|
# print('device', time.time() - t)
|
|
# print(color_frame.get_frame_timestamp_domain())
|
|
if self.enable_depth:
|
|
data['depth'] = np.asarray(
|
|
frameset.get_depth_frame().get_data())
|
|
if self.enable_infrared:
|
|
data['infrared'] = np.asarray(
|
|
frameset.get_infrared_frame().get_data())
|
|
|
|
# apply transform
|
|
put_data = data
|
|
if self.transform is not None:
|
|
put_data = self.transform(dict(data))
|
|
|
|
if self.put_downsample:
|
|
# put frequency regulation
|
|
local_idxs, global_idxs, put_idx \
|
|
= get_accumulate_timestamp_idxs(
|
|
timestamps=[receive_time],
|
|
start_time=put_start_time,
|
|
dt=1/self.put_fps,
|
|
# this is non in first iteration
|
|
# and then replaced with a concrete number
|
|
next_global_idx=put_idx,
|
|
# continue to pump frames even if not started.
|
|
# start_time is simply used to align timestamps.
|
|
allow_negative=True
|
|
)
|
|
|
|
for step_idx in global_idxs:
|
|
put_data['step_idx'] = step_idx
|
|
# put_data['timestamp'] = put_start_time + step_idx / self.put_fps
|
|
put_data['timestamp'] = receive_time
|
|
# print(step_idx, data['timestamp'])
|
|
self.ring_buffer.put(put_data, wait=False)
|
|
else:
|
|
step_idx = int((receive_time - put_start_time) * self.put_fps)
|
|
put_data['step_idx'] = step_idx
|
|
put_data['timestamp'] = receive_time
|
|
self.ring_buffer.put(put_data, wait=False)
|
|
|
|
# signal ready
|
|
if iter_idx == 0:
|
|
self.ready_event.set()
|
|
|
|
# put to vis
|
|
vis_data = data
|
|
if self.vis_transform == self.transform:
|
|
vis_data = put_data
|
|
elif self.vis_transform is not None:
|
|
vis_data = self.vis_transform(dict(data))
|
|
self.vis_ring_buffer.put(vis_data, wait=False)
|
|
|
|
# record frame
|
|
rec_data = data
|
|
if self.recording_transform == self.transform:
|
|
rec_data = put_data
|
|
elif self.recording_transform is not None:
|
|
rec_data = self.recording_transform(dict(data))
|
|
|
|
if self.video_recorder.is_ready():
|
|
self.video_recorder.write_frame(rec_data['color'],
|
|
frame_time=receive_time)
|
|
|
|
# perf
|
|
t_end = time.time()
|
|
duration = t_end - t_start
|
|
frequency = np.round(1 / duration, 1)
|
|
t_start = t_end
|
|
if self.verbose:
|
|
print(f'[SingleRealsense {self.serial_number}] FPS {frequency}')
|
|
|
|
# fetch command from queue
|
|
try:
|
|
commands = self.command_queue.get_all()
|
|
n_cmd = len(commands['cmd'])
|
|
except Empty:
|
|
n_cmd = 0
|
|
|
|
# execute commands
|
|
for i in range(n_cmd):
|
|
command = dict()
|
|
for key, value in commands.items():
|
|
command[key] = value[i]
|
|
cmd = command['cmd']
|
|
if cmd == Command.SET_COLOR_OPTION.value:
|
|
sensor = pipeline_profile.get_device().first_color_sensor()
|
|
option = rs.option(command['option_enum'])
|
|
value = float(command['option_value'])
|
|
sensor.set_option(option, value)
|
|
# print('auto', sensor.get_option(rs.option.enable_auto_exposure))
|
|
# print('exposure', sensor.get_option(rs.option.exposure))
|
|
# print('gain', sensor.get_option(rs.option.gain))
|
|
elif cmd == Command.SET_DEPTH_OPTION.value:
|
|
sensor = pipeline_profile.get_device().first_depth_sensor()
|
|
option = rs.option(command['option_enum'])
|
|
value = float(command['option_value'])
|
|
sensor.set_option(option, value)
|
|
elif cmd == Command.START_RECORDING.value:
|
|
video_path = str(command['video_path'])
|
|
start_time = command['recording_start_time']
|
|
if start_time < 0:
|
|
start_time = None
|
|
self.video_recorder.start(video_path, start_time=start_time)
|
|
elif cmd == Command.STOP_RECORDING.value:
|
|
self.video_recorder.stop()
|
|
# stop need to flush all in-flight frames to disk, which might take longer than dt.
|
|
# soft-reset put to drop frames to prevent ring buffer overflow.
|
|
put_idx = None
|
|
elif cmd == Command.RESTART_PUT.value:
|
|
put_idx = None
|
|
put_start_time = command['put_start_time']
|
|
# self.ring_buffer.clear()
|
|
|
|
iter_idx += 1
|
|
finally:
|
|
self.video_recorder.stop()
|
|
rs_config.disable_all_streams()
|
|
self.ready_event.set()
|
|
|
|
if self.verbose:
|
|
print(f'[SingleRealsense {self.serial_number}] Exiting worker process.')
|