bench_executor.container
This module holds the Container and ContainerManager classes. The Container class is responsible for abstracting the Docker containers and allow running containers easily and make sure they are initialized before using them. The Containermanager class allows to create container networks, list all running containers and stop them.
1#!/usr/bin/env python3 2 3""" 4This module holds the Container and ContainerManager classes. The Container 5class is responsible for abstracting the Docker containers and allow running 6containers easily and make sure they are initialized before using them. 7The Containermanager class allows to create container networks, list all 8running containers and stop them. 9""" 10 11import docker # type: ignore 12from time import time, sleep 13from typing import List, Tuple 14from bench_executor.logger import Logger 15 16WAIT_TIME = 1 # seconds 17TIMEOUT_TIME = 600 # seconds 18NETWORK_NAME = 'bench_executor' 19 20 21class ContainerManager(): 22 """Manage containers and networks.""" 23 24 def __init__(self): 25 """Creates an instance of the ContainerManager class.""" 26 27 self._client = docker.from_env() 28 29 def __del__(self): 30 self._client.close() 31 32 def list_all(self): 33 """List all available containers.""" 34 return self._client.containers.list(all=True) 35 36 def stop_all(self): 37 """Stop all containers.""" 38 stopped = False 39 removed = False 40 41 for container in self.list_all(): 42 try: 43 container.stop() 44 stopped = True 45 except docker.errors.APIError: 46 pass 47 try: 48 container.remove() 49 removed = True 50 except docker.errors.APIError: 51 pass 52 print(f'Container {container.name}: stopped: {stopped} ' 53 f'removed: {removed}') 54 55 def create_network(self, name: str): 56 """Create a container network. 57 58 Parameters 59 ---------- 60 name : str 61 Name of the network 62 """ 63 try: 64 self._client.networks.get(name) 65 except docker.errors.NotFound: 66 self._client.networks.create(name) 67 68 69class Container(): 70 """Container abstracts a Docker container 71 72 Abstract how to run a command in a container, start or stop a container, 73 or retrieve logs. Also allow to wait for a certain log entry to appear or 74 exit successfully. 75 """ 76 77 def __init__(self, container: str, name: str, logger: Logger, 78 ports: dict = {}, environment: dict = {}, 79 volumes: List[str] = []): 80 """Creates an instance of the Container class. 81 82 Parameters 83 ---------- 84 container : str 85 Container ID. 86 name : str 87 Pretty name of the container. 88 logger : Logger 89 Logger class to use for container logs. 90 ports : dict 91 Ports mapping of the container onto the host. 92 volumes : list 93 Volumes mapping of the container onto the host. 94 """ 95 self._manager = ContainerManager() 96 self._client = docker.from_env() 97 self._container = None 98 self._container_name = container 99 self._name = name 100 self._ports = ports 101 self._volumes = volumes 102 self._environment = environment 103 self._proc_pid = None 104 self._long_id = None 105 self._cgroups_mode = None 106 self._cgroups_dir = None 107 self._started = False 108 self._logger = logger 109 110 # create network if not exist 111 self._manager.create_network(NETWORK_NAME) 112 113 def __del__(self): 114 self._client.close() 115 116 @property 117 def started(self) -> bool: 118 """Indicates if the container is already started""" 119 return self._started 120 121 @property 122 def name(self) -> str: 123 """The pretty name of the container""" 124 return self._name 125 126 def run(self, command: str = '', detach=True) -> bool: 127 """Run the container. 128 129 This is used for containers which are long running to provide services 130 such as a database or endpoint. 131 132 Parameters 133 ---------- 134 command : str 135 The command to execute in the container, optionally and defaults to 136 no command. 137 detach : bool 138 If the container may run in the background, default True. 139 140 Returns 141 ------- 142 success : bool 143 Whether running the container was successfull or not. 144 """ 145 try: 146 e = self._environment 147 v = self._volumes 148 self._container = self._client.containers.run(self._container_name, 149 command, 150 name=self._name, 151 detach=detach, 152 ports=self._ports, 153 network=NETWORK_NAME, 154 environment=e, 155 volumes=v) 156 self._started = (self._container is not None) 157 return True 158 except docker.errors.APIError as e: 159 self._logger.error(f'Failed to start container: {e}') 160 161 self._logger.error(f'Starting container "{self._name}" failed!') 162 return False 163 164 def exec(self, command: str) -> Tuple[bool, List[str]]: 165 """Execute a command in the container. 166 167 Parameters 168 ---------- 169 command : str 170 The command to execute in the container. 171 172 Returns 173 ------- 174 success : bool 175 Whether the command was executed successfully or not. 176 logs : list 177 The logs of the container for executing the command. 178 """ 179 logs: List[str] = [] 180 181 try: 182 if self._container is None: 183 self._logger.error('Container is not initialized yet') 184 return False, [] 185 exit_code, output = self._container.exec_run(command) 186 logs = output.decode() 187 for line in logs.split('\n'): 188 self._logger.debug(line.strip()) 189 if exit_code == 0: 190 return True, logs 191 except docker.errors.APIError as e: 192 self._logger.error(f'Failed to execute command: {e}') 193 194 return False, logs 195 196 def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: 197 """Run the container and wait for a log line to appear. 198 199 This blocks until the container's log contains the `log_line`. 200 201 Parameters 202 ---------- 203 log_line : str 204 The log line to wait for in the logs. 205 command : str 206 The command to execute in the container, optionally and defaults to 207 no command. 208 209 Returns 210 ------- 211 success : bool 212 Whether the container exited with status code 0 or not. 213 """ 214 if not self.run(command): 215 self._logger.error(f'Command "{command}" failed') 216 return False 217 218 if self._container is None: 219 self._logger.error('Container is not initialized yet') 220 return False 221 222 start = time() 223 found_line = False 224 logs = self._container.logs(stream=True, follow=True) 225 lines = [] 226 if logs is not None: 227 for line in logs: 228 line = line.decode().strip() 229 lines.append(line) 230 self._logger.debug(line) 231 232 if time() - start > TIMEOUT_TIME: 233 msg = f'Starting container "{self._name}" timed out!' 234 self._logger.error(msg) 235 break 236 237 if log_line in line: 238 found_line = True 239 break 240 241 logs.close() 242 if found_line: 243 sleep(WAIT_TIME) 244 return True 245 246 # Logs are collected on success, log them on failure 247 self._logger.error(f'Waiting for container "{self._name}" failed!') 248 for line in lines: 249 self._logger.error(line) 250 return False 251 252 def run_and_wait_for_exit(self, command: str = '') -> bool: 253 """Run the container and wait for exit 254 255 This blocks until the container exit and gives a status code. 256 257 Parameters 258 ---------- 259 command : str 260 The command to execute in the container, optionally and defaults to 261 no command. 262 263 Returns 264 ------- 265 success : bool 266 Whether the container exited with status code 0 or not. 267 """ 268 if not self.run(command): 269 return False 270 271 if self._container is None: 272 self._logger.error('Container is not initialized yet') 273 return False 274 275 status_code = self._container.wait()['StatusCode'] 276 logs = self._container.logs(stream=True, follow=True) 277 if logs is not None: 278 for line in logs: 279 line = line.decode().strip() 280 # On success, logs are collected when the container is stopped. 281 if status_code != 0: 282 self._logger.error(line) 283 logs.close() 284 285 if status_code == 0: 286 return True 287 288 self._logger.error('Command failed while waiting for exit with status ' 289 f'code: {status_code}') 290 return False 291 292 def stop(self) -> bool: 293 """Stop a running container 294 295 Stops the container and removes it, including its volumes. 296 297 Returns 298 ------- 299 success : bool 300 Whether stopping the container was successfull or not. 301 """ 302 try: 303 if self._container is not None: 304 logs = self._container.logs() 305 if logs is not None: 306 logs = logs.decode() 307 for line in logs.split('\n'): 308 self._logger.debug(line) 309 310 self._container.stop() 311 self._container.remove(v=True) 312 return True 313 # Containers which are already stopped will raise an error which we can 314 # ignore 315 except docker.errors.APIError: 316 pass 317 318 return True
22class ContainerManager(): 23 """Manage containers and networks.""" 24 25 def __init__(self): 26 """Creates an instance of the ContainerManager class.""" 27 28 self._client = docker.from_env() 29 30 def __del__(self): 31 self._client.close() 32 33 def list_all(self): 34 """List all available containers.""" 35 return self._client.containers.list(all=True) 36 37 def stop_all(self): 38 """Stop all containers.""" 39 stopped = False 40 removed = False 41 42 for container in self.list_all(): 43 try: 44 container.stop() 45 stopped = True 46 except docker.errors.APIError: 47 pass 48 try: 49 container.remove() 50 removed = True 51 except docker.errors.APIError: 52 pass 53 print(f'Container {container.name}: stopped: {stopped} ' 54 f'removed: {removed}') 55 56 def create_network(self, name: str): 57 """Create a container network. 58 59 Parameters 60 ---------- 61 name : str 62 Name of the network 63 """ 64 try: 65 self._client.networks.get(name) 66 except docker.errors.NotFound: 67 self._client.networks.create(name)
Manage containers and networks.
25 def __init__(self): 26 """Creates an instance of the ContainerManager class.""" 27 28 self._client = docker.from_env()
Creates an instance of the ContainerManager class.
33 def list_all(self): 34 """List all available containers.""" 35 return self._client.containers.list(all=True)
List all available containers.
37 def stop_all(self): 38 """Stop all containers.""" 39 stopped = False 40 removed = False 41 42 for container in self.list_all(): 43 try: 44 container.stop() 45 stopped = True 46 except docker.errors.APIError: 47 pass 48 try: 49 container.remove() 50 removed = True 51 except docker.errors.APIError: 52 pass 53 print(f'Container {container.name}: stopped: {stopped} ' 54 f'removed: {removed}')
Stop all containers.
56 def create_network(self, name: str): 57 """Create a container network. 58 59 Parameters 60 ---------- 61 name : str 62 Name of the network 63 """ 64 try: 65 self._client.networks.get(name) 66 except docker.errors.NotFound: 67 self._client.networks.create(name)
Create a container network.
Parameters
- name (str): Name of the network
70class Container(): 71 """Container abstracts a Docker container 72 73 Abstract how to run a command in a container, start or stop a container, 74 or retrieve logs. Also allow to wait for a certain log entry to appear or 75 exit successfully. 76 """ 77 78 def __init__(self, container: str, name: str, logger: Logger, 79 ports: dict = {}, environment: dict = {}, 80 volumes: List[str] = []): 81 """Creates an instance of the Container class. 82 83 Parameters 84 ---------- 85 container : str 86 Container ID. 87 name : str 88 Pretty name of the container. 89 logger : Logger 90 Logger class to use for container logs. 91 ports : dict 92 Ports mapping of the container onto the host. 93 volumes : list 94 Volumes mapping of the container onto the host. 95 """ 96 self._manager = ContainerManager() 97 self._client = docker.from_env() 98 self._container = None 99 self._container_name = container 100 self._name = name 101 self._ports = ports 102 self._volumes = volumes 103 self._environment = environment 104 self._proc_pid = None 105 self._long_id = None 106 self._cgroups_mode = None 107 self._cgroups_dir = None 108 self._started = False 109 self._logger = logger 110 111 # create network if not exist 112 self._manager.create_network(NETWORK_NAME) 113 114 def __del__(self): 115 self._client.close() 116 117 @property 118 def started(self) -> bool: 119 """Indicates if the container is already started""" 120 return self._started 121 122 @property 123 def name(self) -> str: 124 """The pretty name of the container""" 125 return self._name 126 127 def run(self, command: str = '', detach=True) -> bool: 128 """Run the container. 129 130 This is used for containers which are long running to provide services 131 such as a database or endpoint. 132 133 Parameters 134 ---------- 135 command : str 136 The command to execute in the container, optionally and defaults to 137 no command. 138 detach : bool 139 If the container may run in the background, default True. 140 141 Returns 142 ------- 143 success : bool 144 Whether running the container was successfull or not. 145 """ 146 try: 147 e = self._environment 148 v = self._volumes 149 self._container = self._client.containers.run(self._container_name, 150 command, 151 name=self._name, 152 detach=detach, 153 ports=self._ports, 154 network=NETWORK_NAME, 155 environment=e, 156 volumes=v) 157 self._started = (self._container is not None) 158 return True 159 except docker.errors.APIError as e: 160 self._logger.error(f'Failed to start container: {e}') 161 162 self._logger.error(f'Starting container "{self._name}" failed!') 163 return False 164 165 def exec(self, command: str) -> Tuple[bool, List[str]]: 166 """Execute a command in the container. 167 168 Parameters 169 ---------- 170 command : str 171 The command to execute in the container. 172 173 Returns 174 ------- 175 success : bool 176 Whether the command was executed successfully or not. 177 logs : list 178 The logs of the container for executing the command. 179 """ 180 logs: List[str] = [] 181 182 try: 183 if self._container is None: 184 self._logger.error('Container is not initialized yet') 185 return False, [] 186 exit_code, output = self._container.exec_run(command) 187 logs = output.decode() 188 for line in logs.split('\n'): 189 self._logger.debug(line.strip()) 190 if exit_code == 0: 191 return True, logs 192 except docker.errors.APIError as e: 193 self._logger.error(f'Failed to execute command: {e}') 194 195 return False, logs 196 197 def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: 198 """Run the container and wait for a log line to appear. 199 200 This blocks until the container's log contains the `log_line`. 201 202 Parameters 203 ---------- 204 log_line : str 205 The log line to wait for in the logs. 206 command : str 207 The command to execute in the container, optionally and defaults to 208 no command. 209 210 Returns 211 ------- 212 success : bool 213 Whether the container exited with status code 0 or not. 214 """ 215 if not self.run(command): 216 self._logger.error(f'Command "{command}" failed') 217 return False 218 219 if self._container is None: 220 self._logger.error('Container is not initialized yet') 221 return False 222 223 start = time() 224 found_line = False 225 logs = self._container.logs(stream=True, follow=True) 226 lines = [] 227 if logs is not None: 228 for line in logs: 229 line = line.decode().strip() 230 lines.append(line) 231 self._logger.debug(line) 232 233 if time() - start > TIMEOUT_TIME: 234 msg = f'Starting container "{self._name}" timed out!' 235 self._logger.error(msg) 236 break 237 238 if log_line in line: 239 found_line = True 240 break 241 242 logs.close() 243 if found_line: 244 sleep(WAIT_TIME) 245 return True 246 247 # Logs are collected on success, log them on failure 248 self._logger.error(f'Waiting for container "{self._name}" failed!') 249 for line in lines: 250 self._logger.error(line) 251 return False 252 253 def run_and_wait_for_exit(self, command: str = '') -> bool: 254 """Run the container and wait for exit 255 256 This blocks until the container exit and gives a status code. 257 258 Parameters 259 ---------- 260 command : str 261 The command to execute in the container, optionally and defaults to 262 no command. 263 264 Returns 265 ------- 266 success : bool 267 Whether the container exited with status code 0 or not. 268 """ 269 if not self.run(command): 270 return False 271 272 if self._container is None: 273 self._logger.error('Container is not initialized yet') 274 return False 275 276 status_code = self._container.wait()['StatusCode'] 277 logs = self._container.logs(stream=True, follow=True) 278 if logs is not None: 279 for line in logs: 280 line = line.decode().strip() 281 # On success, logs are collected when the container is stopped. 282 if status_code != 0: 283 self._logger.error(line) 284 logs.close() 285 286 if status_code == 0: 287 return True 288 289 self._logger.error('Command failed while waiting for exit with status ' 290 f'code: {status_code}') 291 return False 292 293 def stop(self) -> bool: 294 """Stop a running container 295 296 Stops the container and removes it, including its volumes. 297 298 Returns 299 ------- 300 success : bool 301 Whether stopping the container was successfull or not. 302 """ 303 try: 304 if self._container is not None: 305 logs = self._container.logs() 306 if logs is not None: 307 logs = logs.decode() 308 for line in logs.split('\n'): 309 self._logger.debug(line) 310 311 self._container.stop() 312 self._container.remove(v=True) 313 return True 314 # Containers which are already stopped will raise an error which we can 315 # ignore 316 except docker.errors.APIError: 317 pass 318 319 return True
Container abstracts a Docker container
Abstract how to run a command in a container, start or stop a container, or retrieve logs. Also allow to wait for a certain log entry to appear or exit successfully.
78 def __init__(self, container: str, name: str, logger: Logger, 79 ports: dict = {}, environment: dict = {}, 80 volumes: List[str] = []): 81 """Creates an instance of the Container class. 82 83 Parameters 84 ---------- 85 container : str 86 Container ID. 87 name : str 88 Pretty name of the container. 89 logger : Logger 90 Logger class to use for container logs. 91 ports : dict 92 Ports mapping of the container onto the host. 93 volumes : list 94 Volumes mapping of the container onto the host. 95 """ 96 self._manager = ContainerManager() 97 self._client = docker.from_env() 98 self._container = None 99 self._container_name = container 100 self._name = name 101 self._ports = ports 102 self._volumes = volumes 103 self._environment = environment 104 self._proc_pid = None 105 self._long_id = None 106 self._cgroups_mode = None 107 self._cgroups_dir = None 108 self._started = False 109 self._logger = logger 110 111 # create network if not exist 112 self._manager.create_network(NETWORK_NAME)
Creates an instance of the Container class.
Parameters
- container (str): Container ID.
- name (str): Pretty name of the container.
- logger (Logger): Logger class to use for container logs.
- ports (dict): Ports mapping of the container onto the host.
- volumes (list): Volumes mapping of the container onto the host.
127 def run(self, command: str = '', detach=True) -> bool: 128 """Run the container. 129 130 This is used for containers which are long running to provide services 131 such as a database or endpoint. 132 133 Parameters 134 ---------- 135 command : str 136 The command to execute in the container, optionally and defaults to 137 no command. 138 detach : bool 139 If the container may run in the background, default True. 140 141 Returns 142 ------- 143 success : bool 144 Whether running the container was successfull or not. 145 """ 146 try: 147 e = self._environment 148 v = self._volumes 149 self._container = self._client.containers.run(self._container_name, 150 command, 151 name=self._name, 152 detach=detach, 153 ports=self._ports, 154 network=NETWORK_NAME, 155 environment=e, 156 volumes=v) 157 self._started = (self._container is not None) 158 return True 159 except docker.errors.APIError as e: 160 self._logger.error(f'Failed to start container: {e}') 161 162 self._logger.error(f'Starting container "{self._name}" failed!') 163 return False
Run the container.
This is used for containers which are long running to provide services such as a database or endpoint.
Parameters
- command (str): The command to execute in the container, optionally and defaults to no command.
- detach (bool): If the container may run in the background, default True.
Returns
- success (bool): Whether running the container was successfull or not.
165 def exec(self, command: str) -> Tuple[bool, List[str]]: 166 """Execute a command in the container. 167 168 Parameters 169 ---------- 170 command : str 171 The command to execute in the container. 172 173 Returns 174 ------- 175 success : bool 176 Whether the command was executed successfully or not. 177 logs : list 178 The logs of the container for executing the command. 179 """ 180 logs: List[str] = [] 181 182 try: 183 if self._container is None: 184 self._logger.error('Container is not initialized yet') 185 return False, [] 186 exit_code, output = self._container.exec_run(command) 187 logs = output.decode() 188 for line in logs.split('\n'): 189 self._logger.debug(line.strip()) 190 if exit_code == 0: 191 return True, logs 192 except docker.errors.APIError as e: 193 self._logger.error(f'Failed to execute command: {e}') 194 195 return False, logs
Execute a command in the container.
Parameters
- command (str): The command to execute in the container.
Returns
- success (bool): Whether the command was executed successfully or not.
- logs (list): The logs of the container for executing the command.
197 def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool: 198 """Run the container and wait for a log line to appear. 199 200 This blocks until the container's log contains the `log_line`. 201 202 Parameters 203 ---------- 204 log_line : str 205 The log line to wait for in the logs. 206 command : str 207 The command to execute in the container, optionally and defaults to 208 no command. 209 210 Returns 211 ------- 212 success : bool 213 Whether the container exited with status code 0 or not. 214 """ 215 if not self.run(command): 216 self._logger.error(f'Command "{command}" failed') 217 return False 218 219 if self._container is None: 220 self._logger.error('Container is not initialized yet') 221 return False 222 223 start = time() 224 found_line = False 225 logs = self._container.logs(stream=True, follow=True) 226 lines = [] 227 if logs is not None: 228 for line in logs: 229 line = line.decode().strip() 230 lines.append(line) 231 self._logger.debug(line) 232 233 if time() - start > TIMEOUT_TIME: 234 msg = f'Starting container "{self._name}" timed out!' 235 self._logger.error(msg) 236 break 237 238 if log_line in line: 239 found_line = True 240 break 241 242 logs.close() 243 if found_line: 244 sleep(WAIT_TIME) 245 return True 246 247 # Logs are collected on success, log them on failure 248 self._logger.error(f'Waiting for container "{self._name}" failed!') 249 for line in lines: 250 self._logger.error(line) 251 return False
Run the container and wait for a log line to appear.
This blocks until the container's log contains the log_line
.
Parameters
- log_line (str): The log line to wait for in the logs.
- command (str): The command to execute in the container, optionally and defaults to no command.
Returns
- success (bool): Whether the container exited with status code 0 or not.
253 def run_and_wait_for_exit(self, command: str = '') -> bool: 254 """Run the container and wait for exit 255 256 This blocks until the container exit and gives a status code. 257 258 Parameters 259 ---------- 260 command : str 261 The command to execute in the container, optionally and defaults to 262 no command. 263 264 Returns 265 ------- 266 success : bool 267 Whether the container exited with status code 0 or not. 268 """ 269 if not self.run(command): 270 return False 271 272 if self._container is None: 273 self._logger.error('Container is not initialized yet') 274 return False 275 276 status_code = self._container.wait()['StatusCode'] 277 logs = self._container.logs(stream=True, follow=True) 278 if logs is not None: 279 for line in logs: 280 line = line.decode().strip() 281 # On success, logs are collected when the container is stopped. 282 if status_code != 0: 283 self._logger.error(line) 284 logs.close() 285 286 if status_code == 0: 287 return True 288 289 self._logger.error('Command failed while waiting for exit with status ' 290 f'code: {status_code}') 291 return False
Run the container and wait for exit
This blocks until the container exit and gives a status code.
Parameters
command : str The command to execute in the container, optionally and defaults to no command.
Returns
success : bool Whether the container exited with status code 0 or not.
293 def stop(self) -> bool: 294 """Stop a running container 295 296 Stops the container and removes it, including its volumes. 297 298 Returns 299 ------- 300 success : bool 301 Whether stopping the container was successfull or not. 302 """ 303 try: 304 if self._container is not None: 305 logs = self._container.logs() 306 if logs is not None: 307 logs = logs.decode() 308 for line in logs.split('\n'): 309 self._logger.debug(line) 310 311 self._container.stop() 312 self._container.remove(v=True) 313 return True 314 # Containers which are already stopped will raise an error which we can 315 # ignore 316 except docker.errors.APIError: 317 pass 318 319 return True
Stop a running container
Stops the container and removes it, including its volumes.
Returns
- success (bool): Whether stopping the container was successfull or not.