bench_executor.stats

This module holds the Stats class which is responsible for generating staticstics from executed cases. It will automatically aggregate all runs of an executed case to generate an aggregated.csv and summary.csv files which can be used to compare various cases with each other.

  • aggregated.csv: For each run of a case, the median execution time of each step is calculated. For each step, the results of the run with the median execution time is used to assemble the aggregated results.
  • summary.csv: The summary is similar to the previous file, but provides a single result for each step to immediately see how long the step took, how many samples are provided for the step, etc.
  1#!/usr/bin/env python3
  2"""
  3This module holds the Stats class which is responsible for generating
  4staticstics from executed cases. It will automatically aggregate all runs of an
  5executed case to generate an `aggregated.csv` and `summary.csv` files which can
  6be used to compare various cases with each other.
  7
  8- `aggregated.csv`: For each run of a case, the median execution time of each
  9  step is calculated. For each step, the results of the run with the median
 10  execution time is used to assemble the aggregated results.
 11- `summary.csv`: The summary is similar to the previous file, but provides a
 12  single result for each step to immediately see how long the step took, how
 13  many samples are provided for the step, etc.
 14"""
 15
 16import os
 17from glob import glob
 18from statistics import median
 19from csv import DictWriter, DictReader
 20from typing import List
 21from bench_executor.collector import FIELDNAMES, METRICS_FILE_NAME
 22from bench_executor.logger import Logger
 23
 24METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv'
 25METRICS_SUMMARY_FILE_NAME = 'summary.csv'
 26FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle',
 27                    'cpu_iowait', 'cpu_user_system']
 28FIELDNAMES_INT = ['index', 'step', 'version', 'memory_ram', 'memory_swap',
 29                  'memory_ram_swap', 'disk_read_count', 'disk_write_count',
 30                  'disk_read_bytes', 'disk_write_bytes', 'disk_read_time',
 31                  'disk_write_time', 'disk_busy_time',
 32                  'network_received_count', 'network_sent_count',
 33                  'network_received_bytes', 'network_sent_bytes',
 34                  'network_received_error', 'network_sent_error',
 35                  'network_received_drop', 'network_sent_drop']
 36FIELDNAMES_SUMMARY = [
 37    'number_of_samples',
 38    'step',
 39    'duration',
 40    'version',
 41    'cpu_user_diff',
 42    'cpu_system_diff',
 43    'cpu_user_system_diff',
 44    'cpu_idle_diff',
 45    'cpu_iowait_diff',
 46    'memory_ram_max',
 47    'memory_swap_max',
 48    'memory_ram_swap_max',
 49    'memory_ram_min',
 50    'memory_swap_min',
 51    'memory_ram_swap_min',
 52    'disk_read_count_diff',
 53    'disk_write_count_diff',
 54    'disk_read_bytes_diff',
 55    'disk_write_bytes_diff',
 56    'disk_read_time_diff',
 57    'disk_write_time_diff',
 58    'disk_busy_time_diff',
 59    'network_received_count_diff',
 60    'network_sent_count_diff',
 61    'network_received_bytes_diff',
 62    'network_sent_bytes_diff',
 63    'network_received_error_diff',
 64    'network_sent_error_diff',
 65    'network_received_drop_diff',
 66    'network_sent_drop_diff'
 67]
 68ROUND = 4
 69
 70#
 71# Generate stats from the result runs by aggregating it on
 72# median execution time for each step. Processing is done for each step per
 73# run and unnecessary values are skipped to reduce the memory consumption.
 74#
 75# The median run is available in 'aggregated.csv' while a summarized version
 76# which only reports the diff or max value of each step in 'summary.csv'
 77#
 78
 79
 80class Stats():
 81    """Generate statistics for an executed case."""
 82
 83    def __init__(self, results_path: str, number_of_steps: int,
 84                 directory: str, verbose: bool):
 85        """Create an instance of the Stats class.
 86
 87        Parameters
 88        ----------
 89        results_path : str
 90            The path to the results directory of the case
 91        number_of_steps : int
 92            The number of steps of the case
 93        directory : str
 94            The path to the directory where the logs must be stored.
 95        verbose : bool
 96            Enable verbose logs.
 97        """
 98        self._results_path = os.path.abspath(results_path)
 99        self._number_of_steps = number_of_steps
