bench_executor.postgresql

PostgreSQL is an open-source relational database developed by The PostgreSQL Global Development Group.

Website: https://www.postgresql.org/
Repository: https://git.postgresql.org/gitweb/?p=postgresql.git

  1#!/usr/bin/env python3
  2
  3"""
  4PostgreSQL is an open-source relational database developed by The PostgreSQL
  5Global Development Group.
  6
  7**Website**: https://www.postgresql.org/<br>
  8**Repository**: https://git.postgresql.org/gitweb/?p=postgresql.git
  9"""
 10
 11import os
 12import psycopg2
 13import tempfile
 14from csv import reader
 15from time import sleep
 16from typing import List, Tuple
 17from timeout_decorator import timeout, TimeoutError  # type: ignore
 18from bench_executor.container import Container
 19from bench_executor.logger import Logger
 20
 21VERSION = '14.5'
 22HOST = 'localhost'
 23USER = 'root'
 24PASSWORD = 'root'
 25DB = 'db'
 26PORT = '5432'
 27WAIT_TIME = 3
 28CLEAR_TABLES_TIMEOUT = 5 * 60  # 5 minutes
 29
 30
 31class PostgreSQL(Container):
 32    """PostgreSQL container for executing SQL queries"""
 33    def __init__(self, data_path: str, config_path: str, directory: str,
 34                 verbose: bool):
 35        """Creates an instance of the PostgreSQL class.
 36
 37        Parameters
 38        ----------
 39        data_path : str
 40            Path to the data directory of the case.
 41        config_path : str
 42            Path to the config directory of the case.
 43        directory : str
 44            Path to the directory to store logs.
 45        verbose : bool
 46            Enable verbose logs.
 47        """
 48        self._data_path = os.path.abspath(data_path)
 49        self._config_path = os.path.abspath(config_path)
 50        self._logger = Logger(__name__, directory, verbose)
 51
 52        tmp_dir = os.path.join(tempfile.gettempdir(), 'postgresql')
 53        os.umask(0)
 54        os.makedirs(tmp_dir, exist_ok=True)
 55        os.makedirs(os.path.join(self._data_path, 'postgresql'), exist_ok=True)
 56        self._tables: List[str] = []
 57
 58        super().__init__(f'blindreviewing/postgresql:v{VERSION}', 'PostgreSQL',
 59                         self._logger,
 60                         ports={PORT: PORT},
 61                         environment={'POSTGRES_PASSWORD': PASSWORD,
 62                                      'POSTGRES_USER': USER,
 63                                      'POSTGRES_DB': DB,
 64                                      'PGPASSWORD': PASSWORD,
 65                                      'POSTGRES_HOST_AUTH_METHOD': 'trust'},
 66                         volumes=[f'{self._data_path}/shared:/data/shared',
 67                                  f'{tmp_dir}:/var/lib/postgresql/data'])
 68
 69    def initialization(self) -> bool:
 70        """Initialize PostgreSQL's database.
 71
 72        Returns
 73        -------
 74        success : bool
 75            Whether the initialization was successfull or not.
 76        """
 77        # PostgreSQL should start with a initialized database, start PostgreSQL
 78        # if not initialized to avoid the pre-run start during benchmark
 79        # execution
 80        success = self.wait_until_ready()
 81        if not success:
 82            self._logger.error(f'Failed to initialize {__name__}')
 83            return False
 84        success = self.stop()
 85
 86        return success
 87
 88    @property
 89    def root_mount_directory(self) -> str:
 90        """Subdirectory in the root directory of the case for PostgreSQL.
 91
 92        Returns
 93        -------
 94        subdirectory : str
 95            Subdirectory of the root directory for PostgreSQL.
 96        """
 97        return __name__.lower()
 98
 99    def wait_until_ready(self, command: str = '') -> bool:
