bench_executor.container

This module holds the Container and ContainerManager classes. The Container class is responsible for abstracting the Docker containers and allow running containers easily and make sure they are initialized before using them. The Containermanager class allows to create container networks, list all running containers and stop them.

  1#!/usr/bin/env python3
  2
  3"""
  4This module holds the Container and ContainerManager classes. The Container
  5class is responsible for abstracting the Docker containers and allow running
  6containers easily and make sure they are initialized before using them.
  7The Containermanager class allows to create container networks, list all
  8running containers and stop them.
  9"""
 10
 11import docker  # type: ignore
 12from time import time, sleep
 13from typing import List, Tuple
 14from bench_executor.logger import Logger
 15
 16WAIT_TIME = 1  # seconds
 17TIMEOUT_TIME = 600  # seconds
 18NETWORK_NAME = 'bench_executor'
 19
 20
 21class ContainerManager():
 22    """Manage containers and networks."""
 23
 24    def __init__(self):
 25        """Creates an instance of the ContainerManager class."""
 26
 27        self._client = docker.from_env()
 28
 29    def __del__(self):
 30        self._client.close()
 31
 32    def list_all(self):
 33        """List all available containers."""
 34        return self._client.containers.list(all=True)
 35
 36    def stop_all(self):
 37        """Stop all containers."""
 38        stopped = False
 39        removed = False
 40
 41        for container in self.list_all():
 42            try:
 43                container.stop()
 44                stopped = True
 45            except docker.errors.APIError:
 46                pass
 47            try:
 48                container.remove()
 49                removed = True
 50            except docker.errors.APIError:
 51                pass
 52            print(f'Container {container.name}: stopped: {stopped} '
 53                  f'removed: {removed}')
 54
 55    def create_network(self, name: str):
 56        """Create a container network.
 57
 58        Parameters
 59        ----------
 60        name : str
 61            Name of the network
 62        """
 63        try:
 64            self._client.networks.get(name)
 65        except docker.errors.NotFound:
 66            self._client.networks.create(name)
 67
 68
 69class Container():
 70    """Container abstracts a Docker container
 71
 72    Abstract how to run a command in a container, start or stop a container,
 73    or retrieve logs. Also allow to wait for a certain log entry to appear or
 74    exit successfully.
 75    """
 76
 77    def __init__(self, container: str, name: str, logger: Logger,
 78                 ports: dict = {}, environment: dict = {},
 79                 volumes: List[str] = []):
 80        """Creates an instance of the Container class.
 81
 82        Parameters
 83        ----------
 84        container : str
 85            Container ID.
 86        name : str
 87            Pretty name of the container.
 88        logger : Logger
 89            Logger class to use for container logs.
 90        ports : dict
 91            Ports mapping of the container onto the host.
 92        volumes : list
 93            Volumes mapping of the container onto the host.
 94        """
 95        self._manager = ContainerManager()
 96        self._client = docker.from_env()
 97        self._container = None
 98        self._container_name = container
 99        self._name = name
