bench_executor.collector

This module holds the Collector class which is responsible for collecting metrics during the execution of a case. It also collects hardware information for provenance reasons when comparing results from cases.

The following metrics are collected:

General

  • index: incremental index for each collected sample.
  • step: Number of the step of a collected sample.
  • timestamp: The time when the sample was collected.
  • version: format version of the collected version, currently v2.

CPU

  • cpu_user: CPU time spent in userspace.
  • cpu_system: CPU time spent in kernelspace.
  • cpu_user_system: sum of CPU time userspace and kernelspace.
  • cpu_idle: CPU time spent in idle mode.
  • cpu_iowait: Time that the CPU has to wait for IO operations to complete.

Memory

  • memory_ram: Amount of RAM memory in use.
  • memory_swap: Amount of SWAP memory in use.
  • memory_ram_swap: Sum of the RAM and SWAP memory in use.

Disk

  • disk_read_count: Number of disk reads.
  • disk_write_count: Number of disk writes.
  • disk_read_bytes: Number of bytes read from disk.
  • disk_write_bytes: Number of bytes written to disk.
  • disk_read_time: Time spent to read from disk.
  • disk_write_time: Time spent to write to disk.
  • disk_busy_time: Time that the disk is busy and all actions are pending.

Network

  • network_received_count: Number of network packets received.
  • network_sent_count: Number of network packets sent.
  • network_received_bytes: Number of bytes received over network.
  • network_sent_bytes: Number of bytes sent over network.
  • network_received_error: Number of errors occured during receiving over network.
  • network_sent_error: Number of errors occured during sending over network.
  • network_received_drop: Number of packets dropped during receiving over network.
  • network_sent_drop: Number of packets dropped during sending over network.
  1#!/usr/bin/env python3
  2"""
  3This module holds the Collector class which is responsible for collecting
  4metrics during the execution of a case. It also collects hardware information
  5for provenance reasons when comparing results from cases.
  6
  7The following metrics are collected:
  8
  9**General**
 10- `index`: incremental index for each collected sample.
 11- `step`: Number of the step of a collected sample.
 12- `timestamp`: The time when the sample was collected.
 13- `version`: format version of the collected version, currently v2.
 14
 15**CPU**
 16- `cpu_user`: CPU time spent in userspace.
 17- `cpu_system`: CPU time spent in kernelspace.
 18- `cpu_user_system`: sum of CPU time userspace and kernelspace.
 19- `cpu_idle`: CPU time spent in idle mode.
 20- `cpu_iowait`: Time that the CPU has to wait for IO operations to complete.
 21
 22**Memory**
 23- `memory_ram`: Amount of RAM memory in use.
 24- `memory_swap`: Amount of SWAP memory in use.
 25- `memory_ram_swap`: Sum of the RAM and SWAP memory in use.
 26
 27**Disk**
 28- `disk_read_count`: Number of disk reads.
 29- `disk_write_count`: Number of disk writes.
 30- `disk_read_bytes`: Number of bytes read from disk.
 31- `disk_write_bytes`: Number of bytes written to disk.
 32- `disk_read_time`: Time spent to read from disk.
 33- `disk_write_time`: Time spent to write to disk.
 34- `disk_busy_time`: Time that the disk is busy and all actions are pending.
 35
 36**Network**
 37- `network_received_count`: Number of network packets received.
 38- `network_sent_count`: Number of network packets sent.
 39- `network_received_bytes`: Number of bytes received over network.
 40- `network_sent_bytes`: Number of bytes sent over network.
 41- `network_received_error`: Number of errors occured during receiving over
 42network.
 43- `network_sent_error`: Number of errors occured during sending over network.
 44- `network_received_drop`: Number of packets dropped during receiving over
 45network.
 46- `network_sent_drop`: Number of packets dropped during sending over network.
 47"""
 48
 49import os
 50import platform
 51import psutil as ps
 52from docker import DockerClient  # type: ignore
 53from csv import DictWriter
 54from time import time, sleep
 55from datetime import datetime
 56from subprocess import run, CalledProcessError
 57from threading import Thread, Event
 58from typing import TYPE_CHECKING, Dict, Union, Optional, List
 59from bench_executor.logger import Logger
 60
 61# psutil types are platform specific, provide stubs at runtime as checking is
 62# not done there
 63if TYPE_CHECKING:
 64    from psutil._common import sswap, snetio
 65    from psutil._pslinux import svmem, sdiskio
 66    from psutil._psaix import scputimes
 67else:
 68    from collections import namedtuple
 69    scputimes = namedtuple('scputimes', [])
 70    sswap = namedtuple('sswap', [])
 71    svmem = namedtuple('svmem', [])
 72    sdiskio = namedtuple('sdiskio', [])
 73    snetio = namedtuple('snetio', [])
 74
 75#
 76# Hardware and case information is logged to 'case-info.txt' on construction.
 77#
 78# All data are stored in a CSV as 'stats.csv'.
 79# These data are accumulated among all CPU cores, all memory banks, all network
 80# interfaces, etc. individual devices are not logged.
 81#
 82
 83CASE_INFO_FILE_NAME: str = 'case-info.txt'
 84METRICS_FILE_NAME: str = 'metrics.csv'
 85METRICS_VERSION: int = 2
 86FIELDNAMES: List[str] = [
 87    'index',
 88    'step',
 89    'timestamp',
 90    'version',
 91    'cpu_user',
 92    'cpu_system',
 93    'cpu_user_system',
 94    'cpu_idle',
 95    'cpu_iowait',
 96    'memory_ram',
 97    'memory_swap',
 98    'memory_ram_swap',
 99    'disk_read_count',
100    'disk_write_count',
101    'disk_read_bytes',
102    'disk_write_bytes',
103    'disk_read_time',
104    'disk_write_time',
105    'disk_busy_time',
106    'network_received_count',
107    'network_sent_count',
108    'network_received_bytes',
109    'network_sent_bytes',
110    'network_received_error',
111    'network_sent_error',
112    'network_received_drop',
113    'network_sent_drop'
114]
115ROUND: int = 4
116
117step_id: int = 1
118
119
120def _collect_metrics(stop_event: Event, metrics_path: str,
121                     sample_interval: float, initial_timestamp: float,
122                     initial_cpu: scputimes, initial_ram: svmem,
123                     initial_swap: sswap, initial_disk_io: Optional[sdiskio],
124                     initial_network_io: snetio):
125    """Thread function to collect a sample at specific intervals"""
126    global step_id
127    index = 1
128    row: Dict[str, Union[int, float]]
129
130    # Create metrics file
131    with open(metrics_path, 'w') as f:
132        writer = DictWriter(f, fieldnames=FIELDNAMES)
133        writer.writeheader()
134
135        # Initial values
136        row = {
137            'index': index,
138            'step': step_id,
139            'timestamp': 0.0,
140            'version': METRICS_VERSION,
141            'cpu_user': 0.0,
142            'cpu_system': 0.0,
143            'cpu_user_system': 0.0,
144            'cpu_idle': 0.0,
145            'cpu_iowait': 0.0,
146            'memory_ram': 0,
147            'memory_swap': 0,
148            'memory_ram_swap': 0,
149            'disk_read_count': 0,
150            'disk_write_count': 0,
151            'disk_read_bytes': 0,
152            'disk_write_bytes': 0,
153            'disk_read_time': 0,
154            'disk_write_time': 0,
155            'disk_busy_time': 0,
156            'network_received_count': 0,
157            'network_sent_count': 0,
158            'network_received_bytes': 0,
159            'network_sent_bytes': 0,
160            'network_received_error': 0,
161            'network_sent_error': 0,
162            'network_received_drop': 0,
163            'network_sent_drop': 0
164        }
165        writer.writerow(row)
166        index += 1
167        sleep(sample_interval - (initial_timestamp - time()))
168
169        while not stop_event.wait(0):
170            # Collect metrics
171            timestamp = time()
172            cpu: scputimes = ps.cpu_times()
173            ram: svmem = ps.virtual_memory()
174            swap: sswap = ps.swap_memory()
175            disk_io: Optional[sdiskio] = ps.disk_io_counters()  # type: ignore
176            network_io: snetio = ps.net_io_counters()
177
178            # Write to file
179            diff = round(timestamp - initial_timestamp, ROUND)
180            cpu_user = round(cpu.user - initial_cpu.user, ROUND)
181            cpu_system = round(cpu.system - initial_cpu.system, ROUND)
182            cpu_idle = round(cpu.idle - initial_cpu.idle, ROUND)
183            cpu_iowait = round(cpu.iowait - initial_cpu.iowait, ROUND)
184            network_recv_count = \
185                network_io.packets_recv - initial_network_io.packets_recv
186            network_sent_count = \
187                network_io.packets_sent - initial_network_io.packets_sent
188            network_recv_bytes = \
189                network_io.bytes_recv - initial_network_io.bytes_recv
190            network_sent_bytes = \
191                network_io.bytes_sent - initial_network_io.bytes_sent
192            network_errin = \
193                network_io.errin - initial_network_io.errin
194            network_errout = \
195                network_io.errout - initial_network_io.errout
196            network_dropin = \
197                network_io.dropin - initial_network_io.dropin
198            network_dropout = \
199                network_io.dropout - initial_network_io.dropout
200
201            row = {
202                'index': index,
203                'step': step_id,
204                'timestamp': diff,
205                'version': METRICS_VERSION,
206                'cpu_user': cpu_user,
207                'cpu_system': cpu_system,
208                'cpu_user_system': cpu_user + cpu_system,
209                'cpu_idle': cpu_idle,
210                'cpu_iowait': cpu_iowait,
211                'memory_ram': ram.used,
212                'memory_swap': swap.used,
213                'memory_ram_swap': ram.used + swap.used,
214                'network_received_count': network_recv_count,
215                'network_sent_count': network_sent_count,
216                'network_received_bytes': network_recv_bytes,
217                'network_sent_bytes': network_sent_bytes,
218                'network_received_error': network_errin,
219                'network_sent_error': network_errout,
220                'network_received_drop': network_dropin,
221                'network_sent_drop': network_dropout
222            }
223
224            # Diskless machines will return None for diskio
225            if disk_io is not None and initial_disk_io is not None:
226                row['disk_read_count'] = \
227                   disk_io.read_count - initial_disk_io.read_count
228                row['disk_write_count'] = \
229                    disk_io.write_count - initial_disk_io.write_count
230                row['disk_read_bytes'] = \
231                    disk_io.read_bytes - initial_disk_io.read_bytes
232                row['disk_write_bytes'] = \
233                    disk_io.write_bytes - initial_disk_io.write_bytes
234                row['disk_read_time'] = \
235                    disk_io.read_time - initial_disk_io.read_time
236                row['disk_write_time'] = \
237                    disk_io.write_time - initial_disk_io.write_time
238                row['disk_busy_time'] = \
239                    disk_io.busy_time - initial_disk_io.busy_time
240            writer.writerow(row)
241            index += 1
242
243            # Honor sample time, remove metrics logging overhead
244            sleep(sample_interval - (timestamp - time()))
245
246
247class Collector():
248    """Collect metrics samples at a given interval for a run of a case."""
249
250    def __init__(self, results_run_path: str, sample_interval: float,
251                 number_of_steps: int, run_id: int, directory: str,
252                 verbose: bool):
253        """
254        Create an instance of the Collector class.
255
256        Instantiating this class will automatically generate a `case-info.txt`
257        file which describes the hardware used during collection of the
258        metrics. The file describes:
259
260        - **Case**:
261            - Timestamp when started.
262            - Directory of the case.
263            - Number of the run.
264            - Number of steps in a case.
265        - **Hardware**:
266            - CPU name.
267            - Number of CPU cores.
268            - Minimum and maximum CPU core frequency.
269            - Amount of RAM and SWAP memory
270            - Available disk storage.
271            - Available network interfaces and their link speed.
272        - **Docker**:
273            - Version of the Docker daemon
274            - Docker root directory
275            - Docker storage driver
276            - Docker CgroupFS driver and version
277
278        Parameters
279        ----------
280        results_run_path : str
281            Path to the results directory of the run currently being executed.
282        sample_interval : float
283            Sample interval in seconds for collecting metrics.
284        number_of_steps : int
285            The number of steps of the case that is being executed.
286        run_id : int
287            The number of the run that is being executed.
288        directory : str
289            Path to the directory to store logs.
290        verbose : bool
291            Enable verbose logs.
292        """
293
294        self._started: bool = False
295        self._data_path: str = os.path.abspath(results_run_path)
296        self._number_of_steps: int = number_of_steps
297        self._stop_event: Event = Event()
298        self._logger = Logger(__name__, directory, verbose)
299
300        # Only Linux is supported
301        if platform.system() != 'Linux':
302            msg = f'"{platform.system()} is not supported as OS'
303            self._logger.error(msg)
304            raise ValueError(msg)
305
306        # Initialize step ID
307        global step_id
308        step_id = 1
309
310        # System information: OS, kernel, architecture
311        system_hostname = 'UNKNOWN'
312        system_os_name = 'UNKNOWN'
313        system_os_version = 'UNKNOWN'
314        try:
315            system_os_name = platform.freedesktop_os_release()['NAME']
316            system_os_version = platform.freedesktop_os_release()['VERSION']
317        except (OSError, KeyError):
318            self._logger.warning('Cannot extract Freedesktop OS release data')
319        system_hostname = platform.node()
320        system_kernel = platform.platform()
321        system_architecture = platform.uname().machine
322
323        # CPU information: name, max frequency, core count
324        cpu_name = 'UNKNOWN'
325        try:
326            raw = run(['lscpu'], capture_output=True)
327            for line in raw.stdout.decode('utf-8').split('\n'):
328                if 'Model name:' in line:
329                    cpu_name = line.split(':')[1].strip()
330                    break
331        except CalledProcessError as e:
332            self._logger.warning('Unable to determine CPU processor name: '
333                                 f'{e}')
334
335        cpu_cores = ps.cpu_count()
336        cpu_min_freq = ps.cpu_freq().min
337        cpu_max_freq = ps.cpu_freq().max
338
339        # Memory information: RAM total, SWAP total
340        memory_total = ps.virtual_memory().total
341        swap_total = ps.swap_memory().total
342
343        # Disk IO: name
344        partitions: Dict[str, int] = {}
345        for disk in ps.disk_partitions():
346            # Skip Docker's overlayFS
347            if disk.fstype and 'docker' not in disk.mountpoint:
348                total = ps.disk_usage(disk.mountpoint).total
349                partitions[disk.mountpoint] = total
350
351        # Network IO: name, speed, MTU
352        network_interfaces = ps.net_if_stats()
353
354        # Docker daemon: version, storage driver, cgroupfs
355        client = DockerClient()
356        docker_info = client.info()
357        client.close()
358
359        # Write machine information to disk
360        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
361        with open(case_info_file, 'w') as f:
362            f.write('===> CASE <===\n')
363            f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n')
364            f.write(f'Directory: {directory}\n')
365            f.write(f'Run: {run_id}\n')
366            f.write(f'Number of steps: {self._number_of_steps}\n')
367            f.write('\n')
368            f.write('===> HARDWARE <===\n')
369            f.write('System\n')
370            f.write(f'\tHostname: {system_hostname}\n')
371            f.write(f'\tOS name: {system_os_name}\n')
372            f.write(f'\tOS version: {system_os_version}\n')
373            f.write(f'\tKernel: {system_kernel}\n')
374            f.write(f'\tArchitecture: {system_architecture}\n')
375            f.write('CPU\n')
376            f.write(f'\tName: {cpu_name}\n')
377            f.write(f'\tCores: {cpu_cores}\n')
378            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
379            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
380            f.write('Memory\n')
381            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
382            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
383            f.write('Storage\n')
384            for name, size in partitions.items():
385                f.write(f'\tDisk "{name}": '
386                        f'{round(size / 10 ** 9, 2)} GB\n')
387            f.write('Network\n')
388            for name, stats in network_interfaces.items():
389                speed = stats.speed
390                if speed == 0:
391                    f.write(f'\tInterface "{name}"\n')
392                else:
393                    f.write(f'\tInterface "{name}": {speed} mbps\n')
394
395            f.write('\n')
396            f.write('===> DOCKER <===\n')
397            f.write(f'Version: {docker_info["ServerVersion"]}\n')
398            f.write(f'Root directory: {docker_info["DockerRootDir"]}\n')
399            f.write('Drivers:\n')
400            f.write(f'\tStorage: {docker_info["Driver"]}\n')
401            f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} '
402                    f'v{docker_info["CgroupVersion"]}\n')
403
404        # Set initial metric values and start collection thread
405        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
406        initial_timestamp = time()
407        initial_cpu = ps.cpu_times()
408        initial_ram = ps.virtual_memory().used
409        initial_swap = ps.swap_memory().used
410        initial_disk_io = ps.disk_io_counters()
411        initial_network_io = ps.net_io_counters()
412        self._thread: Thread = Thread(target=_collect_metrics,
413                                      daemon=True,
414                                      args=(self._stop_event,
415                                            metrics_path,
416                                            sample_interval,
417                                            initial_timestamp,
418                                            initial_cpu,
419                                            initial_ram,
420                                            initial_swap,
421                                            initial_disk_io,
422                                            initial_network_io))
423        self._thread.start()
424
425    @property
426    def name(self):
427        """Name of the class: Collector"""
428        return self.__class__.__name__
429
430    def next_step(self):
431        """Increment the step number by one.
432
433        The step number must always be equal or lower than the number of steps
434        in the case.
435        """
436        global step_id
437        step_id += 1
438
439        msg = f'Step ({step_id}) is higher than number of steps ' + \
440              f'({self._number_of_steps})'
441        assert (step_id <= self._number_of_steps), msg
442
443    def stop(self):
444        """End metrics collection.
445
446        Signal the metrics collection thread to stop collecting any metrics.
447        """
448        self._stop_event.set()
class Collector:
248class Collector():
249    """Collect metrics samples at a given interval for a run of a case."""
250
251    def __init__(self, results_run_path: str, sample_interval: float,
252                 number_of_steps: int, run_id: int, directory: str,
253                 verbose: bool):
254        """
255        Create an instance of the Collector class.
256
257        Instantiating this class will automatically generate a `case-info.txt`
258        file which describes the hardware used during collection of the
259        metrics. The file describes:
260
261        - **Case**:
262            - Timestamp when started.
263            - Directory of the case.
264            - Number of the run.
265            - Number of steps in a case.
266        - **Hardware**:
267            - CPU name.
268            - Number of CPU cores.
269            - Minimum and maximum CPU core frequency.
270            - Amount of RAM and SWAP memory
271            - Available disk storage.
272            - Available network interfaces and their link speed.
273        - **Docker**:
274            - Version of the Docker daemon
275            - Docker root directory
276            - Docker storage driver
277            - Docker CgroupFS driver and version
278
279        Parameters
280        ----------
281        results_run_path : str
282            Path to the results directory of the run currently being executed.
283        sample_interval : float
284            Sample interval in seconds for collecting metrics.
285        number_of_steps : int
286            The number of steps of the case that is being executed.
287        run_id : int
288            The number of the run that is being executed.
289        directory : str
290            Path to the directory to store logs.
291        verbose : bool
292            Enable verbose logs.
293        """
294
295        self._started: bool = False
296        self._data_path: str = os.path.abspath(results_run_path)
297        self._number_of_steps: int = number_of_steps
298        self._stop_event: Event = Event()
299        self._logger = Logger(__name__, directory, verbose)
300
301        # Only Linux is supported
302        if platform.system() != 'Linux':
303            msg = f'"{platform.system()} is not supported as OS'
304            self._logger.error(msg)
305            raise ValueError(msg)
306
307        # Initialize step ID
308        global step_id
309        step_id = 1
310
311        # System information: OS, kernel, architecture
312        system_hostname = 'UNKNOWN'
313        system_os_name = 'UNKNOWN'
314        system_os_version = 'UNKNOWN'
315        try:
316            system_os_name = platform.freedesktop_os_release()['NAME']
317            system_os_version = platform.freedesktop_os_release()['VERSION']
318        except (OSError, KeyError):
319            self._logger.warning('Cannot extract Freedesktop OS release data')
320        system_hostname = platform.node()
321        system_kernel = platform.platform()
322        system_architecture = platform.uname().machine
323
324        # CPU information: name, max frequency, core count
325        cpu_name = 'UNKNOWN'
326        try:
327            raw = run(['lscpu'], capture_output=True)
328            for line in raw.stdout.decode('utf-8').split('\n'):
329                if 'Model name:' in line:
330                    cpu_name = line.split(':')[1].strip()
331                    break
332        except CalledProcessError as e:
333            self._logger.warning('Unable to determine CPU processor name: '
334                                 f'{e}')
335
336        cpu_cores = ps.cpu_count()
337        cpu_min_freq = ps.cpu_freq().min
338        cpu_max_freq = ps.cpu_freq().max
339
340        # Memory information: RAM total, SWAP total
341        memory_total = ps.virtual_memory().total
342        swap_total = ps.swap_memory().total
343
344        # Disk IO: name
345        partitions: Dict[str, int] = {}
346        for disk in ps.disk_partitions():
347            # Skip Docker's overlayFS
348            if disk.fstype and 'docker' not in disk.mountpoint:
349                total = ps.disk_usage(disk.mountpoint).total
350                partitions[disk.mountpoint] = total
351
352        # Network IO: name, speed, MTU
353        network_interfaces = ps.net_if_stats()
354
355        # Docker daemon: version, storage driver, cgroupfs
356        client = DockerClient()
357        docker_info = client.info()
358        client.close()
359
360        # Write machine information to disk
361        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
362        with open(case_info_file, 'w') as f:
363            f.write('===> CASE <===\n')
364            f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n')
365            f.write(f'Directory: {directory}\n')
366            f.write(f'Run: {run_id}\n')
367            f.write(f'Number of steps: {self._number_of_steps}\n')
368            f.write('\n')
369            f.write('===> HARDWARE <===\n')
370            f.write('System\n')
371            f.write(f'\tHostname: {system_hostname}\n')
372            f.write(f'\tOS name: {system_os_name}\n')
373            f.write(f'\tOS version: {system_os_version}\n')
374            f.write(f'\tKernel: {system_kernel}\n')
375            f.write(f'\tArchitecture: {system_architecture}\n')
376            f.write('CPU\n')
377            f.write(f'\tName: {cpu_name}\n')
378            f.write(f'\tCores: {cpu_cores}\n')
379            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
380            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
381            f.write('Memory\n')
382            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
383            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
384            f.write('Storage\n')
385            for name, size in partitions.items():
386                f.write(f'\tDisk "{name}": '
387                        f'{round(size / 10 ** 9, 2)} GB\n')
388            f.write('Network\n')
389            for name, stats in network_interfaces.items():
390                speed = stats.speed
391                if speed == 0:
392                    f.write(f'\tInterface "{name}"\n')
393                else:
394                    f.write(f'\tInterface "{name}": {speed} mbps\n')
395
396            f.write('\n')
397            f.write('===> DOCKER <===\n')
398            f.write(f'Version: {docker_info["ServerVersion"]}\n')
399            f.write(f'Root directory: {docker_info["DockerRootDir"]}\n')
400            f.write('Drivers:\n')
401            f.write(f'\tStorage: {docker_info["Driver"]}\n')
402            f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} '
403                    f'v{docker_info["CgroupVersion"]}\n')
404
405        # Set initial metric values and start collection thread
406        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
407        initial_timestamp = time()
408        initial_cpu = ps.cpu_times()
409        initial_ram = ps.virtual_memory().used
410        initial_swap = ps.swap_memory().used
411        initial_disk_io = ps.disk_io_counters()
412        initial_network_io = ps.net_io_counters()
413        self._thread: Thread = Thread(target=_collect_metrics,
414                                      daemon=True,
415                                      args=(self._stop_event,
416                                            metrics_path,
417                                            sample_interval,
418                                            initial_timestamp,
419                                            initial_cpu,
420                                            initial_ram,
421                                            initial_swap,
422                                            initial_disk_io,
423                                            initial_network_io))
424        self._thread.start()
425
426    @property
427    def name(self):
428        """Name of the class: Collector"""
429        return self.__class__.__name__
430
431    def next_step(self):
432        """Increment the step number by one.
433
434        The step number must always be equal or lower than the number of steps
435        in the case.
436        """
437        global step_id
438        step_id += 1
439
440        msg = f'Step ({step_id}) is higher than number of steps ' + \
441              f'({self._number_of_steps})'
442        assert (step_id <= self._number_of_steps), msg
443
444    def stop(self):
445        """End metrics collection.
446
447        Signal the metrics collection thread to stop collecting any metrics.
448        """
449        self._stop_event.set()