100        """Wait until PostgreSQL is ready to execute SQL queries.
101
102        Parameters
103        ----------
104        command : str
105            Command to execute in the PostgreSQL container, optionally,
106            defaults to no command.
107
108        Returns
109        -------
110        success : bool
111            Whether the PostgreSQL was initialized successfull or not.
112        """
113        success = self.run_and_wait_for_log(f'port {PORT}', command=command)
114        if success:
115            sleep(WAIT_TIME)
116        else:
117            msg = f'Failed to wait for {__name__} to become ready'
118            self._logger.error(msg)
119
120        return success
121
122    def load(self, csv_file: str, table: str) -> bool:
123        """Load a single CSV file into PostgreSQL.
124
125        Parameters
126        ----------
127        csv_file : str
128            Name of the CSV file.
129        table : str
130            Name of the table.
131
132        Returns
133        -------
134        success : bool
135            Whether the execution was successfull or not.
136        """
137        return self._load_csv(csv_file, table, True)
138
139    def load_multiple(self, csv_files: List[dict]) -> bool:
140        """Load multiple CSV files into PostgreSQL.
141
142        Parameters
143        ----------
144        csv_files : list
145            List of CSV files to load. Each entry consist of a `file` and
146            `table` key.
147
148        Returns
149        -------
150        success : bool
151            Whether the execution was successfull or not.
152        """
153        for entry in csv_files:
154            if not self._load_csv(entry['file'], entry['table'], True):
155                return False
156        return True
157        for entry in csv_files:
158            if not self._load_csv(entry['file'], entry['table'], True):
159                return False
160        return True
161
162    def load_sql_schema(self, schema_file: str,
163                        csv_files: List[Tuple[str, str]]) -> bool:
164        """Execute SQL schema with PostgreSQL.
165
166        Executes a .sql file with PostgreSQL.
167        If the data is not loaded by the .sql file but only the schema is
168        provided through the .sql file, a list of CSV files can be provided to
169        load them as well.
170
171        Parameters
172        ----------
173        schema_file : str
174            Name of the .sql file.
175        csv_files : List[Tuple[str, str]]]
176            List of CSV file names to load in the tables created with the .sql
177            file, may also be an empty list. Each entry contains a Tuple of the
178            CSV file name and the table name.
179
180        Returns
181        -------
182        success : bool
183            Whether the execution was successfull or not.
184        """
185        success = True
186
187        # Load SQL schema
188        success, output = self.exec(f'psql -h {HOST} -p {PORT} -U {USER} '
189                                    f'-d {DB} -f /data/shared/{schema_file}')
190        if not success:
191            self._logger.error(f'Failed to load SQL schema "{schema_file}"')
192            return success
193
194        # Load CSVs
195        for csv_file, table in csv_files:
196            success = self._load_csv(csv_file, table, False)
197            if not success:
198                self._logger.error(f'Failed to load CSV "{csv_file}" in '
199                                   f'table "{table}"')
200                break
201
202        return success
203
204    def _load_csv(self, csv_file: str, table: str, create: bool):
205        """Load a single CSV file into MySQL.
206
207        Parameters
208        ----------
209        csv_file : str
210            Name of the CSV file.
211        table : str
212            Name of the table to store the data in.
213        create : bool
214            Whether to drop and create the table or re-use it
215
216        Returns
217        -------
218        success : bool
219            Whether the execution was successfull or not.
220        """
221        success = True
222        columns = None
223        table = table.lower()
224        path = os.path.join(self._data_path, 'shared', csv_file)
225
226        self._tables.append(table)
227
228        # Analyze and move CSV for loading
229        if not os.path.exists(path):
230            self._logger.error(f'CSV file "{path}" does not exist')
231            return False
232
233        with open(path, 'r') as f:
234            csv_reader = reader(f)
235            columns = next(csv_reader)
236            columns = [x.lower() for x in columns]
237
238        # Load CSV
239        connection = psycopg2.connect(host=HOST, database=DB,
240                                      user=PASSWORD, password=PASSWORD)
241        try:
242            cursor = connection.cursor()
243
244            if create:
245                cursor.execute(f'DROP TABLE IF EXISTS {table};')
246                c = ' VARCHAR , '.join(columns) + ' VARCHAR'
247                cursor.execute(f'CREATE TABLE {table} (KEY SERIAL, {c}, '
248                               'PRIMARY KEY(KEY))')
249
250            c = ','.join(columns)
251            cursor.execute(f'COPY {table} ({c}) FROM '
252                           f'\'/data/shared/{csv_file}\' '
253                           'DELIMITER \',\' NULL \'NULL\' CSV HEADER;')
254            cursor.execute('COMMIT;')
255
256            header = '| ID | ' + ' | '.join(columns) + ' |'
257            self._logger.debug(header)
258            self._logger.debug('-' * len(header))
259
260            cursor.execute(f'SELECT * FROM {table};')
261            number_of_records = 0
262            for record in cursor:
263                number_of_records += 1
264                self._logger.debug(record)
265            if number_of_records == 0:
266                self._logger.error('No records loaded after loading CSV')
267                success = False
268        except Exception as e:
269            self._logger.error(f'Failed to load CSV: "{e}"')
270            success = False
271        finally:
272            connection.close()
273
274        return success
275
276    @timeout(CLEAR_TABLES_TIMEOUT)
277    def _clear_tables(self):
278        """Clears all tables with a provided timeout."""
279        connection = psycopg2.connect(host=HOST, database=DB,
280                                      user=PASSWORD, password=PASSWORD)
281        cursor = connection.cursor()
282        for table in self._tables:
283            cursor.execute(f'DROP TABLE IF EXISTS {table};')
284            cursor.execute('COMMIT;')
285        self._tables = []
286        connection.close()
287
288    def stop(self) -> bool:
289        """Stop PostgreSQL
290        Clears all tables and stops the PostgreSQL container.
291
292        Returns
293        -------
294        success : bool
295            Whether the execution was successfull or not.
296        """
297        try:
298            self._clear_tables()
299        except TimeoutError:
300            self._logger.warning(f'Clearing {__name__} tables timed out after '
301                                 f'{CLEAR_TABLES_TIMEOUT}s!')
302        except Exception as e:
303            self._logger.error(f'Clearing{__name__} tables failed: "{e}"')
304
305        return super().stop()
306
307
308if __name__ == '__main__':
309    print(f'ℹ️  Starting up PostgreSQL v{VERSION}...')
310    p = PostgreSQL('data', 'config', 'log', True)
311    p.wait_until_ready()
312    input('ℹ️  Press any key to stop')
313    p.stop()
314    print('ℹ️  Stopped')
class PostgreSQL(bench_executor.container.Container):
 32class PostgreSQL(Container):
 33    """PostgreSQL container for executing SQL queries"""
 34    def __init__(self, data_path: str, config_path: str, directory: str,
 35                 verbose: bool):
 36        """Creates an instance of the PostgreSQL class.
 37
 38        Parameters
 39        ----------
 40        data_path : str
 41            Path to the data directory of the case.
 42        config_path : str
 43            Path to the config directory of the case.
 44        directory : str
 45            Path to the directory to store logs.
 46        verbose : bool
 47            Enable verbose logs.
 48        """
 49        self._data_path = os.path.abspath(data_path)
 50        self._config_path = os.path.abspath(config_path)
 51        self._logger = Logger(__name__, directory, verbose)
 52
 53        tmp_dir = os.path.join(tempfile.gettempdir(), 'postgresql')
 54        os.umask(0)
 55        os.makedirs(tmp_dir, exist_ok=True)
 56        os.makedirs(os.path.join(self._data_path, 'postgresql'), exist_ok=True)
 57        self._tables: List[str] = []
 58
 59        super().__init__(f'blindreviewing/postgresql:v{VERSION}', 'PostgreSQL',
 60                         self._logger,
 61                         ports={PORT: PORT},
 62                         environment={'POSTGRES_PASSWORD': PASSWORD,
 63                                      'POSTGRES_USER': USER,
 64                                      'POSTGRES_DB': DB,
 65                                      'PGPASSWORD': PASSWORD,
 66                                      'POSTGRES_HOST_AUTH_METHOD': 'trust'},
 67                         volumes=[f'{self._data_path}/shared:/data/shared',
 68                                  f'{tmp_dir}:/var/lib/postgresql/data'])
 69
 70    def initialization(self) -> bool:
 71        """Initialize PostgreSQL's database.
 72
 73        Returns
 74        -------
 75        success : bool
 76            Whether the initialization was successfull or not.
 77        """
 78        # PostgreSQL should start with a initialized database, start PostgreSQL
 79        # if not initialized to avoid the pre-run start during benchmark
 80        # execution
 81        success = self.wait_until_ready()
 82        if not success:
 83            self._logger.error(f'Failed to initialize {__name__}')
 84            return False
 85        success = self.stop()
 86
 87        return success
 88
 89    @property
 90    def root_mount_directory(self) -> str:
 91        """Subdirectory in the root directory of the case for PostgreSQL.
 92
 93        Returns
 94        -------
 95        subdirectory : str
 96            Subdirectory of the root directory for PostgreSQL.
 97        """
 98        return __name__.lower()
 99
