#!/usr/bin/env python3 import argparse import re from collections import defaultdict import numpy as np from tabulate import tabulate from tools import csv2numpy, find_all_files, group_files def numerical_analysis(root_dir, xlim, norm=False): file_pattern = re.compile(r".*/test_reward_\d+seeds.csv$") norm_group_pattern = re.compile(r"(/|^)\w+?\-v(\d|$)") output_group_pattern = re.compile(r".*?(?=(/|^)\w+?\-v\d)") csv_files = find_all_files(root_dir, file_pattern) norm_group = group_files(csv_files, norm_group_pattern) output_group = group_files(csv_files, output_group_pattern) # calculate numerical outcome for each csv_file (y/std integration max_y, final_y) results = defaultdict(list) for f in csv_files: result = csv2numpy(f) if norm: result = np.stack( [ result['env_step'], result['reward'] - result['reward'][0], result['reward:shaded'] ] ) else: result = np.stack( [result['env_step'], result['reward'], result['reward:shaded']] ) if result[0, -1] < xlim: continue final_rew = np.interp(xlim, result[0], result[1]) final_rew_std = np.interp(xlim, result[0], result[2]) result = result[:, result[0] <= xlim] if len(result) == 0: continue if result[0, -1] < xlim: last_line = np.array([xlim, final_rew, final_rew_std]).reshape(3, 1) result = np.concatenate([result, last_line], axis=-1) max_id = np.argmax(result[1]) results['name'].append(f) results['final_reward'].append(result[1, -1]) results['final_reward_std'].append(result[2, -1]) results['max_reward'].append(result[1, max_id]) results['max_std'].append(result[2, max_id]) results['reward_integration'].append(np.trapz(result[1], x=result[0])) results['reward_std_integration'].append(np.trapz(result[2], x=result[0])) results = {k: np.array(v) for k, v in results.items()} print(tabulate(results, headers="keys")) if norm: # calculate normalized numerical outcome for each csv_file group for _, fs in norm_group.items(): mask = np.isin(results['name'], fs) for k, v in results.items(): if k == 'name': continue v[mask] = v[mask] / max(v[mask]) # Add all numerical results for each outcome group group_results = defaultdict(list) for g, fs in output_group.items(): group_results['name'].append(g) mask = np.isin(results['name'], fs) group_results['num'].append(sum(mask)) for k in results.keys(): if k == 'name': continue group_results[k + ":norm"].append(results[k][mask].mean()) # print all outputs for each csv_file and each outcome group print() print(tabulate(group_results, headers="keys")) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--xlim', type=int, default=1000000, help='x-axis limitation (default: 1000000)' ) parser.add_argument('--root-dir', type=str) parser.add_argument( '--norm', action="store_true", help="Normalize all results according to environment." ) args = parser.parse_args() numerical_analysis(args.root_dir, args.xlim, norm=args.norm)