bench_executor.collector
This module holds the Collector class which is responsible for collecting metrics during the execution of a case. It also collects hardware information for provenance reasons when comparing results from cases.
The following metrics are collected:
General
index
: incremental index for each collected sample.step
: Number of the step of a collected sample.timestamp
: The time when the sample was collected.version
: format version of the collected version, currently v2.
CPU
cpu_user
: CPU time spent in userspace.cpu_system
: CPU time spent in kernelspace.cpu_user_system
: sum of CPU time userspace and kernelspace.cpu_idle
: CPU time spent in idle mode.cpu_iowait
: Time that the CPU has to wait for IO operations to complete.
Memory
memory_ram
: Amount of RAM memory in use.memory_swap
: Amount of SWAP memory in use.memory_ram_swap
: Sum of the RAM and SWAP memory in use.
Disk
disk_read_count
: Number of disk reads.disk_write_count
: Number of disk writes.disk_read_bytes
: Number of bytes read from disk.disk_write_bytes
: Number of bytes written to disk.disk_read_time
: Time spent to read from disk.disk_write_time
: Time spent to write to disk.disk_busy_time
: Time that the disk is busy and all actions are pending.
Network
network_received_count
: Number of network packets received.network_sent_count
: Number of network packets sent.network_received_bytes
: Number of bytes received over network.network_sent_bytes
: Number of bytes sent over network.network_received_error
: Number of errors occured during receiving over network.network_sent_error
: Number of errors occured during sending over network.network_received_drop
: Number of packets dropped during receiving over network.network_sent_drop
: Number of packets dropped during sending over network.
1#!/usr/bin/env python3 2""" 3This module holds the Collector class which is responsible for collecting 4metrics during the execution of a case. It also collects hardware information 5for provenance reasons when comparing results from cases. 6 7The following metrics are collected: 8 9**General** 10- `index`: incremental index for each collected sample. 11- `step`: Number of the step of a collected sample. 12- `timestamp`: The time when the sample was collected. 13- `version`: format version of the collected version, currently v2. 14 15**CPU** 16- `cpu_user`: CPU time spent in userspace. 17- `cpu_system`: CPU time spent in kernelspace. 18- `cpu_user_system`: sum of CPU time userspace and kernelspace. 19- `cpu_idle`: CPU time spent in idle mode. 20- `cpu_iowait`: Time that the CPU has to wait for IO operations to complete. 21 22**Memory** 23- `memory_ram`: Amount of RAM memory in use. 24- `memory_swap`: Amount of SWAP memory in use. 25- `memory_ram_swap`: Sum of the RAM and SWAP memory in use. 26 27**Disk** 28- `disk_read_count`: Number of disk reads. 29- `disk_write_count`: Number of disk writes. 30- `disk_read_bytes`: Number of bytes read from disk. 31- `disk_write_bytes`: Number of bytes written to disk. 32- `disk_read_time`: Time spent to read from disk. 33- `disk_write_time`: Time spent to write to disk. 34- `disk_busy_time`: Time that the disk is busy and all actions are pending. 35 36**Network** 37- `network_received_count`: Number of network packets received. 38- `network_sent_count`: Number of network packets sent. 39- `network_received_bytes`: Number of bytes received over network. 40- `network_sent_bytes`: Number of bytes sent over network. 41- `network_received_error`: Number of errors occured during receiving over 42network. 43- `network_sent_error`: Number of errors occured during sending over network. 44- `network_received_drop`: Number of packets dropped during receiving over 45network. 46- `network_sent_drop`: Number of packets dropped during sending over network. 47""" 48 49import os 50import platform 51import psutil as ps 52from docker import DockerClient # type: ignore 53from csv import DictWriter 54from time import time, sleep 55from datetime import datetime 56from subprocess import run, CalledProcessError 57from threading import Thread, Event 58from typing import TYPE_CHECKING, Dict, Union, Optional, List 59from bench_executor.logger import Logger 60 61# psutil types are platform specific, provide stubs at runtime as checking is 62# not done there 63if TYPE_CHECKING: 64 from psutil._common import sswap, snetio 65 from psutil._pslinux import svmem, sdiskio 66 from psutil._psaix import scputimes 67else: 68 from collections import namedtuple 69 scputimes = namedtuple('scputimes', []) 70 sswap = namedtuple('sswap', []) 71 svmem = namedtuple('svmem', []) 72 sdiskio = namedtuple('sdiskio', []) 73 snetio = namedtuple('snetio', []) 74 75# 76# Hardware and case information is logged to 'case-info.txt' on construction. 77# 78# All data are stored in a CSV as 'stats.csv'. 79# These data are accumulated among all CPU cores, all memory banks, all network 80# interfaces, etc. individual devices are not logged. 81# 82 83CASE_INFO_FILE_NAME: str = 'case-info.txt' 84METRICS_FILE_NAME: str = 'metrics.csv' 85METRICS_VERSION: int = 2 86FIELDNAMES: List[str] = [ 87 'index', 88 'step', 89 'timestamp', 90 'version', 91 'cpu_user', 92 'cpu_system', 93 'cpu_user_system', 94 'cpu_idle', 95 'cpu_iowait', 96 'memory_ram', 97 'memory_swap', 98 'memory_ram_swap', 99 'disk_read_count', 100 'disk_write_count', 101 'disk_read_bytes', 102 'disk_write_bytes', 103 'disk_read_time', 104 'disk_write_time', 105 'disk_busy_time', 106 'network_received_count', 107 'network_sent_count', 108 'network_received_bytes', 109 'network_sent_bytes', 110 'network_received_error', 111 'network_sent_error', 112 'network_received_drop', 113 'network_sent_drop' 114] 115ROUND: int = 4 116 117step_id: int = 1 118 119 120def _collect_metrics(stop_event: Event, metrics_path: str, 121 sample_interval: float, initial_timestamp: float, 122 initial_cpu: scputimes, initial_ram: svmem, 123 initial_swap: sswap, initial_disk_io: Optional[sdiskio], 124 initial_network_io: snetio): 125 """Thread function to collect a sample at specific intervals""" 126 global step_id 127 index = 1 128 row: Dict[str, Union[int, float]] 129 130 # Create metrics file 131 with open(metrics_path, 'w') as f: 132 writer = DictWriter(f, fieldnames=FIELDNAMES) 133 writer.writeheader() 134 135 # Initial values 136 row = { 137 'index': index, 138 'step': step_id, 139 'timestamp': 0.0, 140 'version': METRICS_VERSION, 141 'cpu_user': 0.0, 142 'cpu_system': 0.0, 143 'cpu_user_system': 0.0, 144 'cpu_idle': 0.0, 145 'cpu_iowait': 0.0, 146 'memory_ram': 0, 147 'memory_swap': 0, 148 'memory_ram_swap': 0, 149 'disk_read_count': 0, 150 'disk_write_count': 0, 151 'disk_read_bytes': 0, 152 'disk_write_bytes': 0, 153 'disk_read_time': 0, 154 'disk_write_time': 0, 155 'disk_busy_time': 0, 156 'network_received_count': 0, 157 'network_sent_count': 0, 158 'network_received_bytes': 0, 159 'network_sent_bytes': 0, 160 'network_received_error': 0, 161 'network_sent_error': 0, 162 'network_received_drop': 0, 163 'network_sent_drop': 0 164 } 165 writer.writerow(row) 166 index += 1 167 sleep(sample_interval - (initial_timestamp - time())) 168 169 while not stop_event.wait(0): 170 # Collect metrics 171 timestamp = time() 172 cpu: scputimes = ps.cpu_times() 173 ram: svmem = ps.virtual_memory() 174 swap: sswap = ps.swap_memory() 175 disk_io: Optional[sdiskio] = ps.disk_io_counters() # type: ignore 176 network_io: snetio = ps.net_io_counters() 177 178 # Write to file 179 diff = round(timestamp - initial_timestamp, ROUND) 180 cpu_user = round(cpu.user - initial_cpu.user, ROUND) 181 cpu_system = round(cpu.system - initial_cpu.system, ROUND) 182 cpu_idle = round(cpu.idle - initial_cpu.idle, ROUND) 183 cpu_iowait = round(cpu.iowait - initial_cpu.iowait, ROUND) 184 network_recv_count = \ 185 network_io.packets_recv - initial_network_io.packets_recv 186 network_sent_count = \ 187 network_io.packets_sent - initial_network_io.packets_sent 188 network_recv_bytes = \ 189 network_io.bytes_recv - initial_network_io.bytes_recv 190 network_sent_bytes = \ 191 network_io.bytes_sent - initial_network_io.bytes_sent 192 network_errin = \ 193 network_io.errin - initial_network_io.errin 194 network_errout = \ 195 network_io.errout - initial_network_io.errout 196 network_dropin = \ 197 network_io.dropin - initial_network_io.dropin 198 network_dropout = \ 199 network_io.dropout - initial_network_io.dropout 200 201 row = { 202 'index': index, 203 'step': step_id, 204 'timestamp': diff, 205 'version': METRICS_VERSION, 206 'cpu_user': cpu_user, 207 'cpu_system': cpu_system, 208 'cpu_user_system': cpu_user + cpu_system, 209 'cpu_idle': cpu_idle, 210 'cpu_iowait': cpu_iowait, 211 'memory_ram': ram.used, 212 'memory_swap': swap.used, 213 'memory_ram_swap': ram.used + swap.used, 214 'network_received_count': network_recv_count, 215 'network_sent_count': network_sent_count, 216 'network_received_bytes': network_recv_bytes, 217 'network_sent_bytes': network_sent_bytes, 218 'network_received_error': network_errin, 219 'network_sent_error': network_errout, 220 'network_received_drop': network_dropin, 221 'network_sent_drop': network_dropout 222 } 223 224 # Diskless machines will return None for diskio 225 if disk_io is not None and initial_disk_io is not None: 226 row['disk_read_count'] = \ 227 disk_io.read_count - initial_disk_io.read_count 228 row['disk_write_count'] = \ 229 disk_io.write_count - initial_disk_io.write_count 230 row['disk_read_bytes'] = \ 231 disk_io.read_bytes - initial_disk_io.read_bytes 232 row['disk_write_bytes'] = \ 233 disk_io.write_bytes - initial_disk_io.write_bytes 234 row['disk_read_time'] = \ 235 disk_io.read_time - initial_disk_io.read_time 236 row['disk_write_time'] = \ 237 disk_io.write_time - initial_disk_io.write_time 238 row['disk_busy_time'] = \ 239 disk_io.busy_time - initial_disk_io.busy_time 240 writer.writerow(row) 241 index += 1 242 243 # Honor sample time, remove metrics logging overhead 244 sleep(sample_interval - (timestamp - time())) 245 246 247class Collector(): 248 """Collect metrics samples at a given interval for a run of a case.""" 249 250 def __init__(self, results_run_path: str, sample_interval: float, 251 number_of_steps: int, run_id: int, directory: str, 252 verbose: bool): 253 """ 254 Create an instance of the Collector class. 255 256 Instantiating this class will automatically generate a `case-info.txt` 257 file which describes the hardware used during collection of the 258 metrics. The file describes: 259 260 - **Case**: 261 - Timestamp when started. 262 - Directory of the case. 263 - Number of the run. 264 - Number of steps in a case. 265 - **Hardware**: 266 - CPU name. 267 - Number of CPU cores. 268 - Minimum and maximum CPU core frequency. 269 - Amount of RAM and SWAP memory 270 - Available disk storage. 271 - Available network interfaces and their link speed. 272 - **Docker**: 273 - Version of the Docker daemon 274 - Docker root directory 275 - Docker storage driver 276 - Docker CgroupFS driver and version 277 278 Parameters 279 ---------- 280 results_run_path : str 281 Path to the results directory of the run currently being executed. 282 sample_interval : float 283 Sample interval in seconds for collecting metrics. 284 number_of_steps : int 285 The number of steps of the case that is being executed. 286 run_id : int 287 The number of the run that is being executed. 288 directory : str 289 Path to the directory to store logs. 290 verbose : bool 291 Enable verbose logs. 292 """ 293 294 self._started: bool = False 295 self._data_path: str = os.path.abspath(results_run_path) 296 self._number_of_steps: int = number_of_steps 297 self._stop_event: Event = Event() 298 self._logger = Logger(__name__, directory, verbose) 299 300 # Only Linux is supported 301 if platform.system() != 'Linux': 302 msg = f'"{platform.system()} is not supported as OS' 303 self._logger.error(msg) 304 raise ValueError(msg) 305 306 # Initialize step ID 307 global step_id 308 step_id = 1 309 310 # System information: OS, kernel, architecture 311 system_hostname = 'UNKNOWN' 312 system_os_name = 'UNKNOWN' 313 system_os_version = 'UNKNOWN' 314 try: 315 system_os_name = platform.freedesktop_os_release()['NAME'] 316 system_os_version = platform.freedesktop_os_release()['VERSION'] 317 except (OSError, KeyError): 318 self._logger.warning('Cannot extract Freedesktop OS release data') 319 system_hostname = platform.node() 320 system_kernel = platform.platform() 321 system_architecture = platform.uname().machine 322 323 # CPU information: name, max frequency, core count 324 cpu_name = 'UNKNOWN' 325 try: 326 raw = run(['lscpu'], capture_output=True) 327 for line in raw.stdout.decode('utf-8').split('\n'): 328 if 'Model name:' in line: 329 cpu_name = line.split(':')[1].strip() 330 break 331 except CalledProcessError as e: 332 self._logger.warning('Unable to determine CPU processor name: ' 333 f'{e}') 334 335 cpu_cores = ps.cpu_count() 336 cpu_min_freq = ps.cpu_freq().min 337 cpu_max_freq = ps.cpu_freq().max 338 339 # Memory information: RAM total, SWAP total 340 memory_total = ps.virtual_memory().total 341 swap_total = ps.swap_memory().total 342 343 # Disk IO: name 344 partitions: Dict[str, int] = {} 345 for disk in ps.disk_partitions(): 346 # Skip Docker's overlayFS 347 if disk.fstype and 'docker' not in disk.mountpoint: 348 total = ps.disk_usage(disk.mountpoint).total 349 partitions[disk.mountpoint] = total 350 351 # Network IO: name, speed, MTU 352 network_interfaces = ps.net_if_stats() 353 354 # Docker daemon: version, storage driver, cgroupfs 355 client = DockerClient() 356 docker_info = client.info() 357 client.close() 358 359 # Write machine information to disk 360 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 361 with open(case_info_file, 'w') as f: 362 f.write('===> CASE <===\n') 363 f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n') 364 f.write(f'Directory: {directory}\n') 365 f.write(f'Run: {run_id}\n') 366 f.write(f'Number of steps: {self._number_of_steps}\n') 367 f.write('\n') 368 f.write('===> HARDWARE <===\n') 369 f.write('System\n') 370 f.write(f'\tHostname: {system_hostname}\n') 371 f.write(f'\tOS name: {system_os_name}\n') 372 f.write(f'\tOS version: {system_os_version}\n') 373 f.write(f'\tKernel: {system_kernel}\n') 374 f.write(f'\tArchitecture: {system_architecture}\n') 375 f.write('CPU\n') 376 f.write(f'\tName: {cpu_name}\n') 377 f.write(f'\tCores: {cpu_cores}\n') 378 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 379 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 380 f.write('Memory\n') 381 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 382 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 383 f.write('Storage\n') 384 for name, size in partitions.items(): 385 f.write(f'\tDisk "{name}": ' 386 f'{round(size / 10 ** 9, 2)} GB\n') 387 f.write('Network\n') 388 for name, stats in network_interfaces.items(): 389 speed = stats.speed 390 if speed == 0: 391 f.write(f'\tInterface "{name}"\n') 392 else: 393 f.write(f'\tInterface "{name}": {speed} mbps\n') 394 395 f.write('\n') 396 f.write('===> DOCKER <===\n') 397 f.write(f'Version: {docker_info["ServerVersion"]}\n') 398 f.write(f'Root directory: {docker_info["DockerRootDir"]}\n') 399 f.write('Drivers:\n') 400 f.write(f'\tStorage: {docker_info["Driver"]}\n') 401 f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} ' 402 f'v{docker_info["CgroupVersion"]}\n') 403 404 # Set initial metric values and start collection thread 405 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 406 initial_timestamp = time() 407 initial_cpu = ps.cpu_times() 408 initial_ram = ps.virtual_memory().used 409 initial_swap = ps.swap_memory().used 410 initial_disk_io = ps.disk_io_counters() 411 initial_network_io = ps.net_io_counters() 412 self._thread: Thread = Thread(target=_collect_metrics, 413 daemon=True, 414 args=(self._stop_event, 415 metrics_path, 416 sample_interval, 417 initial_timestamp, 418 initial_cpu, 419 initial_ram, 420 initial_swap, 421 initial_disk_io, 422 initial_network_io)) 423 self._thread.start() 424 425 @property 426 def name(self): 427 """Name of the class: Collector""" 428 return self.__class__.__name__ 429 430 def next_step(self): 431 """Increment the step number by one. 432 433 The step number must always be equal or lower than the number of steps 434 in the case. 435 """ 436 global step_id 437 step_id += 1 438 439 msg = f'Step ({step_id}) is higher than number of steps ' + \ 440 f'({self._number_of_steps})' 441 assert (step_id <= self._number_of_steps), msg 442 443 def stop(self): 444 """End metrics collection. 445 446 Signal the metrics collection thread to stop collecting any metrics. 447 """ 448 self._stop_event.set()
class
Collector:
248class Collector(): 249 """Collect metrics samples at a given interval for a run of a case.""" 250 251 def __init__(self, results_run_path: str, sample_interval: float, 252 number_of_steps: int, run_id: int, directory: str, 253 verbose: bool): 254 """ 255 Create an instance of the Collector class. 256 257 Instantiating this class will automatically generate a `case-info.txt` 258 file which describes the hardware used during collection of the 259 metrics. The file describes: 260 261 - **Case**: 262 - Timestamp when started. 263 - Directory of the case. 264 - Number of the run. 265 - Number of steps in a case. 266 - **Hardware**: 267 - CPU name. 268 - Number of CPU cores. 269 - Minimum and maximum CPU core frequency. 270 - Amount of RAM and SWAP memory 271 - Available disk storage. 272 - Available network interfaces and their link speed. 273 - **Docker**: 274 - Version of the Docker daemon 275 - Docker root directory 276 - Docker storage driver 277 - Docker CgroupFS driver and version 278 279 Parameters 280 ---------- 281 results_run_path : str 282 Path to the results directory of the run currently being executed. 283 sample_interval : float 284 Sample interval in seconds for collecting metrics. 285 number_of_steps : int 286 The number of steps of the case that is being executed. 287 run_id : int 288 The number of the run that is being executed. 289 directory : str 290 Path to the directory to store logs. 291 verbose : bool 292 Enable verbose logs. 293 """ 294 295 self._started: bool = False 296 self._data_path: str = os.path.abspath(results_run_path) 297 self._number_of_steps: int = number_of_steps 298 self._stop_event: Event = Event() 299 self._logger = Logger(__name__, directory, verbose) 300 301 # Only Linux is supported 302 if platform.system() != 'Linux': 303 msg = f'"{platform.system()} is not supported as OS' 304 self._logger.error(msg) 305 raise ValueError(msg) 306 307 # Initialize step ID 308 global step_id 309 step_id = 1 310 311 # System information: OS, kernel, architecture 312 system_hostname = 'UNKNOWN' 313 system_os_name = 'UNKNOWN' 314 system_os_version = 'UNKNOWN' 315 try: 316 system_os_name = platform.freedesktop_os_release()['NAME'] 317 system_os_version = platform.freedesktop_os_release()['VERSION'] 318 except (OSError, KeyError): 319 self._logger.warning('Cannot extract Freedesktop OS release data') 320 system_hostname = platform.node() 321 system_kernel = platform.platform() 322 system_architecture = platform.uname().machine 323 324 # CPU information: name, max frequency, core count 325 cpu_name = 'UNKNOWN' 326 try: 327 raw = run(['lscpu'], capture_output=True) 328 for line in raw.stdout.decode('utf-8').split('\n'): 329 if 'Model name:' in line: 330 cpu_name = line.split(':')[1].strip() 331 break 332 except CalledProcessError as e: 333 self._logger.warning('Unable to determine CPU processor name: ' 334 f'{e}') 335 336 cpu_cores = ps.cpu_count() 337 cpu_min_freq = ps.cpu_freq().min 338 cpu_max_freq = ps.cpu_freq().max 339 340 # Memory information: RAM total, SWAP total 341 memory_total = ps.virtual_memory().total 342 swap_total = ps.swap_memory().total 343 344 # Disk IO: name 345 partitions: Dict[str, int] = {} 346 for disk in ps.disk_partitions(): 347 # Skip Docker's overlayFS 348 if disk.fstype and 'docker' not in disk.mountpoint: 349 total = ps.disk_usage(disk.mountpoint).total 350 partitions[disk.mountpoint] = total 351 352 # Network IO: name, speed, MTU 353 network_interfaces = ps.net_if_stats() 354 355 # Docker daemon: version, storage driver, cgroupfs 356 client = DockerClient() 357 docker_info = client.info() 358 client.close() 359 360 # Write machine information to disk 361 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 362 with open(case_info_file, 'w') as f: 363 f.write('===> CASE <===\n') 364 f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n') 365 f.write(f'Directory: {directory}\n') 366 f.write(f'Run: {run_id}\n') 367 f.write(f'Number of steps: {self._number_of_steps}\n') 368 f.write('\n') 369 f.write('===> HARDWARE <===\n') 370 f.write('System\n') 371 f.write(f'\tHostname: {system_hostname}\n') 372 f.write(f'\tOS name: {system_os_name}\n') 373 f.write(f'\tOS version: {system_os_version}\n') 374 f.write(f'\tKernel: {system_kernel}\n') 375 f.write(f'\tArchitecture: {system_architecture}\n') 376 f.write('CPU\n') 377 f.write(f'\tName: {cpu_name}\n') 378 f.write(f'\tCores: {cpu_cores}\n') 379 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 380 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 381 f.write('Memory\n') 382 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 383 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 384 f.write('Storage\n') 385 for name, size in partitions.items(): 386 f.write(f'\tDisk "{name}": ' 387 f'{round(size / 10 ** 9, 2)} GB\n') 388 f.write('Network\n') 389 for name, stats in network_interfaces.items(): 390 speed = stats.speed 391 if speed == 0: 392 f.write(f'\tInterface "{name}"\n') 393 else: 394 f.write(f'\tInterface "{name}": {speed} mbps\n') 395 396 f.write('\n') 397 f.write('===> DOCKER <===\n') 398 f.write(f'Version: {docker_info["ServerVersion"]}\n') 399 f.write(f'Root directory: {docker_info["DockerRootDir"]}\n') 400 f.write('Drivers:\n') 401 f.write(f'\tStorage: {docker_info["Driver"]}\n') 402 f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} ' 403 f'v{docker_info["CgroupVersion"]}\n') 404 405 # Set initial metric values and start collection thread 406 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 407 initial_timestamp = time() 408 initial_cpu = ps.cpu_times() 409 initial_ram = ps.virtual_memory().used 410 initial_swap = ps.swap_memory().used 411 initial_disk_io = ps.disk_io_counters() 412 initial_network_io = ps.net_io_counters() 413 self._thread: Thread = Thread(target=_collect_metrics, 414 daemon=True, 415 args=(self._stop_event, 416 metrics_path, 417 sample_interval, 418 initial_timestamp, 419 initial_cpu, 420 initial_ram, 421 initial_swap, 422 initial_disk_io, 423 initial_network_io)) 424 self._thread.start() 425 426 @property 427 def name(self): 428 """Name of the class: Collector""" 429 return self.__class__.__name__ 430 431 def next_step(self): 432 """Increment the step number by one. 433 434 The step number must always be equal or lower than the number of steps 435 in the case. 436 """ 437 global step_id 438 step_id += 1 439 440 msg = f'Step ({step_id}) is higher than number of steps ' + \ 441 f'({self._number_of_steps})' 442 assert (step_id <= self._number_of_steps), msg 443 444 def stop(self): 445 """End metrics collection. 446 447 Signal the metrics collection thread to stop collecting any metrics. 448 """ 449 self._stop_event.set()
Collect metrics samples at a given interval for a run of a case.
Collector( results_run_path: str, sample_interval: float, number_of_steps: int, run_id: int, directory: str, verbose: bool)
251 def __init__(self, results_run_path: str, sample_interval: float, 252 number_of_steps: int, run_id: int, directory: str, 253 verbose: bool): 254 """ 255 Create an instance of the Collector class. 256 257 Instantiating this class will automatically generate a `case-info.txt` 258 file which describes the hardware used during collection of the 259 metrics. The file describes: 260 261 - **Case**: 262 - Timestamp when started. 263 - Directory of the case. 264 - Number of the run. 265 - Number of steps in a case. 266 - **Hardware**: 267 - CPU name. 268 - Number of CPU cores. 269 - Minimum and maximum CPU core frequency. 270 - Amount of RAM and SWAP memory 271 - Available disk storage. 272 - Available network interfaces and their link speed. 273 - **Docker**: 274 - Version of the Docker daemon 275 - Docker root directory 276 - Docker storage driver 277 - Docker CgroupFS driver and version 278 279 Parameters 280 ---------- 281 results_run_path : str 282 Path to the results directory of the run currently being executed. 283 sample_interval : float 284 Sample interval in seconds for collecting metrics. 285 number_of_steps : int 286 The number of steps of the case that is being executed. 287 run_id : int 288 The number of the run that is being executed. 289 directory : str 290 Path to the directory to store logs. 291 verbose : bool 292 Enable verbose logs. 293 """ 294 295 self._started: bool = False 296 self._data_path: str = os.path.abspath(results_run_path) 297 self._number_of_steps: int = number_of_steps 298 self._stop_event: Event = Event() 299 self._logger = Logger(__name__, directory, verbose) 300 301 # Only Linux is supported 302 if platform.system() != 'Linux': 303 msg = f'"{platform.system()} is not supported as OS' 304 self._logger.error(msg) 305 raise ValueError(msg) 306 307 # Initialize step ID 308 global step_id 309 step_id = 1 310 311 # System information: OS, kernel, architecture 312 system_hostname = 'UNKNOWN' 313 system_os_name = 'UNKNOWN' 314 system_os_version = 'UNKNOWN' 315 try: 316 system_os_name = platform.freedesktop_os_release()['NAME'] 317 system_os_version = platform.freedesktop_os_release()['VERSION'] 318 except (OSError, KeyError): 319 self._logger.warning('Cannot extract Freedesktop OS release data') 320 system_hostname = platform.node() 321 system_kernel = platform.platform() 322 system_architecture = platform.uname().machine 323 324 # CPU information: name, max frequency, core count 325 cpu_name = 'UNKNOWN' 326 try: 327 raw = run(['lscpu'], capture_output=True) 328 for line in raw.stdout.decode('utf-8').split('\n'): 329 if 'Model name:' in line: 330 cpu_name = line.split(':')[1].strip() 331 break 332 except CalledProcessError as e: 333 self._logger.warning('Unable to determine CPU processor name: ' 334 f'{e}') 335 336 cpu_cores = ps.cpu_count() 337 cpu_min_freq = ps.cpu_freq().min 338 cpu_max_freq = ps.cpu_freq().max 339 340 # Memory information: RAM total, SWAP total 341 memory_total = ps.virtual_memory().total 342 swap_total = ps.swap_memory().total 343 344 # Disk IO: name 345 partitions: Dict[str, int] = {} 346 for disk in ps.disk_partitions(): 347 # Skip Docker's overlayFS 348 if disk.fstype and 'docker' not in disk.mountpoint: 349 total = ps.disk_usage(disk.mountpoint).total 350 partitions[disk.mountpoint] = total 351 352 # Network IO: name, speed, MTU 353 network_interfaces = ps.net_if_stats() 354 355 # Docker daemon: version, storage driver, cgroupfs 356 client = DockerClient() 357 docker_info = client.info() 358 client.close() 359 360 # Write machine information to disk 361 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 362 with open(case_info_file, 'w') as f: 363 f.write('===> CASE <===\n') 364 f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n') 365 f.write(f'Directory: {directory}\n') 366 f.write(f'Run: {run_id}\n') 367 f.write(f'Number of steps: {self._number_of_steps}\n') 368 f.write('\n') 369 f.write('===> HARDWARE <===\n') 370 f.write('System\n') 371 f.write(f'\tHostname: {system_hostname}\n') 372 f.write(f'\tOS name: {system_os_name}\n') 373 f.write(f'\tOS version: {system_os_version}\n') 374 f.write(f'\tKernel: {system_kernel}\n') 375 f.write(f'\tArchitecture: {system_architecture}\n') 376 f.write('CPU\n') 377 f.write(f'\tName: {cpu_name}\n') 378 f.write(f'\tCores: {cpu_cores}\n') 379 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 380 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 381 f.write('Memory\n') 382 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 383 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 384 f.write('Storage\n') 385 for name, size in partitions.items(): 386 f.write(f'\tDisk "{name}": ' 387 f'{round(size / 10 ** 9, 2)} GB\n') 388 f.write('Network\n') 389 for name, stats in network_interfaces.items(): 390 speed = stats.speed 391 if speed == 0: 392 f.write(f'\tInterface "{name}"\n') 393 else: 394 f.write(f'\tInterface "{name}": {speed} mbps\n') 395 396 f.write('\n') 397 f.write('===> DOCKER <===\n') 398 f.write(f'Version: {docker_info["ServerVersion"]}\n') 399 f.write(f'Root directory: {docker_info["DockerRootDir"]}\n') 400 f.write('Drivers:\n') 401 f.write(f'\tStorage: {docker_info["Driver"]}\n') 402 f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} ' 403 f'v{docker_info["CgroupVersion"]}\n') 404 405 # Set initial metric values and start collection thread 406 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 407 initial_timestamp = time() 408 initial_cpu = ps.cpu_times() 409 initial_ram = ps.virtual_memory().used 410 initial_swap = ps.swap_memory().used 411 initial_disk_io = ps.disk_io_counters() 412 initial_network_io = ps.net_io_counters() 413 self._thread: Thread = Thread(target=_collect_metrics, 414 daemon=True, 415 args=(self._stop_event, 416 metrics_path, 417 sample_interval, 418 initial_timestamp, 419 initial_cpu, 420 initial_ram, 421 initial_swap, 422 initial_disk_io, 423 initial_network_io)) 424 self._thread.start()
Create an instance of the Collector class.
Instantiating this class will automatically generate a case-info.txt
file which describes the hardware used during collection of the
metrics. The file describes:
- Case:
- Timestamp when started.
- Directory of the case.
- Number of the run.
- Number of steps in a case.
- Hardware:
- CPU name.
- Number of CPU cores.
- Minimum and maximum CPU core frequency.
- Amount of RAM and SWAP memory
- Available disk storage.
- Available network interfaces and their link speed.
- Docker:
- Version of the Docker daemon
- Docker root directory
- Docker storage driver
- Docker CgroupFS driver and version
Parameters
- results_run_path (str): Path to the results directory of the run currently being executed.
- sample_interval (float): Sample interval in seconds for collecting metrics.
- number_of_steps (int): The number of steps of the case that is being executed.
- run_id (int): The number of the run that is being executed.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
def
next_step(self):
431 def next_step(self): 432 """Increment the step number by one. 433 434 The step number must always be equal or lower than the number of steps 435 in the case. 436 """ 437 global step_id 438 step_id += 1 439 440 msg = f'Step ({step_id}) is higher than number of steps ' + \ 441 f'({self._number_of_steps})' 442 assert (step_id <= self._number_of_steps), msg
Increment the step number by one.
The step number must always be equal or lower than the number of steps in the case.
class
scputimes(builtins.tuple):
scputimes()
Inherited Members
- builtins.tuple
- index
- count
class
sswap(builtins.tuple):
sswap()
Inherited Members
- builtins.tuple
- index
- count
class
svmem(builtins.tuple):
svmem()
Inherited Members
- builtins.tuple
- index
- count
class
sdiskio(builtins.tuple):
sdiskio()
Inherited Members
- builtins.tuple
- index
- count
class
snetio(builtins.tuple):
snetio()
Inherited Members
- builtins.tuple
- index
- count