import sys import os ROOT_DIR = os.path.dirname(os.path.dirname(__file__)) sys.path.append(ROOT_DIR) os.chdir(ROOT_DIR) import cv2 import json import time import numpy as np from diffusion_policy.real_world.multi_realsense import MultiRealsense from diffusion_policy.real_world.video_recorder import VideoRecorder def test(): config = json.load(open('/home/cchi/dev/diffusion_policy/diffusion_policy/real_world/realsense_config/415_high_accuracy_mode.json', 'r')) def transform(data): color = data['color'] h,w,_ = color.shape factor = 4 color = cv2.resize(color, (w//factor,h//factor), interpolation=cv2.INTER_AREA) # color = color[:,140:500] data['color'] = color return data from diffusion_policy.common.cv2_util import get_image_transform color_transform = get_image_transform( input_res=(1280,720), output_res=(640,480), bgr_to_rgb=False) def transform(data): data['color'] = color_transform(data['color']) return data # one thread per camera video_recorder = VideoRecorder.create_h264( fps=30, codec='h264', thread_type='FRAME' ) with MultiRealsense( resolution=(1280,720), capture_fps=30, record_fps=15, enable_color=True, # advanced_mode_config=config, transform=transform, # recording_transform=transform, # video_recorder=video_recorder, verbose=True ) as realsense: realsense.set_exposure(exposure=150, gain=5) intr = realsense.get_intrinsics() print(intr) video_path = 'data_local/test' rec_start_time = time.time() + 1 realsense.start_recording(video_path, start_time=rec_start_time) realsense.restart_put(rec_start_time) out = None vis_img = None while True: out = realsense.get(out=out) # bgr = out['color'] # print(bgr.shape) # vis_img = np.concatenate(list(bgr), axis=0, out=vis_img) # cv2.imshow('default', vis_img) # key = cv2.pollKey() # if key == ord('q'): # break time.sleep(1/60) if time.time() > (rec_start_time + 20.0): break if __name__ == "__main__": test()