bench_executor.rmlmapper

The RMLMapper executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources.

Website: https://rml.io
Repository: https://github.com/RMLio/rmlmapper-java

  1#!/usr/bin/env python3
  2
  3"""
  4The RMLMapper executes RML rules to generate high quality Linked Data
  5from multiple originally (semi-)structured data sources.
  6
  7**Website**: https://rml.io<br>
  8**Repository**: https://github.com/RMLio/rmlmapper-java
  9"""
 10
 11import os
 12import psutil
 13from typing import Optional
 14from timeout_decorator import timeout, TimeoutError  # type: ignore
 15from bench_executor.container import Container
 16from bench_executor.logger import Logger
 17
 18VERSION = '6.0.0'
 19TIMEOUT = 6 * 3600  # 6 hours
 20
 21
 22class RMLMapper(Container):
 23    """RMLMapper container for executing R2RML and RML mappings."""
 24
 25    def __init__(self, data_path: str, config_path: str, directory: str,
 26                 verbose: bool):
 27        """Creates an instance of the RMLMapper class.
 28
 29        Parameters
 30        ----------
 31        data_path : str
 32            Path to the data directory of the case.
 33        config_path : str
 34            Path to the config directory of the case.
 35        directory : str
 36            Path to the directory to store logs.
 37        verbose : bool
 38            Enable verbose logs.
 39        """
 40        self._data_path = os.path.abspath(data_path)
 41        self._config_path = os.path.abspath(config_path)
 42        self._logger = Logger(__name__, directory, verbose)
 43        self._verbose = verbose
 44
 45        os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True)
 46        super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper',
 47                         self._logger,
 48                         volumes=[f'{self._data_path}/rmlmapper:/data',
 49                                  f'{self._data_path}/shared:/data/shared'])
 50
 51    @property
 52    def root_mount_directory(self) -> str:
 53        """Subdirectory in the root directory of the case for RMLMapper.
 54
 55        Returns
 56        -------
 57        subdirectory : str
 58            Subdirectory of the root directory for RMLMapper.
 59
 60        """
 61        return __name__.lower()
 62
 63    @timeout(TIMEOUT)
 64    def _execute_with_timeout(self, arguments: list) -> bool:
 65        """Execute a mapping with a provided timeout.
 66
 67        Returns
 68        -------
 69        success : bool
 70            Whether the execution was successfull or not.
 71        """
 72        if self._verbose:
 73            arguments.append('-vvvvvvvvvvv')
 74
 75        self._logger.info(f'Executing RMLMapper with arguments '
 76                          f'{" ".join(arguments)}')
 77
 78        # Set Java heap to 1/2 of available memory instead of the default 1/4
 79        max_heap = int(psutil.virtual_memory().total * (1/2))
 80
 81        # Execute command
 82        cmd = f'java -Xmx{max_heap} -Xms{max_heap} ' + \
 83              '-jar rmlmapper/rmlmapper.jar ' + \
 84              f'{" ".join(arguments)}'
 85        return self.run_and_wait_for_exit(cmd)
 86
 87    def execute(self, arguments: list) -> bool:
 88        """Execute RMLMapper with given arguments.
 89
 90        Parameters
 91        ----------
 92        arguments : list
 93            Arguments to supply to RMLMapper.
 94
 95        Returns
 96        -------
 97        success : bool
 98            Whether the execution succeeded or not.
 99        """
