bench_executor.executor
This module holds the Executor class which is responsible for executing a case, collecting metrics, and exposing this functionality to the CLI. All features of this tool can be accessed through the Executor class, other classes should not be used directly.
1#!/usr/bin/env python3 2 3""" 4This module holds the Executor class which is responsible for executing a case, 5collecting metrics, and exposing this functionality to the CLI. 6All features of this tool can be accessed through the Executor class, other 7classes should not be used directly. 8""" 9 10import os 11import sys 12import json 13import jsonschema 14import importlib 15import inspect 16import shutil 17from glob import glob 18from datetime import datetime 19from time import sleep 20from typing import List, Dict, Any 21from bench_executor.collector import Collector, METRICS_FILE_NAME 22from bench_executor.stats import Stats 23from bench_executor.logger import Logger, LOG_FILE_NAME 24 25METADATA_FILE = 'metadata.json' 26SCHEMA_FILE = 'metadata.schema' 27CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'config') 28WAIT_TIME = 15 # seconds 29CHECKPOINT_FILE_NAME = '.done' 30 31 32# Dummy callback in case no callback was provided 33def _progress_cb(resource: str, name: str, success: bool): 34 pass 35 36 37class Executor: 38 """ 39 Executor class executes a case. 40 """ 41 42 def __init__(self, main_directory: str, verbose: bool = False, 43 progress_cb=_progress_cb): 44 """Create an instance of the Executor class. 45 46 Parameters 47 ---------- 48 main_directory : str 49 The root directory of all the cases to execute. 50 verbose : bool 51 Enables verbose logs. 52 process_cb : function 53 Callback to call when a step is completed of the case. By default, 54 a dummy callback is provided if the argument is missing. 55 """ 56 self._main_directory = os.path.abspath(main_directory) 57 self._schema = {} 58 self._resources: List[Dict[str, Any]] = [] 59 self._class_module_mapping: Dict[str, Any] = {} 60 self._verbose = verbose 61 self._progress_cb = progress_cb 62 self._logger = Logger(__name__, self._main_directory, self._verbose) 63 64 self._init_resources() 65 66 with open(os.path.join(os.path.dirname(__file__), 'data', 67 SCHEMA_FILE)) as f: 68 self._schema = json.load(f) 69 70 @property 71 def main_directory(self) -> str: 72 """The main directory of all the cases. 73 74 Returns 75 ------- 76 main_directory : str 77 The path to the main directory of the cases. 78 """ 79 return self._main_directory 80 81 def _init_resources(self) -> None: 82 """Initialize resources of a case 83 84 Resources are discovered automatically by analyzing Python modules. 85 """ 86 87 # Discover all modules to import 88 sys.path.append(os.path.dirname(__file__)) 89 self._modules = list(filter(lambda x: x.endswith('.py') 90 and '__init__' not in x 91 and '__pycache__' not in x, 92 os.listdir(os.path.dirname(__file__)))) 93 94 # Discover all classes in each module 95 for m in self._modules: 96 module_name = os.path.splitext(m)[0] 97 imported_module = importlib.import_module(module_name) 98 for name, cls in inspect.getmembers(imported_module, 99 inspect.isclass): 100 if name.startswith('_') or name[0].islower(): 101 continue 102 103 # Store class-module mapping for reverse look-up 104 self._class_module_mapping[name] = imported_module 105 106 # Discover all methods and their parameters in each class 107 methods: Dict[str, List[Dict[str, str]]] = {} 108 filt = filter(lambda x: '__init__' not in x, 109 inspect.getmembers(cls, inspect.isfunction)) 110 for method_name, method in filt: 111 parameters = inspect.signature(method).parameters 112 methods[method_name] = [] 113 for key in parameters.keys(): 114 if key == 'self': 115 continue 116 p = parameters[key] 117 required = (p.default == inspect.Parameter.empty) 118 methods[method_name].append({'name': p.name, 119 'required': required}) 120 121 if name not in list(filter(lambda x: x['name'], 122 self._resources)): 123 self._resources.append({'name': name, 'commands': methods}) 124 125 def _resources_all_names(self) -> list: 126 """Retrieve all resources' name in a case. 127 128 Returns 129 ------- 130 names : list 131 List of all resources' name in a case. 132 """ 133 names = [] 134 for r in self._resources: 135 names.append(r['name']) 136 137 return names 138 139 def _resources_all_commands_by_name(self, name: str) -> list: 140 """Retrieve all resources' commands. 141 142 Parameters 143 ---------- 144 name : str 145 The resource's name. 146 147 Returns 148 ------- 149 commands : list 150 List of commands for the resource. 151 """ 152 commands = [] 153 for r in filter(lambda x: x['name'] == name, self._resources): 154 commands += list(r['commands'].keys()) # type: ignore 155 156 return commands 157 158 def _resources_all_parameters_by_command(self, name: str, 159 command: str, 160 required_only=False) -> list: 161 """Retrieve all parameters of a command of a resource. 162 163 Parameters 164 ---------- 165 name : str 166 The resource's name. 167 command : str 168 The command's name. 169 required_only : bool 170 Only return the required parameters of a command. Default all 171 parameters are returned. 172 173 Returns 174 ------- 175 parameters : list 176 List of parameters of the resource's command. None if failed. 177 178 Raises 179 ------ 180 KeyError : Exception 181 If the command cannot be found for the resource. 182 """ 183 parameters = [] 184 for r in filter(lambda x: x['name'] == name, self._resources): 185 try: 186 for p in r['commands'][command]: 187 if required_only: 188 if p['required']: 189 parameters.append(p['name']) 190 else: 191 parameters.append(p['name']) 192 except KeyError as e: 193 self._logger.error(f'Command "{command}" not found for ' 194 f'resource "{name}": {e}') 195 raise e 196 197 return parameters 198 199 def _validate_case(self, case: dict, path: str) -> bool: 200 """Validate a case's syntax. 201 202 Verify if a case has a valid syntax or not. Report any errors 203 discovered through logging and return if the validation succeeded or 204 not. 205 206 Parameters 207 ---------- 208 case : dict 209 The case to validate. 210 path : str 211 The file path to the case. 212 213 Returns 214 ------- 215 success : bool 216 Whether the validation of the case succeeded or not. 217 """ 218 try: 219 # Verify schema 220 jsonschema.validate(case, self._schema) 221 222 # Verify values 223 for step in case['steps']: 224 # Check if resource is known 225 names = self._resources_all_names() 226 if step['resource'] not in names: 227 msg = f'{path}: Unknown resource "{step["resource"]}"' 228 self._logger.error(msg) 229 return False 230 231 # Check if command is known 232 r = step['resource'] 233 commands = self._resources_all_commands_by_name(r) 234 if commands is None or step['command'] not in commands: 235 msg = f'{path}: Unknown command "{step["command"]}" ' + \ 236 f'for resource "{step["resource"]}"' 237 self._logger.error(msg) 238 return False 239 240 # Check if parameters are known 241 r = step['resource'] 242 c = step['command'] 243 parameters = self._resources_all_parameters_by_command(r, c) 244 if parameters is None: 245 return False 246 247 for p in step['parameters'].keys(): 248 if p not in parameters: 249 msg = f'{path}: Unkown parameter "{p}" for ' + \ 250 f'command "{step["command"]}" of resource ' + \ 251 f'"{step["resource"]}"' 252 self._logger.error(msg) 253 return False 254 255 # Check if all required parameters are provided 256 r = step['resource'] 257 c = step['command'] 258 parameters = \ 259 self._resources_all_parameters_by_command(r, c, True) 260 for p in parameters: 261 if p not in step['parameters'].keys(): 262 msg = f'{path}: Missing required parameter "{p}" ' + \ 263 f'for command "{step["command"]}" ' + \ 264 f'of resource "{step["resource"]}"' 265 self._logger.error(msg) 266 return False 267 268 except jsonschema.ValidationError: 269 msg = f'{path}: JSON schema violation' 270 self._logger.error(msg) 271 return False 272 273 return True 274 275 def stats(self, case: dict) -> bool: 276 """Generate statistics for a case. 277 278 Generate statistics for an executed case. The case must be executed 279 before to succeed. 280 281 Parameters 282 ---------- 283 case : dict 284 The case to generate statistics for. 285 286 Returns 287 ------- 288 success : bool 289 Whether the statistics are generated with success or not. 290 291 """ 292 data = case['data'] 293 directory = case['directory'] 294 results_path = os.path.join(directory, 'results') 295 296 if not os.path.exists(results_path): 297 msg = f'Results do not exist for case "{data["name"]}"' 298 self._logger.error(msg) 299 return False 300 301 stats = Stats(results_path, len(data['steps']), directory, 302 self._verbose) 303 stats.aggregate() 304 305 return True 306 307 def clean(self, case: dict) -> bool: 308 """Clean a case. 309 310 Clean up all results and metrics for a case to start it fresh. 311 312 Parameters 313 ---------- 314 case : dict 315 The case to clean. 316 317 Returns 318 ------- 319 success : bool 320 Whether the cleaning of the case succeeded or not. 321 """ 322 # Checkpoints 323 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 324 if os.path.exists(checkpoint_file): 325 os.remove(checkpoint_file) 326 327 # Results: log files, metric measurements, run checkpoints 328 for result_dir in glob(f'{case["directory"]}/results'): 329 shutil.rmtree(result_dir) 330 331 # Data: persistent storage 332 for data_dir in glob(f'{case["directory"]}/data/*'): 333 if not data_dir.endswith('shared'): 334 shutil.rmtree(data_dir) 335 336 return True 337 338 def run(self, case: dict, interval: float, 339 run: int, checkpoint: bool) -> bool: 340 """Execute a case. 341 342 Execute all steps of a case while collecting metrics and logs. 343 The metrics are collected at a given interval and for a specific run of 344 the case to allow multiple executions of the same case. Checkpoints of 345 runs can be enabled to allow the executor to restart where it stopped 346 in case of a failure, electricity blackout, etc. 347 348 Parameters 349 ---------- 350 case : dict 351 The case to execute. 352 interval : float 353 The sample interval for the metrics collection. 354 run : int 355 The run number of the case. 356 checkpoint : bool 357 Enable checkpoints after each run to allow restarts. 358 359 Returns 360 ------- 361 success : bool 362 Whether the case was executed successfully or not. 363 """ 364 success = True 365 data = case['data'] 366 directory = case['directory'] 367 data_path = os.path.join(directory, 'data') 368 results_run_path = os.path.join(directory, 'results', f'run_{run}') 369 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 370 run_checkpoint_file = os.path.join(results_run_path, 371 CHECKPOINT_FILE_NAME) 372 active_resources = [] 373 374 # Make sure we start with a clean setup before the first run 375 if run == 1: 376 self.clean(case) 377 378 # create directories 379 os.umask(0) 380 os.makedirs(data_path, exist_ok=True) 381 os.makedirs(results_run_path, exist_ok=True) 382 383 # Initialize resources if needed 384 # Some resources have to perform an initialization step such as 385 # configuring database users, storage, etc. which is only done once 386 for step in data['steps']: 387 module = self._class_module_mapping[step['resource']] 388 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 389 directory, 390 self._verbose) 391 if hasattr(resource, 'initialization'): 392 if not resource.initialization(): 393 self._logger.error('Failed to initialize resource ' 394 f'{step["resource"]}') 395 return False 396 397 self._logger.debug(f'Resource {step["resource"]} initialized') 398 self._progress_cb('Initializing', step['resource'], success) 399 400 # Launch metrics collection 401 collector = Collector(results_run_path, interval, len(data['steps']), 402 run, directory, self._verbose) 403 404 # Execute steps 405 for index, step in enumerate(data['steps']): 406 success = True 407 module = self._class_module_mapping[step['resource']] 408 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 409 directory, 410 self._verbose) 411 active_resources.append(resource) 412 413 # Containers may need to start up first before executing a command 414 if hasattr(resource, 'wait_until_ready'): 415 if not resource.wait_until_ready(): 416 success = False 417 self._logger.error('Waiting until resource ' 418 f'"{step["resource"]} is ready failed') 419 self._progress_cb(step['resource'], step['name'], success) 420 break 421 self._logger.debug(f'Resource {step["resource"]} ready') 422 423 # Execute command 424 command = getattr(resource, step['command']) 425 if not command(**step['parameters']): 426 success = False 427 msg = f'Executing command "{step["command"]}" ' + \ 428 f'failed for resource "{step["resource"]}"' 429 # Some steps are non-critical like queries, they may fail but 430 # should not cause a complete case failure. Allow these 431 # failures if the may_fail key is present 432 if step.get('may_fail', False): 433 self._logger.warning(msg) 434 self._progress_cb(step['resource'], step['name'], success) 435 continue 436 else: 437 self._logger.error(msg) 438 self._progress_cb(step['resource'], step['name'], success) 439 break 440 self._logger.debug(f'Command "{step["command"]}" executed on ' 441 f'resource {step["resource"]}') 442 443 # Step complete 444 self._progress_cb(step['resource'], step['name'], success) 445 446 # Step finished, let metric collector know 447 if (index + 1) < len(data['steps']): 448 collector.next_step() 449 450 # Stop metrics collection 451 collector.stop() 452 453 # Stop active containers 454 for resource in active_resources: 455 if resource is not None and hasattr(resource, 'stop'): 456 resource.stop() 457 458 self._logger.debug('Cleaned up all resource') 459 self._progress_cb('Cleaner', 'Clean up resources', True) 460 461 # Mark checkpoint if necessary 462 if checkpoint and success: 463 self._logger.debug('Writing checkpoint...') 464 with open(checkpoint_file, 'w') as f: 465 d = datetime.now().replace(microsecond=0).isoformat() 466 f.write(f'{d}\n') 467 468 # Log file 469 os.makedirs(os.path.join(results_run_path), exist_ok=True) 470 shutil.move(os.path.join(directory, LOG_FILE_NAME), 471 os.path.join(results_run_path, LOG_FILE_NAME)) 472 self._logger.debug('Copied logs to run results path') 473 474 # Metrics measurements 475 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 476 subdir = metrics_file.replace(f'{data_path}/', '') \ 477 .replace('/METRICS_FILE_NAME', '') 478 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 479 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 480 METRICS_FILE_NAME)) 481 self._logger.debug('Copied metric measurements to run results path') 482 483 # Results: all 'output_file' and 'result_file' values 484 if success: 485 self._logger.debug('Copying generated files for run') 486 for step in data['steps']: 487 subdir = step['resource'].lower().replace('_', '') 488 parameters = step['parameters'] 489 os.makedirs(os.path.join(results_run_path, subdir), 490 exist_ok=True) 491 if parameters.get('results_file', False): 492 results_file = parameters['results_file'] 493 p1 = os.path.join(directory, 'data/shared', results_file) 494 p2 = os.path.join(results_run_path, subdir, results_file) 495 try: 496 shutil.move(p1, p2) 497 except FileNotFoundError as e: 498 msg = f'Cannot find results file "{p1}": {e}' 499 self._logger.warning(msg) 500 501 if parameters.get('output_file', False) \ 502 and not parameters.get('multiple_files', False): 503 output_file = step['parameters']['output_file'] 504 p1 = os.path.join(directory, 'data/shared', output_file) 505 p2 = os.path.join(results_run_path, subdir, output_file) 506 try: 507 shutil.move(p1, p2) 508 except FileNotFoundError as e: 509 msg = f'Cannot find output file "{p1}": {e}' 510 self._logger.warning(msg) 511 512 # Run complete, mark it 513 run_checkpoint_file = os.path.join(results_run_path, 514 CHECKPOINT_FILE_NAME) 515 self._logger.debug('Writing run checkpoint...') 516 with open(run_checkpoint_file, 'w') as f: 517 d = datetime.now().replace(microsecond=0).isoformat() 518 f.write(f'{d}\n') 519 520 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 521 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 522 True) 523 sleep(WAIT_TIME) 524 525 return success 526 527 def list(self) -> list: 528 """List all cases in a root directory. 529 530 Retrieve a list of all discovered valid cases in a given directory. 531 Cases which do not pass the validation, are excluded and their 532 validation errors are reported through logging. 533 534 Returns 535 ------- 536 cases : list 537 List of discovered cases. 538 """ 539 cases = [] 540 541 for directory in glob(self._main_directory): 542 for root, dirs, files in os.walk(directory): 543 for file in files: 544 if os.path.basename(file) == METADATA_FILE: 545 path = os.path.join(root, file) 546 with open(path, 'r') as f: 547 data = json.load(f) 548 if self._validate_case(data, path): 549 cases.append({ 550 'directory': os.path.dirname(path), 551 'data': data 552 }) 553 554 return cases
38class Executor: 39 """ 40 Executor class executes a case. 41 """ 42 43 def __init__(self, main_directory: str, verbose: bool = False, 44 progress_cb=_progress_cb): 45 """Create an instance of the Executor class. 46 47 Parameters 48 ---------- 49 main_directory : str 50 The root directory of all the cases to execute. 51 verbose : bool 52 Enables verbose logs. 53 process_cb : function 54 Callback to call when a step is completed of the case. By default, 55 a dummy callback is provided if the argument is missing. 56 """ 57 self._main_directory = os.path.abspath(main_directory) 58 self._schema = {} 59 self._resources: List[Dict[str, Any]] = [] 60 self._class_module_mapping: Dict[str, Any] = {} 61 self._verbose = verbose 62 self._progress_cb = progress_cb 63 self._logger = Logger(__name__, self._main_directory, self._verbose) 64 65 self._init_resources() 66 67 with open(os.path.join(os.path.dirname(__file__), 'data', 68 SCHEMA_FILE)) as f: 69 self._schema = json.load(f) 70 71 @property 72 def main_directory(self) -> str: 73 """The main directory of all the cases. 74 75 Returns 76 ------- 77 main_directory : str 78 The path to the main directory of the cases. 79 """ 80 return self._main_directory 81 82 def _init_resources(self) -> None: 83 """Initialize resources of a case 84 85 Resources are discovered automatically by analyzing Python modules. 86 """ 87 88 # Discover all modules to import 89 sys.path.append(os.path.dirname(__file__)) 90 self._modules = list(filter(lambda x: x.endswith('.py') 91 and '__init__' not in x 92 and '__pycache__' not in x, 93 os.listdir(os.path.dirname(__file__)))) 94 95 # Discover all classes in each module 96 for m in self._modules: 97 module_name = os.path.splitext(m)[0] 98 imported_module = importlib.import_module(module_name) 99 for name, cls in inspect.getmembers(imported_module, 100 inspect.isclass): 101 if name.startswith('_') or name[0].islower(): 102 continue 103 104 # Store class-module mapping for reverse look-up 105 self._class_module_mapping[name] = imported_module 106 107 # Discover all methods and their parameters in each class 108 methods: Dict[str, List[Dict[str, str]]] = {} 109 filt = filter(lambda x: '__init__' not in x, 110 inspect.getmembers(cls, inspect.isfunction)) 111 for method_name, method in filt: 112 parameters = inspect.signature(method).parameters 113 methods[method_name] = [] 114 for key in parameters.keys(): 115 if key == 'self': 116 continue 117 p = parameters[key] 118 required = (p.default == inspect.Parameter.empty) 119 methods[method_name].append({'name': p.name, 120 'required': required}) 121 122 if name not in list(filter(lambda x: x['name'], 123 self._resources)): 124 self._resources.append({'name': name, 'commands': methods}) 125 126 def _resources_all_names(self) -> list: 127 """Retrieve all resources' name in a case. 128 129 Returns 130 ------- 131 names : list 132 List of all resources' name in a case. 133 """ 134 names = [] 135 for r in self._resources: 136 names.append(r['name']) 137 138 return names 139 140 def _resources_all_commands_by_name(self, name: str) -> list: 141 """Retrieve all resources' commands. 142 143 Parameters 144 ---------- 145 name : str 146 The resource's name. 147 148 Returns 149 ------- 150 commands : list 151 List of commands for the resource. 152 """ 153 commands = [] 154 for r in filter(lambda x: x['name'] == name, self._resources): 155 commands += list(r['commands'].keys()) # type: ignore 156 157 return commands 158 159 def _resources_all_parameters_by_command(self, name: str, 160 command: str, 161 required_only=False) -> list: 162 """Retrieve all parameters of a command of a resource. 163 164 Parameters 165 ---------- 166 name : str 167 The resource's name. 168 command : str 169 The command's name. 170 required_only : bool 171 Only return the required parameters of a command. Default all 172 parameters are returned. 173 174 Returns 175 ------- 176 parameters : list 177 List of parameters of the resource's command. None if failed. 178 179 Raises 180 ------ 181 KeyError : Exception 182 If the command cannot be found for the resource. 183 """ 184 parameters = [] 185 for r in filter(lambda x: x['name'] == name, self._resources): 186 try: 187 for p in r['commands'][command]: 188 if required_only: 189 if p['required']: 190 parameters.append(p['name']) 191 else: 192 parameters.append(p['name']) 193 except KeyError as e: 194 self._logger.error(f'Command "{command}" not found for ' 195 f'resource "{name}": {e}') 196 raise e 197 198 return parameters 199 200 def _validate_case(self, case: dict, path: str) -> bool: 201 """Validate a case's syntax. 202 203 Verify if a case has a valid syntax or not. Report any errors 204 discovered through logging and return if the validation succeeded or 205 not. 206 207 Parameters 208 ---------- 209 case : dict 210 The case to validate. 211 path : str 212 The file path to the case. 213 214 Returns 215 ------- 216 success : bool 217 Whether the validation of the case succeeded or not. 218 """ 219 try: 220 # Verify schema 221 jsonschema.validate(case, self._schema) 222 223 # Verify values 224 for step in case['steps']: 225 # Check if resource is known 226 names = self._resources_all_names() 227 if step['resource'] not in names: 228 msg = f'{path}: Unknown resource "{step["resource"]}"' 229 self._logger.error(msg) 230 return False 231 232 # Check if command is known 233 r = step['resource'] 234 commands = self._resources_all_commands_by_name(r) 235 if commands is None or step['command'] not in commands: 236 msg = f'{path}: Unknown command "{step["command"]}" ' + \ 237 f'for resource "{step["resource"]}"' 238 self._logger.error(msg) 239 return False 240 241 # Check if parameters are known 242 r = step['resource'] 243 c = step['command'] 244 parameters = self._resources_all_parameters_by_command(r, c) 245 if parameters is None: 246 return False 247 248 for p in step['parameters'].keys(): 249 if p not in parameters: 250 msg = f'{path}: Unkown parameter "{p}" for ' + \ 251 f'command "{step["command"]}" of resource ' + \ 252 f'"{step["resource"]}"' 253 self._logger.error(msg) 254 return False 255 256 # Check if all required parameters are provided 257 r = step['resource'] 258 c = step['command'] 259 parameters = \ 260 self._resources_all_parameters_by_command(r, c, True) 261 for p in parameters: 262 if p not in step['parameters'].keys(): 263 msg = f'{path}: Missing required parameter "{p}" ' + \ 264 f'for command "{step["command"]}" ' + \ 265 f'of resource "{step["resource"]}"' 266 self._logger.error(msg) 267 return False 268 269 except jsonschema.ValidationError: 270 msg = f'{path}: JSON schema violation' 271 self._logger.error(msg) 272 return False 273 274 return True 275 276 def stats(self, case: dict) -> bool: 277 """Generate statistics for a case. 278 279 Generate statistics for an executed case. The case must be executed 280 before to succeed. 281 282 Parameters 283 ---------- 284 case : dict 285 The case to generate statistics for. 286 287 Returns 288 ------- 289 success : bool 290 Whether the statistics are generated with success or not. 291 292 """ 293 data = case['data'] 294 directory = case['directory'] 295 results_path = os.path.join(directory, 'results') 296 297 if not os.path.exists(results_path): 298 msg = f'Results do not exist for case "{data["name"]}"' 299 self._logger.error(msg) 300 return False 301 302 stats = Stats(results_path, len(data['steps']), directory, 303 self._verbose) 304 stats.aggregate() 305 306 return True 307 308 def clean(self, case: dict) -> bool: 309 """Clean a case. 310 311 Clean up all results and metrics for a case to start it fresh. 312 313 Parameters 314 ---------- 315 case : dict 316 The case to clean. 317 318 Returns 319 ------- 320 success : bool 321 Whether the cleaning of the case succeeded or not. 322 """ 323 # Checkpoints 324 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 325 if os.path.exists(checkpoint_file): 326 os.remove(checkpoint_file) 327 328 # Results: log files, metric measurements, run checkpoints 329 for result_dir in glob(f'{case["directory"]}/results'): 330 shutil.rmtree(result_dir) 331 332 # Data: persistent storage 333 for data_dir in glob(f'{case["directory"]}/data/*'): 334 if not data_dir.endswith('shared'): 335 shutil.rmtree(data_dir) 336 337 return True 338 339 def run(self, case: dict, interval: float, 340 run: int, checkpoint: bool) -> bool: 341 """Execute a case. 342 343 Execute all steps of a case while collecting metrics and logs. 344 The metrics are collected at a given interval and for a specific run of 345 the case to allow multiple executions of the same case. Checkpoints of 346 runs can be enabled to allow the executor to restart where it stopped 347 in case of a failure, electricity blackout, etc. 348 349 Parameters 350 ---------- 351 case : dict 352 The case to execute. 353 interval : float 354 The sample interval for the metrics collection. 355 run : int 356 The run number of the case. 357 checkpoint : bool 358 Enable checkpoints after each run to allow restarts. 359 360 Returns 361 ------- 362 success : bool 363 Whether the case was executed successfully or not. 364 """ 365 success = True 366 data = case['data'] 367 directory = case['directory'] 368 data_path = os.path.join(directory, 'data') 369 results_run_path = os.path.join(directory, 'results', f'run_{run}') 370 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 371 run_checkpoint_file = os.path.join(results_run_path, 372 CHECKPOINT_FILE_NAME) 373 active_resources = [] 374 375 # Make sure we start with a clean setup before the first run 376 if run == 1: 377 self.clean(case) 378 379 # create directories 380 os.umask(0) 381 os.makedirs(data_path, exist_ok=True) 382 os.makedirs(results_run_path, exist_ok=True) 383 384 # Initialize resources if needed 385 # Some resources have to perform an initialization step such as 386 # configuring database users, storage, etc. which is only done once 387 for step in data['steps']: 388 module = self._class_module_mapping[step['resource']] 389 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 390 directory, 391 self._verbose) 392 if hasattr(resource, 'initialization'): 393 if not resource.initialization(): 394 self._logger.error('Failed to initialize resource ' 395 f'{step["resource"]}') 396 return False 397 398 self._logger.debug(f'Resource {step["resource"]} initialized') 399 self._progress_cb('Initializing', step['resource'], success) 400 401 # Launch metrics collection 402 collector = Collector(results_run_path, interval, len(data['steps']), 403 run, directory, self._verbose) 404 405 # Execute steps 406 for index, step in enumerate(data['steps']): 407 success = True 408 module = self._class_module_mapping[step['resource']] 409 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 410 directory, 411 self._verbose) 412 active_resources.append(resource) 413 414 # Containers may need to start up first before executing a command 415 if hasattr(resource, 'wait_until_ready'): 416 if not resource.wait_until_ready(): 417 success = False 418 self._logger.error('Waiting until resource ' 419 f'"{step["resource"]} is ready failed') 420 self._progress_cb(step['resource'], step['name'], success) 421 break 422 self._logger.debug(f'Resource {step["resource"]} ready') 423 424 # Execute command 425 command = getattr(resource, step['command']) 426 if not command(**step['parameters']): 427 success = False 428 msg = f'Executing command "{step["command"]}" ' + \ 429 f'failed for resource "{step["resource"]}"' 430 # Some steps are non-critical like queries, they may fail but 431 # should not cause a complete case failure. Allow these 432 # failures if the may_fail key is present 433 if step.get('may_fail', False): 434 self._logger.warning(msg) 435 self._progress_cb(step['resource'], step['name'], success) 436 continue 437 else: 438 self._logger.error(msg) 439 self._progress_cb(step['resource'], step['name'], success) 440 break 441 self._logger.debug(f'Command "{step["command"]}" executed on ' 442 f'resource {step["resource"]}') 443 444 # Step complete 445 self._progress_cb(step['resource'], step['name'], success) 446 447 # Step finished, let metric collector know 448 if (index + 1) < len(data['steps']): 449 collector.next_step() 450 451 # Stop metrics collection 452 collector.stop() 453 454 # Stop active containers 455 for resource in active_resources: 456 if resource is not None and hasattr(resource, 'stop'): 457 resource.stop() 458 459 self._logger.debug('Cleaned up all resource') 460 self._progress_cb('Cleaner', 'Clean up resources', True) 461 462 # Mark checkpoint if necessary 463 if checkpoint and success: 464 self._logger.debug('Writing checkpoint...') 465 with open(checkpoint_file, 'w') as f: 466 d = datetime.now().replace(microsecond=0).isoformat() 467 f.write(f'{d}\n') 468 469 # Log file 470 os.makedirs(os.path.join(results_run_path), exist_ok=True) 471 shutil.move(os.path.join(directory, LOG_FILE_NAME), 472 os.path.join(results_run_path, LOG_FILE_NAME)) 473 self._logger.debug('Copied logs to run results path') 474 475 # Metrics measurements 476 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 477 subdir = metrics_file.replace(f'{data_path}/', '') \ 478 .replace('/METRICS_FILE_NAME', '') 479 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 480 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 481 METRICS_FILE_NAME)) 482 self._logger.debug('Copied metric measurements to run results path') 483 484 # Results: all 'output_file' and 'result_file' values 485 if success: 486 self._logger.debug('Copying generated files for run') 487 for step in data['steps']: 488 subdir = step['resource'].lower().replace('_', '') 489 parameters = step['parameters'] 490 os.makedirs(os.path.join(results_run_path, subdir), 491 exist_ok=True) 492 if parameters.get('results_file', False): 493 results_file = parameters['results_file'] 494 p1 = os.path.join(directory, 'data/shared', results_file) 495 p2 = os.path.join(results_run_path, subdir, results_file) 496 try: 497 shutil.move(p1, p2) 498 except FileNotFoundError as e: 499 msg = f'Cannot find results file "{p1}": {e}' 500 self._logger.warning(msg) 501 502 if parameters.get('output_file', False) \ 503 and not parameters.get('multiple_files', False): 504 output_file = step['parameters']['output_file'] 505 p1 = os.path.join(directory, 'data/shared', output_file) 506 p2 = os.path.join(results_run_path, subdir, output_file) 507 try: 508 shutil.move(p1, p2) 509 except FileNotFoundError as e: 510 msg = f'Cannot find output file "{p1}": {e}' 511 self._logger.warning(msg) 512 513 # Run complete, mark it 514 run_checkpoint_file = os.path.join(results_run_path, 515 CHECKPOINT_FILE_NAME) 516 self._logger.debug('Writing run checkpoint...') 517 with open(run_checkpoint_file, 'w') as f: 518 d = datetime.now().replace(microsecond=0).isoformat() 519 f.write(f'{d}\n') 520 521 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 522 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 523 True) 524 sleep(WAIT_TIME) 525 526 return success 527 528 def list(self) -> list: 529 """List all cases in a root directory. 530 531 Retrieve a list of all discovered valid cases in a given directory. 532 Cases which do not pass the validation, are excluded and their 533 validation errors are reported through logging. 534 535 Returns 536 ------- 537 cases : list 538 List of discovered cases. 539 """ 540 cases = [] 541 542 for directory in glob(self._main_directory): 543 for root, dirs, files in os.walk(directory): 544 for file in files: 545 if os.path.basename(file) == METADATA_FILE: 546 path = os.path.join(root, file) 547 with open(path, 'r') as f: 548 data = json.load(f) 549 if self._validate_case(data, path): 550 cases.append({ 551 'directory': os.path.dirname(path), 552 'data': data 553 }) 554 555 return cases
Executor class executes a case.
43 def __init__(self, main_directory: str, verbose: bool = False, 44 progress_cb=_progress_cb): 45 """Create an instance of the Executor class. 46 47 Parameters 48 ---------- 49 main_directory : str 50 The root directory of all the cases to execute. 51 verbose : bool 52 Enables verbose logs. 53 process_cb : function 54 Callback to call when a step is completed of the case. By default, 55 a dummy callback is provided if the argument is missing. 56 """ 57 self._main_directory = os.path.abspath(main_directory) 58 self._schema = {} 59 self._resources: List[Dict[str, Any]] = [] 60 self._class_module_mapping: Dict[str, Any] = {} 61 self._verbose = verbose 62 self._progress_cb = progress_cb 63 self._logger = Logger(__name__, self._main_directory, self._verbose) 64 65 self._init_resources() 66 67 with open(os.path.join(os.path.dirname(__file__), 'data', 68 SCHEMA_FILE)) as f: 69 self._schema = json.load(f)
Create an instance of the Executor class.
Parameters
- main_directory (str): The root directory of all the cases to execute.
- verbose (bool): Enables verbose logs.
- process_cb (function): Callback to call when a step is completed of the case. By default, a dummy callback is provided if the argument is missing.
The main directory of all the cases.
Returns
- main_directory (str): The path to the main directory of the cases.
276 def stats(self, case: dict) -> bool: 277 """Generate statistics for a case. 278 279 Generate statistics for an executed case. The case must be executed 280 before to succeed. 281 282 Parameters 283 ---------- 284 case : dict 285 The case to generate statistics for. 286 287 Returns 288 ------- 289 success : bool 290 Whether the statistics are generated with success or not. 291 292 """ 293 data = case['data'] 294 directory = case['directory'] 295 results_path = os.path.join(directory, 'results') 296 297 if not os.path.exists(results_path): 298 msg = f'Results do not exist for case "{data["name"]}"' 299 self._logger.error(msg) 300 return False 301 302 stats = Stats(results_path, len(data['steps']), directory, 303 self._verbose) 304 stats.aggregate() 305 306 return True
Generate statistics for a case.
Generate statistics for an executed case. The case must be executed before to succeed.
Parameters
- case (dict): The case to generate statistics for.
Returns
- success (bool): Whether the statistics are generated with success or not.
308 def clean(self, case: dict) -> bool: 309 """Clean a case. 310 311 Clean up all results and metrics for a case to start it fresh. 312 313 Parameters 314 ---------- 315 case : dict 316 The case to clean. 317 318 Returns 319 ------- 320 success : bool 321 Whether the cleaning of the case succeeded or not. 322 """ 323 # Checkpoints 324 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 325 if os.path.exists(checkpoint_file): 326 os.remove(checkpoint_file) 327 328 # Results: log files, metric measurements, run checkpoints 329 for result_dir in glob(f'{case["directory"]}/results'): 330 shutil.rmtree(result_dir) 331 332 # Data: persistent storage 333 for data_dir in glob(f'{case["directory"]}/data/*'): 334 if not data_dir.endswith('shared'): 335 shutil.rmtree(data_dir) 336 337 return True
Clean a case.
Clean up all results and metrics for a case to start it fresh.
Parameters
- case (dict): The case to clean.
Returns
- success (bool): Whether the cleaning of the case succeeded or not.
339 def run(self, case: dict, interval: float, 340 run: int, checkpoint: bool) -> bool: 341 """Execute a case. 342 343 Execute all steps of a case while collecting metrics and logs. 344 The metrics are collected at a given interval and for a specific run of 345 the case to allow multiple executions of the same case. Checkpoints of 346 runs can be enabled to allow the executor to restart where it stopped 347 in case of a failure, electricity blackout, etc. 348 349 Parameters 350 ---------- 351 case : dict 352 The case to execute. 353 interval : float 354 The sample interval for the metrics collection. 355 run : int 356 The run number of the case. 357 checkpoint : bool 358 Enable checkpoints after each run to allow restarts. 359 360 Returns 361 ------- 362 success : bool 363 Whether the case was executed successfully or not. 364 """ 365 success = True 366 data = case['data'] 367 directory = case['directory'] 368 data_path = os.path.join(directory, 'data') 369 results_run_path = os.path.join(directory, 'results', f'run_{run}') 370 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 371 run_checkpoint_file = os.path.join(results_run_path, 372 CHECKPOINT_FILE_NAME) 373 active_resources = [] 374 375 # Make sure we start with a clean setup before the first run 376 if run == 1: 377 self.clean(case) 378 379 # create directories 380 os.umask(0) 381 os.makedirs(data_path, exist_ok=True) 382 os.makedirs(results_run_path, exist_ok=True) 383 384 # Initialize resources if needed 385 # Some resources have to perform an initialization step such as 386 # configuring database users, storage, etc. which is only done once 387 for step in data['steps']: 388 module = self._class_module_mapping[step['resource']] 389 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 390 directory, 391 self._verbose) 392 if hasattr(resource, 'initialization'): 393 if not resource.initialization(): 394 self._logger.error('Failed to initialize resource ' 395 f'{step["resource"]}') 396 return False 397 398 self._logger.debug(f'Resource {step["resource"]} initialized') 399 self._progress_cb('Initializing', step['resource'], success) 400 401 # Launch metrics collection 402 collector = Collector(results_run_path, interval, len(data['steps']), 403 run, directory, self._verbose) 404 405 # Execute steps 406 for index, step in enumerate(data['steps']): 407 success = True 408 module = self._class_module_mapping[step['resource']] 409 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 410 directory, 411 self._verbose) 412 active_resources.append(resource) 413 414 # Containers may need to start up first before executing a command 415 if hasattr(resource, 'wait_until_ready'): 416 if not resource.wait_until_ready(): 417 success = False 418 self._logger.error('Waiting until resource ' 419 f'"{step["resource"]} is ready failed') 420 self._progress_cb(step['resource'], step['name'], success) 421 break 422 self._logger.debug(f'Resource {step["resource"]} ready') 423 424 # Execute command 425 command = getattr(resource, step['command']) 426 if not command(**step['parameters']): 427 success = False 428 msg = f'Executing command "{step["command"]}" ' + \ 429 f'failed for resource "{step["resource"]}"' 430 # Some steps are non-critical like queries, they may fail but 431 # should not cause a complete case failure. Allow these 432 # failures if the may_fail key is present 433 if step.get('may_fail', False): 434 self._logger.warning(msg) 435 self._progress_cb(step['resource'], step['name'], success) 436 continue 437 else: 438 self._logger.error(msg) 439 self._progress_cb(step['resource'], step['name'], success) 440 break 441 self._logger.debug(f'Command "{step["command"]}" executed on ' 442 f'resource {step["resource"]}') 443 444 # Step complete 445 self._progress_cb(step['resource'], step['name'], success) 446 447 # Step finished, let metric collector know 448 if (index + 1) < len(data['steps']): 449 collector.next_step() 450 451 # Stop metrics collection 452 collector.stop() 453 454 # Stop active containers 455 for resource in active_resources: 456 if resource is not None and hasattr(resource, 'stop'): 457 resource.stop() 458 459 self._logger.debug('Cleaned up all resource') 460 self._progress_cb('Cleaner', 'Clean up resources', True) 461 462 # Mark checkpoint if necessary 463 if checkpoint and success: 464 self._logger.debug('Writing checkpoint...') 465 with open(checkpoint_file, 'w') as f: 466 d = datetime.now().replace(microsecond=0).isoformat() 467 f.write(f'{d}\n') 468 469 # Log file 470 os.makedirs(os.path.join(results_run_path), exist_ok=True) 471 shutil.move(os.path.join(directory, LOG_FILE_NAME), 472 os.path.join(results_run_path, LOG_FILE_NAME)) 473 self._logger.debug('Copied logs to run results path') 474 475 # Metrics measurements 476 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 477 subdir = metrics_file.replace(f'{data_path}/', '') \ 478 .replace('/METRICS_FILE_NAME', '') 479 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 480 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 481 METRICS_FILE_NAME)) 482 self._logger.debug('Copied metric measurements to run results path') 483 484 # Results: all 'output_file' and 'result_file' values 485 if success: 486 self._logger.debug('Copying generated files for run') 487 for step in data['steps']: 488 subdir = step['resource'].lower().replace('_', '') 489 parameters = step['parameters'] 490 os.makedirs(os.path.join(results_run_path, subdir), 491 exist_ok=True) 492 if parameters.get('results_file', False): 493 results_file = parameters['results_file'] 494 p1 = os.path.join(directory, 'data/shared', results_file) 495 p2 = os.path.join(results_run_path, subdir, results_file) 496 try: 497 shutil.move(p1, p2) 498 except FileNotFoundError as e: 499 msg = f'Cannot find results file "{p1}": {e}' 500 self._logger.warning(msg) 501 502 if parameters.get('output_file', False) \ 503 and not parameters.get('multiple_files', False): 504 output_file = step['parameters']['output_file'] 505 p1 = os.path.join(directory, 'data/shared', output_file) 506 p2 = os.path.join(results_run_path, subdir, output_file) 507 try: 508 shutil.move(p1, p2) 509 except FileNotFoundError as e: 510 msg = f'Cannot find output file "{p1}": {e}' 511 self._logger.warning(msg) 512 513 # Run complete, mark it 514 run_checkpoint_file = os.path.join(results_run_path, 515 CHECKPOINT_FILE_NAME) 516 self._logger.debug('Writing run checkpoint...') 517 with open(run_checkpoint_file, 'w') as f: 518 d = datetime.now().replace(microsecond=0).isoformat() 519 f.write(f'{d}\n') 520 521 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 522 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 523 True) 524 sleep(WAIT_TIME) 525 526 return success
Execute a case.
Execute all steps of a case while collecting metrics and logs. The metrics are collected at a given interval and for a specific run of the case to allow multiple executions of the same case. Checkpoints of runs can be enabled to allow the executor to restart where it stopped in case of a failure, electricity blackout, etc.
Parameters
- case (dict): The case to execute.
- interval (float): The sample interval for the metrics collection.
- run (int): The run number of the case.
- checkpoint (bool): Enable checkpoints after each run to allow restarts.
Returns
- success (bool): Whether the case was executed successfully or not.
528 def list(self) -> list: 529 """List all cases in a root directory. 530 531 Retrieve a list of all discovered valid cases in a given directory. 532 Cases which do not pass the validation, are excluded and their 533 validation errors are reported through logging. 534 535 Returns 536 ------- 537 cases : list 538 List of discovered cases. 539 """ 540 cases = [] 541 542 for directory in glob(self._main_directory): 543 for root, dirs, files in os.walk(directory): 544 for file in files: 545 if os.path.basename(file) == METADATA_FILE: 546 path = os.path.join(root, file) 547 with open(path, 'r') as f: 548 data = json.load(f) 549 if self._validate_case(data, path): 550 cases.append({ 551 'directory': os.path.dirname(path), 552 'data': data 553 }) 554 555 return cases
List all cases in a root directory.
Retrieve a list of all discovered valid cases in a given directory. Cases which do not pass the validation, are excluded and their validation errors are reported through logging.
Returns
- cases (list): List of discovered cases.