bench_executor.query
Query executes SPARQL queries on endpoints by posting the SPARQL query over HTTP onto the endpoint. It applies timeouts to these queries automatically and checks if the results are empty or not.
1#!/usr/bin/env python3 2 3""" 4Query executes SPARQL queries on endpoints by posting the SPARQL query over 5HTTP onto the endpoint. It applies timeouts to these queries automatically and 6checks if the results are empty or not. 7""" 8 9import os 10import requests 11from typing import Optional 12from timeout_decorator import timeout, TimeoutError # type: ignore 13from bench_executor.logger import Logger 14 15TIMEOUT = 1 * 3600 # 1 hour 16 17 18class Query(): 19 """Execute a query on a SPARQL endpoint.""" 20 def __init__(self, data_path: str, config_path: str, directory: str, 21 verbose: bool): 22 """Creates an instance of the Query class. 23 24 Parameters 25 ---------- 26 data_path : str 27 Path to the data directory of the case. 28 config_path : str 29 Path to the config directory of the case. 30 directory : str 31 Path to the directory to store logs. 32 verbose : bool 33 Enable verbose logs. 34 """ 35 self._data_path = os.path.abspath(data_path) 36 self._config_path = os.path.abspath(config_path) 37 self._logger = Logger(__name__, directory, verbose) 38 39 os.umask(0) 40 os.makedirs(os.path.join(self._data_path, 'query'), exist_ok=True) 41 42 @property 43 def name(self): 44 """Name of the class: Query""" 45 return __name__ 46 47 @property 48 def root_mount_directory(self) -> str: 49 """Subdirectory in the root directory of the case for Query. 50 51 Returns 52 ------- 53 subdirectory : str 54 Subdirectory of the root directory for Query. 55 56 """ 57 return __name__.lower() 58 59 @timeout(TIMEOUT) 60 def _execute_with_timeout(self, query: str, sparql_endpoint: str, 61 headers: dict = {}) -> str: 62 """Execute a query with a provided timeout. 63 64 Parameters 65 ---------- 66 query : str 67 The query to execute. 68 sparql_endpoint : str 69 The URL of the SPARQL endpoint. 70 headers : dict 71 HTTP headers to supply when posting the query. 72 73 Returns 74 ------- 75 success : bool 76 Whether the execution was successfull or not. 77 """ 78 self._logger.info(f'Executing query "{query}" on endpoint ' 79 f'"{sparql_endpoint}"') 80 data = { 81 'query': query, 82 'maxrows': '3000000' # Overwrite Virtuoso SPARQL limit 83 } 84 # Hardcoded to N-Triples 85 r = requests.post(sparql_endpoint, data=data, headers=headers) 86 if r.status_code != 200: 87 msg = f'Query failed: {r.text} (HTTP {r.status_code})' 88 self._logger.error(msg) 89 r.raise_for_status() 90 return r.text 91 92 def _execute(self, query: str, sparql_endpoint: str, expect_empty: bool, 93 headers: dict = {}) -> Optional[str]: 94 """Execute a query on a SPARQL endpoint 95 96 Parameters 97 ---------- 98 query : str 99 The query to execute. 100 sparql_endpoint : str 101 The URL of the SPARQL endpoint. 102 expect_empty : bool 103 Whether the expected results are empty or not. 104 headers : dict 105 HTTP headers to supply when posting the query. 106 107 Returns 108 ------- 109 results : str 110 The HTTP response as string of the SPARQL endpoint, unless it has 111 no results. 112 """ 113 results = None 114 try: 115 results = self._execute_with_timeout(query, 116 sparql_endpoint, 117 headers) 118 except TimeoutError: 119 msg = f'Timeout ({TIMEOUT}s) reached for Query: "{query}"' 120 self._logger.warning(msg) 121 122 # Check results output 123 if results is None or not results or 'Empty' in results: 124 if expect_empty: 125 self._logger.info('No results found, but was expected!') 126 return None 127 128 self._logger.error('No results found!') 129 return None 130 131 return results 132 133 def execute_and_save(self, query: str, sparql_endpoint: str, 134 results_file: str, expect_empty: bool = False, 135 headers: dict = {}) -> bool: 136 """Executes a SPARQL query and save the results. 137 138 The results are saved to the `results_file` path. 139 140 Parameters 141 ---------- 142 query : str 143 The query to execute. 144 sparql_endpoint : str 145 The URL of the SPARQL endpoint. 146 results_file : str 147 Path to the file where the results may be stored. 148 expect_empty : bool 149 Whether the expected results are empty or not. 150 headers : dict 151 HTTP headers to supply when posting the query. 152 153 Returns 154 ------- 155 success : bool 156 Whether the execution succeeded or not. 157 """ 158 try: 159 results = self._execute(query, sparql_endpoint, expect_empty, 160 headers) 161 except Exception as e: 162 msg = f'Failed to execute query "{query}" on endpoint ' + \ 163 f'"{sparql_endpoint}": {e}' 164 self._logger.error(msg) 165 return False 166 167 path = os.path.join(self._data_path, 'shared') 168 os.umask(0) 169 os.makedirs(path, exist_ok=True) 170 171 if results is not None: 172 results_file = os.path.join(path, results_file) 173 with open(results_file, 'w') as f: 174 f.write(results) 175 176 self._logger.debug(f'Wrote query results to "{results_file}"') 177 self._logger.debug('Query results:') 178 self._logger.debug(results) 179 return True 180 181 return False 182 183 def _read_query_file(self, query_file: str) -> str: 184 """Read the query file 185 186 Returns 187 ------- 188 content : str 189 The content of the query file. 190 191 Raises 192 ------ 193 FileNotFoundError : Exception 194 If the query file cannot be found. 195 """ 196 path = os.path.join(self._data_path, 'shared', query_file) 197 if not os.path.exists(path): 198 msg = f'Query file "{path}" does not exist' 199 self._logger.error(msg) 200 raise FileNotFoundError(msg) 201 202 with open(path, 'r') as f: 203 query = f.read() 204 205 return query 206 207 def execute_from_file(self, query_file: str, sparql_endpoint: str, 208 expect_empty: bool = False, 209 headers: dict = {}) -> str: 210 """Executes a SPARQL query from file. 211 212 The results are saved to the `results_file` path. 213 214 Parameters 215 ---------- 216 query_file : str 217 Path to the file containing the query. 218 sparql_endpoint : str 219 The URL of the SPARQL endpoint. 220 expect_empty : bool 221 Whether the expected results are empty or not. 222 headers : dict 223 HTTP headers to supply when posting the query. 224 225 Returns 226 ------- 227 results : str 228 The HTTP response as string of the SPARQL endpoint, unless it has 229 no results. 230 231 Raises 232 ------ 233 Exception : Exception 234 Pass through the exception from the Python's request module 235 regarding HTTP status codes. 236 """ 237 query = self._read_query_file(query_file) 238 try: 239 results = self._execute(query, sparql_endpoint, expect_empty, 240 headers) 241 except Exception as e: 242 msg = f'Failed to execute query "{query}" on endpoint ' + \ 243 f'"{sparql_endpoint}": {e}' 244 self._logger.error(msg) 245 raise e 246 247 if results is not None: 248 return results 249 250 return '' 251 252 def execute_from_file_and_save(self, query_file: str, 253 sparql_endpoint: str, 254 results_file: str, 255 expect_empty: bool = False, 256 headers: dict = {}) -> bool: 257 """Executes a SPARQL query from file and save the results. 258 259 The results are saved to the `results_file` path. 260 261 Parameters 262 ---------- 263 query_file : str 264 Path to the file containing the query. 265 sparql_endpoint : str 266 The URL of the SPARQL endpoint. 267 results_file : str 268 Path to the file where the results may be stored. 269 expect_empty : bool 270 Whether the expected results are empty or not. 271 headers : dict 272 HTTP headers to supply when posting the query. 273 274 Returns 275 ------- 276 success : bool 277 Whether the execution succeeded or not. 278 279 Raises 280 ------ 281 FileNotFoundError : Exception 282 If the query file cannot be found. 283 """ 284 query = self._read_query_file(query_file) 285 results = self.execute_and_save(query, sparql_endpoint, results_file, 286 expect_empty, headers) 287 if results is not None: 288 return True 289 290 return False
class
Query:
19class Query(): 20 """Execute a query on a SPARQL endpoint.""" 21 def __init__(self, data_path: str, config_path: str, directory: str, 22 verbose: bool): 23 """Creates an instance of the Query class. 24 25 Parameters 26 ---------- 27 data_path : str 28 Path to the data directory of the case. 29 config_path : str 30 Path to the config directory of the case. 31 directory : str 32 Path to the directory to store logs. 33 verbose : bool 34 Enable verbose logs. 35 """ 36 self._data_path = os.path.abspath(data_path) 37 self._config_path = os.path.abspath(config_path) 38 self._logger = Logger(__name__, directory, verbose) 39 40 os.umask(0) 41 os.makedirs(os.path.join(self._data_path, 'query'), exist_ok=True) 42 43 @property 44 def name(self): 45 """Name of the class: Query""" 46 return __name__ 47 48 @property 49 def root_mount_directory(self) -> str: 50 """Subdirectory in the root directory of the case for Query. 51 52 Returns 53 ------- 54 subdirectory : str 55 Subdirectory of the root directory for Query. 56 57 """ 58 return __name__.lower() 59 60 @timeout(TIMEOUT) 61 def _execute_with_timeout(self, query: str, sparql_endpoint: str, 62 headers: dict = {}) -> str: 63 """Execute a query with a provided timeout. 64 65 Parameters 66 ---------- 67 query : str 68 The query to execute. 69 sparql_endpoint : str 70 The URL of the SPARQL endpoint. 71 headers : dict 72 HTTP headers to supply when posting the query. 73 74 Returns 75 ------- 76 success : bool 77 Whether the execution was successfull or not. 78 """ 79 self._logger.info(f'Executing query "{query}" on endpoint ' 80 f'"{sparql_endpoint}"') 81 data = { 82 'query': query, 83 'maxrows': '3000000' # Overwrite Virtuoso SPARQL limit 84 } 85 # Hardcoded to N-Triples 86 r = requests.post(sparql_endpoint, data=data, headers=headers) 87 if r.status_code != 200: 88 msg = f'Query failed: {r.text} (HTTP {r.status_code})' 89 self._logger.error(msg) 90 r.raise_for_status() 91 return r.text 92 93 def _execute(self, query: str, sparql_endpoint: str, expect_empty: bool, 94 headers: dict = {}) -> Optional[str]: 95 """Execute a query on a SPARQL endpoint 96 97 Parameters 98 ---------- 99 query : str 100 The query to execute. 101 sparql_endpoint : str 102 The URL of the SPARQL endpoint. 103 expect_empty : bool 104 Whether the expected results are empty or not. 105 headers : dict 106 HTTP headers to supply when posting the query. 107 108 Returns 109 ------- 110 results : str 111 The HTTP response as string of the SPARQL endpoint, unless it has 112 no results. 113 """ 114 results = None 115 try: 116 results = self._execute_with_timeout(query, 117 sparql_endpoint, 118 headers) 119 except TimeoutError: 120 msg = f'Timeout ({TIMEOUT}s) reached for Query: "{query}"' 121 self._logger.warning(msg) 122 123 # Check results output 124 if results is None or not results or 'Empty' in results: 125 if expect_empty: 126 self._logger.info('No results found, but was expected!') 127 return None 128 129 self._logger.error('No results found!') 130 return None 131 132 return results 133 134 def execute_and_save(self, query: str, sparql_endpoint: str, 135 results_file: str, expect_empty: bool = False, 136 headers: dict = {}) -> bool: 137 """Executes a SPARQL query and save the results. 138 139 The results are saved to the `results_file` path. 140 141 Parameters 142 ---------- 143 query : str 144 The query to execute. 145 sparql_endpoint : str 146 The URL of the SPARQL endpoint. 147 results_file : str 148 Path to the file where the results may be stored. 149 expect_empty : bool 150 Whether the expected results are empty or not. 151 headers : dict 152 HTTP headers to supply when posting the query. 153 154 Returns 155 ------- 156 success : bool 157 Whether the execution succeeded or not. 158 """ 159 try: 160 results = self._execute(query, sparql_endpoint, expect_empty, 161 headers) 162 except Exception as e: 163 msg = f'Failed to execute query "{query}" on endpoint ' + \ 164 f'"{sparql_endpoint}": {e}' 165 self._logger.error(msg) 166 return False 167 168 path = os.path.join(self._data_path, 'shared') 169 os.umask(0) 170 os.makedirs(path, exist_ok=True) 171 172 if results is not None: 173 results_file = os.path.join(path, results_file) 174 with open(results_file, 'w') as f: 175 f.write(results) 176 177 self._logger.debug(f'Wrote query results to "{results_file}"') 178 self._logger.debug('Query results:') 179 self._logger.debug(results) 180 return True 181 182 return False 183 184 def _read_query_file(self, query_file: str) -> str: 185 """Read the query file 186 187 Returns 188 ------- 189 content : str 190 The content of the query file. 191 192 Raises 193 ------ 194 FileNotFoundError : Exception 195 If the query file cannot be found. 196 """ 197 path = os.path.join(self._data_path, 'shared', query_file) 198 if not os.path.exists(path): 199 msg = f'Query file "{path}" does not exist' 200 self._logger.error(msg) 201 raise FileNotFoundError(msg) 202 203 with open(path, 'r') as f: 204 query = f.read() 205 206 return query 207 208 def execute_from_file(self, query_file: str, sparql_endpoint: str, 209 expect_empty: bool = False, 210 headers: dict = {}) -> str: 211 """Executes a SPARQL query from file. 212 213 The results are saved to the `results_file` path. 214 215 Parameters 216 ---------- 217 query_file : str 218 Path to the file containing the query. 219 sparql_endpoint : str 220 The URL of the SPARQL endpoint. 221 expect_empty : bool 222 Whether the expected results are empty or not. 223 headers : dict 224 HTTP headers to supply when posting the query. 225 226 Returns 227 ------- 228 results : str 229 The HTTP response as string of the SPARQL endpoint, unless it has 230 no results. 231 232 Raises 233 ------ 234 Exception : Exception 235 Pass through the exception from the Python's request module 236 regarding HTTP status codes. 237 """ 238 query = self._read_query_file(query_file) 239 try: 240 results = self._execute(query, sparql_endpoint, expect_empty, 241 headers) 242 except Exception as e: 243 msg = f'Failed to execute query "{query}" on endpoint ' + \ 244 f'"{sparql_endpoint}": {e}' 245 self._logger.error(msg) 246 raise e 247 248 if results is not None: 249 return results 250 251 return '' 252 253 def execute_from_file_and_save(self, query_file: str, 254 sparql_endpoint: str, 255 results_file: str, 256 expect_empty: bool = False, 257 headers: dict = {}) -> bool: 258 """Executes a SPARQL query from file and save the results. 259 260 The results are saved to the `results_file` path. 261 262 Parameters 263 ---------- 264 query_file : str 265 Path to the file containing the query. 266 sparql_endpoint : str 267 The URL of the SPARQL endpoint. 268 results_file : str 269 Path to the file where the results may be stored. 270 expect_empty : bool 271 Whether the expected results are empty or not. 272 headers : dict 273 HTTP headers to supply when posting the query. 274 275 Returns 276 ------- 277 success : bool 278 Whether the execution succeeded or not. 279 280 Raises 281 ------ 282 FileNotFoundError : Exception 283 If the query file cannot be found. 284 """ 285 query = self._read_query_file(query_file) 286 results = self.execute_and_save(query, sparql_endpoint, results_file, 287 expect_empty, headers) 288 if results is not None: 289 return True 290 291 return False
Execute a query on a SPARQL endpoint.
Query(data_path: str, config_path: str, directory: str, verbose: bool)
21 def __init__(self, data_path: str, config_path: str, directory: str, 22 verbose: bool): 23 """Creates an instance of the Query class. 24 25 Parameters 26 ---------- 27 data_path : str 28 Path to the data directory of the case. 29 config_path : str 30 Path to the config directory of the case. 31 directory : str 32 Path to the directory to store logs. 33 verbose : bool 34 Enable verbose logs. 35 """ 36 self._data_path = os.path.abspath(data_path) 37 self._config_path = os.path.abspath(config_path) 38 self._logger = Logger(__name__, directory, verbose) 39 40 os.umask(0) 41 os.makedirs(os.path.join(self._data_path, 'query'), exist_ok=True)
Creates an instance of the Query class.
Parameters
- data_path (str): Path to the data directory of the case.
- config_path (str): Path to the config directory of the case.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
root_mount_directory: str
Subdirectory in the root directory of the case for Query.
Returns
- subdirectory (str): Subdirectory of the root directory for Query.
def
execute_and_save( self, query: str, sparql_endpoint: str, results_file: str, expect_empty: bool = False, headers: dict = {}) -> bool:
134 def execute_and_save(self, query: str, sparql_endpoint: str, 135 results_file: str, expect_empty: bool = False, 136 headers: dict = {}) -> bool: 137 """Executes a SPARQL query and save the results. 138 139 The results are saved to the `results_file` path. 140 141 Parameters 142 ---------- 143 query : str 144 The query to execute. 145 sparql_endpoint : str 146 The URL of the SPARQL endpoint. 147 results_file : str 148 Path to the file where the results may be stored. 149 expect_empty : bool 150 Whether the expected results are empty or not. 151 headers : dict 152 HTTP headers to supply when posting the query. 153 154 Returns 155 ------- 156 success : bool 157 Whether the execution succeeded or not. 158 """ 159 try: 160 results = self._execute(query, sparql_endpoint, expect_empty, 161 headers) 162 except Exception as e: 163 msg = f'Failed to execute query "{query}" on endpoint ' + \ 164 f'"{sparql_endpoint}": {e}' 165 self._logger.error(msg) 166 return False 167 168 path = os.path.join(self._data_path, 'shared') 169 os.umask(0) 170 os.makedirs(path, exist_ok=True) 171 172 if results is not None: 173 results_file = os.path.join(path, results_file) 174 with open(results_file, 'w') as f: 175 f.write(results) 176 177 self._logger.debug(f'Wrote query results to "{results_file}"') 178 self._logger.debug('Query results:') 179 self._logger.debug(results) 180 return True 181 182 return False
Executes a SPARQL query and save the results.
The results are saved to the results_file
path.
Parameters
- query (str): The query to execute.
- sparql_endpoint (str): The URL of the SPARQL endpoint.
- results_file (str): Path to the file where the results may be stored.
- expect_empty (bool): Whether the expected results are empty or not.
- headers (dict): HTTP headers to supply when posting the query.
Returns
- success (bool): Whether the execution succeeded or not.
def
execute_from_file( self, query_file: str, sparql_endpoint: str, expect_empty: bool = False, headers: dict = {}) -> str:
208 def execute_from_file(self, query_file: str, sparql_endpoint: str, 209 expect_empty: bool = False, 210 headers: dict = {}) -> str: 211 """Executes a SPARQL query from file. 212 213 The results are saved to the `results_file` path. 214 215 Parameters 216 ---------- 217 query_file : str 218 Path to the file containing the query. 219 sparql_endpoint : str 220 The URL of the SPARQL endpoint. 221 expect_empty : bool 222 Whether the expected results are empty or not. 223 headers : dict 224 HTTP headers to supply when posting the query. 225 226 Returns 227 ------- 228 results : str 229 The HTTP response as string of the SPARQL endpoint, unless it has 230 no results. 231 232 Raises 233 ------ 234 Exception : Exception 235 Pass through the exception from the Python's request module 236 regarding HTTP status codes. 237 """ 238 query = self._read_query_file(query_file) 239 try: 240 results = self._execute(query, sparql_endpoint, expect_empty, 241 headers) 242 except Exception as e: 243 msg = f'Failed to execute query "{query}" on endpoint ' + \ 244 f'"{sparql_endpoint}": {e}' 245 self._logger.error(msg) 246 raise e 247 248 if results is not None: 249 return results 250 251 return ''
Executes a SPARQL query from file.
The results are saved to the results_file
path.
Parameters
- query_file (str): Path to the file containing the query.
- sparql_endpoint (str): The URL of the SPARQL endpoint.
- expect_empty (bool): Whether the expected results are empty or not.
- headers (dict): HTTP headers to supply when posting the query.
Returns
- results (str): The HTTP response as string of the SPARQL endpoint, unless it has no results.
Raises
- Exception (Exception): Pass through the exception from the Python's request module regarding HTTP status codes.
def
execute_from_file_and_save( self, query_file: str, sparql_endpoint: str, results_file: str, expect_empty: bool = False, headers: dict = {}) -> bool:
253 def execute_from_file_and_save(self, query_file: str, 254 sparql_endpoint: str, 255 results_file: str, 256 expect_empty: bool = False, 257 headers: dict = {}) -> bool: 258 """Executes a SPARQL query from file and save the results. 259 260 The results are saved to the `results_file` path. 261 262 Parameters 263 ---------- 264 query_file : str 265 Path to the file containing the query. 266 sparql_endpoint : str 267 The URL of the SPARQL endpoint. 268 results_file : str 269 Path to the file where the results may be stored. 270 expect_empty : bool 271 Whether the expected results are empty or not. 272 headers : dict 273 HTTP headers to supply when posting the query. 274 275 Returns 276 ------- 277 success : bool 278 Whether the execution succeeded or not. 279 280 Raises 281 ------ 282 FileNotFoundError : Exception 283 If the query file cannot be found. 284 """ 285 query = self._read_query_file(query_file) 286 results = self.execute_and_save(query, sparql_endpoint, results_file, 287 expect_empty, headers) 288 if results is not None: 289 return True 290 291 return False
Executes a SPARQL query from file and save the results.
The results are saved to the results_file
path.
Parameters
- query_file (str): Path to the file containing the query.
- sparql_endpoint (str): The URL of the SPARQL endpoint.
- results_file (str): Path to the file where the results may be stored.
- expect_empty (bool): Whether the expected results are empty or not.
- headers (dict): HTTP headers to supply when posting the query.
Returns
- success (bool): Whether the execution succeeded or not.
Raises
- FileNotFoundError (Exception): If the query file cannot be found.