100        self._ports = ports
101        self._volumes = volumes
102        self._environment = environment
103        self._proc_pid = None
104        self._long_id = None
105        self._cgroups_mode = None
106        self._cgroups_dir = None
107        self._started = False
108        self._logger = logger
109
110        # create network if not exist
111        self._manager.create_network(NETWORK_NAME)
112
113    def __del__(self):
114        self._client.close()
115
116    @property
117    def started(self) -> bool:
118        """Indicates if the container is already started"""
119        return self._started
120
121    @property
122    def name(self) -> str:
123        """The pretty name of the container"""
124        return self._name
125
126    def run(self, command: str = '', detach=True) -> bool:
127        """Run the container.
128
129        This is used for containers which are long running to provide services
130        such as a database or endpoint.
131
132        Parameters
133        ----------
134        command : str
135            The command to execute in the container, optionally and defaults to
136            no command.
137        detach : bool
138            If the container may run in the background, default True.
139
140        Returns
141        -------
142        success : bool
143            Whether running the container was successfull or not.
144        """
145        try:
146            e = self._environment
147            v = self._volumes
148            self._container = self._client.containers.run(self._container_name,
149                                                          command,
150                                                          name=self._name,
151                                                          detach=detach,
152                                                          ports=self._ports,
153                                                          network=NETWORK_NAME,
154                                                          environment=e,
155                                                          volumes=v)
156            self._started = (self._container is not None)
157            return True
158        except docker.errors.APIError as e:
159            self._logger.error(f'Failed to start container: {e}')
160
161        self._logger.error(f'Starting container "{self._name}" failed!')
162        return False
163
164    def exec(self, command: str) -> Tuple[bool, List[str]]:
165        """Execute a command in the container.
166
167        Parameters
168        ----------
169        command : str
170            The command to execute in the container.
171
172        Returns
173        -------
174        success : bool
175            Whether the command was executed successfully or not.
176        logs : list
177            The logs of the container for executing the command.
178        """
179        logs: List[str] = []
180
181        try:
182            if self._container is None:
183                self._logger.error('Container is not initialized yet')
184                return False, []
185            exit_code, output = self._container.exec_run(command)
186            logs = output.decode()
187            for line in logs.split('\n'):
188                self._logger.debug(line.strip())
189            if exit_code == 0:
190                return True, logs
191        except docker.errors.APIError as e:
192            self._logger.error(f'Failed to execute command: {e}')
193
194        return False, logs
195
196    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
197        """Run the container and wait for a log line to appear.
198
199        This blocks until the container's log contains the `log_line`.
200
201        Parameters
202        ----------
203        log_line : str
204            The log line to wait for in the logs.
205        command : str
206            The command to execute in the container, optionally and defaults to
207            no command.
208
209        Returns
210        -------
211        success : bool
212            Whether the container exited with status code 0 or not.
213        """
214        if not self.run(command):
215            self._logger.error(f'Command "{command}" failed')
216            return False
217
218        if self._container is None:
219            self._logger.error('Container is not initialized yet')
220            return False
221
222        start = time()
223        found_line = False
224        logs = self._container.logs(stream=True, follow=True)
225        lines = []
226        if logs is not None:
227            for line in logs:
228                line = line.decode().strip()
229                lines.append(line)
230                self._logger.debug(line)
231
232                if time() - start > TIMEOUT_TIME:
233                    msg = f'Starting container "{self._name}" timed out!'
234                    self._logger.error(msg)
235                    break
236
237                if log_line in line:
238                    found_line = True
239                    break
240
241            logs.close()
242            if found_line:
243                sleep(WAIT_TIME)
244                return True
245
246        # Logs are collected on success, log them on failure
247        self._logger.error(f'Waiting for container "{self._name}" failed!')
248        for line in lines:
249            self._logger.error(line)
250        return False
251
252    def run_and_wait_for_exit(self, command: str = '') -> bool:
253        """Run the container and wait for exit
254
255        This blocks until the container exit and gives a status code.
256
257        Parameters
258        ----------
259        command : str
260            The command to execute in the container, optionally and defaults to
261            no command.
262
263        Returns
264        -------
265       success : bool
266            Whether the container exited with status code 0 or not.
267        """
268        if not self.run(command):
269            return False
270
271        if self._container is None:
272            self._logger.error('Container is not initialized yet')
273            return False
274
275        status_code = self._container.wait()['StatusCode']
276        logs = self._container.logs(stream=True, follow=True)
277        if logs is not None:
278            for line in logs:
279                line = line.decode().strip()
280                # On success, logs are collected when the container is stopped.
281                if status_code != 0:
282                    self._logger.error(line)
283            logs.close()
284
285        if status_code == 0:
286            return True
287
288        self._logger.error('Command failed while waiting for exit with status '
289                           f'code: {status_code}')
290        return False
291
292    def stop(self) -> bool:
293        """Stop a running container
294
295        Stops the container and removes it, including its volumes.
296
297        Returns
298        -------
299        success : bool
300            Whether stopping the container was successfull or not.
301        """
302        try:
303            if self._container is not None:
304                logs = self._container.logs()
305                if logs is not None:
306                    logs = logs.decode()
307                    for line in logs.split('\n'):
308                        self._logger.debug(line)
309
310                self._container.stop()
311                self._container.remove(v=True)
312            return True
313        # Containers which are already stopped will raise an error which we can
314        # ignore
315        except docker.errors.APIError:
316            pass
317
318        return True
class ContainerManager:
22class ContainerManager():
23    """Manage containers and networks."""
24
25    def __init__(self):
26        """Creates an instance of the ContainerManager class."""
27
28        self._client = docker.from_env()
29
30    def __del__(self):
31        self._client.close()
32
33    def list_all(self):
34        """List all available containers."""
35        return self._client.containers.list(all=True)
36
37    def stop_all(self):
38        """Stop all containers."""
39        stopped = False
40        removed = False
41
42        for container in self.list_all():
43            try:
44                container.stop()
45                stopped = True
46            except docker.errors.APIError:
47                pass
48            try:
49                container.remove()
50                removed = True
51            except docker.errors.APIError:
52                pass
53            print(f'Container {container.name}: stopped: {stopped} '
54                  f'removed: {removed}')
55
56    def create_network(self, name: str):
57        """Create a container network.
58
59        Parameters
60        ----------
61        name : str
62            Name of the network
63        """
64        try:
65            self._client.networks.get(name)
66        except docker.errors.NotFound:
67            self._client.networks.create(name)