100    def wait_until_ready(self, command: str = '') -> bool:
101        """Wait until PostgreSQL is ready to execute SQL queries.
102
103        Parameters
104        ----------
105        command : str
106            Command to execute in the PostgreSQL container, optionally,
107            defaults to no command.
108
109        Returns
110        -------
111        success : bool
112            Whether the PostgreSQL was initialized successfull or not.
113        """
114        success = self.run_and_wait_for_log(f'port {PORT}', command=command)
115        if success:
116            sleep(WAIT_TIME)
117        else:
118            msg = f'Failed to wait for {__name__} to become ready'
119            self._logger.error(msg)
120
121        return success
122
123    def load(self, csv_file: str, table: str) -> bool:
124        """Load a single CSV file into PostgreSQL.
125
126        Parameters
127        ----------
128        csv_file : str
129            Name of the CSV file.
130        table : str
131            Name of the table.
132
133        Returns
134        -------
135        success : bool
136            Whether the execution was successfull or not.
137        """
138        return self._load_csv(csv_file, table, True)
139
140    def load_multiple(self, csv_files: List[dict]) -> bool:
141        """Load multiple CSV files into PostgreSQL.
142
143        Parameters
144        ----------
145        csv_files : list
146            List of CSV files to load. Each entry consist of a `file` and
147            `table` key.
148
149        Returns
150        -------
151        success : bool
152            Whether the execution was successfull or not.
153        """
154        for entry in csv_files:
155            if not self._load_csv(entry['file'], entry['table'], True):
156                return False
157        return True
158        for entry in csv_files:
159            if not self._load_csv(entry['file'], entry['table'], True):
160                return False
161        return True
162
163    def load_sql_schema(self, schema_file: str,
164                        csv_files: List[Tuple[str, str]]) -> bool:
165        """Execute SQL schema with PostgreSQL.
166
167        Executes a .sql file with PostgreSQL.
168        If the data is not loaded by the .sql file but only the schema is
169        provided through the .sql file, a list of CSV files can be provided to
170        load them as well.
171
172        Parameters
173        ----------
174        schema_file : str
175            Name of the .sql file.
176        csv_files : List[Tuple[str, str]]]
177            List of CSV file names to load in the tables created with the .sql
178            file, may also be an empty list. Each entry contains a Tuple of the
179            CSV file name and the table name.
180
181        Returns
182        -------
183        success : bool
184            Whether the execution was successfull or not.
185        """
186        success = True
187
188        # Load SQL schema
189        success, output = self.exec(f'psql -h {HOST} -p {PORT} -U {USER} '
190                                    f'-d {DB} -f /data/shared/{schema_file}')
191        if not success:
192            self._logger.error(f'Failed to load SQL schema "{schema_file}"')
193            return success
194
195        # Load CSVs
196        for csv_file, table in csv_files:
197            success = self._load_csv(csv_file, table, False)
198            if not success:
199                self._logger.error(f'Failed to load CSV "{csv_file}" in '
200                                   f'table "{table}"')
201                break
202
203        return success
204
205    def _load_csv(self, csv_file: str, table: str, create: bool):
206        """Load a single CSV file into MySQL.
207
208        Parameters
209        ----------
210        csv_file : str
211            Name of the CSV file.
212        table : str
213            Name of the table to store the data in.
214        create : bool
215            Whether to drop and create the table or re-use it
216
217        Returns
218        -------
219        success : bool
220            Whether the execution was successfull or not.
221        """
222        success = True
223        columns = None
224        table = table.lower()
225        path = os.path.join(self._data_path, 'shared', csv_file)
226
227        self._tables.append(table)
228
229        # Analyze and move CSV for loading
230        if not os.path.exists(path):
231            self._logger.error(f'CSV file "{path}" does not exist')
232            return False
233
234        with open(path, 'r') as f:
235            csv_reader = reader(f)
236            columns = next(csv_reader)
237            columns = [x.lower() for x in columns]
238
239        # Load CSV
240        connection = psycopg2.connect(host=HOST, database=DB,
241                                      user=PASSWORD, password=PASSWORD)
242        try:
243            cursor = connection.cursor()
244
245            if create:
246                cursor.execute(f'DROP TABLE IF EXISTS {table};')
247                c = ' VARCHAR , '.join(columns) + ' VARCHAR'
248                cursor.execute(f'CREATE TABLE {table} (KEY SERIAL, {c}, '
249                               'PRIMARY KEY(KEY))')
250
251            c = ','.join(columns)
252            cursor.execute(f'COPY {table} ({c}) FROM '
253                           f'\'/data/shared/{csv_file}\' '
254                           'DELIMITER \',\' NULL \'NULL\' CSV HEADER;')
255            cursor.execute('COMMIT;')
256
257            header = '| ID | ' + ' | '.join(columns) + ' |'
258            self._logger.debug(header)
259            self._logger.debug('-' * len(header))
260
261            cursor.execute(f'SELECT * FROM {table};')
262            number_of_records = 0
263            for record in cursor:
264                number_of_records += 1
265                self._logger.debug(record)
266            if number_of_records == 0:
267                self._logger.error('No records loaded after loading CSV')
268                success = False
269        except Exception as e:
270            self._logger.error(f'Failed to load CSV: "{e}"')
271            success = False
272        finally:
273            connection.close()
274
275        return success
276
277    @timeout(CLEAR_TABLES_TIMEOUT)
278    def _clear_tables(self):
279        """Clears all tables with a provided timeout."""
280        connection = psycopg2.connect(host=HOST, database=DB,
281                                      user=PASSWORD, password=PASSWORD)
282        cursor = connection.cursor()
283        for table in self._tables:
284            cursor.execute(f'DROP TABLE IF EXISTS {table};')
285            cursor.execute('COMMIT;')
286        self._tables = []
287        connection.close()
288
289    def stop(self) -> bool:
290        """Stop PostgreSQL
291        Clears all tables and stops the PostgreSQL container.
292
293        Returns
294        -------
295        success : bool
296            Whether the execution was successfull or not.
297        """
298        try:
299            self._clear_tables()
300        except TimeoutError:
301            self._logger.warning(f'Clearing {__name__} tables timed out after '
302                                 f'{CLEAR_TABLES_TIMEOUT}s!')
303        except Exception as e:
304            self._logger.error(f'Clearing{__name__} tables failed: "{e}"')
305
306        return super().stop()

