-
Pat Karnchanachari authoredPat Karnchanachari authored
run_result_processor_aws.py 5.70 KiB
import logging
import os
import re
import shutil
from pathlib import Path
from typing import List
import hydra
from omegaconf import DictConfig
from nuplan.common.utils.s3_utils import get_s3_client
from nuplan.planning.script.run_metric_aggregator import main as aggregator_main
from nuplan.planning.script.run_simulation import CONFIG_PATH
from nuplan.planning.script.utils import set_default_path
from nuplan.planning.simulation.main_callback.publisher_callback import PublisherCallback
from nuplan.submission.evalai.leaderboard_writer import LeaderBoardWriter
from nuplan.submission.utils.aws_utils import s3_download
from nuplan.submission.utils.utils import get_submission_logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
submission_logger = get_submission_logger(__name__)
# If set, use the env. variable to overwrite the default dataset and experiment paths
set_default_path()
CONFIG_NAME = 'default_run_metric_aggregator'
NUM_INSTANCES_PER_CHALLENGE = 8
CHALLENGES = ['open_loop_boxes', 'closed_loop_nonreactive_agents', 'closed_loop_reactive_agents']
def is_submission_successful(challenges: List[str], simulation_results_dir: Path) -> bool:
"""
Checks if evaluation of one submission was successful, by checking that all instances for all challenges
were completed.
:param challenges: The list of challenges.
:param simulation_results_dir: Path were the simulation results are saved locally.
:return: True if the submission was evaluated successfully, False otherwise.
"""
completed = list(simulation_results_dir.rglob('*completed.txt'))
successful = True if len(completed) == len(challenges) * NUM_INSTANCES_PER_CHALLENGE else False
logger.info("Found %s completed simulations" % len(completed))
logger.info("Simulation was successful: %s" % successful)
return successful
def list_subdirs_filtered(root_dir: Path, regex_pattern: re.Pattern[str]) -> List[str]:
"""
Lists the path of files present in a directory. Results are filtered by ending pattern.
:param root_dir: The path to start the search.
:param regex_pattern: Regex based Pattern for which paths to keep.
:return: List of paths under root_dir which wnd with path_end_filter.
"""
paths = [
str(path)
for path in root_dir.rglob(
'**/*',
)
if regex_pattern.search(str(path))
]
return paths
@hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME)
def main(cfg: DictConfig) -> None:
"""
Downloads evaluation results from S3, runs metric aggregator and re-uploads the results.
:param cfg: Hydra config dict.
"""
# copy over the metric results from S3
local_output_dir = Path(cfg.output_dir, cfg.contestant_id, cfg.submission_id)
cfg.challenges = CHALLENGES
Path(cfg.output_dir).mkdir(exist_ok=True, parents=True)
s3_download(
prefix='/'.join([cfg.contestant_id, cfg.submission_id]),
local_path_name=cfg.output_dir,
filters=None,
)
# Check if simulation was successful
simulation_successful = is_submission_successful(cfg.challenges, local_output_dir)
# Set up configuration
cfg.output_dir = str(local_output_dir)
cfg.scenario_metric_paths = list_subdirs_filtered(local_output_dir, re.compile(f'/{cfg.metric_folder_name}$'))
logger.info("Found metric paths %s" % cfg.scenario_metric_paths)
aggregated_metric_save_path = local_output_dir / cfg.aggregated_metric_folder_name
leaderboard_writer = LeaderBoardWriter(cfg, str(local_output_dir))
simulation_results = {}
summary_results = {}
try:
if simulation_successful:
shutil.rmtree(str(aggregated_metric_save_path), ignore_errors=True)
aggregated_metric_save_path.mkdir(parents=True, exist_ok=True)
aggregator_main(cfg)
# Upload results
simulation_results["aggregated-metrics"] = {
"upload": True,
"save_path": str(aggregated_metric_save_path),
"remote_path": 'aggregated_metrics',
}
simulation_results["metrics"] = {
"upload": True,
"save_path": str(local_output_dir / cfg.metric_folder_name),
"remote_path": 'metrics',
}
summary_results["summary"] = {
"upload": True,
"save_path": str(local_output_dir / 'summary'),
"remote_path": "summary",
}
except Exception as e:
submission_logger.error("Aggregation failed!")
submission_logger.error(e)
simulation_successful = False
finally:
simulation_results["submission_logs"] = {
"upload": True,
"save_path": "/tmp/submission.log",
"remote_path": 'aggregated_metrics',
}
result_remote_prefix = [str(cfg.contestant_id), str(cfg.submission_id)]
result_s3_client = get_s3_client()
result_publisher_callback = PublisherCallback(
simulation_results,
remote_prefix=result_remote_prefix,
s3_client=result_s3_client,
s3_bucket=os.getenv("NUPLAN_SERVER_S3_ROOT_URL"),
)
result_publisher_callback.on_run_simulation_end()
summary_publisher_callback = PublisherCallback(
summary_results,
remote_prefix=["public/leaderboard/planning/2022", cfg.submission_id],
s3_client=result_s3_client,
s3_bucket=os.getenv("NUPLAN_SERVER_S3_ROOT_URL"),
)
summary_publisher_callback.on_run_simulation_end()
leaderboard_writer.write_to_leaderboard(simulation_successful=simulation_successful)
# Cleanup
shutil.rmtree(local_output_dir)
if __name__ == '__main__':
main()