100        try:
101            return self._execute_with_timeout(arguments)
102        except TimeoutError:
103            msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper'
104            self._logger.warning(msg)
105
106        return False
107
108    def execute_mapping(self,
109                        mapping_file: str,
110                        output_file: str,
111                        serialization: str,
112                        rdb_username: Optional[str] = None,
113                        rdb_password: Optional[str] = None,
114                        rdb_host: Optional[str] = None,
115                        rdb_port: Optional[int] = None,
116                        rdb_name: Optional[str] = None,
117                        rdb_type: Optional[str] = None) -> bool:
118        """Execute a mapping file with RMLMapper.
119
120        N-Quads and N-Triples are currently supported as serialization
121        format for RMLMapper.
122
123        Parameters
124        ----------
125        mapping_file : str
126            Path to the mapping file to execute.
127        output_file : str
128            Name of the output file to store the triples in.
129        serialization : str
130            Serialization format to use.
131        rdb_username : Optional[str]
132            Username for the database, required when a database is used as
133            source.
134        rdb_password : Optional[str]
135            Password for the database, required when a database is used as
136            source.
137        rdb_host : Optional[str]
138            Hostname for the database, required when a database is used as
139            source.
140        rdb_port : Optional[int]
141            Port for the database, required when a database is used as source.
142        rdb_name : Optional[str]
143            Database name for the database, required when a database is used as
144            source.
145        rdb_type : Optional[str]
146            Database type, required when a database is used as source.
147
148        Returns
149        -------
150        success : bool
151            Whether the execution was successfull or not.
152        """
153        arguments = ['-m', os.path.join('/data/shared/', mapping_file),
154                     '-s', serialization,
155                     '-o', os.path.join('/data/shared/', output_file),
156                     '-d']  # Enable duplicate removal
157
158        if rdb_username is not None and rdb_password is not None \
159                and rdb_host is not None and rdb_port is not None \
160                and rdb_name is not None and rdb_type is not None:
161
162            arguments.append('-u')
163            arguments.append(rdb_username)
164            arguments.append('-p')
165            arguments.append(rdb_password)
166
167            parameters = ''
168            if rdb_type == 'MySQL':
169                protocol = 'jdbc:mysql'
170                parameters = '?allowPublicKeyRetrieval=true&useSSL=false'
171            elif rdb_type == 'PostgreSQL':
172                protocol = 'jdbc:postgresql'
173            else:
174                raise ValueError(f'Unknown RDB type: "{rdb_type}"')
175            rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \
176                      f'{rdb_name}{parameters}'
177            arguments.append('-dsn')
178            arguments.append(rdb_dsn)
179
180        return self.execute(arguments)
class RMLMapper(bench_executor.container.Container):
 23class RMLMapper(Container):
 24    """RMLMapper container for executing R2RML and RML mappings."""
 25
 26    def __init__(self, data_path: str, config_path: str, directory: str,
 27                 verbose: bool):
 28        """Creates an instance of the RMLMapper class.
 29
 30        Parameters
 31        ----------
 32        data_path : str
 33            Path to the data directory of the case.
 34        config_path : str
 35            Path to the config directory of the case.
 36        directory : str
 37            Path to the directory to store logs.
 38        verbose : bool
 39            Enable verbose logs.
 40        """
 41        self._data_path = os.path.abspath(data_path)
 42        self._config_path = os.path.abspath(config_path)
 43        self._logger = Logger(__name__, directory, verbose)
 44        self._verbose = verbose
 45
 46        os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True)
 47        super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper',
 48                         self._logger,
 49                         volumes=[f'{self._data_path}/rmlmapper:/data',
 50                                  f'{self._data_path}/shared:/data/shared'])
 51
 52    @property
 53    def root_mount_directory(self) -> str:
 54        """Subdirectory in the root directory of the case for RMLMapper.
 55
 56        Returns
 57        -------
 58        subdirectory : str
 59            Subdirectory of the root directory for RMLMapper.
 60
 61        """
 62        return __name__.lower()
 63
 64    @timeout(TIMEOUT)
 65    def _execute_with_timeout(self, arguments: list) -> bool:
 66        """Execute a mapping with a provided timeout.
 67
 68        Returns
 69        -------
 70        success : bool
 71            Whether the execution was successfull or not.
 72        """
 73        if self._verbose:
 74            arguments.append('-vvvvvvvvvvv')
 75
 76        self._logger.info(f'Executing RMLMapper with arguments '
 77                          f'{" ".join(arguments)}')
 78
 79        # Set Java heap to 1/2 of available memory instead of the default 1/4
 80        max_heap = int(psutil.virtual_memory().total * (1/2))
 81
 82        # Execute command
 83        cmd = f'java -Xmx{max_heap} -Xms{max_heap} ' + \
 84              '-jar rmlmapper/rmlmapper.jar ' + \
 85              f'{" ".join(arguments)}'
 86        return self.run_and_wait_for_exit(cmd)
 87
 88    def execute(self, arguments: list) -> bool:
 89        """Execute RMLMapper with given arguments.
 90
 91        Parameters
 92        ----------
 93        arguments : list
 94            Arguments to supply to RMLMapper.
 95
 96        Returns
 97        -------
 98        success : bool
 99            Whether the execution succeeded or not.
100        """
101        try:
102            return self._execute_with_timeout(arguments)
103        except TimeoutError:
104            msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper'
105            self._logger.warning(msg)
106
107        return False
108
109    def execute_mapping(self,
110                        mapping_file: str,
111                        output_file: str,
112                        serialization: str,
113                        rdb_username: Optional[str] = None,
114                        rdb_password: Optional[str] = None,
115                        rdb_host: Optional[str] = None,
116                        rdb_port: Optional[int] = None,
117                        rdb_name: Optional[str] = None,
118                        rdb_type: Optional[str] = None) -> bool:
119        """Execute a mapping file with RMLMapper.
120
121        N-Quads and N-Triples are currently supported as serialization
122        format for RMLMapper.
123
124        Parameters
125        ----------
126        mapping_file : str
127            Path to the mapping file to execute.
128        output_file : str
129            Name of the output file to store the triples in.
130        serialization : str
131            Serialization format to use.
132        rdb_username : Optional[str]
133            Username for the database, required when a database is used as
134            source.
135        rdb_password : Optional[str]
136            Password for the database, required when a database is used as
137            source.
138        rdb_host : Optional[str]
139            Hostname for the database, required when a database is used as
140            source.
141        rdb_port : Optional[int]
142            Port for the database, required when a database is used as source.
143        rdb_name : Optional[str]
144            Database name for the database, required when a database is used as
145            source.
146        rdb_type : Optional[str]
147            Database type, required when a database is used as source.
148
149        Returns
150        -------
151        success : bool
152            Whether the execution was successfull or not.
153        """
154        arguments = ['-m', os.path.join('/data/shared/', mapping_file),
155                     '-s', serialization,
156                     '-o', os.path.join('/data/shared/', output_file),
157                     '-d']  # Enable duplicate removal
158
159        if rdb_username is not None and rdb_password is not None \
160                and rdb_host is not None and rdb_port is not None \
161                and rdb_name is not None and rdb_type is not None:
162
163            arguments.append('-u')
164            arguments.append(rdb_username)
165            arguments.append('-p')
166            arguments.append(rdb_password)
167
168            parameters = ''
169            if rdb_type == 'MySQL':
170                protocol = 'jdbc:mysql'
171                parameters = '?allowPublicKeyRetrieval=true&useSSL=false'
172            elif rdb_type == 'PostgreSQL':
173                protocol = 'jdbc:postgresql'
174            else:
175                raise ValueError(f'Unknown RDB type: "{rdb_type}"')
176            rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \
177                      f'{rdb_name}{parameters}'
178            arguments.append('-dsn')
179            arguments.append(rdb_dsn)
180
181        return self.execute(arguments)