Collect metrics samples at a given interval for a run of a case.

Collector( results_run_path: str, sample_interval: float, number_of_steps: int, run_id: int, directory: str, verbose: bool)
251    def __init__(self, results_run_path: str, sample_interval: float,
252                 number_of_steps: int, run_id: int, directory: str,
253                 verbose: bool):
254        """
255        Create an instance of the Collector class.
256
257        Instantiating this class will automatically generate a `case-info.txt`
258        file which describes the hardware used during collection of the
259        metrics. The file describes:
260
261        - **Case**:
262            - Timestamp when started.
263            - Directory of the case.
264            - Number of the run.
265            - Number of steps in a case.
266        - **Hardware**:
267            - CPU name.
268            - Number of CPU cores.
269            - Minimum and maximum CPU core frequency.
270            - Amount of RAM and SWAP memory
271            - Available disk storage.
272            - Available network interfaces and their link speed.
273        - **Docker**:
274            - Version of the Docker daemon
275            - Docker root directory
276            - Docker storage driver
277            - Docker CgroupFS driver and version
278
279        Parameters
280        ----------
281        results_run_path : str
282            Path to the results directory of the run currently being executed.
283        sample_interval : float
284            Sample interval in seconds for collecting metrics.
285        number_of_steps : int
286            The number of steps of the case that is being executed.
287        run_id : int
288            The number of the run that is being executed.
289        directory : str
290            Path to the directory to store logs.
291        verbose : bool
292            Enable verbose logs.
293        """
294
295        self._started: bool = False
296        self._data_path: str = os.path.abspath(results_run_path)
297        self._number_of_steps: int = number_of_steps
298        self._stop_event: Event = Event()
299        self._logger = Logger(__name__, directory, verbose)
300
301        # Only Linux is supported
302        if platform.system() != 'Linux':
303            msg = f'"{platform.system()} is not supported as OS'
304            self._logger.error(msg)
305            raise ValueError(msg)
306
307        # Initialize step ID
308        global step_id
309        step_id = 1
310
311        # System information: OS, kernel, architecture
312        system_hostname = 'UNKNOWN'
313        system_os_name = 'UNKNOWN'
314        system_os_version = 'UNKNOWN'
315        try:
316            system_os_name = platform.freedesktop_os_release()['NAME']
317            system_os_version = platform.freedesktop_os_release()['VERSION']
318        except (OSError, KeyError):
319            self._logger.warning('Cannot extract Freedesktop OS release data')
320        system_hostname = platform.node()
321        system_kernel = platform.platform()
322        system_architecture = platform.uname().machine
323
324        # CPU information: name, max frequency, core count
325        cpu_name = 'UNKNOWN'
326        try:
327            raw = run(['lscpu'], capture_output=True)
328            for line in raw.stdout.decode('utf-8').split('\n'):
329                if 'Model name:' in line:
330                    cpu_name = line.split(':')[1].strip()
331                    break
332        except CalledProcessError as e:
333            self._logger.warning('Unable to determine CPU processor name: '
334                                 f'{e}')
335
336        cpu_cores = ps.cpu_count()
337        cpu_min_freq = ps.cpu_freq().min
338        cpu_max_freq = ps.cpu_freq().max
339
340        # Memory information: RAM total, SWAP total
341        memory_total = ps.virtual_memory().total
342        swap_total = ps.swap_memory().total
343
344        # Disk IO: name
345        partitions: Dict[str, int] = {}
346        for disk in ps.disk_partitions():
347            # Skip Docker's overlayFS
348            if disk.fstype and 'docker' not in disk.mountpoint:
349                total = ps.disk_usage(disk.mountpoint).total
350                partitions[disk.mountpoint] = total
351
352        # Network IO: name, speed, MTU
353        network_interfaces = ps.net_if_stats()
354
355        # Docker daemon: version, storage driver, cgroupfs
356        client = DockerClient()
357        docker_info = client.info()
358        client.close()
359
360        # Write machine information to disk
361        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
362        with open(case_info_file, 'w') as f:
363            f.write('===> CASE <===\n')
364            f.write(f'Timestamp: {datetime.utcnow().isoformat()}\n')
365            f.write(f'Directory: {directory}\n')
366            f.write(f'Run: {run_id}\n')
367            f.write(f'Number of steps: {self._number_of_steps}\n')
368            f.write('\n')
369            f.write('===> HARDWARE <===\n')
370            f.write('System\n')
371            f.write(f'\tHostname: {system_hostname}\n')
372            f.write(f'\tOS name: {system_os_name}\n')
373            f.write(f'\tOS version: {system_os_version}\n')
374            f.write(f'\tKernel: {system_kernel}\n')
375            f.write(f'\tArchitecture: {system_architecture}\n')
376            f.write('CPU\n')
377            f.write(f'\tName: {cpu_name}\n')
378            f.write(f'\tCores: {cpu_cores}\n')
379            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
380            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
381            f.write('Memory\n')
382            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
383            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
384            f.write('Storage\n')
385            for name, size in partitions.items():
386                f.write(f'\tDisk "{name}": '
387                        f'{round(size / 10 ** 9, 2)} GB\n')
388            f.write('Network\n')
389            for name, stats in network_interfaces.items():
390                speed = stats.speed
391                if speed == 0:
392                    f.write(f'\tInterface "{name}"\n')
393                else:
394                    f.write(f'\tInterface "{name}": {speed} mbps\n')
395
396            f.write('\n')
397            f.write('===> DOCKER <===\n')
398            f.write(f'Version: {docker_info["ServerVersion"]}\n')
399            f.write(f'Root directory: {docker_info["DockerRootDir"]}\n')
400            f.write('Drivers:\n')
401            f.write(f'\tStorage: {docker_info["Driver"]}\n')
402            f.write(f'\tCgroupFS: {docker_info["CgroupDriver"]} '
403                    f'v{docker_info["CgroupVersion"]}\n')
404
405        # Set initial metric values and start collection thread
406        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
407        initial_timestamp = time()
408        initial_cpu = ps.cpu_times()
409        initial_ram = ps.virtual_memory().used
410        initial_swap = ps.swap_memory().used
411        initial_disk_io = ps.disk_io_counters()
412        initial_network_io = ps.net_io_counters()
413        self._thread: Thread = Thread(target=_collect_metrics,
414                                      daemon=True,
415                                      args=(self._stop_event,
416                                            metrics_path,
417                                            sample_interval,
418                                            initial_timestamp,
419                                            initial_cpu,
420                                            initial_ram,
421                                            initial_swap,
422                                            initial_disk_io,
423                                            initial_network_io))
424        self._thread.start()