Manage containers and networks.

ContainerManager()
25    def __init__(self):
26        """Creates an instance of the ContainerManager class."""
27
28        self._client = docker.from_env()

Creates an instance of the ContainerManager class.

def list_all(self):
33    def list_all(self):
34        """List all available containers."""
35        return self._client.containers.list(all=True)

List all available containers.

def stop_all(self):
37    def stop_all(self):
38        """Stop all containers."""
39        stopped = False
40        removed = False
41
42        for container in self.list_all():
43            try:
44                container.stop()
45                stopped = True
46            except docker.errors.APIError:
47                pass
48            try:
49                container.remove()
50                removed = True
51            except docker.errors.APIError:
52                pass
53            print(f'Container {container.name}: stopped: {stopped} '
54                  f'removed: {removed}')

Stop all containers.

def create_network(self, name: str):
56    def create_network(self, name: str):
57        """Create a container network.
58
59        Parameters
60        ----------
61        name : str
62            Name of the network
63        """
64        try:
65            self._client.networks.get(name)
66        except docker.errors.NotFound:
67            self._client.networks.create(name)

Create a container network.

Parameters
  • name (str): Name of the network
class Container:
 70class Container():
 71    """Container abstracts a Docker container
 72
 73    Abstract how to run a command in a container, start or stop a container,
 74    or retrieve logs. Also allow to wait for a certain log entry to appear or
 75    exit successfully.
 76    """
 77
 78    def __init__(self, container: str, name: str, logger: Logger,
 79                 ports: dict = {}, environment: dict = {},
 80                 volumes: List[str] = []):
 81        """Creates an instance of the Container class.
 82
 83        Parameters
 84        ----------
 85        container : str
 86            Container ID.
 87        name : str
 88            Pretty name of the container.
 89        logger : Logger
 90            Logger class to use for container logs.
 91        ports : dict
 92            Ports mapping of the container onto the host.
 93        volumes : list
 94            Volumes mapping of the container onto the host.
 95        """
 96        self._manager = ContainerManager()
 97        self._client = docker.from_env()
 98        self._container = None
 99        self._container_name = container
