bench_executor.executor

This module holds the Executor class which is responsible for executing a case, collecting metrics, and exposing this functionality to the CLI. All features of this tool can be accessed through the Executor class, other classes should not be used directly.

  1#!/usr/bin/env python3
  2
  3"""
  4This module holds the Executor class which is responsible for executing a case,
  5collecting metrics, and exposing this functionality to the CLI.
  6All features of this tool can be accessed through the Executor class, other
  7classes should not be used directly.
  8"""
  9
 10import os
 11import sys
 12import json
 13import jsonschema
 14import importlib
 15import inspect
 16import shutil
 17from glob import glob
 18from datetime import datetime
 19from time import sleep
 20from typing import List, Dict, Any
 21from bench_executor.collector import Collector, METRICS_FILE_NAME
 22from bench_executor.stats import Stats
 23from bench_executor.logger import Logger, LOG_FILE_NAME
 24
 25METADATA_FILE = 'metadata.json'
 26SCHEMA_FILE = 'metadata.schema'
 27CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'config')
 28WAIT_TIME = 15  # seconds
 29CHECKPOINT_FILE_NAME = '.done'
 30
 31
 32# Dummy callback in case no callback was provided
 33def _progress_cb(resource: str, name: str, success: bool):
 34    pass
 35
 36
 37class Executor:
 38    """
 39    Executor class executes a case.
 40    """
 41
 42    def __init__(self, main_directory: str, verbose: bool = False,
 43                 progress_cb=_progress_cb):
 44        """Create an instance of the Executor class.
 45
 46        Parameters
 47        ----------
 48        main_directory : str
 49            The root directory of all the cases to execute.
 50        verbose : bool
 51            Enables verbose logs.
 52        process_cb : function
 53            Callback to call when a step is completed of the case. By default,
 54            a dummy callback is provided if the argument is missing.
 55        """
 56        self._main_directory = os.path.abspath(main_directory)
 57        self._schema = {}
 58        self._resources: List[Dict[str, Any]] = []
 59        self._class_module_mapping: Dict[str, Any] = {}
 60        self._verbose = verbose
 61        self._progress_cb = progress_cb
 62        self._logger = Logger(__name__, self._main_directory, self._verbose)
 63
 64        self._init_resources()
 65
 66        with open(os.path.join(os.path.dirname(__file__), 'data',
 67                               SCHEMA_FILE)) as f:
 68            self._schema = json.load(f)
 69
 70    @property
 71    def main_directory(self) -> str:
 72        """The main directory of all the cases.
 73
 74        Returns
 75        -------
 76        main_directory : str
 77            The path to the main directory of the cases.
 78        """
 79        return self._main_directory
 80
 81    def _init_resources(self) -> None:
 82        """Initialize resources of a case
 83
 84        Resources are discovered automatically by analyzing Python modules.
 85        """
 86
 87        # Discover all modules to import
 88        sys.path.append(os.path.dirname(__file__))
 89        self._modules = list(filter(lambda x: x.endswith('.py')
 90                                    and '__init__' not in x
 91                                    and '__pycache__' not in x,
 92                                    os.listdir(os.path.dirname(__file__))))
 93
 94        # Discover all classes in each module
 95        for m in self._modules:
 96            module_name = os.path.splitext(m)[0]
 97            imported_module = importlib.import_module(module_name)
 98            for name, cls in inspect.getmembers(imported_module,
 99                                                inspect.isclass):
