bench_executor.stats
This module holds the Stats class which is responsible for generating
staticstics from executed cases. It will automatically aggregate all runs of an
executed case to generate an aggregated.csv
and summary.csv
files which can
be used to compare various cases with each other.
aggregated.csv
: For each run of a case, the median execution time of each step is calculated. For each step, the results of the run with the median execution time is used to assemble the aggregated results.summary.csv
: The summary is similar to the previous file, but provides a single result for each step to immediately see how long the step took, how many samples are provided for the step, etc.
1#!/usr/bin/env python3 2""" 3This module holds the Stats class which is responsible for generating 4staticstics from executed cases. It will automatically aggregate all runs of an 5executed case to generate an `aggregated.csv` and `summary.csv` files which can 6be used to compare various cases with each other. 7 8- `aggregated.csv`: For each run of a case, the median execution time of each 9 step is calculated. For each step, the results of the run with the median 10 execution time is used to assemble the aggregated results. 11- `summary.csv`: The summary is similar to the previous file, but provides a 12 single result for each step to immediately see how long the step took, how 13 many samples are provided for the step, etc. 14""" 15 16import os 17from glob import glob 18from statistics import median 19from csv import DictWriter, DictReader 20from typing import List 21from bench_executor.collector import FIELDNAMES, METRICS_FILE_NAME 22from bench_executor.logger import Logger 23 24METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv' 25METRICS_SUMMARY_FILE_NAME = 'summary.csv' 26FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle', 27 'cpu_iowait', 'cpu_user_system'] 28FIELDNAMES_INT = ['index', 'step', 'version', 'memory_ram', 'memory_swap', 29 'memory_ram_swap', 'disk_read_count', 'disk_write_count', 30 'disk_read_bytes', 'disk_write_bytes', 'disk_read_time', 31 'disk_write_time', 'disk_busy_time', 32 'network_received_count', 'network_sent_count', 33 'network_received_bytes', 'network_sent_bytes', 34 'network_received_error', 'network_sent_error', 35 'network_received_drop', 'network_sent_drop'] 36FIELDNAMES_SUMMARY = [ 37 'number_of_samples', 38 'step', 39 'duration', 40 'version', 41 'cpu_user_diff', 42 'cpu_system_diff', 43 'cpu_user_system_diff', 44 'cpu_idle_diff', 45 'cpu_iowait_diff', 46 'memory_ram_max', 47 'memory_swap_max', 48 'memory_ram_swap_max', 49 'memory_ram_min', 50 'memory_swap_min', 51 'memory_ram_swap_min', 52 'disk_read_count_diff', 53 'disk_write_count_diff', 54 'disk_read_bytes_diff', 55 'disk_write_bytes_diff', 56 'disk_read_time_diff', 57 'disk_write_time_diff', 58 'disk_busy_time_diff', 59 'network_received_count_diff', 60 'network_sent_count_diff', 61 'network_received_bytes_diff', 62 'network_sent_bytes_diff', 63 'network_received_error_diff', 64 'network_sent_error_diff', 65 'network_received_drop_diff', 66 'network_sent_drop_diff' 67] 68ROUND = 4 69 70# 71# Generate stats from the result runs by aggregating it on 72# median execution time for each step. Processing is done for each step per 73# run and unnecessary values are skipped to reduce the memory consumption. 74# 75# The median run is available in 'aggregated.csv' while a summarized version 76# which only reports the diff or max value of each step in 'summary.csv' 77# 78 79 80class Stats(): 81 """Generate statistics for an executed case.""" 82 83 def __init__(self, results_path: str, number_of_steps: int, 84 directory: str, verbose: bool): 85 """Create an instance of the Stats class. 86 87 Parameters 88 ---------- 89 results_path : str 90 The path to the results directory of the case 91 number_of_steps : int 92 The number of steps of the case 93 directory : str 94 The path to the directory where the logs must be stored. 95 verbose : bool 96 Enable verbose logs. 97 """ 98 self._results_path = os.path.abspath(results_path) 99 self._number_of_steps = number_of_steps 100 self._logger = Logger(__name__, directory, verbose) 101 102 if not os.path.exists(results_path): 103 msg = f'Results do not exist: {results_path}' 104 self._logger.error(msg) 105 raise ValueError(msg) 106 107 def _parse_field(self, field, value): 108 """Parse the field of the metrics field in a Python data type.""" 109 try: 110 if field in FIELDNAMES_FLOAT: 111 return float(value) 112 elif field in FIELDNAMES_INT: 113 return int(value) 114 else: 115 msg = f'Field "{field}" type is unknown' 116 self._logger.error(msg) 117 raise ValueError(msg) 118 except TypeError: 119 return -1 120 121 def _parse_v2(self, run_path, fields=FIELDNAMES, step=None): 122 """Parse the CSV metrics file in v2 format.""" 123 data = [] 124 125 metrics_file = os.path.join(run_path, METRICS_FILE_NAME) 126 if not os.path.exists(metrics_file): 127 self._logger.error(f'Metrics file "{metrics_file}" does not exist') 128 return [] 129 130 # Filter the fields we want from above, this way we don't load all 131 # the data in memory during processing. 132 with open(metrics_file, 'r') as f: 133 reader = DictReader(f) 134 for line in reader: 135 corrupt: bool = False 136 137 # Skip steps we don't want to parse 138 if step is not None and \ 139 step != self._parse_field('step', line['step']): 140 continue 141 142 # Filter on field names 143 filtered = {key: line[key] for key in fields} 144 entry = {} 145 for key, value in filtered.items(): 146 v = self._parse_field(key, value) 147 if v == -1: 148 corrupt = True 149 msg = f'Corrupt entry {key} with value {value} in ' + \ 150 f'{metrics_file}, skipped' 151 self._logger.info(msg) 152 break 153 154 entry[key] = v 155 156 if not corrupt: 157 data.append(entry) 158 159 return data 160 161 def aggregate(self) -> bool: 162 """Aggregate the metrics of the different runs of a case. 163 164 Find the median execution time of each step across all runs and extract 165 the step from the run which has this median execution time to assemble 166 an aggregated version and summary version of the case's metrics. 167 168 Returns 169 ------- 170 success : bool 171 Whether the aggregation was successfully or not. 172 """ 173 # Find each median step of all runs before extracting more data for 174 # memory consumption reasons 175 runs = [] 176 for run_path in glob(f'{self._results_path}/run_*/'): 177 # Extract run number 178 try: 179 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 180 run_id: int = int(run_folder.replace('run_', '')) 181 except ValueError: 182 self._logger.error(f'Run "{run_id}" is not a number') 183 return False 184 185 # Extract steps and timestamps of this run 186 data = self._parse_v2(run_path, fields=['step', 'timestamp']) 187 188 # Calculate timestamp diff for each step 189 step = 1 190 timestamps = [] 191 step_end = 0.0 192 step_begin = 0.0 193 for entry in data: 194 entry_step = entry['step'] 195 assert (entry_step >= step), 'Entry step decreased over time' 196 197 # Next step 198 if entry_step > step: 199 # Calculate diff of current step if at least 2 entries 200 # for the step exist, if not the diff is 0.0 and we fall 201 # back to the step_begin timestamp which will make sure we 202 # use the run with the timestamp that is the median of all 203 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 204 # 5.0 is the median. 205 diff = step_end - step_begin 206 if diff == 0.0: 207 self._logger.warning(f'Only 1 entry for step {step} ' 208 f'found, falling back to median ' 209 f'timestamp instead of diff') 210 diff = step_begin 211 212 timestamps.append(diff) 213 214 # Reset for next step 215 step = entry_step 216 step_begin = entry['timestamp'] 217 step_end = entry['timestamp'] 218 # step_end keeps increasing until the step changes 219 else: 220 step_end = entry['timestamp'] 221 # Final step does not cause an increment, add manually 222 timestamps.append(step_end - step_begin) 223 runs.append((run_id, timestamps)) 224 225 # Statistics rely on uneven number of runs 226 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 227 228 # Runs are unsorted as glob does not have a fixed order, sort them 229 # based on run number in tuple 230 runs.sort(key=lambda element: element[0]) 231 232 # Find median for each step across runs 233 timestamps_by_step: List[List[float]] = [] 234 for step_index in range(self._number_of_steps): 235 timestamps_by_step.append([]) 236 237 for run in runs: 238 run_id = run[0] 239 timestamps = run[1] 240 241 # Do not process incomplete runs 242 msg = f'Number of steps ({self._number_of_steps}) does not ' + \ 243 'match with extracted steps of ' + \ 244 f'run ({len(timestamps)}). Skipping run {run_id}' 245 assert (len(timestamps) == self._number_of_steps), msg 246 247 # Create list of timestamps for each step from all runs 248 for step_index in range(self._number_of_steps): 249 timestamps_by_step[step_index].append(timestamps[step_index]) 250 251 # Create a list of our steps with the run_id which has the median value 252 # for that step 253 aggregated_entries = [] 254 summary_entries = [] 255 index_number = 1 256 for step_index, step_timestamps in enumerate(timestamps_by_step): 257 # We ensure that the number of runs is always uneven so the median 258 # is always a measured data point instead of the average of 2 data 259 # points with even number of runs 260 median_run_id = timestamps_by_step[step_index] \ 261 .index(median(step_timestamps)) + 1 262 median_run_path = os.path.join(self._results_path, 263 f'run_{median_run_id}') 264 median_step_data = self._parse_v2(median_run_path, 265 step=step_index + 1) 266 267 # Rewrite indexes to match new number of samples 268 for entry in median_step_data: 269 entry['index'] = index_number 270 271 aggregated_entries.append(entry) 272 index_number += 1 273 274 # Summary data of a step: diff per step 275 for step_index, step_timestamps in enumerate(timestamps_by_step): 276 summary = {} 277 median_run_id = timestamps_by_step[step_index] \ 278 .index(median(step_timestamps)) + 1 279 median_run_path = os.path.join(self._results_path, 280 f'run_{median_run_id}') 281 median_step_data = self._parse_v2(median_run_path, 282 step=step_index + 1) 283 for field in FIELDNAMES: 284 # Report max memory peak for this step 285 if 'memory' in field: 286 values = [] 287 for data in median_step_data: 288 values.append(data[field]) 289 summary[f'{field}_min'] = min(values) 290 summary[f'{field}_max'] = max(values) 291 # Leave some fields like they are 292 elif field in ['version', 'step']: 293 summary[field] = median_step_data[0][field] 294 # All other fields are accumulated data values for which we 295 # report the diff for the step 296 else: 297 first = median_step_data[0][field] 298 last = median_step_data[-1][field] 299 diff = round(last - first, ROUND) 300 if field == 'index': 301 # diff will be 0 for 1 sample, but we have this sample, 302 # so include it 303 summary['number_of_samples'] = diff + 1 304 elif field == 'timestamp': 305 summary['duration'] = diff 306 else: 307 summary[f'{field}_diff'] = diff 308 summary_entries.append(summary) 309 310 aggregated_file = os.path.join(self._results_path, 311 METRICS_AGGREGATED_FILE_NAME) 312 summary_file = os.path.join(self._results_path, 313 METRICS_SUMMARY_FILE_NAME) 314 315 # Store aggregated data 316 with open(aggregated_file, 'w') as f: 317 writer = DictWriter(f, fieldnames=FIELDNAMES) 318 writer.writeheader() 319 for entry in aggregated_entries: 320 writer.writerow(entry) 321 322 # Store summary data 323 with open(summary_file, 'w') as f: 324 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 325 writer.writeheader() 326 for entry in summary_entries: 327 writer.writerow(entry) 328 329 return True
class
Stats:
81class Stats(): 82 """Generate statistics for an executed case.""" 83 84 def __init__(self, results_path: str, number_of_steps: int, 85 directory: str, verbose: bool): 86 """Create an instance of the Stats class. 87 88 Parameters 89 ---------- 90 results_path : str 91 The path to the results directory of the case 92 number_of_steps : int 93 The number of steps of the case 94 directory : str 95 The path to the directory where the logs must be stored. 96 verbose : bool 97 Enable verbose logs. 98 """ 99 self._results_path = os.path.abspath(results_path) 100 self._number_of_steps = number_of_steps 101 self._logger = Logger(__name__, directory, verbose) 102 103 if not os.path.exists(results_path): 104 msg = f'Results do not exist: {results_path}' 105 self._logger.error(msg) 106 raise ValueError(msg) 107 108 def _parse_field(self, field, value): 109 """Parse the field of the metrics field in a Python data type.""" 110 try: 111 if field in FIELDNAMES_FLOAT: 112 return float(value) 113 elif field in FIELDNAMES_INT: 114 return int(value) 115 else: 116 msg = f'Field "{field}" type is unknown' 117 self._logger.error(msg) 118 raise ValueError(msg) 119 except TypeError: 120 return -1 121 122 def _parse_v2(self, run_path, fields=FIELDNAMES, step=None): 123 """Parse the CSV metrics file in v2 format.""" 124 data = [] 125 126 metrics_file = os.path.join(run_path, METRICS_FILE_NAME) 127 if not os.path.exists(metrics_file): 128 self._logger.error(f'Metrics file "{metrics_file}" does not exist') 129 return [] 130 131 # Filter the fields we want from above, this way we don't load all 132 # the data in memory during processing. 133 with open(metrics_file, 'r') as f: 134 reader = DictReader(f) 135 for line in reader: 136 corrupt: bool = False 137 138 # Skip steps we don't want to parse 139 if step is not None and \ 140 step != self._parse_field('step', line['step']): 141 continue 142 143 # Filter on field names 144 filtered = {key: line[key] for key in fields} 145 entry = {} 146 for key, value in filtered.items(): 147 v = self._parse_field(key, value) 148 if v == -1: 149 corrupt = True 150 msg = f'Corrupt entry {key} with value {value} in ' + \ 151 f'{metrics_file}, skipped' 152 self._logger.info(msg) 153 break 154 155 entry[key] = v 156 157 if not corrupt: 158 data.append(entry) 159 160 return data 161 162 def aggregate(self) -> bool: 163 """Aggregate the metrics of the different runs of a case. 164 165 Find the median execution time of each step across all runs and extract 166 the step from the run which has this median execution time to assemble 167 an aggregated version and summary version of the case's metrics. 168 169 Returns 170 ------- 171 success : bool 172 Whether the aggregation was successfully or not. 173 """ 174 # Find each median step of all runs before extracting more data for 175 # memory consumption reasons 176 runs = [] 177 for run_path in glob(f'{self._results_path}/run_*/'): 178 # Extract run number 179 try: 180 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 181 run_id: int = int(run_folder.replace('run_', '')) 182 except ValueError: 183 self._logger.error(f'Run "{run_id}" is not a number') 184 return False 185 186 # Extract steps and timestamps of this run 187 data = self._parse_v2(run_path, fields=['step', 'timestamp']) 188 189 # Calculate timestamp diff for each step 190 step = 1 191 timestamps = [] 192 step_end = 0.0 193 step_begin = 0.0 194 for entry in data: 195 entry_step = entry['step'] 196 assert (entry_step >= step), 'Entry step decreased over time' 197 198 # Next step 199 if entry_step > step: 200 # Calculate diff of current step if at least 2 entries 201 # for the step exist, if not the diff is 0.0 and we fall 202 # back to the step_begin timestamp which will make sure we 203 # use the run with the timestamp that is the median of all 204 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 205 # 5.0 is the median. 206 diff = step_end - step_begin 207 if diff == 0.0: 208 self._logger.warning(f'Only 1 entry for step {step} ' 209 f'found, falling back to median ' 210 f'timestamp instead of diff') 211 diff = step_begin 212 213 timestamps.append(diff) 214 215 # Reset for next step 216 step = entry_step 217 step_begin = entry['timestamp'] 218 step_end = entry['timestamp'] 219 # step_end keeps increasing until the step changes 220 else: 221 step_end = entry['timestamp'] 222 # Final step does not cause an increment, add manually 223 timestamps.append(step_end - step_begin) 224 runs.append((run_id, timestamps)) 225 226 # Statistics rely on uneven number of runs 227 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 228 229 # Runs are unsorted as glob does not have a fixed order, sort them 230 # based on run number in tuple 231 runs.sort(key=lambda element: element[0]) 232 233 # Find median for each step across runs 234 timestamps_by_step: List[List[float]] = [] 235 for step_index in range(self._number_of_steps): 236 timestamps_by_step.append([]) 237 238 for run in runs: 239 run_id = run[0] 240 timestamps = run[1] 241 242 # Do not process incomplete runs 243 msg = f'Number of steps ({self._number_of_steps}) does not ' + \ 244 'match with extracted steps of ' + \ 245 f'run ({len(timestamps)}). Skipping run {run_id}' 246 assert (len(timestamps) == self._number_of_steps), msg 247 248 # Create list of timestamps for each step from all runs 249 for step_index in range(self._number_of_steps): 250 timestamps_by_step[step_index].append(timestamps[step_index]) 251 252 # Create a list of our steps with the run_id which has the median value 253 # for that step 254 aggregated_entries = [] 255 summary_entries = [] 256 index_number = 1 257 for step_index, step_timestamps in enumerate(timestamps_by_step): 258 # We ensure that the number of runs is always uneven so the median 259 # is always a measured data point instead of the average of 2 data 260 # points with even number of runs 261 median_run_id = timestamps_by_step[step_index] \ 262 .index(median(step_timestamps)) + 1 263 median_run_path = os.path.join(self._results_path, 264 f'run_{median_run_id}') 265 median_step_data = self._parse_v2(median_run_path, 266 step=step_index + 1) 267 268 # Rewrite indexes to match new number of samples 269 for entry in median_step_data: 270 entry['index'] = index_number 271 272 aggregated_entries.append(entry) 273 index_number += 1 274 275 # Summary data of a step: diff per step 276 for step_index, step_timestamps in enumerate(timestamps_by_step): 277 summary = {} 278 median_run_id = timestamps_by_step[step_index] \ 279 .index(median(step_timestamps)) + 1 280 median_run_path = os.path.join(self._results_path, 281 f'run_{median_run_id}') 282 median_step_data = self._parse_v2(median_run_path, 283 step=step_index + 1) 284 for field in FIELDNAMES: 285 # Report max memory peak for this step 286 if 'memory' in field: 287 values = [] 288 for data in median_step_data: 289 values.append(data[field]) 290 summary[f'{field}_min'] = min(values) 291 summary[f'{field}_max'] = max(values) 292 # Leave some fields like they are 293 elif field in ['version', 'step']: 294 summary[field] = median_step_data[0][field] 295 # All other fields are accumulated data values for which we 296 # report the diff for the step 297 else: 298 first = median_step_data[0][field] 299 last = median_step_data[-1][field] 300 diff = round(last - first, ROUND) 301 if field == 'index': 302 # diff will be 0 for 1 sample, but we have this sample, 303 # so include it 304 summary['number_of_samples'] = diff + 1 305 elif field == 'timestamp': 306 summary['duration'] = diff 307 else: 308 summary[f'{field}_diff'] = diff 309 summary_entries.append(summary) 310 311 aggregated_file = os.path.join(self._results_path, 312 METRICS_AGGREGATED_FILE_NAME) 313 summary_file = os.path.join(self._results_path, 314 METRICS_SUMMARY_FILE_NAME) 315 316 # Store aggregated data 317 with open(aggregated_file, 'w') as f: 318 writer = DictWriter(f, fieldnames=FIELDNAMES) 319 writer.writeheader() 320 for entry in aggregated_entries: 321 writer.writerow(entry) 322 323 # Store summary data 324 with open(summary_file, 'w') as f: 325 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 326 writer.writeheader() 327 for entry in summary_entries: 328 writer.writerow(entry) 329 330 return True
Generate statistics for an executed case.
Stats( results_path: str, number_of_steps: int, directory: str, verbose: bool)
84 def __init__(self, results_path: str, number_of_steps: int, 85 directory: str, verbose: bool): 86 """Create an instance of the Stats class. 87 88 Parameters 89 ---------- 90 results_path : str 91 The path to the results directory of the case 92 number_of_steps : int 93 The number of steps of the case 94 directory : str 95 The path to the directory where the logs must be stored. 96 verbose : bool 97 Enable verbose logs. 98 """ 99 self._results_path = os.path.abspath(results_path) 100 self._number_of_steps = number_of_steps 101 self._logger = Logger(__name__, directory, verbose) 102 103 if not os.path.exists(results_path): 104 msg = f'Results do not exist: {results_path}' 105 self._logger.error(msg) 106 raise ValueError(msg)
Create an instance of the Stats class.
Parameters
- results_path (str): The path to the results directory of the case
- number_of_steps (int): The number of steps of the case
- directory (str): The path to the directory where the logs must be stored.
- verbose (bool): Enable verbose logs.
def
aggregate(self) -> bool:
162 def aggregate(self) -> bool: 163 """Aggregate the metrics of the different runs of a case. 164 165 Find the median execution time of each step across all runs and extract 166 the step from the run which has this median execution time to assemble 167 an aggregated version and summary version of the case's metrics. 168 169 Returns 170 ------- 171 success : bool 172 Whether the aggregation was successfully or not. 173 """ 174 # Find each median step of all runs before extracting more data for 175 # memory consumption reasons 176 runs = [] 177 for run_path in glob(f'{self._results_path}/run_*/'): 178 # Extract run number 179 try: 180 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 181 run_id: int = int(run_folder.replace('run_', '')) 182 except ValueError: 183 self._logger.error(f'Run "{run_id}" is not a number') 184 return False 185 186 # Extract steps and timestamps of this run 187 data = self._parse_v2(run_path, fields=['step', 'timestamp']) 188 189 # Calculate timestamp diff for each step 190 step = 1 191 timestamps = [] 192 step_end = 0.0 193 step_begin = 0.0 194 for entry in data: 195 entry_step = entry['step'] 196 assert (entry_step >= step), 'Entry step decreased over time' 197 198 # Next step 199 if entry_step > step: 200 # Calculate diff of current step if at least 2 entries 201 # for the step exist, if not the diff is 0.0 and we fall 202 # back to the step_begin timestamp which will make sure we 203 # use the run with the timestamp that is the median of all 204 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 205 # 5.0 is the median. 206 diff = step_end - step_begin 207 if diff == 0.0: 208 self._logger.warning(f'Only 1 entry for step {step} ' 209 f'found, falling back to median ' 210 f'timestamp instead of diff') 211 diff = step_begin 212 213 timestamps.append(diff) 214 215 # Reset for next step 216 step = entry_step 217 step_begin = entry['timestamp'] 218 step_end = entry['timestamp'] 219 # step_end keeps increasing until the step changes 220 else: 221 step_end = entry['timestamp'] 222 # Final step does not cause an increment, add manually 223 timestamps.append(step_end - step_begin) 224 runs.append((run_id, timestamps)) 225 226 # Statistics rely on uneven number of runs 227 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 228 229 # Runs are unsorted as glob does not have a fixed order, sort them 230 # based on run number in tuple 231 runs.sort(key=lambda element: element[0]) 232 233 # Find median for each step across runs 234 timestamps_by_step: List[List[float]] = [] 235 for step_index in range(self._number_of_steps): 236 timestamps_by_step.append([]) 237 238 for run in runs: 239 run_id = run[0] 240 timestamps = run[1] 241 242 # Do not process incomplete runs 243 msg = f'Number of steps ({self._number_of_steps}) does not ' + \ 244 'match with extracted steps of ' + \ 245 f'run ({len(timestamps)}). Skipping run {run_id}' 246 assert (len(timestamps) == self._number_of_steps), msg 247 248 # Create list of timestamps for each step from all runs 249 for step_index in range(self._number_of_steps): 250 timestamps_by_step[step_index].append(timestamps[step_index]) 251 252 # Create a list of our steps with the run_id which has the median value 253 # for that step 254 aggregated_entries = [] 255 summary_entries = [] 256 index_number = 1 257 for step_index, step_timestamps in enumerate(timestamps_by_step): 258 # We ensure that the number of runs is always uneven so the median 259 # is always a measured data point instead of the average of 2 data 260 # points with even number of runs 261 median_run_id = timestamps_by_step[step_index] \ 262 .index(median(step_timestamps)) + 1 263 median_run_path = os.path.join(self._results_path, 264 f'run_{median_run_id}') 265 median_step_data = self._parse_v2(median_run_path, 266 step=step_index + 1) 267 268 # Rewrite indexes to match new number of samples 269 for entry in median_step_data: 270 entry['index'] = index_number 271 272 aggregated_entries.append(entry) 273 index_number += 1 274 275 # Summary data of a step: diff per step 276 for step_index, step_timestamps in enumerate(timestamps_by_step): 277 summary = {} 278 median_run_id = timestamps_by_step[step_index] \ 279 .index(median(step_timestamps)) + 1 280 median_run_path = os.path.join(self._results_path, 281 f'run_{median_run_id}') 282 median_step_data = self._parse_v2(median_run_path, 283 step=step_index + 1) 284 for field in FIELDNAMES: 285 # Report max memory peak for this step 286 if 'memory' in field: 287 values = [] 288 for data in median_step_data: 289 values.append(data[field]) 290 summary[f'{field}_min'] = min(values) 291 summary[f'{field}_max'] = max(values) 292 # Leave some fields like they are 293 elif field in ['version', 'step']: 294 summary[field] = median_step_data[0][field] 295 # All other fields are accumulated data values for which we 296 # report the diff for the step 297 else: 298 first = median_step_data[0][field] 299 last = median_step_data[-1][field] 300 diff = round(last - first, ROUND) 301 if field == 'index': 302 # diff will be 0 for 1 sample, but we have this sample, 303 # so include it 304 summary['number_of_samples'] = diff + 1 305 elif field == 'timestamp': 306 summary['duration'] = diff 307 else: 308 summary[f'{field}_diff'] = diff 309 summary_entries.append(summary) 310 311 aggregated_file = os.path.join(self._results_path, 312 METRICS_AGGREGATED_FILE_NAME) 313 summary_file = os.path.join(self._results_path, 314 METRICS_SUMMARY_FILE_NAME) 315 316 # Store aggregated data 317 with open(aggregated_file, 'w') as f: 318 writer = DictWriter(f, fieldnames=FIELDNAMES) 319 writer.writeheader() 320 for entry in aggregated_entries: 321 writer.writerow(entry) 322 323 # Store summary data 324 with open(summary_file, 'w') as f: 325 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 326 writer.writeheader() 327 for entry in summary_entries: 328 writer.writerow(entry) 329 330 return True
Aggregate the metrics of the different runs of a case.
Find the median execution time of each step across all runs and extract the step from the run which has this median execution time to assemble an aggregated version and summary version of the case's metrics.
Returns
- success (bool): Whether the aggregation was successfully or not.