100        self._name = name
101        self._ports = ports
102        self._volumes = volumes
103        self._environment = environment
104        self._proc_pid = None
105        self._long_id = None
106        self._cgroups_mode = None
107        self._cgroups_dir = None
108        self._started = False
109        self._logger = logger
110
111        # create network if not exist
112        self._manager.create_network(NETWORK_NAME)
113
114    def __del__(self):
115        self._client.close()
116
117    @property
118    def started(self) -> bool:
119        """Indicates if the container is already started"""
120        return self._started
121
122    @property
123    def name(self) -> str:
124        """The pretty name of the container"""
125        return self._name
126
127    def run(self, command: str = '', detach=True) -> bool:
128        """Run the container.
129
130        This is used for containers which are long running to provide services
131        such as a database or endpoint.
132
133        Parameters
134        ----------
135        command : str
136            The command to execute in the container, optionally and defaults to
137            no command.
138        detach : bool
139            If the container may run in the background, default True.
140
141        Returns
142        -------
143        success : bool
144            Whether running the container was successfull or not.
145        """
146        try:
147            e = self._environment
148            v = self._volumes
149            self._container = self._client.containers.run(self._container_name,
150                                                          command,
151                                                          name=self._name,
152                                                          detach=detach,
153                                                          ports=self._ports,
154                                                          network=NETWORK_NAME,
155                                                          environment=e,
156                                                          volumes=v)
157            self._started = (self._container is not None)
158            return True
159        except docker.errors.APIError as e:
160            self._logger.error(f'Failed to start container: {e}')
161
162        self._logger.error(f'Starting container "{self._name}" failed!')
163        return False
164
165    def exec(self, command: str) -> Tuple[bool, List[str]]:
166        """Execute a command in the container.
167
168        Parameters
169        ----------
170        command : str
171            The command to execute in the container.
172
173        Returns
174        -------
175        success : bool
176            Whether the command was executed successfully or not.
177        logs : list
178            The logs of the container for executing the command.
179        """
180        logs: List[str] = []
181
182        try:
183            if self._container is None:
184                self._logger.error('Container is not initialized yet')
185                return False, []
186            exit_code, output = self._container.exec_run(command)
187            logs = output.decode()
188            for line in logs.split('\n'):
189                self._logger.debug(line.strip())
190            if exit_code == 0:
191                return True, logs
192        except docker.errors.APIError as e:
193            self._logger.error(f'Failed to execute command: {e}')
194
195        return False, logs
196
197    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
198        """Run the container and wait for a log line to appear.
199
200        This blocks until the container's log contains the `log_line`.
201
202        Parameters
203        ----------
204        log_line : str
205            The log line to wait for in the logs.
206        command : str
207            The command to execute in the container, optionally and defaults to
208            no command.
209
210        Returns
211        -------
212        success : bool
213            Whether the container exited with status code 0 or not.
214        """
215        if not self.run(command):
216            self._logger.error(f'Command "{command}" failed')
217            return False
218
219        if self._container is None:
220            self._logger.error('Container is not initialized yet')
221            return False
222
223        start = time()
224        found_line = False
225        logs = self._container.logs(stream=True, follow=True)
226        lines = []
227        if logs is not None:
228            for line in logs:
229                line = line.decode().strip()
230                lines.append(line)
231                self._logger.debug(line)
232
233                if time() - start > TIMEOUT_TIME:
234                    msg = f'Starting container "{self._name}" timed out!'
235                    self._logger.error(msg)
236                    break
237
238                if log_line in line:
239                    found_line = True
240                    break
241
242            logs.close()
243            if found_line:
244                sleep(WAIT_TIME)
245                return True
246
247        # Logs are collected on success, log them on failure
248        self._logger.error(f'Waiting for container "{self._name}" failed!')
249        for line in lines:
250            self._logger.error(line)
251        return False
252
253    def run_and_wait_for_exit(self, command: str = '') -> bool:
254        """Run the container and wait for exit
255
256        This blocks until the container exit and gives a status code.
257
258        Parameters
259        ----------
260        command : str
261            The command to execute in the container, optionally and defaults to
262            no command.
263
264        Returns
265        -------
266       success : bool
267            Whether the container exited with status code 0 or not.
268        """
269        if not self.run(command):
270            return False
271
272        if self._container is None:
273            self._logger.error('Container is not initialized yet')
274            return False
275
276        status_code = self._container.wait()['StatusCode']
277        logs = self._container.logs(stream=True, follow=True)
278        if logs is not None:
279            for line in logs:
280                line = line.decode().strip()
281                # On success, logs are collected when the container is stopped.
282                if status_code != 0:
283                    self._logger.error(line)
284            logs.close()
285
286        if status_code == 0:
287            return True
288
289        self._logger.error('Command failed while waiting for exit with status '
290                           f'code: {status_code}')
291        return False
292
293    def stop(self) -> bool:
294        """Stop a running container
295
296        Stops the container and removes it, including its volumes.
297
298        Returns
299        -------
300        success : bool
301            Whether stopping the container was successfull or not.
302        """
303        try:
304            if self._container is not None:
305                logs = self._container.logs()
306                if logs is not None:
307                    logs = logs.decode()
308                    for line in logs.split('\n'):
309                        self._logger.debug(line)
310
311                self._container.stop()
312                self._container.remove(v=True)
313            return True
314        # Containers which are already stopped will raise an error which we can
315        # ignore
316        except docker.errors.APIError:
317            pass
318
319        return True