Create an instance of the Collector class.

Instantiating this class will automatically generate a case-info.txt file which describes the hardware used during collection of the metrics. The file describes:

  • Case:
    • Timestamp when started.
    • Directory of the case.
    • Number of the run.
    • Number of steps in a case.
  • Hardware:
    • CPU name.
    • Number of CPU cores.
    • Minimum and maximum CPU core frequency.
    • Amount of RAM and SWAP memory
    • Available disk storage.
    • Available network interfaces and their link speed.
  • Docker:
    • Version of the Docker daemon
    • Docker root directory
    • Docker storage driver
    • Docker CgroupFS driver and version
Parameters
  • results_run_path (str): Path to the results directory of the run currently being executed.
  • sample_interval (float): Sample interval in seconds for collecting metrics.
  • number_of_steps (int): The number of steps of the case that is being executed.
  • run_id (int): The number of the run that is being executed.
  • directory (str): Path to the directory to store logs.
  • verbose (bool): Enable verbose logs.
name

Name of the class: Collector

def next_step(self):
431    def next_step(self):
432        """Increment the step number by one.
433
434        The step number must always be equal or lower than the number of steps
435        in the case.
436        """
437        global step_id
438        step_id += 1
439
440        msg = f'Step ({step_id}) is higher than number of steps ' + \
441              f'({self._number_of_steps})'
442        assert (step_id <= self._number_of_steps), msg

Increment the step number by one.

The step number must always be equal or lower than the number of steps in the case.

def stop(self):
444    def stop(self):
445        """End metrics collection.
446
447        Signal the metrics collection thread to stop collecting any metrics.
448        """
449        self._stop_event.set()

End metrics collection.

Signal the metrics collection thread to stop collecting any metrics.

class scputimes(builtins.tuple):

scputimes()

scputimes()

Create new instance of scputimes()

Inherited Members
builtins.tuple
index
count
class sswap(builtins.tuple):

sswap()

sswap()

Create new instance of sswap()

Inherited Members
builtins.tuple
index
count
class svmem(builtins.tuple):

svmem()

svmem()

Create new instance of svmem()

Inherited Members
builtins.tuple
index
count
class sdiskio(builtins.tuple):

sdiskio()

sdiskio()

Create new instance of sdiskio()

Inherited Members
builtins.tuple
index
count
class snetio(builtins.tuple):

snetio()

snetio()

Create new instance of snetio()

Inherited Members
builtins.tuple
index
count