import logging
import os
import pathlib
from abc import abstractmethod
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
overload,
)
import dask.dataframe as dd
import pandas as pd
import pystow
from pystow.utils import read_zipfile_csv
from slugify import slugify
from .dask import read_dask_df_archive_csv
from .typing import BACKEND_LITERAL, COLUMNS, EA_SIDES
from .utils import fix_dataclass_init_docs
BASE_DATASET_KEY = "sylloge"
BASE_DATASET_MODULE = pystow.module(BASE_DATASET_KEY)
DataFrameType = TypeVar("DataFrameType", pd.DataFrame, dd.DataFrame)
if TYPE_CHECKING:
import dask.dataframe as dd
logger = logging.getLogger(__name__)
[docs]@fix_dataclass_init_docs
@dataclass
class TrainTestValSplit(Generic[DataFrameType]):
"""Dataclass holding split of gold standard entity links."""
#: entity links for training
train: DataFrameType
#: entity links for testing
test: DataFrameType
#: entity links for validation
val: DataFrameType
[docs]@fix_dataclass_init_docs
class EADataset(Generic[DataFrameType]):
"""Dataset class holding information of the alignment class."""
rel_triples_left: DataFrameType
rel_triples_right: DataFrameType
attr_triples_left: DataFrameType
attr_triples_right: DataFrameType
ent_links: DataFrameType
dataset_names: Tuple[str, str]
folds: Optional[Sequence[TrainTestValSplit[DataFrameType]]] = None
_REL_TRIPLES_LEFT_PATH: str = "rel_triples_left_parquet"
_REL_TRIPLES_RIGHT_PATH: str = "rel_triples_right_parquet"
_ATTR_TRIPLES_LEFT_PATH: str = "attr_triples_left_parquet"
_ATTR_TRIPLES_RIGHT_PATH: str = "attr_triples_right_parquet"
_ENT_LINKS_PATH: str = "ent_links_parquet"
_FOLD_DIR: str = "folds"
_TRAIN_LINKS_PATH: str = "train_parquet"
_TEST_LINKS_PATH: str = "test_parquet"
_VAL_LINKS_PATH: str = "val_parquet"
_DATASET_NAMES_PATH: str = "dataset_names.txt"
def __init__(
self,
*,
rel_triples_left: DataFrameType,
rel_triples_right: DataFrameType,
attr_triples_left: DataFrameType,
attr_triples_right: DataFrameType,
ent_links: DataFrameType,
dataset_names: Tuple[str, str],
folds: Optional[Sequence[TrainTestValSplit[DataFrameType]]] = None,
backend: BACKEND_LITERAL = "pandas",
npartitions: int = 1,
) -> None:
"""Create an entity aligment dataclass.
:param rel_triples_left: relation triples of left knowledge graph
:param rel_triples_right: relation triples of right knowledge graph
:param attr_triples_left: attribute triples of left knowledge graph
:param attr_triples_right: attribute triples of right knowledge graph
:param dataset_names: tuple of dataset names
:param ent_links: gold standard entity links of alignment
:param folds: optional pre-split folds of the gold standard
:param backend: which backend is used of either 'pandas' or 'dask'
:param npartitions: how many partitions to use for each frame, when using dask
"""
self.rel_triples_left = rel_triples_left
self.rel_triples_right = rel_triples_right
self.attr_triples_left = attr_triples_left
self.attr_triples_right = attr_triples_right
self.ent_links = ent_links
self.dataset_names = dataset_names
self.folds = folds
self.npartitions: int = npartitions
self._backend: BACKEND_LITERAL = backend
# trigger possible transformation
self.backend = backend
@property
def _canonical_name(self) -> str:
raise NotImplementedError
@property
def canonical_name(self) -> str:
"""A canonical name for this dataset instance.
This includes all the necessary information
to distinguish this specific dataset as string.
This can be used e.g. to create folders with this
dataset name to store results.
:return: concise string representation for this dataset instance
"""
name = self._canonical_name
assert isinstance(name, str) # for mypy
return slugify(name, separator="_")
@property
def _param_repr(self) -> str:
raise NotImplementedError
@property
def _statistics(self) -> str:
if isinstance(self.rel_triples_left, pd.DataFrame):
return f"rel_triples_left={len(self.rel_triples_left)}, rel_triples_right={len(self.rel_triples_right)}, attr_triples_left={len(self.attr_triples_left)}, attr_triples_right={len(self.attr_triples_right)}, ent_links={len(self.ent_links)}, folds={len(self.folds) if self.folds else None}" # type: ignore
else:
unknown = "unknown_len"
return f"rel_triples_left={unknown}, rel_triples_right={unknown}, attr_triples_left={unknown}, attr_triples_right={unknown}, ent_links={unknown}, folds={unknown if self.folds else None}"
def __repr__(self) -> str:
return f"{self.__class__.__name__}(backend={self.backend}, {self._param_repr}{self._statistics})"
def _additional_backend_handling(self, backend: BACKEND_LITERAL):
pass
@property
def backend(self) -> BACKEND_LITERAL:
return self._backend
@backend.setter
def backend(self, backend: BACKEND_LITERAL):
"""Set backend and transform data if needed"""
if backend == "pandas":
self._backend = "pandas"
if isinstance(self.rel_triples_left, pd.DataFrame):
return
else:
self.rel_triples_left = self.rel_triples_left.compute()
self.rel_triples_right = self.rel_triples_right.compute()
self.attr_triples_left = self.attr_triples_left.compute()
self.attr_triples_right = self.attr_triples_right.compute()
self.ent_links = self.ent_links.compute()
if self.folds:
for fold in self.folds:
fold.train = fold.train.compute()
fold.test = fold.test.compute()
fold.val = fold.val.compute()
elif backend == "dask":
self._backend = "dask"
if isinstance(self.rel_triples_left, dd.DataFrame):
if self.rel_triples_left.npartitions != self.npartitions:
self.rel_triples_left = self.rel_triples_left.repartition(
npartitions=self.npartitions
)
self.rel_triples_right = self.rel_triples_right.repartition(
npartitions=self.npartitions
)
self.attr_triples_left = self.attr_triples_left.repartition(
npartitions=self.npartitions
)
self.attr_triples_right = self.attr_triples_right.repartition(
npartitions=self.npartitions
)
self.ent_links = self.ent_links.repartition(
npartitions=self.npartitions
)
if self.folds:
for fold in self.folds:
fold.train = fold.train.repartition(
npartitions=self.npartitions
)
fold.test = fold.test.repartition(
npartitions=self.npartitions
)
fold.val = fold.val.repartition(
npartitions=self.npartitions
)
else:
return
else:
self.rel_triples_left = dd.from_pandas(
self.rel_triples_left, npartitions=self.npartitions
)
self.rel_triples_right = dd.from_pandas(
self.rel_triples_right, npartitions=self.npartitions
)
self.attr_triples_left = dd.from_pandas(
self.attr_triples_left, npartitions=self.npartitions
)
self.attr_triples_right = dd.from_pandas(
self.attr_triples_right, npartitions=self.npartitions
)
self.ent_links = dd.from_pandas(
self.ent_links, npartitions=self.npartitions
)
if self.folds:
for fold in self.folds:
fold.train = dd.from_pandas(
fold.train, npartitions=self.npartitions
)
fold.test = dd.from_pandas(
fold.test, npartitions=self.npartitions
)
fold.val = dd.from_pandas(
fold.val, npartitions=self.npartitions
)
else:
raise ValueError(f"Unknown backend {backend}")
self._additional_backend_handling(backend)
[docs] def to_parquet(self, path: Union[str, pathlib.Path], **kwargs):
"""Write dataset to path as several parquet files.
:param path: directory where dataset will be stored. Will be created if necessary.
:param kwargs: will be handed through to `to_parquet` functions
.. seealso:: :func:`read_parquet`
"""
if not os.path.exists(path):
os.makedirs(path)
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
# write dataset names
with open(path.joinpath(self.__class__._DATASET_NAMES_PATH), "w") as fh:
for side, name in zip(EA_SIDES, self.dataset_names):
fh.write(f"{side}:{name}\n")
# write tables
for table, table_path in zip(
[
self.rel_triples_left,
self.rel_triples_right,
self.attr_triples_left,
self.attr_triples_right,
self.ent_links,
],
[
self.__class__._REL_TRIPLES_LEFT_PATH,
self.__class__._REL_TRIPLES_RIGHT_PATH,
self.__class__._ATTR_TRIPLES_LEFT_PATH,
self.__class__._ATTR_TRIPLES_RIGHT_PATH,
self.__class__._ENT_LINKS_PATH,
],
):
table.to_parquet(path.joinpath(table_path), **kwargs)
# write folds
if self.folds:
fold_path = path.joinpath(self.__class__._FOLD_DIR)
for fold_number, fold in enumerate(self.folds, start=1):
fold_dir = fold_path.joinpath(str(fold_number))
os.makedirs(fold_dir)
for links, link_path in zip(
[fold.train, fold.test, fold.val],
[
self.__class__._TRAIN_LINKS_PATH,
self.__class__._TEST_LINKS_PATH,
self.__class__._VAL_LINKS_PATH,
],
):
table.to_parquet(fold_dir.joinpath(link_path), **kwargs)
@classmethod
def _read_parquet_values(
cls,
path: Union[str, pathlib.Path],
backend: BACKEND_LITERAL = "pandas",
**kwargs,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
read_parquet_fn = pd.read_parquet if backend == "pandas" else dd.read_parquet
# read dataset names
with open(path.joinpath(cls._DATASET_NAMES_PATH), "r") as fh:
dataset_names = tuple(line.strip().split(":")[1] for line in fh)
# for mypy
dataset_names = cast(Tuple[str, str], dataset_names)
tables = {}
# read tables
for table, table_path in zip(
[
"rel_triples_left",
"rel_triples_right",
"attr_triples_left",
"attr_triples_right",
"ent_links",
],
[
cls._REL_TRIPLES_LEFT_PATH,
cls._REL_TRIPLES_RIGHT_PATH,
cls._ATTR_TRIPLES_LEFT_PATH,
cls._ATTR_TRIPLES_RIGHT_PATH,
cls._ENT_LINKS_PATH,
],
):
tables[table] = read_parquet_fn(path.joinpath(table_path), **kwargs)
# read folds
fold_path = path.joinpath(cls._FOLD_DIR)
folds = None
if os.path.exists(fold_path):
folds = []
for tmp_fold_dir in sorted(sub_dir for sub_dir in os.listdir(fold_path)):
fold_dir = fold_path.joinpath(tmp_fold_dir)
train_test_val = {}
for links, link_path in zip(
["train", "test", "val"],
[
cls._TRAIN_LINKS_PATH,
cls._TEST_LINKS_PATH,
cls._VAL_LINKS_PATH,
],
):
train_test_val[links] = read_parquet_fn(
fold_dir.joinpath(link_path), **kwargs
)
folds.append(TrainTestValSplit(**train_test_val))
npartitions = 1
if backend == "dask":
npartitions = tables["rel_triples_left"].npartitions
return (
dict(
dataset_names=dataset_names,
folds=folds,
backend=backend,
npartitions=npartitions,
**tables,
),
{},
)
[docs] @classmethod
def read_parquet(
cls,
path: Union[str, pathlib.Path],
backend: BACKEND_LITERAL = "pandas",
**kwargs,
) -> "EADataset":
"""Read dataset from parquet files in given `path`.
This function expects the left/right attribute/relation triples and entity links as well as a `dataset_names.txt`
Optionally folds are read from a `folds` directory, with numbered fold subdirectories containing train/test/val links.
:param path: Directory with files
:param backend: Whether to use pandas or dask for reading
:param kwargs: passed on to the respective read function
:return: EADataset read from parquet
.. seealso:: :func:`to_parquet`
"""
init_kwargs, additional_kwargs = cls._read_parquet_values(
path=path, backend=backend, **kwargs
)
instance = cls(**init_kwargs)
instance.__dict__.update(additional_kwargs)
return instance
[docs]class CacheableEADataset(EADataset[DataFrameType]):
def __init__(
self,
*,
cache_path: pathlib.Path,
use_cache: bool = True,
parquet_load_options: Optional[Mapping] = None,
parquet_store_options: Optional[Mapping] = None,
**init_kwargs,
):
"""EADataset that uses caching after initial read.
:param cache_path: Path where cache will be stored/loaded
:param use_cache: whether to use cache
:param parquet_load_options: handed through to parquet loading function
:param parquet_store_options: handed through to parquet writing function
:param init_kwargs: other arguments for creating the EADataset instance
"""
self.cache_path = cache_path
self.parquet_load_options = parquet_load_options or {}
self.parquet_store_options = parquet_store_options or {}
backend = init_kwargs["backend"]
specific_npartitions = init_kwargs["npartitions"]
update_cache = False
additional_kwargs: Dict[str, Any] = {}
if use_cache:
if self.cache_path.exists():
logger.info(f"Loading from cache at {self.cache_path}")
ea_ds_kwargs, new_additional_kwargs = self.load_from_cache(
backend=backend
)
init_kwargs.update(ea_ds_kwargs)
additional_kwargs.update(new_additional_kwargs)
else:
init_kwargs.update(self.initial_read(backend=backend))
update_cache = True
else:
init_kwargs.update(self.initial_read(backend=backend))
if specific_npartitions != 1:
init_kwargs["npartitions"] = specific_npartitions
self.__dict__.update(additional_kwargs)
super().__init__(**init_kwargs)
if update_cache:
logger.info(f"Caching dataset at {self.cache_path}")
self.store_cache()
[docs] def create_cache_path(
self,
pystow_module: pystow.Module,
inner_cache_path: str,
cache_path: Optional[pathlib.Path] = None,
) -> pathlib.Path:
"""Uses either pystow module or cache_path to create cache path.
:param pystow_module: module where data is stored
:param inner_cache_path: path relative to pystow/cache path
:param cache_path: alternative to pystow module
:return: cache path as `pathlib.Path`
"""
if cache_path is None:
return pystow_module.join("cached", inner_cache_path, ensure_exists=False)
else:
return cache_path.joinpath(inner_cache_path)
def load_from_cache(
self, backend: BACKEND_LITERAL = "pandas"
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
return self.__class__._read_parquet_values(
path=self.cache_path, backend=backend
)
[docs] @abstractmethod
def initial_read(self, backend: BACKEND_LITERAL) -> Dict[str, Any]:
"""Read data for initialising EADataset."""
def store_cache(self):
self.to_parquet(self.cache_path, **self.parquet_store_options)
[docs]class ZipEADataset(CacheableEADataset):
"""Dataset created from zip file which is downloaded."""
def __init__(
self,
*,
cache_path: pathlib.Path,
zip_path: str,
inner_path: pathlib.PurePosixPath,
dataset_names: Tuple[str, str],
file_name_rel_triples_left: str = "rel_triples_1",
file_name_rel_triples_right: str = "rel_triples_2",
file_name_attr_triples_left: str = "attr_triples_1",
file_name_attr_triples_right: str = "attr_triples_2",
file_name_ent_links: str = "ent_links",
backend: BACKEND_LITERAL = "pandas",
npartitions: int = 1,
use_cache: bool = True,
):
"""Initialize ZipEADataset.
:param cache_path: Path where cache will be stored/loaded
:param zip_path: path to zip archive containing data
:param inner_path: base path inside zip archive
:param dataset_names: tuple of dataset names
:param file_name_rel_triples_left: file name of left relation triples
:param file_name_rel_triples_right: file name of right relation triples
:param file_name_attr_triples_left: file name of left attribute triples
:param file_name_attr_triples_right: file name of right attribute triples
:param file_name_ent_links: file name gold standard containing all entity links
:param backend: Whether to use "pandas" or "dask"
:param npartitions: how many partitions to use for each frame, when using dask
:param use_cache: whether to use cache or not
"""
self.zip_path = zip_path
self.inner_path = inner_path
self.file_name_rel_triples_left = file_name_rel_triples_left
self.file_name_rel_triples_right = file_name_rel_triples_right
self.file_name_ent_links = file_name_ent_links
self.file_name_attr_triples_left = file_name_attr_triples_left
self.file_name_attr_triples_right = file_name_attr_triples_right
super().__init__(
dataset_names=dataset_names,
cache_path=cache_path,
backend=backend,
npartitions=npartitions,
use_cache=use_cache,
)
[docs] def initial_read(self, backend: BACKEND_LITERAL) -> Dict[str, Any]:
return dict(
rel_triples_left=self._read_triples(
file_name=self.file_name_rel_triples_left, backend=backend
),
rel_triples_right=self._read_triples(
file_name=self.file_name_rel_triples_right, backend=backend
),
attr_triples_left=self._read_triples(
file_name=self.file_name_attr_triples_left, backend=backend
),
attr_triples_right=self._read_triples(
file_name=self.file_name_attr_triples_right, backend=backend
),
ent_links=self._read_triples(
file_name=self.file_name_ent_links, is_links=True, backend=backend
),
)
@overload
def _read_triples(
self,
file_name: Union[str, pathlib.Path],
backend: Literal["pandas"],
is_links: bool = False,
) -> pd.DataFrame:
...
@overload
def _read_triples(
self,
file_name: Union[str, pathlib.Path],
backend: Literal["dask"],
is_links: bool = False,
) -> "dd.DataFrame":
...
def _read_triples(
self,
file_name: Union[str, pathlib.Path],
backend: BACKEND_LITERAL,
is_links: bool = False,
) -> Union[pd.DataFrame, "dd.DataFrame"]:
columns = list(EA_SIDES) if is_links else COLUMNS
read_csv_kwargs = dict(
header=None,
names=columns,
sep="\t",
encoding="utf8",
dtype=str,
)
if backend == "pandas":
return read_zipfile_csv(
path=self.zip_path,
inner_path=str(self.inner_path.joinpath(file_name)),
**read_csv_kwargs,
)
else:
return read_dask_df_archive_csv(
path=self.zip_path,
inner_path=str(self.inner_path.joinpath(file_name)),
protocol="zip",
**read_csv_kwargs,
)
[docs]class ZipEADatasetWithPreSplitFolds(ZipEADataset):
"""Dataset with pre-split folds created from zip file which is downloaded."""
def __init__(
self,
*,
cache_path: pathlib.Path,
zip_path: str,
inner_path: pathlib.PurePosixPath,
dataset_names: Tuple[str, str],
file_name_rel_triples_left: str = "rel_triples_1",
file_name_rel_triples_right: str = "rel_triples_2",
file_name_ent_links: str = "ent_links",
file_name_attr_triples_left: str = "attr_triples_1",
file_name_attr_triples_right: str = "attr_triples_2",
backend: BACKEND_LITERAL = "pandas",
npartitions: int = 1,
directory_name_folds: str = "721_5fold",
directory_names_individual_folds: Sequence[str] = ("1", "2", "3", "4", "5"),
file_name_test_links: str = "test_links",
file_name_train_links: str = "train_links",
file_name_valid_links: str = "valid_links",
use_cache: bool = True,
):
"""Initialize ZipEADatasetWithPreSplitFolds.
:param cache_path: Path where cache will be stored/loaded
:param zip_path: path to zip archive containing data
:param inner_path: base path inside zip archive
:param dataset_names: tuple of dataset names
:param file_name_rel_triples_left: file name of left relation triples
:param file_name_rel_triples_right: file name of right relation triples
:param file_name_attr_triples_left: file name of left attribute triples
:param file_name_attr_triples_right: file name of right attribute triples
:param file_name_ent_links: file name gold standard containing all entity links
:param backend: Whether to use "pandas" or "dask"
:param npartitions: how many partitions to use for each frame, when using dask
:param directory_name_folds: name of the folds directory
:param directory_names_individual_folds: name of individual folds
:param file_name_test_links: name of test link file
:param file_name_train_links: name of train link file
:param file_name_valid_links: name of valid link file
:param use_cache: whether to use cache or not
"""
self.zip_path = zip_path
self.inner_path = inner_path
self.directory_names_individual_folds = directory_names_individual_folds
self.directory_name_folds = directory_name_folds
self.file_name_train_links = file_name_train_links
self.file_name_test_links = file_name_test_links
self.file_name_valid_links = file_name_valid_links
super().__init__(
dataset_names=dataset_names,
zip_path=zip_path,
inner_path=inner_path,
cache_path=cache_path,
backend=backend,
npartitions=npartitions,
use_cache=use_cache,
file_name_rel_triples_left=file_name_rel_triples_left,
file_name_rel_triples_right=file_name_rel_triples_right,
file_name_ent_links=file_name_ent_links,
file_name_attr_triples_left=file_name_attr_triples_left,
file_name_attr_triples_right=file_name_attr_triples_right,
)
[docs] def initial_read(self, backend: BACKEND_LITERAL):
folds = []
for fold in self.directory_names_individual_folds:
fold_folder = pathlib.Path(self.directory_name_folds).joinpath(fold)
train = self._read_triples(
fold_folder.joinpath(self.file_name_train_links),
is_links=True,
backend=backend,
)
test = self._read_triples(
fold_folder.joinpath(self.file_name_test_links),
is_links=True,
backend=backend,
)
val = self._read_triples(
fold_folder.joinpath(self.file_name_valid_links),
is_links=True,
backend=backend,
)
folds.append(TrainTestValSplit(train=train, test=test, val=val))
return {**super().initial_read(backend=backend), **dict(folds=folds)}