Container abstracts a Docker container

Abstract how to run a command in a container, start or stop a container, or retrieve logs. Also allow to wait for a certain log entry to appear or exit successfully.

Container( container: str, name: str, logger: bench_executor.logger.Logger, ports: dict = {}, environment: dict = {}, volumes: List[str] = [])
 78    def __init__(self, container: str, name: str, logger: Logger,
 79                 ports: dict = {}, environment: dict = {},
 80                 volumes: List[str] = []):
 81        """Creates an instance of the Container class.
 82
 83        Parameters
 84        ----------
 85        container : str
 86            Container ID.
 87        name : str
 88            Pretty name of the container.
 89        logger : Logger
 90            Logger class to use for container logs.
 91        ports : dict
 92            Ports mapping of the container onto the host.
 93        volumes : list
 94            Volumes mapping of the container onto the host.
 95        """
 96        self._manager = ContainerManager()
 97        self._client = docker.from_env()
 98        self._container = None
 99        self._container_name = container
100        self._name = name
101        self._ports = ports
102        self._volumes = volumes
103        self._environment = environment
104        self._proc_pid = None
105        self._long_id = None
106        self._cgroups_mode = None
107        self._cgroups_dir = None
108        self._started = False
109        self._logger = logger
110
111        # create network if not exist
112        self._manager.create_network(NETWORK_NAME)

Creates an instance of the Container class.

Parameters
  • container (str): Container ID.
  • name (str): Pretty name of the container.
  • logger (Logger): Logger class to use for container logs.
  • ports (dict): Ports mapping of the container onto the host.
  • volumes (list): Volumes mapping of the container onto the host.
started: bool

Indicates if the container is already started

name: str

The pretty name of the container

def run(self, command: str = '', detach=True) -> bool:
127    def run(self, command: str = '', detach=True) -> bool:
128        """Run the container.
129
130        This is used for containers which are long running to provide services
131        such as a database or endpoint.
132
133        Parameters
134        ----------
135        command : str
136            The command to execute in the container, optionally and defaults to
137            no command.
138        detach : bool
139            If the container may run in the background, default True.
140
141        Returns
142        -------
143        success : bool
144            Whether running the container was successfull or not.
145        """
146        try:
147            e = self._environment
148            v = self._volumes
149            self._container = self._client.containers.run(self._container_name,
150                                                          command,
151                                                          name=self._name,
152                                                          detach=detach,
153                                                          ports=self._ports,
154                                                          network=NETWORK_NAME,
155                                                          environment=e,
156                                                          volumes=v)
157            self._started = (self._container is not None)
158            return True
159        except docker.errors.APIError as e:
160            self._logger.error(f'Failed to start container: {e}')
161
162        self._logger.error(f'Starting container "{self._name}" failed!')
163        return False

Run the container.

This is used for containers which are long running to provide services such as a database or endpoint.

Parameters
  • command (str): The command to execute in the container, optionally and defaults to no command.
  • detach (bool): If the container may run in the background, default True.
Returns
  • success (bool): Whether running the container was successfull or not.
def exec(self, command: str) -> Tuple[bool, List[str]]:
165    def exec(self, command: str) -> Tuple[bool, List[str]]:
166        """Execute a command in the container.
167
168        Parameters
169        ----------
170        command : str
171            The command to execute in the container.
172
173        Returns
174        -------
175        success : bool
176            Whether the command was executed successfully or not.
177        logs : list
178            The logs of the container for executing the command.
179        """
180        logs: List[str] = []
181
182        try:
183            if self._container is None:
184                self._logger.error('Container is not initialized yet')
185                return False, []
186            exit_code, output = self._container.exec_run(command)
187            logs = output.decode()
188            for line in logs.split('\n'):
189                self._logger.debug(line.strip())
190            if exit_code == 0:
191                return True, logs
192        except docker.errors.APIError as e:
193            self._logger.error(f'Failed to execute command: {e}')
194
195        return False, logs