PostgreSQL container for executing SQL queries

PostgreSQL(data_path: str, config_path: str, directory: str, verbose: bool)
34    def __init__(self, data_path: str, config_path: str, directory: str,
35                 verbose: bool):
36        """Creates an instance of the PostgreSQL class.
37
38        Parameters
39        ----------
40        data_path : str
41            Path to the data directory of the case.
42        config_path : str
43            Path to the config directory of the case.
44        directory : str
45            Path to the directory to store logs.
46        verbose : bool
47            Enable verbose logs.
48        """
49        self._data_path = os.path.abspath(data_path)
50        self._config_path = os.path.abspath(config_path)
51        self._logger = Logger(__name__, directory, verbose)
52
53        tmp_dir = os.path.join(tempfile.gettempdir(), 'postgresql')
54        os.umask(0)
55        os.makedirs(tmp_dir, exist_ok=True)
56        os.makedirs(os.path.join(self._data_path, 'postgresql'), exist_ok=True)
57        self._tables: List[str] = []
58
59        super().__init__(f'blindreviewing/postgresql:v{VERSION}', 'PostgreSQL',
60                         self._logger,
61                         ports={PORT: PORT},
62                         environment={'POSTGRES_PASSWORD': PASSWORD,
63                                      'POSTGRES_USER': USER,
64                                      'POSTGRES_DB': DB,
65                                      'PGPASSWORD': PASSWORD,
66                                      'POSTGRES_HOST_AUTH_METHOD': 'trust'},
67                         volumes=[f'{self._data_path}/shared:/data/shared',
68                                  f'{tmp_dir}:/var/lib/postgresql/data'])