100        self._logger = Logger(__name__, directory, verbose)
101
102        if not os.path.exists(results_path):
103            msg = f'Results do not exist: {results_path}'
104            self._logger.error(msg)
105            raise ValueError(msg)
106
107    def _parse_field(self, field, value):
108        """Parse the field of the metrics field in a Python data type."""
109        try:
110            if field in FIELDNAMES_FLOAT:
111                return float(value)
112            elif field in FIELDNAMES_INT:
113                return int(value)
114            else:
115                msg = f'Field "{field}" type is unknown'
116                self._logger.error(msg)
117                raise ValueError(msg)
118        except TypeError:
119            return -1
120
121    def _parse_v2(self, run_path, fields=FIELDNAMES, step=None):
122        """Parse the CSV metrics file in v2 format."""
123        data = []
124
125        metrics_file = os.path.join(run_path, METRICS_FILE_NAME)
126        if not os.path.exists(metrics_file):
127            self._logger.error(f'Metrics file "{metrics_file}" does not exist')
128            return []
129
130        # Filter the fields we want from above, this way we don't load all
131        # the data in memory during processing.
132        with open(metrics_file, 'r') as f:
133            reader = DictReader(f)
134            for line in reader:
135                corrupt: bool = False
136
137                # Skip steps we don't want to parse
138                if step is not None and \
139                   step != self._parse_field('step', line['step']):
140                    continue
141
142                # Filter on field names
143                filtered = {key: line[key] for key in fields}
144                entry = {}
145                for key, value in filtered.items():
146                    v = self._parse_field(key, value)
147                    if v == -1:
148                        corrupt = True
149                        msg = f'Corrupt entry {key} with value {value} in ' + \
150                              f'{metrics_file}, skipped'
151                        self._logger.info(msg)
152                        break
153
154                    entry[key] = v
155
156                if not corrupt:
157                    data.append(entry)
158
159        return data
160
161    def aggregate(self) -> bool:
162        """Aggregate the metrics of the different runs of a case.
163
164        Find the median execution time of each step across all runs and extract
165        the step from the run which has this median execution time to assemble
166        an aggregated version and summary version of the case's metrics.
167
168        Returns
169        -------
170        success : bool
171            Whether the aggregation was successfully or not.
172        """
173        # Find each median step of all runs before extracting more data for
174        # memory consumption reasons
175        runs = []
176        for run_path in glob(f'{self._results_path}/run_*/'):
177            # Extract run number
178            try:
179                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
180                run_id: int = int(run_folder.replace('run_', ''))
181            except ValueError:
182                self._logger.error(f'Run "{run_id}" is not a number')
183                return False
184
185            # Extract steps and timestamps of this run
186            data = self._parse_v2(run_path, fields=['step', 'timestamp'])
187
188            # Calculate timestamp diff for each step
189            step = 1
190            timestamps = []
191            step_end = 0.0
192            step_begin = 0.0
193            for entry in data:
194                entry_step = entry['step']
195                assert (entry_step >= step), 'Entry step decreased over time'
196
197                # Next step
198                if entry_step > step:
199                    # Calculate diff of current step if at least 2 entries
200                    # for the step exist, if not the diff is 0.0 and we fall
201                    # back to the step_begin timestamp which will make sure we
202                    # use the run with the timestamp that is the median of all
203                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
204                    # 5.0 is the median.
205                    diff = step_end - step_begin
206                    if diff == 0.0:
207                        self._logger.warning(f'Only 1 entry for step {step} '
208                                             f'found, falling back to median '
209                                             f'timestamp instead of diff')
210                        diff = step_begin
211
212                    timestamps.append(diff)
213
214                    # Reset for next step
215                    step = entry_step
216                    step_begin = entry['timestamp']
217                    step_end = entry['timestamp']
218                # step_end keeps increasing until the step changes
219                else:
220                    step_end = entry['timestamp']
221            # Final step does not cause an increment, add manually
222            timestamps.append(step_end - step_begin)
223            runs.append((run_id, timestamps))
224
225        # Statistics rely on uneven number of runs
226        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
227
228        # Runs are unsorted as glob does not have a fixed order, sort them
229        # based on run number in tuple
230        runs.sort(key=lambda element: element[0])
231
232        # Find median for each step across runs
233        timestamps_by_step: List[List[float]] = []
234        for step_index in range(self._number_of_steps):
235            timestamps_by_step.append([])
236
237        for run in runs:
238            run_id = run[0]
239            timestamps = run[1]
240
241            # Do not process incomplete runs
242            msg = f'Number of steps ({self._number_of_steps}) does not ' + \
243                  'match with extracted steps of ' + \
244                  f'run ({len(timestamps)}). Skipping run {run_id}'
245            assert (len(timestamps) == self._number_of_steps), msg
246
247            # Create list of timestamps for each step from all runs
248            for step_index in range(self._number_of_steps):
249                timestamps_by_step[step_index].append(timestamps[step_index])
250
251        # Create a list of our steps with the run_id which has the median value
252        # for that step
253        aggregated_entries = []
254        summary_entries = []
255        index_number = 1
256        for step_index, step_timestamps in enumerate(timestamps_by_step):
257            # We ensure that the number of runs is always uneven so the median
258            # is always a measured data point instead of the average of 2 data
259            # points with even number of runs
260            median_run_id = timestamps_by_step[step_index] \
261                            .index(median(step_timestamps)) + 1
262            median_run_path = os.path.join(self._results_path,
263                                           f'run_{median_run_id}')
264            median_step_data = self._parse_v2(median_run_path,
265                                              step=step_index + 1)
266
267            # Rewrite indexes to match new number of samples
268            for entry in median_step_data:
269                entry['index'] = index_number
270
271                aggregated_entries.append(entry)
272                index_number += 1
273
274        # Summary data of a step: diff per step
275        for step_index, step_timestamps in enumerate(timestamps_by_step):
276            summary = {}
277            median_run_id = timestamps_by_step[step_index] \
278                .index(median(step_timestamps)) + 1
279            median_run_path = os.path.join(self._results_path,
280                                           f'run_{median_run_id}')
281            median_step_data = self._parse_v2(median_run_path,
282                                              step=step_index + 1)
283            for field in FIELDNAMES:
284                # Report max memory peak for this step
285                if 'memory' in field:
286                    values = []
287                    for data in median_step_data:
288                        values.append(data[field])
289                    summary[f'{field}_min'] = min(values)
290                    summary[f'{field}_max'] = max(values)
291                # Leave some fields like they are
292                elif field in ['version', 'step']:
293                    summary[field] = median_step_data[0][field]
294                # All other fields are accumulated data values for which we
295                # report the diff for the step
296                else:
297                    first = median_step_data[0][field]
298                    last = median_step_data[-1][field]
299                    diff = round(last - first, ROUND)
300                    if field == 'index':
301                        # diff will be 0 for 1 sample, but we have this sample,
302                        # so include it
303                        summary['number_of_samples'] = diff + 1
304                    elif field == 'timestamp':
305                        summary['duration'] = diff
306                    else:
307                        summary[f'{field}_diff'] = diff
308            summary_entries.append(summary)
309
310        aggregated_file = os.path.join(self._results_path,
311                                       METRICS_AGGREGATED_FILE_NAME)
312        summary_file = os.path.join(self._results_path,
313                                    METRICS_SUMMARY_FILE_NAME)
314
315        # Store aggregated data
316        with open(aggregated_file, 'w') as f:
317            writer = DictWriter(f, fieldnames=FIELDNAMES)
318            writer.writeheader()
319            for entry in aggregated_entries:
320                writer.writerow(entry)
321
322        # Store summary data
323        with open(summary_file, 'w') as f:
324            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
325            writer.writeheader()
326            for entry in summary_entries:
327                writer.writerow(entry)
328
329        return True
class Stats:
 81class Stats():
 82    """Generate statistics for an executed case."""
 83
 84    def __init__(self, results_path: str, number_of_steps: int,
 85                 directory: str, verbose: bool):
 86        """Create an instance of the Stats class.
 87
 88        Parameters
 89        ----------
 90        results_path : str
 91            The path to the results directory of the case
 92        number_of_steps : int
 93            The number of steps of the case
 94        directory : str
 95            The path to the directory where the logs must be stored.
 96        verbose : bool
 97            Enable verbose logs.
 98        """
 99        self._results_path = os.path.abspath(results_path)