100                if name.startswith('_') or name[0].islower():
101                    continue
102
103                # Store class-module mapping for reverse look-up
104                self._class_module_mapping[name] = imported_module
105
106                # Discover all methods and their parameters in each class
107                methods: Dict[str, List[Dict[str, str]]] = {}
108                filt = filter(lambda x: '__init__' not in x,
109                              inspect.getmembers(cls, inspect.isfunction))
110                for method_name, method in filt:
111                    parameters = inspect.signature(method).parameters
112                    methods[method_name] = []
113                    for key in parameters.keys():
114                        if key == 'self':
115                            continue
116                        p = parameters[key]
117                        required = (p.default == inspect.Parameter.empty)
118                        methods[method_name].append({'name': p.name,
119                                                     'required': required})
120
121                if name not in list(filter(lambda x: x['name'],
122                                           self._resources)):
123                    self._resources.append({'name': name, 'commands': methods})
124
125    def _resources_all_names(self) -> list:
126        """Retrieve all resources' name in a case.
127
128        Returns
129        -------
130        names : list
131            List of all resources' name in a case.
132        """
133        names = []
134        for r in self._resources:
135            names.append(r['name'])
136
137        return names
138
139    def _resources_all_commands_by_name(self, name: str) -> list:
140        """Retrieve all resources' commands.
141
142        Parameters
143        ----------
144        name : str
145            The resource's name.
146
147        Returns
148        -------
149        commands : list
150            List of commands for the resource.
151        """
152        commands = []
153        for r in filter(lambda x: x['name'] == name, self._resources):
154            commands += list(r['commands'].keys())  # type: ignore
155
156        return commands
157
158    def _resources_all_parameters_by_command(self, name: str,
159                                             command: str,
160                                             required_only=False) -> list:
161        """Retrieve all parameters of a command of a resource.
162
163        Parameters
164        ----------
165        name : str
166            The resource's name.
167        command : str
168            The command's name.
169        required_only : bool
170            Only return the required parameters of a command. Default all
171            parameters are returned.
172
173        Returns
174        -------
175        parameters : list
176            List of parameters of the resource's command. None if failed.
177
178        Raises
179        ------
180        KeyError : Exception
181            If the command cannot be found for the resource.
182        """
183        parameters = []
184        for r in filter(lambda x: x['name'] == name, self._resources):
185            try:
186                for p in r['commands'][command]:
187                    if required_only:
188                        if p['required']:
189                            parameters.append(p['name'])
190                    else:
191                        parameters.append(p['name'])
192            except KeyError as e:
193                self._logger.error(f'Command "{command}" not found for '
194                                   f'resource "{name}": {e}')
195                raise e
196
197        return parameters
198
199    def _validate_case(self, case: dict, path: str) -> bool:
200        """Validate a case's syntax.
201
202        Verify if a case has a valid syntax or not. Report any errors
203        discovered through logging and return if the validation succeeded or
204        not.
205
206        Parameters
207        ----------
208        case : dict
209            The case to validate.
210        path : str
211            The file path to the case.
212
213        Returns
214        -------
215        success : bool
216            Whether the validation of the case succeeded or not.
217        """
218        try:
219            # Verify schema
220            jsonschema.validate(case, self._schema)
221
222            # Verify values
223            for step in case['steps']:
224                # Check if resource is known
225                names = self._resources_all_names()
226                if step['resource'] not in names:
227                    msg = f'{path}: Unknown resource "{step["resource"]}"'
228                    self._logger.error(msg)
229                    return False
230
231                # Check if command is known
232                r = step['resource']
233                commands = self._resources_all_commands_by_name(r)
234                if commands is None or step['command'] not in commands:
235                    msg = f'{path}: Unknown command "{step["command"]}" ' + \
236                          f'for resource "{step["resource"]}"'
237                    self._logger.error(msg)
238                    return False
239
240                # Check if parameters are known
241                r = step['resource']
242                c = step['command']
243                parameters = self._resources_all_parameters_by_command(r, c)
244                if parameters is None:
245                    return False
246
247                for p in step['parameters'].keys():
248                    if p not in parameters:
249                        msg = f'{path}: Unkown parameter "{p}" for ' + \
250                              f'command "{step["command"]}" of resource ' + \
251                              f'"{step["resource"]}"'
252                        self._logger.error(msg)
253                        return False
254
255                # Check if all required parameters are provided
256                r = step['resource']
257                c = step['command']
258                parameters = \
259                    self._resources_all_parameters_by_command(r, c, True)
260                for p in parameters:
261                    if p not in step['parameters'].keys():
262                        msg = f'{path}: Missing required parameter "{p}" ' + \
263                              f'for command "{step["command"]}" ' + \
264                              f'of resource "{step["resource"]}"'
265                        self._logger.error(msg)
266                        return False
267
268        except jsonschema.ValidationError:
269            msg = f'{path}: JSON schema violation'
270            self._logger.error(msg)
271            return False
272
273        return True
274
275    def stats(self, case: dict) -> bool:
276        """Generate statistics for a case.
277
278        Generate statistics for an executed case. The case must be executed
279        before to succeed.
280
281        Parameters
282        ----------
283        case : dict
284            The case to generate statistics for.
285
286        Returns
287        -------
288        success : bool
289            Whether the statistics are generated with success or not.
290
291        """
292        data = case['data']
293        directory = case['directory']
294        results_path = os.path.join(directory, 'results')
295
296        if not os.path.exists(results_path):
297            msg = f'Results do not exist for case "{data["name"]}"'
298            self._logger.error(msg)
299            return False
300
301        stats = Stats(results_path, len(data['steps']), directory,
302                      self._verbose)
303        stats.aggregate()
304
305        return True
306
307    def clean(self, case: dict) -> bool:
308        """Clean a case.
309
310        Clean up all results and metrics for a case to start it fresh.
311
312        Parameters
313        ----------
314        case : dict
315            The case to clean.
316
317        Returns
318        -------
319        success : bool
320            Whether the cleaning of the case succeeded or not.
321        """
322        # Checkpoints
323        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
324        if os.path.exists(checkpoint_file):
325            os.remove(checkpoint_file)
326
327        # Results: log files, metric measurements, run checkpoints
328        for result_dir in glob(f'{case["directory"]}/results'):
329            shutil.rmtree(result_dir)
330
331        # Data: persistent storage
332        for data_dir in glob(f'{case["directory"]}/data/*'):
333            if not data_dir.endswith('shared'):
334                shutil.rmtree(data_dir)
335
336        return True
337
338    def run(self, case: dict, interval: float,
339            run: int, checkpoint: bool) -> bool:
340        """Execute a case.
341
342        Execute all steps of a case while collecting metrics and logs.
343        The metrics are collected at a given interval and for a specific run of
344        the case to allow multiple executions of the same case. Checkpoints of
345        runs can be enabled to allow the executor to restart where it stopped
346        in case of a failure, electricity blackout, etc.
347
348        Parameters
349        ----------
350        case : dict
351            The case to execute.
352        interval : float
353            The sample interval for the metrics collection.
354        run : int
355            The run number of the case.
356        checkpoint : bool
357            Enable checkpoints after each run to allow restarts.
358
359        Returns
360        -------
361        success : bool
362            Whether the case was executed successfully or not.
363        """
364        success = True
365        data = case['data']
366        directory = case['directory']
367        data_path = os.path.join(directory, 'data')
368        results_run_path = os.path.join(directory, 'results', f'run_{run}')
369        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
370        run_checkpoint_file = os.path.join(results_run_path,
371                                           CHECKPOINT_FILE_NAME)
372        active_resources = []
373
374        # Make sure we start with a clean setup before the first run
375        if run == 1:
376            self.clean(case)
377
378        # create directories
379        os.umask(0)
380        os.makedirs(data_path, exist_ok=True)
381        os.makedirs(results_run_path, exist_ok=True)
382
383        # Initialize resources if needed
384        # Some resources have to perform an initialization step such as
385        # configuring database users, storage, etc. which is only done once
386        for step in data['steps']:
387            module = self._class_module_mapping[step['resource']]
388            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
389                                                         directory,
390                                                         self._verbose)
391            if hasattr(resource, 'initialization'):
392                if not resource.initialization():
393                    self._logger.error('Failed to initialize resource '
394                                       f'{step["resource"]}')
395                    return False
396
397                self._logger.debug(f'Resource {step["resource"]} initialized')
398                self._progress_cb('Initializing', step['resource'], success)
399
400        # Launch metrics collection
401        collector = Collector(results_run_path, interval, len(data['steps']),
402                              run, directory, self._verbose)
403
404        # Execute steps
405        for index, step in enumerate(data['steps']):
406            success = True
407            module = self._class_module_mapping[step['resource']]
408            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
409                                                         directory,
410                                                         self._verbose)
411            active_resources.append(resource)
412
413            # Containers may need to start up first before executing a command
414            if hasattr(resource, 'wait_until_ready'):
415                if not resource.wait_until_ready():
416                    success = False
417                    self._logger.error('Waiting until resource '
418                                       f'"{step["resource"]} is ready failed')
419                    self._progress_cb(step['resource'], step['name'], success)
420                    break
421                self._logger.debug(f'Resource {step["resource"]} ready')
422
423            # Execute command
424            command = getattr(resource, step['command'])
425            if not command(**step['parameters']):
426                success = False
427                msg = f'Executing command "{step["command"]}" ' + \
428                      f'failed for resource "{step["resource"]}"'
429                # Some steps are non-critical like queries, they may fail but
430                # should not cause a complete case failure. Allow these
431                # failures if the may_fail key is present
432                if step.get('may_fail', False):
433                    self._logger.warning(msg)
434                    self._progress_cb(step['resource'], step['name'], success)
435                    continue
436                else:
437                    self._logger.error(msg)
438                    self._progress_cb(step['resource'], step['name'], success)
439                    break
440            self._logger.debug(f'Command "{step["command"]}" executed on '
441                               f'resource {step["resource"]}')
442
443            # Step complete
444            self._progress_cb(step['resource'], step['name'], success)
445
446            # Step finished, let metric collector know
447            if (index + 1) < len(data['steps']):
448                collector.next_step()
449
450        # Stop metrics collection
451        collector.stop()
452
453        # Stop active containers
454        for resource in active_resources:
455            if resource is not None and hasattr(resource, 'stop'):
456                resource.stop()
457
458        self._logger.debug('Cleaned up all resource')
459        self._progress_cb('Cleaner', 'Clean up resources', True)
460
461        # Mark checkpoint if necessary
462        if checkpoint and success:
463            self._logger.debug('Writing checkpoint...')
464            with open(checkpoint_file, 'w') as f:
465                d = datetime.now().replace(microsecond=0).isoformat()
466                f.write(f'{d}\n')
467
468        # Log file
469        os.makedirs(os.path.join(results_run_path), exist_ok=True)
470        shutil.move(os.path.join(directory, LOG_FILE_NAME),
471                    os.path.join(results_run_path, LOG_FILE_NAME))
472        self._logger.debug('Copied logs to run results path')
473
474        # Metrics measurements
475        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
476            subdir = metrics_file.replace(f'{data_path}/', '') \
477                    .replace('/METRICS_FILE_NAME', '')
478            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
479            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
480                                                   METRICS_FILE_NAME))
481        self._logger.debug('Copied metric measurements to run results path')
482
483        # Results: all 'output_file' and 'result_file' values
484        if success:
485            self._logger.debug('Copying generated files for run')
486            for step in data['steps']:
487                subdir = step['resource'].lower().replace('_', '')
488                parameters = step['parameters']
489                os.makedirs(os.path.join(results_run_path, subdir),
490                            exist_ok=True)
491                if parameters.get('results_file', False):
492                    results_file = parameters['results_file']
493                    p1 = os.path.join(directory, 'data/shared', results_file)
494                    p2 = os.path.join(results_run_path, subdir, results_file)
495                    try:
496                        shutil.move(p1, p2)
497                    except FileNotFoundError as e:
498                        msg = f'Cannot find results file "{p1}": {e}'
499                        self._logger.warning(msg)
500
501                if parameters.get('output_file', False) \
502                        and not parameters.get('multiple_files', False):
503                    output_file = step['parameters']['output_file']
504                    p1 = os.path.join(directory, 'data/shared', output_file)
505                    p2 = os.path.join(results_run_path, subdir, output_file)
506                    try:
507                        shutil.move(p1, p2)
508                    except FileNotFoundError as e:
509                        msg = f'Cannot find output file "{p1}": {e}'
510                        self._logger.warning(msg)
511
512            # Run complete, mark it
513            run_checkpoint_file = os.path.join(results_run_path,
514                                               CHECKPOINT_FILE_NAME)
515            self._logger.debug('Writing run checkpoint...')
516            with open(run_checkpoint_file, 'w') as f:
517                d = datetime.now().replace(microsecond=0).isoformat()
518                f.write(f'{d}\n')
519
520        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
521        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
522                          True)
523        sleep(WAIT_TIME)
524
525        return success
526
527    def list(self) -> list:
528        """List all cases in a root directory.
529
530            Retrieve a list of all discovered valid cases in a given directory.
531            Cases which do not pass the validation, are excluded and their
532            validation errors are reported through logging.
533
534            Returns
535            -------
536            cases : list
537                List of discovered cases.
538        """
539        cases = []
540
541        for directory in glob(self._main_directory):
542            for root, dirs, files in os.walk(directory):
543                for file in files:
544                    if os.path.basename(file) == METADATA_FILE:
545                        path = os.path.join(root, file)
546                        with open(path, 'r') as f:
547                            data = json.load(f)
548                            if self._validate_case(data, path):
549                                cases.append({
550                                    'directory': os.path.dirname(path),
551                                    'data': data
552                                })
553
554        return cases
class Executor:
 38class Executor:
 39    """
 40    Executor class executes a case.
 41    """
 42
 43    def __init__(self, main_directory: str, verbose: bool = False,
 44                 progress_cb=_progress_cb):
 45        """Create an instance of the Executor class.
 46
 47        Parameters
 48        ----------
 49        main_directory : str
 50            The root directory of all the cases to execute.
 51        verbose : bool
 52            Enables verbose logs.
 53        process_cb : function
 54            Callback to call when a step is completed of the case. By default,
 55            a dummy callback is provided if the argument is missing.
 56        """
 57        self._main_directory = os.path.abspath(main_directory)
 58        self._schema = {}
 59        self._resources: List[Dict[str, Any]] = []
 60        self._class_module_mapping: Dict[str, Any] = {}
 61        self._verbose = verbose
 62        self._progress_cb = progress_cb
 63        self._logger = Logger(__name__, self._main_directory, self._verbose)
 64
 65        self._init_resources()
 66
 67        with open(os.path.join(os.path.dirname(__file__), 'data',
 68                               SCHEMA_FILE)) as f:
 69            self._schema = json.load(f)
 70
 71    @property
 72    def main_directory(self) -> str:
 73        """The main directory of all the cases.
 74
 75        Returns
 76        -------
 77        main_directory : str
 78            The path to the main directory of the cases.
 79        """
 80        return self._main_directory
 81
 82    def _init_resources(self) -> None:
 83        """Initialize resources of a case
 84
 85        Resources are discovered automatically by analyzing Python modules.
 86        """
 87
 88        # Discover all modules to import
 89        sys.path.append(os.path.dirname(__file__))
 90        self._modules = list(filter(lambda x: x.endswith('.py')
 91                                    and '__init__' not in x
 92                                    and '__pycache__' not in x,
 93                                    os.listdir(os.path.dirname(__file__))))
 94
 95        # Discover all classes in each module
 96        for m in self._modules:
 97            module_name = os.path.splitext(m)[0]
 98            imported_module = importlib.import_module(module_name)
 99            for name, cls in inspect.getmembers(imported_module,
100                                                inspect.isclass):
101                if name.startswith('_') or name[0].islower():
102                    continue
103
104                # Store class-module mapping for reverse look-up
105                self._class_module_mapping[name] = imported_module
106
107                # Discover all methods and their parameters in each class
108                methods: Dict[str, List[Dict[str, str]]] = {}
109                filt = filter(lambda x: '__init__' not in x,
110                              inspect.getmembers(cls, inspect.isfunction))
111                for method_name, method in filt:
112                    parameters = inspect.signature(method).parameters
113                    methods[method_name] = []
114                    for key in parameters.keys():
115                        if key == 'self':
116                            continue
117                        p = parameters[key]
118                        required = (p.default == inspect.Parameter.empty)
119                        methods[method_name].append({'name': p.name,
120                                                     'required': required})
121
122                if name not in list(filter(lambda x: x['name'],
123                                           self._resources)):
124                    self._resources.append({'name': name, 'commands': methods})
125
126    def _resources_all_names(self) -> list:
127        """Retrieve all resources' name in a case.
128
129        Returns
130        -------
131        names : list
132            List of all resources' name in a case.
133        """
134        names = []
135        for r in self._resources:
136            names.append(r['name'])
137
138        return names
139
140    def _resources_all_commands_by_name(self, name: str) -> list:
141        """Retrieve all resources' commands.
142
143        Parameters
144        ----------
145        name : str
146            The resource's name.
147
148        Returns
149        -------
150        commands : list
151            List of commands for the resource.
152        """
153        commands = []
154        for r in filter(lambda x: x['name'] == name, self._resources):
155            commands += list(r['commands'].keys())  # type: ignore
156
157        return commands
158
159    def _resources_all_parameters_by_command(self, name: str,
160                                             command: str,
161                                             required_only=False) -> list:
162        """Retrieve all parameters of a command of a resource.
163
164        Parameters
165        ----------
166        name : str
167            The resource's name.
168        command : str
169            The command's name.
170        required_only : bool
171            Only return the required parameters of a command. Default all
172            parameters are returned.
173
174        Returns
175        -------
176        parameters : list
177            List of parameters of the resource's command. None if failed.
178
179        Raises
180        ------
181        KeyError : Exception
182            If the command cannot be found for the resource.
183        """
184        parameters = []
185        for r in filter(lambda x: x['name'] == name, self._resources):
186            try:
187                for p in r['commands'][command]:
188                    if required_only:
189                        if p['required']:
190                            parameters.append(p['name'])
191                    else:
192                        parameters.append(p['name'])
193            except KeyError as e:
194                self._logger.error(f'Command "{command}" not found for '
195                                   f'resource "{name}": {e}')
196                raise e
197
198        return parameters
199
200    def _validate_case(self, case: dict, path: str) -> bool:
201        """Validate a case's syntax.
202
203        Verify if a case has a valid syntax or not. Report any errors
204        discovered through logging and return if the validation succeeded or
205        not.
206
207        Parameters
208        ----------
209        case : dict
210            The case to validate.
211        path : str
212            The file path to the case.
213
214        Returns
215        -------
216        success : bool
217            Whether the validation of the case succeeded or not.
218        """
219        try:
220            # Verify schema
221            jsonschema.validate(case, self._schema)
222
223            # Verify values
224            for step in case['steps']:
225                # Check if resource is known
226                names = self._resources_all_names()
227                if step['resource'] not in names:
228                    msg = f'{path}: Unknown resource "{step["resource"]}"'
229                    self._logger.error(msg)
230                    return False
231
232                # Check if command is known
233                r = step['resource']
234                commands = self._resources_all_commands_by_name(r)
235                if commands is None or step['command'] not in commands:
236                    msg = f'{path}: Unknown command "{step["command"]}" ' + \
237                          f'for resource "{step["resource"]}"'
238                    self._logger.error(msg)
239                    return False
240
241                # Check if parameters are known
242                r = step['resource']
243                c = step['command']
244                parameters = self._resources_all_parameters_by_command(r, c)
245                if parameters is None:
246                    return False
247
248                for p in step['parameters'].keys():
249                    if p not in parameters:
250                        msg = f'{path}: Unkown parameter "{p}" for ' + \
251                              f'command "{step["command"]}" of resource ' + \
252                              f'"{step["resource"]}"'
253                        self._logger.error(msg)
254                        return False
255
256                # Check if all required parameters are provided
257                r = step['resource']
258                c = step['command']
259                parameters = \
260                    self._resources_all_parameters_by_command(r, c, True)
261                for p in parameters:
262                    if p not in step['parameters'].keys():
263                        msg = f'{path}: Missing required parameter "{p}" ' + \
264                              f'for command "{step["command"]}" ' + \
265                              f'of resource "{step["resource"]}"'
266                        self._logger.error(msg)
267                        return False
268
269        except jsonschema.ValidationError:
270            msg = f'{path}: JSON schema violation'
271            self._logger.error(msg)
272            return False
273
274        return True
275
276    def stats(self, case: dict) -> bool:
277        """Generate statistics for a case.
278
279        Generate statistics for an executed case. The case must be executed
280        before to succeed.
281
282        Parameters
283        ----------
284        case : dict
285            The case to generate statistics for.
286
287        Returns
288        -------
289        success : bool
290            Whether the statistics are generated with success or not.
291
292        """
293        data = case['data']
294        directory = case['directory']
295        results_path = os.path.join(directory, 'results')
296
297        if not os.path.exists(results_path):
298            msg = f'Results do not exist for case "{data["name"]}"'
299            self._logger.error(msg)
300            return False
301
302        stats = Stats(results_path, len(data['steps']), directory,
303                      self._verbose)
304        stats.aggregate()
305
306        return True
307
308    def clean(self, case: dict) -> bool:
309        """Clean a case.
310
311        Clean up all results and metrics for a case to start it fresh.
312
313        Parameters
314        ----------
315        case : dict
316            The case to clean.
317
318        Returns
319        -------
320        success : bool
321            Whether the cleaning of the case succeeded or not.
322        """
323        # Checkpoints
324        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
325        if os.path.exists(checkpoint_file):
326            os.remove(checkpoint_file)
327
328        # Results: log files, metric measurements, run checkpoints
329        for result_dir in glob(f'{case["directory"]}/results'):
330            shutil.rmtree(result_dir)
331
332        # Data: persistent storage
333        for data_dir in glob(f'{case["directory"]}/data/*'):
334            if not data_dir.endswith('shared'):
335                shutil.rmtree(data_dir)
336
337        return True
338
339    def run(self, case: dict, interval: float,
340            run: int, checkpoint: bool) -> bool:
341        """Execute a case.
342
343        Execute all steps of a case while collecting metrics and logs.
344        The metrics are collected at a given interval and for a specific run of
345        the case to allow multiple executions of the same case. Checkpoints of
346        runs can be enabled to allow the executor to restart where it stopped
347        in case of a failure, electricity blackout, etc.
348
349        Parameters
350        ----------
351        case : dict
352            The case to execute.
353        interval : float
354            The sample interval for the metrics collection.
355        run : int
356            The run number of the case.
357        checkpoint : bool
358            Enable checkpoints after each run to allow restarts.
359
360        Returns
361        -------
362        success : bool
363            Whether the case was executed successfully or not.
364        """
365        success = True
366        data = case['data']
367        directory = case['directory']
368        data_path = os.path.join(directory, 'data')
369        results_run_path = os.path.join(directory, 'results', f'run_{run}')
370        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
371        run_checkpoint_file = os.path.join(results_run_path,
372                                           CHECKPOINT_FILE_NAME)
373        active_resources = []
374
375        # Make sure we start with a clean setup before the first run
376        if run == 1:
377            self.clean(case)
378
379        # create directories
380        os.umask(0)
381        os.makedirs(data_path, exist_ok=True)
382        os.makedirs(results_run_path, exist_ok=True)
383
384        # Initialize resources if needed
385        # Some resources have to perform an initialization step such as
386        # configuring database users, storage, etc. which is only done once
387        for step in data['steps']:
388            module = self._class_module_mapping[step['resource']]
389            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
390                                                         directory,
391                                                         self._verbose)
392            if hasattr(resource, 'initialization'):
393                if not resource.initialization():
394                    self._logger.error('Failed to initialize resource '
395                                       f'{step["resource"]}')
396                    return False
397
398                self._logger.debug(f'Resource {step["resource"]} initialized')
399                self._progress_cb('Initializing', step['resource'], success)
400
401        # Launch metrics collection
402        collector = Collector(results_run_path, interval, len(data['steps']),
403                              run, directory, self._verbose)
404
405        # Execute steps
406        for index, step in enumerate(data['steps']):
407            success = True
408            module = self._class_module_mapping[step['resource']]
409            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
410                                                         directory,
411                                                         self._verbose)
412            active_resources.append(resource)
413
414            # Containers may need to start up first before executing a command
415            if hasattr(resource, 'wait_until_ready'):
416                if not resource.wait_until_ready():
417                    success = False
418                    self._logger.error('Waiting until resource '
419                                       f'"{step["resource"]} is ready failed')
420                    self._progress_cb(step['resource'], step['name'], success)
421                    break
422                self._logger.debug(f'Resource {step["resource"]} ready')
423
424            # Execute command
425            command = getattr(resource, step['command'])
426            if not command(**step['parameters']):
427                success = False
428                msg = f'Executing command "{step["command"]}" ' + \
429                      f'failed for resource "{step["resource"]}"'
430                # Some steps are non-critical like queries, they may fail but
431                # should not cause a complete case failure. Allow these
432                # failures if the may_fail key is present
433                if step.get('may_fail', False):
434                    self._logger.warning(msg)
435                    self._progress_cb(step['resource'], step['name'], success)
436                    continue
437                else:
438                    self._logger.error(msg)
439                    self._progress_cb(step['resource'], step['name'], success)
440                    break
441            self._logger.debug(f'Command "{step["command"]}" executed on '
442                               f'resource {step["resource"]}')
443
444            # Step complete
445            self._progress_cb(step['resource'], step['name'], success)
446
447            # Step finished, let metric collector know
448            if (index + 1) < len(data['steps']):
449                collector.next_step()
450
451        # Stop metrics collection
452        collector.stop()
453
454        # Stop active containers
455        for resource in active_resources:
456            if resource is not None and hasattr(resource, 'stop'):
457                resource.stop()
458
459        self._logger.debug('Cleaned up all resource')
460        self._progress_cb('Cleaner', 'Clean up resources', True)
461
462        # Mark checkpoint if necessary
463        if checkpoint and success:
464            self._logger.debug('Writing checkpoint...')
465            with open(checkpoint_file, 'w') as f:
466                d = datetime.now().replace(microsecond=0).isoformat()
467                f.write(f'{d}\n')
468
469        # Log file
470        os.makedirs(os.path.join(results_run_path), exist_ok=True)
471        shutil.move(os.path.join(directory, LOG_FILE_NAME),
472                    os.path.join(results_run_path, LOG_FILE_NAME))
473        self._logger.debug('Copied logs to run results path')
474
475        # Metrics measurements
476        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
477            subdir = metrics_file.replace(f'{data_path}/', '') \
478                    .replace('/METRICS_FILE_NAME', '')
479            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
480            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
481                                                   METRICS_FILE_NAME))
482        self._logger.debug('Copied metric measurements to run results path')
483
484        # Results: all 'output_file' and 'result_file' values
485        if success:
486            self._logger.debug('Copying generated files for run')
487            for step in data['steps']:
488                subdir = step['resource'].lower().replace('_', '')
489                parameters = step['parameters']
490                os.makedirs(os.path.join(results_run_path, subdir),
491                            exist_ok=True)
492                if parameters.get('results_file', False):
493                    results_file = parameters['results_file']
494                    p1 = os.path.join(directory, 'data/shared', results_file)
495                    p2 = os.path.join(results_run_path, subdir, results_file)
496                    try:
497                        shutil.move(p1, p2)
498                    except FileNotFoundError as e:
499                        msg = f'Cannot find results file "{p1}": {e}'
500                        self._logger.warning(msg)
501
502                if parameters.get('output_file', False) \
503                        and not parameters.get('multiple_files', False):
504                    output_file = step['parameters']['output_file']
505                    p1 = os.path.join(directory, 'data/shared', output_file)
506                    p2 = os.path.join(results_run_path, subdir, output_file)
507                    try:
508                        shutil.move(p1, p2)
509                    except FileNotFoundError as e:
510                        msg = f'Cannot find output file "{p1}": {e}'
511                        self._logger.warning(msg)
512
513            # Run complete, mark it
514            run_checkpoint_file = os.path.join(results_run_path,
515                                               CHECKPOINT_FILE_NAME)
516            self._logger.debug('Writing run checkpoint...')
517            with open(run_checkpoint_file, 'w') as f:
518                d = datetime.now().replace(microsecond=0).isoformat()
519                f.write(f'{d}\n')
520
521        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
522        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
523                          True)
524        sleep(WAIT_TIME)
525
526        return success
527
528    def list(self) -> list:
529        """List all cases in a root directory.
530
531            Retrieve a list of all discovered valid cases in a given directory.
532            Cases which do not pass the validation, are excluded and their
533            validation errors are reported through logging.
534
535            Returns
536            -------
537            cases : list
538                List of discovered cases.
539        """
540        cases = []
541
542        for directory in glob(self._main_directory):
543            for root, dirs, files in os.walk(directory):
544                for file in files:
545                    if os.path.basename(file) == METADATA_FILE:
546                        path = os.path.join(root, file)
547                        with open(path, 'r') as f:
548                            data = json.load(f)
549                            if self._validate_case(data, path):
550                                cases.append({
551                                    'directory': os.path.dirname(path),
552                                    'data': data
553                                })
554
555        return cases