Creates an instance of the PostgreSQL class.

Parameters
  • data_path (str): Path to the data directory of the case.
  • config_path (str): Path to the config directory of the case.
  • directory (str): Path to the directory to store logs.
  • verbose (bool): Enable verbose logs.
def initialization(self) -> bool:
70    def initialization(self) -> bool:
71        """Initialize PostgreSQL's database.
72
73        Returns
74        -------
75        success : bool
76            Whether the initialization was successfull or not.
77        """
78        # PostgreSQL should start with a initialized database, start PostgreSQL
79        # if not initialized to avoid the pre-run start during benchmark
80        # execution
81        success = self.wait_until_ready()
82        if not success:
83            self._logger.error(f'Failed to initialize {__name__}')
84            return False
85        success = self.stop()
86
87        return success

Initialize PostgreSQL's database.

Returns
  • success (bool): Whether the initialization was successfull or not.
root_mount_directory: str

Subdirectory in the root directory of the case for PostgreSQL.

Returns
  • subdirectory (str): Subdirectory of the root directory for PostgreSQL.
def wait_until_ready(self, command: str = '') -> bool:
100    def wait_until_ready(self, command: str = '') -> bool:
101        """Wait until PostgreSQL is ready to execute SQL queries.
102
103        Parameters
104        ----------
105        command : str
106            Command to execute in the PostgreSQL container, optionally,
107            defaults to no command.
108
109        Returns
110        -------
111        success : bool
112            Whether the PostgreSQL was initialized successfull or not.
113        """
114        success = self.run_and_wait_for_log(f'port {PORT}', command=command)
115        if success:
116            sleep(WAIT_TIME)
117        else:
118            msg = f'Failed to wait for {__name__} to become ready'
119            self._logger.error(msg)
120
121        return success