100        self._number_of_steps = number_of_steps
101        self._logger = Logger(__name__, directory, verbose)
102
103        if not os.path.exists(results_path):
104            msg = f'Results do not exist: {results_path}'
105            self._logger.error(msg)
106            raise ValueError(msg)
107
108    def _parse_field(self, field, value):
109        """Parse the field of the metrics field in a Python data type."""
110        try:
111            if field in FIELDNAMES_FLOAT:
112                return float(value)
113            elif field in FIELDNAMES_INT:
114                return int(value)
115            else:
116                msg = f'Field "{field}" type is unknown'
117                self._logger.error(msg)
118                raise ValueError(msg)
119        except TypeError:
120            return -1
121
122    def _parse_v2(self, run_path, fields=FIELDNAMES, step=None):
123        """Parse the CSV metrics file in v2 format."""
124        data = []
125
126        metrics_file = os.path.join(run_path, METRICS_FILE_NAME)
127        if not os.path.exists(metrics_file):
128            self._logger.error(f'Metrics file "{metrics_file}" does not exist')
129            return []
130
131        # Filter the fields we want from above, this way we don't load all
132        # the data in memory during processing.
133        with open(metrics_file, 'r') as f:
134            reader = DictReader(f)
135            for line in reader:
136                corrupt: bool = False
137
138                # Skip steps we don't want to parse
139                if step is not None and \
140                   step != self._parse_field('step', line['step']):
141                    continue
142
143                # Filter on field names
144                filtered = {key: line[key] for key in fields}
145                entry = {}
146                for key, value in filtered.items():
147                    v = self._parse_field(key, value)
148                    if v == -1:
149                        corrupt = True
150                        msg = f'Corrupt entry {key} with value {value} in ' + \
151                              f'{metrics_file}, skipped'
152                        self._logger.info(msg)
153                        break
154
155                    entry[key] = v
156
157                if not corrupt:
158                    data.append(entry)
159
160        return data
161
162    def aggregate(self) -> bool:
163        """Aggregate the metrics of the different runs of a case.
164
165        Find the median execution time of each step across all runs and extract
166        the step from the run which has this median execution time to assemble
167        an aggregated version and summary version of the case's metrics.
168
169        Returns
170        -------
171        success : bool
172            Whether the aggregation was successfully or not.
173        """
174        # Find each median step of all runs before extracting more data for
175        # memory consumption reasons
176        runs = []
177        for run_path in glob(f'{self._results_path}/run_*/'):
178            # Extract run number
179            try:
180                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
181                run_id: int = int(run_folder.replace('run_', ''))
182            except ValueError:
183                self._logger.error(f'Run "{run_id}" is not a number')
184                return False
185
186            # Extract steps and timestamps of this run
187            data = self._parse_v2(run_path, fields=['step', 'timestamp'])
188
189            # Calculate timestamp diff for each step
190            step = 1
191            timestamps = []
192            step_end = 0.0
193            step_begin = 0.0
194            for entry in data:
195                entry_step = entry['step']
196                assert (entry_step >= step), 'Entry step decreased over time'
197
198                # Next step
199                if entry_step > step:
200                    # Calculate diff of current step if at least 2 entries
201                    # for the step exist, if not the diff is 0.0 and we fall
202                    # back to the step_begin timestamp which will make sure we
203                    # use the run with the timestamp that is the median of all
204                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
205                    # 5.0 is the median.
206                    diff = step_end - step_begin
207                    if diff == 0.0:
208                        self._logger.warning(f'Only 1 entry for step {step} '
209                                             f'found, falling back to median '
210                                             f'timestamp instead of diff')
211                        diff = step_begin
212
213                    timestamps.append(diff)
214
215                    # Reset for next step
216                    step = entry_step
217                    step_begin = entry['timestamp']
218                    step_end = entry['timestamp']
219                # step_end keeps increasing until the step changes
220                else:
221                    step_end = entry['timestamp']
222            # Final step does not cause an increment, add manually
223            timestamps.append(step_end - step_begin)
224            runs.append((run_id, timestamps))
225
226        # Statistics rely on uneven number of runs
227        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
228
229        # Runs are unsorted as glob does not have a fixed order, sort them
230        # based on run number in tuple
231        runs.sort(key=lambda element: element[0])
232
233        # Find median for each step across runs
234        timestamps_by_step: List[List[float]] = []
235        for step_index in range(self._number_of_steps):
236            timestamps_by_step.append([])
237
238        for run in runs:
239            run_id = run[0]
240            timestamps = run[1]
241
242            # Do not process incomplete runs
243            msg = f'Number of steps ({self._number_of_steps}) does not ' + \
244                  'match with extracted steps of ' + \
245                  f'run ({len(timestamps)}). Skipping run {run_id}'
246            assert (len(timestamps) == self._number_of_steps), msg
247
248            # Create list of timestamps for each step from all runs
249            for step_index in range(self._number_of_steps):
250                timestamps_by_step[step_index].append(timestamps[step_index])
251
252        # Create a list of our steps with the run_id which has the median value
253        # for that step
254        aggregated_entries = []
255        summary_entries = []
256        index_number = 1
257        for step_index, step_timestamps in enumerate(timestamps_by_step):
258            # We ensure that the number of runs is always uneven so the median
259            # is always a measured data point instead of the average of 2 data
260            # points with even number of runs
261            median_run_id = timestamps_by_step[step_index] \
262                            .index(median(step_timestamps)) + 1
263            median_run_path = os.path.join(self._results_path,
264                                           f'run_{median_run_id}')
265            median_step_data = self._parse_v2(median_run_path,
266                                              step=step_index + 1)
267
268            # Rewrite indexes to match new number of samples
269            for entry in median_step_data:
270                entry['index'] = index_number
271
272                aggregated_entries.append(entry)
273                index_number += 1
274
275        # Summary data of a step: diff per step
276        for step_index, step_timestamps in enumerate(timestamps_by_step):
277            summary = {}
278            median_run_id = timestamps_by_step[step_index] \
279                .index(median(step_timestamps)) + 1
280            median_run_path = os.path.join(self._results_path,
281                                           f'run_{median_run_id}')
282            median_step_data = self._parse_v2(median_run_path,
283                                              step=step_index + 1)
284            for field in FIELDNAMES:
285                # Report max memory peak for this step
286                if 'memory' in field:
287                    values = []
288                    for data in median_step_data:
289                        values.append(data[field])
290                    summary[f'{field}_min'] = min(values)
291                    summary[f'{field}_max'] = max(values)
292                # Leave some fields like they are
293                elif field in ['version', 'step']:
294                    summary[field] = median_step_data[0][field]
295                # All other fields are accumulated data values for which we
296                # report the diff for the step
297                else:
298                    first = median_step_data[0][field]
299                    last = median_step_data[-1][field]
300                    diff = round(last - first, ROUND)
301                    if field == 'index':
302                        # diff will be 0 for 1 sample, but we have this sample,
303                        # so include it
304                        summary['number_of_samples'] = diff + 1
305                    elif field == 'timestamp':
306                        summary['duration'] = diff
307                    else:
308                        summary[f'{field}_diff'] = diff
309            summary_entries.append(summary)
310
311        aggregated_file = os.path.join(self._results_path,
312                                       METRICS_AGGREGATED_FILE_NAME)
313        summary_file = os.path.join(self._results_path,
314                                    METRICS_SUMMARY_FILE_NAME)
315
316        # Store aggregated data
317        with open(aggregated_file, 'w') as f:
318            writer = DictWriter(f, fieldnames=FIELDNAMES)
319            writer.writeheader()
320            for entry in aggregated_entries:
321                writer.writerow(entry)
322
323        # Store summary data
324        with open(summary_file, 'w') as f:
325            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
326            writer.writeheader()
327            for entry in summary_entries:
328                writer.writerow(entry)
329
330        return True

