from typing import Optional, Union
from pathlib import Path
from tempfile import TemporaryDirectory
import hashlib
import pickle
import numpy as np
import pandas as pd
import pyterrier as pt
import shutil
import json
import sqlite3
from collections import defaultdict
from contextlib import closing, contextmanager
from more_itertools import chunked
from npids import Lookup
from pyterrier_caching import meta_file_compat
import pyterrier_alpha as pta
[docs]
class Hdf5ScorerCache(pta.Artifact, pt.Transformer):
"""A cache for storing and retrieving scores for documents, backed by an HDF5 file.
This is a *dense* scorer cache, meaning that space for all documents is allocated ahead of time.
Dense caches are more suitable than sparse ones when a large proportion of the corpus (or
the entire corpus) is expected to be scored. If only a small proportion of the corpus is expected
to be scored, a sparse cache (e.g., :class:`~pyterrier_caching.Sqlite3ScorerCache`) may be more appropriate.
"""
ARTIFACT_TYPE = 'scorer_cache'
ARTIFACT_FORMAT = 'hdf5'
def __init__(self,
path: Optional[Union[str, Path]] = None,
scorer: Optional[pt.Transformer] = None,
*,
verbose: bool = False,
):
""" Creates a new Hdf5ScorerCache instance.
Args:
path: The path to the directory where the cache should be stored, or None to create a temporary cache.
scorer: The scorer to use to score documents that are missing from the cache.
verbose: Whether to print verbose output.
"""
if path is None:
self._tmpdir = TemporaryDirectory()
path = Path(self._tmpdir.name) / 'cache'
else:
self._tmpdir = None
super().__init__(path)
meta_file_compat(path)
self.scorer = scorer
self.verbose = verbose
self.meta = None
self.file = None
self.docnos = None
self.dataset_cache = {}
def transform(self, inp):
return self.cached_scorer()(inp)
[docs]
def built(self) -> bool:
"""Returns whether this cache has been built."""
return (Path(self.path)/'pt_meta.json').exists()
[docs]
def build(self, corpus_iter=None, docnos_file=None):
"""Builds this cache."""
assert not self.built(), "this cache is alrady built"
assert corpus_iter is not None or docnos_file is not None
import h5py
with pta.ArtifactBuilder(self) as builder:
with h5py.File(str(builder.path/'data.h5'), 'a'):
pass # just create the data file
if docnos_file:
shutil.copy(docnos_file, builder.path/'docnos.npids')
builder.metadata['doc_count'] = len(Lookup(builder.path/'docnos.npids'))
else:
builder.metadata['doc_count'] = 0
with Lookup.builder(builder.path/'docnos.npids') as docno_lookup:
for record in corpus_iter:
docno_lookup.add(record['docno'])
builder.metadata['doc_count'] += 1
def _ensure_built(self):
import h5py
assert self.built(), "you must .build(...) this cache before it can be used"
if self.file is None:
self.file = h5py.File(self.path/'data.h5', 'r')
if self.meta is None:
with (self.path/'pt_meta.json').open('rt') as fin:
self.meta = json.load(fin)
if self.docnos is None:
self.docnos = Lookup(self.path/'docnos.npids')
def _get_dataset(self, qid):
if qid not in self.dataset_cache:
if qid not in self.file:
return None
self.dataset_cache[qid] = self.file[qid][:]
return self.dataset_cache[qid]
[docs]
def corpus_count(self) -> int:
"""Returns the number of documents in the corpus that this cache was built from."""
self._ensure_built()
return self.meta['doc_count']
def __repr__(self):
return f'Hdf5ScorerCache({repr(str(self.path))}, {self.scorer})'
[docs]
def cached_scorer(self) -> pt.Transformer:
"""Returns a scorer that uses this cache to store and retrieve scores."""
return Hdf5ScorerCacheScorer(self)
[docs]
def cached_retriever(self, num_results: int = 1000) -> pt.Transformer:
"""Returns a retriever that uses this cache to store and retrieve scores for every document in the corpus.
This transformer will raie an error if the entire corpus is not scored (e.g., from :meth:`score_all`).
"""
return Hdf5ScorerCacheRetriever(self, num_results)
[docs]
def close(self, delete_tmp: bool = True):
""" Closes this cache, releasing the file pointer that it holds and writing any new results to disk. """
if self.file is not None:
self.dataset_cache = {} # reset the dataset cache
self.file.close()
self.file = None
if self.meta is not None:
self.meta = None
if self._tmpdir is not None and delete_tmp:
self._tmpdir.cleanup()
self._tmpdir = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def merge_from(self, other: 'Hdf5ScorerCache'):
""" Merges the cached values from another Hdf5ScorerCache instance into this one.
Any records that appear in both ``self`` and ``other`` will be replaced with the value from ``other``.
"""
count = 0
other._ensure_built()
self._ensure_built()
self._ensure_write_mode()
assert self.meta['doc_count'] == other.meta['doc_count'], f'{self} and {other} are incompatible'
for key in other.file.keys():
data = other[key][:]
if key in self.file:
# copy over the non-missing values
mask = ~np.isnan(data)
self.file[key][mask] = data[mask]
if self.verbose:
count += mask.sum()
else:
# easy: just copy it all over
self.file.create_dataset(key, data=data)
if self.verbose:
count += (~np.isnan(data)).sum()
if self.verbose:
print(f"merged {count} records from {other} into {self}")
[docs]
def score_all(self, dataset, *, batch_size: int = 1024):
"""Scores all topics for the entire corpus, storing the results in this cache."""
if not self.built():
self.build(dataset.get_corpus_iter())
topics = dataset.get_topics()
scorer = self.cached_scorer()
for doc_batch in chunked(dataset.get_corpus_iter(), batch_size):
doc_batch = pd.DataFrame(doc_batch)
inp = pd.merge(topics, doc_batch, how='cross')
scorer(inp)
class Hdf5ScorerCacheScorer(pt.Transformer):
def __init__(self, cache: Hdf5ScorerCache):
self.cache = cache
def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
import h5py
self.cache._ensure_built()
to_score_idxs = []
to_score_map = defaultdict(list)
inp = inp.reset_index(drop=True)
values = pd.Series(index=inp.index, dtype=float)
# First pass: load what we can from cache
for query, group in inp.groupby('query'):
query_hash = hashlib.sha256(query.encode()).hexdigest()
ds = self.cache._get_dataset(query_hash)
if ds is None:
for idx in group.index:
to_score_idxs.append(idx)
to_score_map[query_hash, group.docno[idx]].append(idx)
else:
dids = self.cache.docnos.inv[np.array(group.docno)]
assert not (dids == -1).any(), 'unknown docno encountered'
scores = ds[dids]
for idx, score, is_miss in zip(group.index, scores, np.isnan(scores)):
if is_miss:
to_score_idxs.append(idx)
to_score_map[query_hash, group.docno[idx]].append(idx)
else:
values[idx] = score
# Second pass: score the missing ones and add to cache
if to_score_idxs:
if self.cache.scorer is None:
raise LookupError('values missing from cache, but no scorer provided')
scored = self.cache.scorer(inp.loc[to_score_idxs])
self.cache.close(delete_tmp=False)
with h5py.File(self.cache.path/'data.h5', 'a') as fout:
records = scored[['query', 'docno', 'score']]
for query, group in records.groupby('query'):
query_hash = hashlib.sha256(query.encode()).hexdigest()
if query_hash not in fout:
fout.create_dataset(query_hash, shape=(self.cache.corpus_count(),), dtype=np.float32, fillvalue=float('nan'))
dids = self.cache.docnos.inv[np.array(group.docno)]
assert not (dids == -1).any(), 'unknown docno encountered'
dids_sorted, dids_sorted_idx = np.unique(dids, return_index=True)
fout[query_hash][dids_sorted] = group.score.iloc[dids_sorted_idx]
for _, docno, score in group.itertuples(index=False):
for idx in to_score_map[query_hash, docno]:
values[idx] = score
results = inp.assign(score=values)
pt.model.add_ranks(results)
if self.cache.verbose:
print(f"{self}: {len(inp)-len(to_score_idxs)} hit(s), {len(to_score_idxs)} miss(es)")
return results
class Hdf5ScorerCacheRetriever(pt.Transformer):
def __init__(self, cache: Hdf5ScorerCache, num_results: int = 1000):
self.cache = cache
self.num_results = num_results
def transform(self, inp: pd.DataFrame) -> pd.DataFrame:
self.cache._ensure_built()
pta.validate.query_frame(inp, extra_columns=['query'])
inp = inp.reset_index(drop=True)
builder = pta.DataFrameBuilder(['_index', 'docno', 'score', 'rank'])
for i, query in enumerate(inp['query']):
query_hash = hashlib.sha256(query.encode()).hexdigest()
ds = self.cache._get_dataset(query_hash)
if ds is None:
raise RuntimeError(f'retriever only works if corpus is scored completely; '
f'no cached documents found for query {query!r}.')
ds = ds[:]
nans = np.isnan(ds)
if nans.any():
raise RuntimeError(f'retriever only works if corpus is scored completely; '
f'{nans.sum()} uncached documents found for query {query!r}.')
k = min(len(ds), self.num_results)
docids = ds.argpartition(-k)[-k:]
scores = ds[docids]
idxs = scores.argsort()[::-1]
builder.extend({
'_index': i,
'docno': self.cache.docnos.fwd[docids[idxs]],
'score': scores[idxs].astype(np.float64),
'rank': np.arange(scores.shape[0]),
})
return builder.to_df(merge_on_index=inp)
[docs]
class Sqlite3ScorerCache(pta.Artifact, pt.Transformer):
"""A cache for storing and retrieving scores for documents, backed by a SQLite3 database.
This is a *sparse* scorer cache, meaning that space is only allocated for documents that have been scored.
If a large proportion of the corpus is expected to be scored, a dense cache (e.g., :class:`~pyterrier_caching.Hdf5ScorerCache`)
may be more appropriate.
"""
ARTIFACT_TYPE = 'scorer_cache'
ARTIFACT_FORMAT = 'sqlite3'
def __init__(
self,
path: Optional[Union[str, Path]] = None,
scorer: pt.Transformer = None,
*,
group: Optional[str] = None,
key: Optional[str] = None,
value: Optional[str] = None,
pickle : Optional[bool] = None,
verbose: bool = False,
):
"""
Args:
path: The path to the directory where the cache should be stored, or None to create a temporary cache.
scorer: The scorer to use to score documents that are missing from the cache.
group: The name of the column in the input DataFrame that contains the group identifier (default: ``query``)
key: The name of the column in the input DataFrame that contains the document identifier (default: ``docno``)
value: The name of the column in the input DataFrame that contains the value to cache (default: ``score``)
pickle: Whether to pickle the value before storing it in the cache (default: False)
verbose: Whether to print verbose output when scoring documents.
If a cache does not yet exist at the provided ``path``, a new one is created.
.. versionchanged:: 0.3.0 added ``pickle`` option to support caching non-numeric values
"""
if path is None:
self._tmpdir = TemporaryDirectory()
path = Path(self._tmpdir.name) / 'cache'
else:
self._tmpdir = None
super().__init__(path)
meta_file_compat(path)
self.scorer = scorer
self.verbose = verbose
self.meta = None
if not (Path(self.path)/'pt_meta.json').exists():
if group is None:
group = 'query'
if key is None:
key = 'docno'
if value is None:
value = 'score'
with pta.ArtifactBuilder(self) as builder:
builder.metadata['group'] = group
builder.metadata['key'] = key
builder.metadata['value'] = value
builder.metadata['pickle'] = pickle
self.db = sqlite3.connect(builder.path/'db.sqlite3')
value_type = "BLOB" if pickle else "NUMERIC"
with closing(self.db.cursor()) as cursor:
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS cache (
[group] TEXT NOT NULL,
key TEXT NOT NULL,
value {value_type} NOT NULL,
PRIMARY KEY ([group], key)
)
""")
else:
self.db = sqlite3.connect(self.path/'db.sqlite3')
with (Path(self.path)/'pt_meta.json').open('rt') as fin:
self.meta = json.load(fin)
self.meta.setdefault('pickle', False)
if group is not None:
assert group == self.meta['group'], f'group={group!r} provided, but index created with group={self.meta["group"]!r}'
self.group = self.meta['group']
if key is not None:
assert key == self.meta['key'], f'key={key!r} provided, but index created with key={self.meta["key"]!r}'
self.key = self.meta['key']
if value is not None:
assert value == self.meta['value'], f'value={value!r} provided, but index created with value={self.meta["value"]!r}'
self.value = self.meta['value']
if pickle is not None:
assert pickle == self.meta['pickle'], f'pickle={pickle!r} provided, but index created with pickle={self.meta["pickle"]!r}'
self.pickle = self.meta['pickle']
[docs]
def close(self):
""" Closes this cache, releasing the sqlite connection that it holds. """
if self.db is not None:
self.db.close()
self.db = None
if self._tmpdir is not None:
self._tmpdir.cleanup()
self._tmpdir = None
@contextmanager
def _cursor(self):
assert self.db is not None, "cache is closed"
with closing(self.db.cursor()) as cursor:
yield cursor
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def merge_from(self, other: 'Sqlite3ScorerCache'):
""" Merges the cached values from another Sqlite3ScorerCache instance into this one.
Any keys that appear in both ``self`` and ``other`` will be replaced with the value from ``other``.
"""
count = 0
with self._cursor() as insert_cursor, other._cursor() as select_cursor:
select_cursor.execute('SELECT [group], key, value FROM cache')
while batch := select_cursor.fetchmany(10_000):
count += len(batch)
insert_cursor.executemany('INSERT OR REPLACE INTO cache ([group], key, value) VALUES (?, ?, ?)', batch)
self.db.commit()
if self.verbose:
print(f"merged {count} records from {other} into {self}")
def __repr__(self):
return f'Sqlite3ScorerCache({str(self.path)!r}, {self.scorer!r}, group={self.group!r}, key={self.key!r})'
# Default implementations
ScorerCache = Sqlite3ScorerCache
DenseScorerCache = Hdf5ScorerCache
SparseScorerCache = Sqlite3ScorerCache