Wait until PostgreSQL is ready to execute SQL queries.

Parameters
  • command (str): Command to execute in the PostgreSQL container, optionally, defaults to no command.
Returns
  • success (bool): Whether the PostgreSQL was initialized successfull or not.
def load(self, csv_file: str, table: str) -> bool:
123    def load(self, csv_file: str, table: str) -> bool:
124        """Load a single CSV file into PostgreSQL.
125
126        Parameters
127        ----------
128        csv_file : str
129            Name of the CSV file.
130        table : str
131            Name of the table.
132
133        Returns
134        -------
135        success : bool
136            Whether the execution was successfull or not.
137        """
138        return self._load_csv(csv_file, table, True)

Load a single CSV file into PostgreSQL.

Parameters
  • csv_file (str): Name of the CSV file.
  • table (str): Name of the table.
Returns
  • success (bool): Whether the execution was successfull or not.
def load_multiple(self, csv_files: List[dict]) -> bool:
140    def load_multiple(self, csv_files: List[dict]) -> bool:
141        """Load multiple CSV files into PostgreSQL.
142
143        Parameters
144        ----------
145        csv_files : list
146            List of CSV files to load. Each entry consist of a `file` and
147            `table` key.
148
149        Returns
150        -------
151        success : bool
152            Whether the execution was successfull or not.
153        """
154        for entry in csv_files:
155            if not self._load_csv(entry['file'], entry['table'], True):
156                return False
157        return True
158        for entry in csv_files:
159            if not self._load_csv(entry['file'], entry['table'], True):
160                return False
161        return True