Executor class executes a case.

Executor( main_directory: str, verbose: bool = False, progress_cb=<function _progress_cb>)
43    def __init__(self, main_directory: str, verbose: bool = False,
44                 progress_cb=_progress_cb):
45        """Create an instance of the Executor class.
46
47        Parameters
48        ----------
49        main_directory : str
50            The root directory of all the cases to execute.
51        verbose : bool
52            Enables verbose logs.
53        process_cb : function
54            Callback to call when a step is completed of the case. By default,
55            a dummy callback is provided if the argument is missing.
56        """
57        self._main_directory = os.path.abspath(main_directory)
58        self._schema = {}
59        self._resources: List[Dict[str, Any]] = []
60        self._class_module_mapping: Dict[str, Any] = {}
61        self._verbose = verbose
62        self._progress_cb = progress_cb
63        self._logger = Logger(__name__, self._main_directory, self._verbose)
64
65        self._init_resources()
66
67        with open(os.path.join(os.path.dirname(__file__), 'data',
68                               SCHEMA_FILE)) as f:
69            self._schema = json.load(f)

Create an instance of the Executor class.

Parameters
  • main_directory (str): The root directory of all the cases to execute.
  • verbose (bool): Enables verbose logs.
  • process_cb (function): Callback to call when a step is completed of the case. By default, a dummy callback is provided if the argument is missing.