RMLMapper container for executing R2RML and RML mappings.

RMLMapper(data_path: str, config_path: str, directory: str, verbose: bool)
26    def __init__(self, data_path: str, config_path: str, directory: str,
27                 verbose: bool):
28        """Creates an instance of the RMLMapper class.
29
30        Parameters
31        ----------
32        data_path : str
33            Path to the data directory of the case.
34        config_path : str
35            Path to the config directory of the case.
36        directory : str
37            Path to the directory to store logs.
38        verbose : bool
39            Enable verbose logs.
40        """
41        self._data_path = os.path.abspath(data_path)
42        self._config_path = os.path.abspath(config_path)
43        self._logger = Logger(__name__, directory, verbose)
44        self._verbose = verbose
45
46        os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True)
47        super().__init__(f'kgconstruct/rmlmapper:v{VERSION}', 'RMLMapper',
48                         self._logger,
49                         volumes=[f'{self._data_path}/rmlmapper:/data',
50                                  f'{self._data_path}/shared:/data/shared'])

Creates an instance of the RMLMapper class.

Parameters
  • data_path (str): Path to the data directory of the case.
  • config_path (str): Path to the config directory of the case.
  • directory (str): Path to the directory to store logs.
  • verbose (bool): Enable verbose logs.
root_mount_directory: str

Subdirectory in the root directory of the case for RMLMapper.

Returns
  • subdirectory (str): Subdirectory of the root directory for RMLMapper.