Load multiple CSV files into PostgreSQL.

Parameters
  • csv_files (list): List of CSV files to load. Each entry consist of a file and table key.
Returns
  • success (bool): Whether the execution was successfull or not.
def load_sql_schema(self, schema_file: str, csv_files: List[Tuple[str, str]]) -> bool:
163    def load_sql_schema(self, schema_file: str,
164                        csv_files: List[Tuple[str, str]]) -> bool:
165        """Execute SQL schema with PostgreSQL.
166
167        Executes a .sql file with PostgreSQL.
168        If the data is not loaded by the .sql file but only the schema is
169        provided through the .sql file, a list of CSV files can be provided to
170        load them as well.
171
172        Parameters
173        ----------
174        schema_file : str
175            Name of the .sql file.
176        csv_files : List[Tuple[str, str]]]
177            List of CSV file names to load in the tables created with the .sql
178            file, may also be an empty list. Each entry contains a Tuple of the
179            CSV file name and the table name.
180
181        Returns
182        -------
183        success : bool
184            Whether the execution was successfull or not.
185        """
186        success = True
187
188        # Load SQL schema
189        success, output = self.exec(f'psql -h {HOST} -p {PORT} -U {USER} '
190                                    f'-d {DB} -f /data/shared/{schema_file}')
191        if not success:
192            self._logger.error(f'Failed to load SQL schema "{schema_file}"')
193            return success
194
195        # Load CSVs
196        for csv_file, table in csv_files:
197            success = self._load_csv(csv_file, table, False)
198            if not success:
199                self._logger.error(f'Failed to load CSV "{csv_file}" in '
200                                   f'table "{table}"')
201                break
202
203        return success

Execute SQL schema with PostgreSQL.

Executes a .sql file with PostgreSQL. If the data is not loaded by the .sql file but only the schema is provided through the .sql file, a list of CSV files can be provided to load them as well.

Parameters
  • schema_file (str): Name of the .sql file.
  • csv_files (List[Tuple[str, str]]]): List of CSV file names to load in the tables created with the .sql file, may also be an empty list. Each entry contains a Tuple of the CSV file name and the table name.
Returns
  • success (bool): Whether the execution was successfull or not.
def stop(self) -> bool:
289    def stop(self) -> bool:
290        """Stop PostgreSQL
291        Clears all tables and stops the PostgreSQL container.
292
293        Returns
294        -------
295        success : bool
296            Whether the execution was successfull or not.
297        """
298        try:
299            self._clear_tables()
300        except TimeoutError:
301            self._logger.warning(f'Clearing {__name__} tables timed out after '
302                                 f'{CLEAR_TABLES_TIMEOUT}s!')
303        except Exception as e:
304            self._logger.error(f'Clearing{__name__} tables failed: "{e}"')
305
306        return super().stop()

Stop PostgreSQL Clears all tables and stops the PostgreSQL container.

Returns
  • success (bool): Whether the execution was successfull or not.