main_directory: str

The main directory of all the cases.

Returns
  • main_directory (str): The path to the main directory of the cases.
def stats(self, case: dict) -> bool:
276    def stats(self, case: dict) -> bool:
277        """Generate statistics for a case.
278
279        Generate statistics for an executed case. The case must be executed
280        before to succeed.
281
282        Parameters
283        ----------
284        case : dict
285            The case to generate statistics for.
286
287        Returns
288        -------
289        success : bool
290            Whether the statistics are generated with success or not.
291
292        """
293        data = case['data']
294        directory = case['directory']
295        results_path = os.path.join(directory, 'results')
296
297        if not os.path.exists(results_path):
298            msg = f'Results do not exist for case "{data["name"]}"'
299            self._logger.error(msg)
300            return False
301
302        stats = Stats(results_path, len(data['steps']), directory,
303                      self._verbose)
304        stats.aggregate()
305
306        return True

Generate statistics for a case.

Generate statistics for an executed case. The case must be executed before to succeed.

Parameters
  • case (dict): The case to generate statistics for.
Returns
  • success (bool): Whether the statistics are generated with success or not.
def clean(self, case: dict) -> bool:
308    def clean(self, case: dict) -> bool:
309        """Clean a case.
310
311        Clean up all results and metrics for a case to start it fresh.
312
313        Parameters
314        ----------
315        case : dict
316            The case to clean.
317
318        Returns
319        -------
320        success : bool
321            Whether the cleaning of the case succeeded or not.
322        """
323        # Checkpoints
324        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
325        if os.path.exists(checkpoint_file):
326            os.remove(checkpoint_file)
327
328        # Results: log files, metric measurements, run checkpoints
329        for result_dir in glob(f'{case["directory"]}/results'):
330            shutil.rmtree(result_dir)
331
332        # Data: persistent storage
333        for data_dir in glob(f'{case["directory"]}/data/*'):
334            if not data_dir.endswith('shared'):
335                shutil.rmtree(data_dir)
336
337        return True

