bench_executor.rmlmapper
The RMLMapper executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources.
Website: https://rml.io
Repository: https://github.com/RMLio/rmlmapper-java
1#!/usr/bin/env python3 2 3""" 4The RMLMapper executes RML rules to generate high quality Linked Data 5from multiple originally (semi-)structured data sources. 6 7**Website**: https://rml.io<br> 8**Repository**: https://github.com/RMLio/rmlmapper-java 9""" 10 11import os 12import psutil 13from typing import Optional 14from timeout_decorator import timeout, TimeoutError # type: ignore 15from bench_executor.container import Container 16from bench_executor.logger import Logger 17 18VERSION = '6.0.0' 19TIMEOUT = 6 * 3600 # 6 hours 20 21 22class RMLMapper(Container): 23 """RMLMapper container for executing R2RML and RML mappings.""" 24 25 def __init__(self, data_path: str, config_path: str, directory: str, 26 verbose: bool): 27 """Creates an instance of the RMLMapper class. 28 29 Parameters 30 ---------- 31 data_path : str 32 Path to the data directory of the case. 33 config_path : str 34 Path to the config directory of the case. 35 directory : str 36 Path to the directory to store logs. 37 verbose : bool 38 Enable verbose logs. 39 """ 40 self._data_path = os.path.abspath(data_path) 41 self._config_path = os.path.abspath(config_path) 42 self._logger = Logger(__name__, directory, verbose) 43 self._verbose = verbose 44 45 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) 46 super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper', 47 self._logger, 48 volumes=[f'{self._data_path}/rmlmapper:/data', 49 f'{self._data_path}/shared:/data/shared']) 50 51 @property 52 def root_mount_directory(self) -> str: 53 """Subdirectory in the root directory of the case for RMLMapper. 54 55 Returns 56 ------- 57 subdirectory : str 58 Subdirectory of the root directory for RMLMapper. 59 60 """ 61 return __name__.lower() 62 63 @timeout(TIMEOUT) 64 def _execute_with_timeout(self, arguments: list) -> bool: 65 """Execute a mapping with a provided timeout. 66 67 Returns 68 ------- 69 success : bool 70 Whether the execution was successfull or not. 71 """ 72 if self._verbose: 73 arguments.append('-vvvvvvvvvvv') 74 75 self._logger.info(f'Executing RMLMapper with arguments ' 76 f'{" ".join(arguments)}') 77 78 # Set Java heap to 1/2 of available memory instead of the default 1/4 79 max_heap = int(psutil.virtual_memory().total * (1/2)) 80 81 # Execute command 82 cmd = f'java -Xmx{max_heap} -Xms{max_heap} ' + \ 83 '-jar rmlmapper/rmlmapper.jar ' + \ 84 f'{" ".join(arguments)}' 85 return self.run_and_wait_for_exit(cmd) 86 87 def execute(self, arguments: list) -> bool: 88 """Execute RMLMapper with given arguments. 89 90 Parameters 91 ---------- 92 arguments : list 93 Arguments to supply to RMLMapper. 94 95 Returns 96 ------- 97 success : bool 98 Whether the execution succeeded or not. 99 """ 100 try: 101 return self._execute_with_timeout(arguments) 102 except TimeoutError: 103 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 104 self._logger.warning(msg) 105 106 return False 107 108 def execute_mapping(self, 109 mapping_file: str, 110 output_file: str, 111 serialization: str, 112 rdb_username: Optional[str] = None, 113 rdb_password: Optional[str] = None, 114 rdb_host: Optional[str] = None, 115 rdb_port: Optional[int] = None, 116 rdb_name: Optional[str] = None, 117 rdb_type: Optional[str] = None) -> bool: 118 """Execute a mapping file with RMLMapper. 119 120 N-Quads and N-Triples are currently supported as serialization 121 format for RMLMapper. 122 123 Parameters 124 ---------- 125 mapping_file : str 126 Path to the mapping file to execute. 127 output_file : str 128 Name of the output file to store the triples in. 129 serialization : str 130 Serialization format to use. 131 rdb_username : Optional[str] 132 Username for the database, required when a database is used as 133 source. 134 rdb_password : Optional[str] 135 Password for the database, required when a database is used as 136 source. 137 rdb_host : Optional[str] 138 Hostname for the database, required when a database is used as 139 source. 140 rdb_port : Optional[int] 141 Port for the database, required when a database is used as source. 142 rdb_name : Optional[str] 143 Database name for the database, required when a database is used as 144 source. 145 rdb_type : Optional[str] 146 Database type, required when a database is used as source. 147 148 Returns 149 ------- 150 success : bool 151 Whether the execution was successfull or not. 152 """ 153 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 154 '-s', serialization, 155 '-o', os.path.join('/data/shared/', output_file), 156 '-d'] # Enable duplicate removal 157 158 if rdb_username is not None and rdb_password is not None \ 159 and rdb_host is not None and rdb_port is not None \ 160 and rdb_name is not None and rdb_type is not None: 161 162 arguments.append('-u') 163 arguments.append(rdb_username) 164 arguments.append('-p') 165 arguments.append(rdb_password) 166 167 parameters = '' 168 if rdb_type == 'MySQL': 169 protocol = 'jdbc:mysql' 170 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 171 elif rdb_type == 'PostgreSQL': 172 protocol = 'jdbc:postgresql' 173 else: 174 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 175 rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \ 176 f'{rdb_name}{parameters}' 177 arguments.append('-dsn') 178 arguments.append(rdb_dsn) 179 180 return self.execute(arguments)
23class RMLMapper(Container): 24 """RMLMapper container for executing R2RML and RML mappings.""" 25 26 def __init__(self, data_path: str, config_path: str, directory: str, 27 verbose: bool): 28 """Creates an instance of the RMLMapper class. 29 30 Parameters 31 ---------- 32 data_path : str 33 Path to the data directory of the case. 34 config_path : str 35 Path to the config directory of the case. 36 directory : str 37 Path to the directory to store logs. 38 verbose : bool 39 Enable verbose logs. 40 """ 41 self._data_path = os.path.abspath(data_path) 42 self._config_path = os.path.abspath(config_path) 43 self._logger = Logger(__name__, directory, verbose) 44 self._verbose = verbose 45 46 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) 47 super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper', 48 self._logger, 49 volumes=[f'{self._data_path}/rmlmapper:/data', 50 f'{self._data_path}/shared:/data/shared']) 51 52 @property 53 def root_mount_directory(self) -> str: 54 """Subdirectory in the root directory of the case for RMLMapper. 55 56 Returns 57 ------- 58 subdirectory : str 59 Subdirectory of the root directory for RMLMapper. 60 61 """ 62 return __name__.lower() 63 64 @timeout(TIMEOUT) 65 def _execute_with_timeout(self, arguments: list) -> bool: 66 """Execute a mapping with a provided timeout. 67 68 Returns 69 ------- 70 success : bool 71 Whether the execution was successfull or not. 72 """ 73 if self._verbose: 74 arguments.append('-vvvvvvvvvvv') 75 76 self._logger.info(f'Executing RMLMapper with arguments ' 77 f'{" ".join(arguments)}') 78 79 # Set Java heap to 1/2 of available memory instead of the default 1/4 80 max_heap = int(psutil.virtual_memory().total * (1/2)) 81 82 # Execute command 83 cmd = f'java -Xmx{max_heap} -Xms{max_heap} ' + \ 84 '-jar rmlmapper/rmlmapper.jar ' + \ 85 f'{" ".join(arguments)}' 86 return self.run_and_wait_for_exit(cmd) 87 88 def execute(self, arguments: list) -> bool: 89 """Execute RMLMapper with given arguments. 90 91 Parameters 92 ---------- 93 arguments : list 94 Arguments to supply to RMLMapper. 95 96 Returns 97 ------- 98 success : bool 99 Whether the execution succeeded or not. 100 """ 101 try: 102 return self._execute_with_timeout(arguments) 103 except TimeoutError: 104 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 105 self._logger.warning(msg) 106 107 return False 108 109 def execute_mapping(self, 110 mapping_file: str, 111 output_file: str, 112 serialization: str, 113 rdb_username: Optional[str] = None, 114 rdb_password: Optional[str] = None, 115 rdb_host: Optional[str] = None, 116 rdb_port: Optional[int] = None, 117 rdb_name: Optional[str] = None, 118 rdb_type: Optional[str] = None) -> bool: 119 """Execute a mapping file with RMLMapper. 120 121 N-Quads and N-Triples are currently supported as serialization 122 format for RMLMapper. 123 124 Parameters 125 ---------- 126 mapping_file : str 127 Path to the mapping file to execute. 128 output_file : str 129 Name of the output file to store the triples in. 130 serialization : str 131 Serialization format to use. 132 rdb_username : Optional[str] 133 Username for the database, required when a database is used as 134 source. 135 rdb_password : Optional[str] 136 Password for the database, required when a database is used as 137 source. 138 rdb_host : Optional[str] 139 Hostname for the database, required when a database is used as 140 source. 141 rdb_port : Optional[int] 142 Port for the database, required when a database is used as source. 143 rdb_name : Optional[str] 144 Database name for the database, required when a database is used as 145 source. 146 rdb_type : Optional[str] 147 Database type, required when a database is used as source. 148 149 Returns 150 ------- 151 success : bool 152 Whether the execution was successfull or not. 153 """ 154 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 155 '-s', serialization, 156 '-o', os.path.join('/data/shared/', output_file), 157 '-d'] # Enable duplicate removal 158 159 if rdb_username is not None and rdb_password is not None \ 160 and rdb_host is not None and rdb_port is not None \ 161 and rdb_name is not None and rdb_type is not None: 162 163 arguments.append('-u') 164 arguments.append(rdb_username) 165 arguments.append('-p') 166 arguments.append(rdb_password) 167 168 parameters = '' 169 if rdb_type == 'MySQL': 170 protocol = 'jdbc:mysql' 171 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 172 elif rdb_type == 'PostgreSQL': 173 protocol = 'jdbc:postgresql' 174 else: 175 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 176 rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \ 177 f'{rdb_name}{parameters}' 178 arguments.append('-dsn') 179 arguments.append(rdb_dsn) 180 181 return self.execute(arguments)
RMLMapper container for executing R2RML and RML mappings.
RMLMapper(data_path: str, config_path: str, directory: str, verbose: bool)
26 def __init__(self, data_path: str, config_path: str, directory: str, 27 verbose: bool): 28 """Creates an instance of the RMLMapper class. 29 30 Parameters 31 ---------- 32 data_path : str 33 Path to the data directory of the case. 34 config_path : str 35 Path to the config directory of the case. 36 directory : str 37 Path to the directory to store logs. 38 verbose : bool 39 Enable verbose logs. 40 """ 41 self._data_path = os.path.abspath(data_path) 42 self._config_path = os.path.abspath(config_path) 43 self._logger = Logger(__name__, directory, verbose) 44 self._verbose = verbose 45 46 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) 47 super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper', 48 self._logger, 49 volumes=[f'{self._data_path}/rmlmapper:/data', 50 f'{self._data_path}/shared:/data/shared'])
Creates an instance of the RMLMapper class.
Parameters
- data_path (str): Path to the data directory of the case.
- config_path (str): Path to the config directory of the case.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
root_mount_directory: str
Subdirectory in the root directory of the case for RMLMapper.
Returns
- subdirectory (str): Subdirectory of the root directory for RMLMapper.
def
execute(self, arguments: list) -> bool:
88 def execute(self, arguments: list) -> bool: 89 """Execute RMLMapper with given arguments. 90 91 Parameters 92 ---------- 93 arguments : list 94 Arguments to supply to RMLMapper. 95 96 Returns 97 ------- 98 success : bool 99 Whether the execution succeeded or not. 100 """ 101 try: 102 return self._execute_with_timeout(arguments) 103 except TimeoutError: 104 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 105 self._logger.warning(msg) 106 107 return False
Execute RMLMapper with given arguments.
Parameters
- arguments (list): Arguments to supply to RMLMapper.
Returns
- success (bool): Whether the execution succeeded or not.
def
execute_mapping( self, mapping_file: str, output_file: str, serialization: str, rdb_username: Optional[str] = None, rdb_password: Optional[str] = None, rdb_host: Optional[str] = None, rdb_port: Optional[int] = None, rdb_name: Optional[str] = None, rdb_type: Optional[str] = None) -> bool:
109 def execute_mapping(self, 110 mapping_file: str, 111 output_file: str, 112 serialization: str, 113 rdb_username: Optional[str] = None, 114 rdb_password: Optional[str] = None, 115 rdb_host: Optional[str] = None, 116 rdb_port: Optional[int] = None, 117 rdb_name: Optional[str] = None, 118 rdb_type: Optional[str] = None) -> bool: 119 """Execute a mapping file with RMLMapper. 120 121 N-Quads and N-Triples are currently supported as serialization 122 format for RMLMapper. 123 124 Parameters 125 ---------- 126 mapping_file : str 127 Path to the mapping file to execute. 128 output_file : str 129 Name of the output file to store the triples in. 130 serialization : str 131 Serialization format to use. 132 rdb_username : Optional[str] 133 Username for the database, required when a database is used as 134 source. 135 rdb_password : Optional[str] 136 Password for the database, required when a database is used as 137 source. 138 rdb_host : Optional[str] 139 Hostname for the database, required when a database is used as 140 source. 141 rdb_port : Optional[int] 142 Port for the database, required when a database is used as source. 143 rdb_name : Optional[str] 144 Database name for the database, required when a database is used as 145 source. 146 rdb_type : Optional[str] 147 Database type, required when a database is used as source. 148 149 Returns 150 ------- 151 success : bool 152 Whether the execution was successfull or not. 153 """ 154 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 155 '-s', serialization, 156 '-o', os.path.join('/data/shared/', output_file), 157 '-d'] # Enable duplicate removal 158 159 if rdb_username is not None and rdb_password is not None \ 160 and rdb_host is not None and rdb_port is not None \ 161 and rdb_name is not None and rdb_type is not None: 162 163 arguments.append('-u') 164 arguments.append(rdb_username) 165 arguments.append('-p') 166 arguments.append(rdb_password) 167 168 parameters = '' 169 if rdb_type == 'MySQL': 170 protocol = 'jdbc:mysql' 171 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 172 elif rdb_type == 'PostgreSQL': 173 protocol = 'jdbc:postgresql' 174 else: 175 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 176 rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \ 177 f'{rdb_name}{parameters}' 178 arguments.append('-dsn') 179 arguments.append(rdb_dsn) 180 181 return self.execute(arguments)
Execute a mapping file with RMLMapper.
N-Quads and N-Triples are currently supported as serialization format for RMLMapper.
Parameters
- mapping_file (str): Path to the mapping file to execute.
- output_file (str): Name of the output file to store the triples in.
- serialization (str): Serialization format to use.
- rdb_username (Optional[str]): Username for the database, required when a database is used as source.
- rdb_password (Optional[str]): Password for the database, required when a database is used as source.
- rdb_host (Optional[str]): Hostname for the database, required when a database is used as source.
- rdb_port (Optional[int]): Port for the database, required when a database is used as source.
- rdb_name (Optional[str]): Database name for the database, required when a database is used as source.
- rdb_type (Optional[str]): Database type, required when a database is used as source.
Returns
- success (bool): Whether the execution was successfull or not.