def execute(self, arguments: list) -> bool:
 88    def execute(self, arguments: list) -> bool:
 89        """Execute RMLMapper with given arguments.
 90
 91        Parameters
 92        ----------
 93        arguments : list
 94            Arguments to supply to RMLMapper.
 95
 96        Returns
 97        -------
 98        success : bool
 99            Whether the execution succeeded or not.
100        """
101        try:
102            return self._execute_with_timeout(arguments)
103        except TimeoutError:
104            msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper'
105            self._logger.warning(msg)
106
107        return False

Execute RMLMapper with given arguments.

Parameters
  • arguments (list): Arguments to supply to RMLMapper.
Returns
  • success (bool): Whether the execution succeeded or not.
def execute_mapping( self, mapping_file: str, output_file: str, serialization: str, rdb_username: Optional[str] = None, rdb_password: Optional[str] = None, rdb_host: Optional[str] = None, rdb_port: Optional[int] = None, rdb_name: Optional[str] = None, rdb_type: Optional[str] = None) -> bool:
109    def execute_mapping(self,
110                        mapping_file: str,
111                        output_file: str,
112                        serialization: str,
113                        rdb_username: Optional[str] = None,
114                        rdb_password: Optional[str] = None,
115                        rdb_host: Optional[str] = None,
116                        rdb_port: Optional[int] = None,
117                        rdb_name: Optional[str] = None,
118                        rdb_type: Optional[str] = None) -> bool:
119        """Execute a mapping file with RMLMapper.
120
121        N-Quads and N-Triples are currently supported as serialization
122        format for RMLMapper.
123
124        Parameters
125        ----------
126        mapping_file : str
127            Path to the mapping file to execute.
128        output_file : str
129            Name of the output file to store the triples in.
130        serialization : str
131            Serialization format to use.
132        rdb_username : Optional[str]
133            Username for the database, required when a database is used as
134            source.
135        rdb_password : Optional[str]
136            Password for the database, required when a database is used as
137            source.
138        rdb_host : Optional[str]
139            Hostname for the database, required when a database is used as
140            source.
141        rdb_port : Optional[int]
142            Port for the database, required when a database is used as source.
143        rdb_name : Optional[str]
144            Database name for the database, required when a database is used as
145            source.
146        rdb_type : Optional[str]
147            Database type, required when a database is used as source.
148
149        Returns
150        -------
151        success : bool
152            Whether the execution was successfull or not.
153        """
154        arguments = ['-m', os.path.join('/data/shared/', mapping_file),
155                     '-s', serialization,
156                     '-o', os.path.join('/data/shared/', output_file),
157                     '-d']  # Enable duplicate removal
158
159        if rdb_username is not None and rdb_password is not None \
160                and rdb_host is not None and rdb_port is not None \
161                and rdb_name is not None and rdb_type is not None:
162
163            arguments.append('-u')
164            arguments.append(rdb_username)
165            arguments.append('-p')
166            arguments.append(rdb_password)
167
168            parameters = ''
169            if rdb_type == 'MySQL':
170                protocol = 'jdbc:mysql'
171                parameters = '?allowPublicKeyRetrieval=true&useSSL=false'
172            elif rdb_type == 'PostgreSQL':
173                protocol = 'jdbc:postgresql'
174            else:
175                raise ValueError(f'Unknown RDB type: "{rdb_type}"')
176            rdb_dsn = f'{protocol}://{rdb_host}:{rdb_port}/' + \
177                      f'{rdb_name}{parameters}'
178            arguments.append('-dsn')
179            arguments.append(rdb_dsn)
180
181        return self.execute(arguments)

Execute a mapping file with RMLMapper.

N-Quads and N-Triples are currently supported as serialization format for RMLMapper.

Parameters
  • mapping_file (str): Path to the mapping file to execute.
  • output_file (str): Name of the output file to store the triples in.
  • serialization (str): Serialization format to use.
  • rdb_username (Optional[str]): Username for the database, required when a database is used as source.
  • rdb_password (Optional[str]): Password for the database, required when a database is used as source.
  • rdb_host (Optional[str]): Hostname for the database, required when a database is used as source.
  • rdb_port (Optional[int]): Port for the database, required when a database is used as source.
  • rdb_name (Optional[str]): Database name for the database, required when a database is used as source.
  • rdb_type (Optional[str]): Database type, required when a database is used as source.
Returns
  • success (bool): Whether the execution was successfull or not.