Clean a case.

Clean up all results and metrics for a case to start it fresh.

Parameters
  • case (dict): The case to clean.
Returns
  • success (bool): Whether the cleaning of the case succeeded or not.
def run(self, case: dict, interval: float, run: int, checkpoint: bool) -> bool:
339    def run(self, case: dict, interval: float,
340            run: int, checkpoint: bool) -> bool:
341        """Execute a case.
342
343        Execute all steps of a case while collecting metrics and logs.
344        The metrics are collected at a given interval and for a specific run of
345        the case to allow multiple executions of the same case. Checkpoints of
346        runs can be enabled to allow the executor to restart where it stopped
347        in case of a failure, electricity blackout, etc.
348
349        Parameters
350        ----------
351        case : dict
352            The case to execute.
353        interval : float
354            The sample interval for the metrics collection.
355        run : int
356            The run number of the case.
357        checkpoint : bool
358            Enable checkpoints after each run to allow restarts.
359
360        Returns
361        -------
362        success : bool
363            Whether the case was executed successfully or not.
364        """
365        success = True
366        data = case['data']
367        directory = case['directory']
368        data_path = os.path.join(directory, 'data')
369        results_run_path = os.path.join(directory, 'results', f'run_{run}')
370        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
371        run_checkpoint_file = os.path.join(results_run_path,
372                                           CHECKPOINT_FILE_NAME)
373        active_resources = []
374
375        # Make sure we start with a clean setup before the first run
376        if run == 1:
377            self.clean(case)
378
379        # create directories
380        os.umask(0)
381        os.makedirs(data_path, exist_ok=True)
382        os.makedirs(results_run_path, exist_ok=True)
383
384        # Initialize resources if needed
385        # Some resources have to perform an initialization step such as
386        # configuring database users, storage, etc. which is only done once
387        for step in data['steps']:
388            module = self._class_module_mapping[step['resource']]
389            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
390                                                         directory,
391                                                         self._verbose)
392            if hasattr(resource, 'initialization'):
393                if not resource.initialization():
394                    self._logger.error('Failed to initialize resource '
395                                       f'{step["resource"]}')
396                    return False
397
398                self._logger.debug(f'Resource {step["resource"]} initialized')
399                self._progress_cb('Initializing', step['resource'], success)
400
401        # Launch metrics collection
402        collector = Collector(results_run_path, interval, len(data['steps']),
403                              run, directory, self._verbose)
404
405        # Execute steps
406        for index, step in enumerate(data['steps']):
407            success = True
408            module = self._class_module_mapping[step['resource']]
409            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
410                                                         directory,
411                                                         self._verbose)
412            active_resources.append(resource)
413
414            # Containers may need to start up first before executing a command
415            if hasattr(resource, 'wait_until_ready'):
416                if not resource.wait_until_ready():
417                    success = False
418                    self._logger.error('Waiting until resource '
419                                       f'"{step["resource"]} is ready failed')
420                    self._progress_cb(step['resource'], step['name'], success)
421                    break
422                self._logger.debug(f'Resource {step["resource"]} ready')
423
424            # Execute command
425            command = getattr(resource, step['command'])
426            if not command(**step['parameters']):
427                success = False
428                msg = f'Executing command "{step["command"]}" ' + \
429                      f'failed for resource "{step["resource"]}"'
430                # Some steps are non-critical like queries, they may fail but
431                # should not cause a complete case failure. Allow these
432                # failures if the may_fail key is present
433                if step.get('may_fail', False):
434                    self._logger.warning(msg)
435                    self._progress_cb(step['resource'], step['name'], success)
436                    continue
437                else:
438                    self._logger.error(msg)
439                    self._progress_cb(step['resource'], step['name'], success)
440                    break
441            self._logger.debug(f'Command "{step["command"]}" executed on '
442                               f'resource {step["resource"]}')
443
444            # Step complete
445            self._progress_cb(step['resource'], step['name'], success)
446
447            # Step finished, let metric collector know
448            if (index + 1) < len(data['steps']):
449                collector.next_step()
450
451        # Stop metrics collection
452        collector.stop()
453
454        # Stop active containers
455        for resource in active_resources:
456            if resource is not None and hasattr(resource, 'stop'):
457                resource.stop()
458
459        self._logger.debug('Cleaned up all resource')
460        self._progress_cb('Cleaner', 'Clean up resources', True)
461
462        # Mark checkpoint if necessary
463        if checkpoint and success:
464            self._logger.debug('Writing checkpoint...')
465            with open(checkpoint_file, 'w') as f:
466                d = datetime.now().replace(microsecond=0).isoformat()
467                f.write(f'{d}\n')
468
469        # Log file
470        os.makedirs(os.path.join(results_run_path), exist_ok=True)
471        shutil.move(os.path.join(directory, LOG_FILE_NAME),
472                    os.path.join(results_run_path, LOG_FILE_NAME))
473        self._logger.debug('Copied logs to run results path')
474
475        # Metrics measurements
476        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
477            subdir = metrics_file.replace(f'{data_path}/', '') \
478                    .replace('/METRICS_FILE_NAME', '')
479            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
480            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
481                                                   METRICS_FILE_NAME))
482        self._logger.debug('Copied metric measurements to run results path')
483
484        # Results: all 'output_file' and 'result_file' values
485        if success:
486            self._logger.debug('Copying generated files for run')
487            for step in data['steps']:
488                subdir = step['resource'].lower().replace('_', '')
489                parameters = step['parameters']
490                os.makedirs(os.path.join(results_run_path, subdir),
491                            exist_ok=True)
492                if parameters.get('results_file', False):
493                    results_file = parameters['results_file']
494                    p1 = os.path.join(directory, 'data/shared', results_file)
495                    p2 = os.path.join(results_run_path, subdir, results_file)
496                    try:
497                        shutil.move(p1, p2)
498                    except FileNotFoundError as e:
499                        msg = f'Cannot find results file "{p1}": {e}'
500                        self._logger.warning(msg)
501
502                if parameters.get('output_file', False) \
503                        and not parameters.get('multiple_files', False):
504                    output_file = step['parameters']['output_file']
505                    p1 = os.path.join(directory, 'data/shared', output_file)
506                    p2 = os.path.join(results_run_path, subdir, output_file)
507                    try:
508                        shutil.move(p1, p2)
509                    except FileNotFoundError as e:
510                        msg = f'Cannot find output file "{p1}": {e}'
511                        self._logger.warning(msg)
512
513            # Run complete, mark it
514            run_checkpoint_file = os.path.join(results_run_path,
515                                               CHECKPOINT_FILE_NAME)
516            self._logger.debug('Writing run checkpoint...')
517            with open(run_checkpoint_file, 'w') as f:
518                d = datetime.now().replace(microsecond=0).isoformat()
519                f.write(f'{d}\n')
520
521        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
522        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
523                          True)
524        sleep(WAIT_TIME)
525
526        return success