Generate statistics for an executed case.

Stats( results_path: str, number_of_steps: int, directory: str, verbose: bool)
 84    def __init__(self, results_path: str, number_of_steps: int,
 85                 directory: str, verbose: bool):
 86        """Create an instance of the Stats class.
 87
 88        Parameters
 89        ----------
 90        results_path : str
 91            The path to the results directory of the case
 92        number_of_steps : int
 93            The number of steps of the case
 94        directory : str
 95            The path to the directory where the logs must be stored.
 96        verbose : bool
 97            Enable verbose logs.
 98        """
 99        self._results_path = os.path.abspath(results_path)
100        self._number_of_steps = number_of_steps
101        self._logger = Logger(__name__, directory, verbose)
102
103        if not os.path.exists(results_path):
104            msg = f'Results do not exist: {results_path}'
105            self._logger.error(msg)
106            raise ValueError(msg)

Create an instance of the Stats class.

Parameters
  • results_path (str): The path to the results directory of the case
  • number_of_steps (int): The number of steps of the case
  • directory (str): The path to the directory where the logs must be stored.
  • verbose (bool): Enable verbose logs.
def aggregate(self) -> bool:
162    def aggregate(self) -> bool:
163        """Aggregate the metrics of the different runs of a case.
164
165        Find the median execution time of each step across all runs and extract
166        the step from the run which has this median execution time to assemble
167        an aggregated version and summary version of the case's metrics.
168
169        Returns
170        -------
171        success : bool
172            Whether the aggregation was successfully or not.
173        """
174        # Find each median step of all runs before extracting more data for
175        # memory consumption reasons
176        runs = []
177        for run_path in glob(f'{self._results_path}/run_*/'):
178            # Extract run number
179            try:
180                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
181                run_id: int = int(run_folder.replace('run_', ''))
182            except ValueError:
183                self._logger.error(f'Run "{run_id}" is not a number')
184                return False
185
186            # Extract steps and timestamps of this run
187            data = self._parse_v2(run_path, fields=['step', 'timestamp'])
188
189            # Calculate timestamp diff for each step
190            step = 1
191            timestamps = []
192            step_end = 0.0
193            step_begin = 0.0
194            for entry in data:
195                entry_step = entry['step']
196                assert (entry_step >= step), 'Entry step decreased over time'
197
198                # Next step
199                if entry_step > step:
200                    # Calculate diff of current step if at least 2 entries
201                    # for the step exist, if not the diff is 0.0 and we fall
202                    # back to the step_begin timestamp which will make sure we
203                    # use the run with the timestamp that is the median of all
204                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
205                    # 5.0 is the median.
206                    diff = step_end - step_begin
207                    if diff == 0.0:
208                        self._logger.warning(f'Only 1 entry for step {step} '
209                                             f'found, falling back to median '
210                                             f'timestamp instead of diff')
211                        diff = step_begin
212
213                    timestamps.append(diff)
214
215                    # Reset for next step
216                    step = entry_step
217                    step_begin = entry['timestamp']
218                    step_end = entry['timestamp']
219                # step_end keeps increasing until the step changes
220                else:
221                    step_end = entry['timestamp']
222            # Final step does not cause an increment, add manually
223            timestamps.append(step_end - step_begin)
224            runs.append((run_id, timestamps))
225
226        # Statistics rely on uneven number of runs
227        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
228
229        # Runs are unsorted as glob does not have a fixed order, sort them
230        # based on run number in tuple
231        runs.sort(key=lambda element: element[0])
232
233        # Find median for each step across runs
234        timestamps_by_step: List[List[float]] = []
235        for step_index in range(self._number_of_steps):
236            timestamps_by_step.append([])
237
238        for run in runs:
239            run_id = run[0]
240            timestamps = run[1]
241
242            # Do not process incomplete runs
243            msg = f'Number of steps ({self._number_of_steps}) does not ' + \
244                  'match with extracted steps of ' + \
245                  f'run ({len(timestamps)}). Skipping run {run_id}'
246            assert (len(timestamps) == self._number_of_steps), msg
247
248            # Create list of timestamps for each step from all runs
249            for step_index in range(self._number_of_steps):
250                timestamps_by_step[step_index].append(timestamps[step_index])
251
252        # Create a list of our steps with the run_id which has the median value
253        # for that step
254        aggregated_entries = []
255        summary_entries = []
256        index_number = 1
257        for step_index, step_timestamps in enumerate(timestamps_by_step):
258            # We ensure that the number of runs is always uneven so the median
259            # is always a measured data point instead of the average of 2 data
260            # points with even number of runs
261            median_run_id = timestamps_by_step[step_index] \
262                            .index(median(step_timestamps)) + 1
263            median_run_path = os.path.join(self._results_path,
264                                           f'run_{median_run_id}')
265            median_step_data = self._parse_v2(median_run_path,
266                                              step=step_index + 1)
267
268            # Rewrite indexes to match new number of samples
269            for entry in median_step_data:
270                entry['index'] = index_number
271
272                aggregated_entries.append(entry)
273                index_number += 1
274
275        # Summary data of a step: diff per step
276        for step_index, step_timestamps in enumerate(timestamps_by_step):
277            summary = {}
278            median_run_id = timestamps_by_step[step_index] \
279                .index(median(step_timestamps)) + 1
280            median_run_path = os.path.join(self._results_path,
281                                           f'run_{median_run_id}')
282            median_step_data = self._parse_v2(median_run_path,
283                                              step=step_index + 1)
284            for field in FIELDNAMES:
285                # Report max memory peak for this step
286                if 'memory' in field:
287                    values = []
288                    for data in median_step_data:
289                        values.append(data[field])
290                    summary[f'{field}_min'] = min(values)
291                    summary[f'{field}_max'] = max(values)
292                # Leave some fields like they are
293                elif field in ['version', 'step']:
294                    summary[field] = median_step_data[0][field]
295                # All other fields are accumulated data values for which we
296                # report the diff for the step
297                else:
298                    first = median_step_data[0][field]
299                    last = median_step_data[-1][field]
300                    diff = round(last - first, ROUND)
301                    if field == 'index':
302                        # diff will be 0 for 1 sample, but we have this sample,
303                        # so include it
304                        summary['number_of_samples'] = diff + 1
305                    elif field == 'timestamp':
306                        summary['duration'] = diff
307                    else:
308                        summary[f'{field}_diff'] = diff
309            summary_entries.append(summary)
310
311        aggregated_file = os.path.join(self._results_path,
312                                       METRICS_AGGREGATED_FILE_NAME)
313        summary_file = os.path.join(self._results_path,
314                                    METRICS_SUMMARY_FILE_NAME)
315
316        # Store aggregated data
317        with open(aggregated_file, 'w') as f:
318            writer = DictWriter(f, fieldnames=FIELDNAMES)
319            writer.writeheader()
320            for entry in aggregated_entries:
321                writer.writerow(entry)
322
323        # Store summary data
324        with open(summary_file, 'w') as f:
325            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
326            writer.writeheader()
327            for entry in summary_entries:
328                writer.writerow(entry)
329
330        return True

Aggregate the metrics of the different runs of a case.

Find the median execution time of each step across all runs and extract the step from the run which has this median execution time to assemble an aggregated version and summary version of the case's metrics.

Returns
  • success (bool): Whether the aggregation was successfully or not.