"""
Generates all the import files: Ontologies, Databases and Experiments.
The module is reponsible for generating all the csv files that will
be loaded into the Graph database and also updates a stats object
(hdf table) with the number of entities and relationships from each
dataset imported. A new stats object is created the first time a
full import is run.
"""
import os.path
from datetime import datetime
import pandas as pd
from joblib import Parallel, delayed
from uuid import uuid4
import config.ckg_config as ckg_config
from graphdb_builder.ontologies import ontologies_controller as oh
from graphdb_builder.databases import databases_controller as dh
from graphdb_builder.experiments import experiments_controller as eh
from graphdb_builder.users import users_controller as uh
from graphdb_builder import builder_utils
log_config = ckg_config.graphdb_builder_log
logger = builder_utils.setup_logging(log_config, key="importer")
import_id = uuid4()
try:
cwd = os.path.abspath(os.path.dirname(__file__))
config = builder_utils.setup_config('builder')
directories = builder_utils.get_full_path_directories()
oconfig = builder_utils.setup_config('ontologies')
dbconfig = builder_utils.setup_config('databases')
econfig = builder_utils.setup_config('experiments')
uconfig = builder_utils.setup_config('users')
except Exception as err:
logger.error("importer - Reading configuration > {}.".format(err))
START_TIME = datetime.now()
[docs]def ontologiesImport(importDirectory, ontologies=None, download=True, import_type="partial"):
"""
Generates all the entities and relationships from the provided ontologies. If the ontologies list is\
not provided, then all the ontologies listed in the configuration will be imported (full_import). \
This function also updates the stats object with numbers from the imported ontologies.
:param str importDirectory: path of the import directory where files will be created.
:param list ontologies: a list of ontology names to be imported.
:param bool download: wether database is to be downloaded.
:param str import_type: type of import (´full´ or ´partial´).
"""
ontologiesImportDirectory = os.path.join(importDirectory, oconfig["ontologies_importDir"])
builder_utils.checkDirectory(ontologiesImportDirectory)
stats = oh.generate_graphFiles(ontologiesImportDirectory, ontologies, download)
statsDf = generateStatsDataFrame(stats)
setupStats(import_type=import_type)
writeStats(statsDf, import_type)
[docs]def databasesImport(importDirectory, databases=None, n_jobs=1, download=True, import_type="partial"):
"""
Generates all the entities and relationships from the provided databases. If the databases list is\
not provided, then all the databases listed in the configuration will be imported (full_import).\
This function also updates the stats object with numbers from the imported databases.
:param str importDirectory: path of the import directory where files will be created.
:param list databases: a list of database names to be imported.
:param int n_jobs: number of jobs to run in parallel. 1 by default when updating one database.
:param str import_type: type of import (´full´ or ´partial´).
"""
databasesImportDirectory = os.path.join(importDirectory, dbconfig["databasesImportDir"])
builder_utils.checkDirectory(databasesImportDirectory)
stats = dh.generateGraphFiles(databasesImportDirectory, databases, download, n_jobs)
statsDf = generateStatsDataFrame(stats)
setupStats(import_type=import_type)
writeStats(statsDf, import_type)
[docs]def experimentsImport(projects=None, n_jobs=1, import_type="partial"):
"""
Generates all the entities and relationships from the specified Projects. If the projects list is\
not provided, then all the projects the experiments directory will be imported (full_import). \
Calls function experimentImport.
:param list projects: list of project identifiers to be imported.
:param int n_jobs: number of jobs to run in parallel. 1 by default when updating one project.
:param str import_type: type of import (´full´ or ´partial´).
"""
experiments_import_directory = os.path.join(directories['importDirectory'], econfig["import_directory"])
builder_utils.checkDirectory(experiments_import_directory)
experiments_directory = os.path.join(directories['dataDirectory'], econfig["experiments_directory"])
if projects is None:
projects = builder_utils.listDirectoryFolders(experiments_directory)
if len(projects) > 0:
Parallel(n_jobs=n_jobs)(delayed(experimentImport)(experiments_import_directory, experiments_directory, project) for project in projects)
[docs]def experimentImport(importDirectory, experimentsDirectory, project):
"""
Generates all the entities and relationships from the specified Project. Called from function experimentsImport.
:param str importDirectory: path to the directory where all the import files are generated.
:param str experimentDirectory: path to the directory where all the experiments are located.
:param str project: identifier of the project to be imported.
"""
projectPath = os.path.join(importDirectory, project)
builder_utils.checkDirectory(projectPath)
projectDirectory = os.path.join(experimentsDirectory, project)
datasets = builder_utils.listDirectoryFolders(projectDirectory)
for dataset in datasets:
if dataset != "experimental_design":
datasetPath = os.path.join(projectPath, dataset)
builder_utils.checkDirectory(datasetPath)
eh.generate_dataset_imports(project, dataset, datasetPath)
[docs]def usersImport(importDirectory, import_type='partial'):
"""
Generates User entities from excel file and grants access of new users to the database.
This function also writes the relevant information to a tab-delimited file in the import \
directory.
:param str importDirectory: path to the directory where all the import files are generated.
:param str import_type: type of import (´full´ or ´partial).
"""
usersImportDirectory = os.path.join(importDirectory, uconfig['usersImportDirectory'])
builder_utils.checkDirectory(usersImportDirectory)
uh.parseUsersFile(usersImportDirectory, expiration=365)
[docs]def fullImport(download=True, n_jobs=4):
"""
Calls the different importer functions: Ontologies, databases, \
experiments. The first step is to check if the stats object exists \
and create it otherwise. Calls setupStats.
"""
try:
importDirectory = directories["importDirectory"]
builder_utils.checkDirectory(importDirectory)
setupStats(import_type='full')
logger.info("Full import: importing all Ontologies")
ontologiesImport(importDirectory, download=download, import_type='full')
logger.info("Full import: Ontologies import took {}".format(datetime.now() - START_TIME))
logger.info("Full import: importing all Databases")
databasesImport(importDirectory, n_jobs=n_jobs, download=download, import_type='full')
logger.info("Full import: Databases import took {}".format(datetime.now() - START_TIME))
logger.info("Full import: importing all Experiments")
experimentsImport(n_jobs=n_jobs, import_type='full')
logger.info("Full import: Experiments import took {}".format(datetime.now() - START_TIME))
logger.info("Full import: importing all Users")
usersImport(importDirectory, import_type='full')
logger.info("Full import: Users import took {}".format(datetime.now() - START_TIME))
except FileNotFoundError as err:
logger.error("Full import > {}.".format(err))
except EOFError as err:
logger.error("Full import > {}.".format(err))
except IOError as err:
logger.error("Full import > {}.".format(err))
except IndexError as err:
logger.error("Full import > {}.".format(err))
except KeyError as err:
logger.error("Full import > {}.".format(err))
except MemoryError as err:
logger.error("Full import > {}.".format(err))
except Exception as err:
logger.error("Full import > {}.".format(err))
[docs]def generateStatsDataFrame(stats):
"""
Generates a dataframe with the stats from each import.
:param list stats: a list with statistics collected from each importer function.
:return: Pandas dataframe with the collected statistics.
"""
statsDf = pd.DataFrame.from_records(list(stats), columns=config["statsCols"])
statsDf['import_id'] = import_id
statsDf['import_id'] = statsDf['import_id'].astype('str')
return statsDf
[docs]def setupStats(import_type):
"""
Creates a stats object that will collect all the statistics collected from each import.
"""
statsDirectory = directories["statsDirectory"]
statsFile = os.path.join(statsDirectory, config["statsFile"])
statsCols = config["statsCols"]
statsName = getStatsName(import_type)
try:
if not os.path.exists(statsDirectory) or not os.path.isfile(statsFile):
if not os.path.exists(statsDirectory):
os.makedirs(statsDirectory)
else:
pass
createEmptyStats(statsCols, statsFile, statsName)
else:
pass
except Exception as err:
logger.error("Setting up Stats object {} in file:{} > {}.".format(statsName, statsFile, err))
[docs]def createEmptyStats(statsCols, statsFile, statsName):
"""
Creates a HDFStore object with a empty dataframe with the collected stats columns.
:param list statsCols: a list of columns with the fields collected from the import statistics.
:param str statsFile: path where the object should be stored.
:param str statsName: name if the file containing the stats object.
"""
try:
statsDf = pd.DataFrame(columns=statsCols)
with pd.HDFStore(statsFile) as hdf:
hdf.put(statsName, statsDf, format='table', data_columns=True)
hdf.close()
except Exception as err:
logger.error("Creating empty Stats object {} in file:{} > {}.".format(statsName, statsFile, err))
# def loadStats(statsFile):
# """
# Loads the statistics object.
# :param str statsFile: file path where the stats object is stored.
# :returns: HDFStore object with the collected statistics. \
# stats can be accessed using a key (i.e stats_ version).
# """
# try:
# hdf = None
# if os.path.isfile(statsFile):
# hdf = pd.HDFStore(statsFile)
# except Exception as err:
# logger.error("Loading Stats file:{} > {}.".format(statsFile, err))
# return hdf
[docs]def writeStats(statsDf, import_type, stats_name=None):
"""
Appends the new collected statistics to the existing stats object.
:param statsDf: a pandas dataframe with the new statistics from the importing.
:param str statsName: If the statistics should be stored with a specific name.
"""
stats_directory = directories["statsDirectory"]
stats_file = os.path.join(stats_directory, config["statsFile"])
try:
if stats_name is None:
stats_name = getStatsName(import_type)
with pd.HDFStore(stats_file) as hdf:
hdf.append(stats_name, statsDf, data_columns=True, min_itemsize={'time': 8})
except Exception as err:
logger.error("Writing Stats object {} in file:{} > {}.".format(stats_name, stats_file, err))
[docs]def getStatsName(import_type):
"""
Generates the stats object name where to store the importing statistics from the CKG version, \
which is defined in the configuration.
:return: statsName: key used to store in the stats object.
:rtype: str
"""
version = ckg_config.version
statsName = import_type+'_stats_' + str(version).replace('.', '_')
return statsName
if __name__ == "__main__":
fullImport()