Execute a case.

Execute all steps of a case while collecting metrics and logs. The metrics are collected at a given interval and for a specific run of the case to allow multiple executions of the same case. Checkpoints of runs can be enabled to allow the executor to restart where it stopped in case of a failure, electricity blackout, etc.

Parameters
  • case (dict): The case to execute.
  • interval (float): The sample interval for the metrics collection.
  • run (int): The run number of the case.
  • checkpoint (bool): Enable checkpoints after each run to allow restarts.
Returns
  • success (bool): Whether the case was executed successfully or not.
def list(self) -> list:
528    def list(self) -> list:
529        """List all cases in a root directory.
530
531            Retrieve a list of all discovered valid cases in a given directory.
532            Cases which do not pass the validation, are excluded and their
533            validation errors are reported through logging.
534
535            Returns
536            -------
537            cases : list
538                List of discovered cases.
539        """
540        cases = []
541
542        for directory in glob(self._main_directory):
543            for root, dirs, files in os.walk(directory):
544                for file in files:
545                    if os.path.basename(file) == METADATA_FILE:
546                        path = os.path.join(root, file)
547                        with open(path, 'r') as f:
548                            data = json.load(f)
549                            if self._validate_case(data, path):
550                                cases.append({
551                                    'directory': os.path.dirname(path),
552                                    'data': data
553                                })
554
555        return cases

List all cases in a root directory.

Retrieve a list of all discovered valid cases in a given directory. Cases which do not pass the validation, are excluded and their validation errors are reported through logging.

Returns
  • cases (list): List of discovered cases.