Execute a command in the container.

Parameters
  • command (str): The command to execute in the container.
Returns
  • success (bool): Whether the command was executed successfully or not.
  • logs (list): The logs of the container for executing the command.
def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
197    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
198        """Run the container and wait for a log line to appear.
199
200        This blocks until the container's log contains the `log_line`.
201
202        Parameters
203        ----------
204        log_line : str
205            The log line to wait for in the logs.
206        command : str
207            The command to execute in the container, optionally and defaults to
208            no command.
209
210        Returns
211        -------
212        success : bool
213            Whether the container exited with status code 0 or not.
214        """
215        if not self.run(command):
216            self._logger.error(f'Command "{command}" failed')
217            return False
218
219        if self._container is None:
220            self._logger.error('Container is not initialized yet')
221            return False
222
223        start = time()
224        found_line = False
225        logs = self._container.logs(stream=True, follow=True)
226        lines = []
227        if logs is not None:
228            for line in logs:
229                line = line.decode().strip()
230                lines.append(line)
231                self._logger.debug(line)
232
233                if time() - start > TIMEOUT_TIME:
234                    msg = f'Starting container "{self._name}" timed out!'
235                    self._logger.error(msg)
236                    break
237
238                if log_line in line:
239                    found_line = True
240                    break
241
242            logs.close()
243            if found_line:
244                sleep(WAIT_TIME)
245                return True
246
247        # Logs are collected on success, log them on failure
248        self._logger.error(f'Waiting for container "{self._name}" failed!')
249        for line in lines:
250            self._logger.error(line)
251        return False

Run the container and wait for a log line to appear.

This blocks until the container's log contains the log_line.

Parameters
  • log_line (str): The log line to wait for in the logs.
  • command (str): The command to execute in the container, optionally and defaults to no command.
Returns
  • success (bool): Whether the container exited with status code 0 or not.
def run_and_wait_for_exit(self, command: str = '') -> bool:
253    def run_and_wait_for_exit(self, command: str = '') -> bool:
254        """Run the container and wait for exit
255
256        This blocks until the container exit and gives a status code.
257
258        Parameters
259        ----------
260        command : str
261            The command to execute in the container, optionally and defaults to
262            no command.
263
264        Returns
265        -------
266       success : bool
267            Whether the container exited with status code 0 or not.
268        """
269        if not self.run(command):
270            return False
271
272        if self._container is None:
273            self._logger.error('Container is not initialized yet')
274            return False
275
276        status_code = self._container.wait()['StatusCode']
277        logs = self._container.logs(stream=True, follow=True)
278        if logs is not None:
279            for line in logs:
280                line = line.decode().strip()
281                # On success, logs are collected when the container is stopped.
282                if status_code != 0:
283                    self._logger.error(line)
284            logs.close()
285
286        if status_code == 0:
287            return True
288
289        self._logger.error('Command failed while waiting for exit with status '
290                           f'code: {status_code}')
291        return False

Run the container and wait for exit

This blocks until the container exit and gives a status code.

Parameters


command : str The command to execute in the container, optionally and defaults to no command.

Returns


success : bool Whether the container exited with status code 0 or not.

def stop(self) -> bool:
293    def stop(self) -> bool:
294        """Stop a running container
295
296        Stops the container and removes it, including its volumes.
297
298        Returns
299        -------
300        success : bool
301            Whether stopping the container was successfull or not.
302        """
303        try:
304            if self._container is not None:
305                logs = self._container.logs()
306                if logs is not None:
307                    logs = logs.decode()
308                    for line in logs.split('\n'):
309                        self._logger.debug(line)
310
311                self._container.stop()
312                self._container.remove(v=True)
313            return True
314        # Containers which are already stopped will raise an error which we can
315        # ignore
316        except docker.errors.APIError:
317            pass
318
319        return True

Stop a running container

Stops the container and removes it, including its volumes.

Returns
  • success (bool): Whether stopping the container was successfull or not.