Source code for pyterrier_doc2query.stores

import contextlib
from pathlib import Path
import json
import numpy as np
import lz4.block
import pyterrier as pt
from typing import Optional
from npids import Lookup
import pyterrier_alpha as pta
from . import Artefact


[docs] class Doc2QueryStore(pt.Indexer, Artefact): """A :class:`~pyterrier.Indexer` that caches and loads the queries generated by :class:`~pyterrier_doc2query.Doc2Query`. .. seealso:: This cache is deprecated in favor of :class:`pyterrier_caching.IndexerCache`. """ def __init__(self, path: str): """ Args: path: The path to the cache. """ self.path = Path(path) self._meta = None self._queries = None self._queries_offsets = None self._docnos = None def payload(self): if self._meta is None: assert (self.path/'pt_meta.json').exists() with (self.path/'pt_meta.json').open('rt') as fin: self._meta = json.load(fin) assert self._meta.get('type') == 'doc2query_store' if self._queries is None: self._queries = (self.path/'queries.lz4').open('rb') if self._queries_offsets is None: self._queries_offsets = np.memmap(self.path/'queries.offsets.u8', mode='r', dtype=np.uint64) if self._docnos is None: self._docnos = Lookup(self.path/'docnos.npids') return self._queries, self._queries_offsets, self._docnos def generator(self, limit_k: Optional[int] = None, append: bool = False): """Returns a transformer that loads the generated docquery text from the cache. Args: limit_k: The maximum number of queries to return per document. append: If True, the queries are appended to the text. Otherwise, the queries are stored in a separate attribute. """ return Doc2QueryStoreGenerator(self, limit_k, append=append) def transform(self, inp): return self.generator()(inp) def index(self, inp_it): """Builds this cache from the input iterator. """ assert not (self.path/'pt_meta.json').exists() self.path.mkdir(parents=True, exist_ok=True) with contextlib.ExitStack() as stack: f_queries = stack.enter_context((self.path/'queries.lz4').open('wb')) f_queries_offsets = stack.enter_context((self.path/'queries.offsets.u8').open('wb')) f_docnos = stack.enter_context(Lookup.builder(self.path/'docnos.npids')) f_queries_offsets.write(np.array([f_queries.tell()], dtype=np.uint64).tobytes()) for record in inp_it: f_queries.write(lz4.block.compress(record['querygen'].encode())) f_queries_offsets.write(np.array([f_queries.tell()], dtype=np.uint64).tobytes()) f_docnos.add(record['docno']) with (self.path/'pt_meta.json').open('wt') as f_meta: json.dump({'type': 'doc2query_store'}, f_meta) return self def lookup(self, docnos, limit_k=None): """Looks up the queries generated for the given docnos. Args: docnos: The docnos to lookup. limit_k: The maximum number of queries to return per document. """ single = False if isinstance(docnos, str): docnos = np.array([docnos]) single = True queries, q_offsets, docnos_lookup = self.payload() dids = docnos_lookup.inv[docnos] if (dids == -1).sum() > 0: raise ValueError(f"unknown docno(s) encountered: {docnos[dids == -1]}") querygen = [_lz4_read(queries, s, e, limit_k) for s, e in zip(q_offsets[dids], q_offsets[dids+1])] if single: return {'querygen': querygen[0]} return {'querygen': querygen} def __iter__(self): queries, q_offsets, docnos = self.payload() for i, docno in enumerate(docnos): yield {'docno': docno, 'querygen': _lz4_read(queries, q_offsets[i], q_offsets[i+1], limit_k=None)}
[docs] class QueryScoreStore(pt.Indexer, Artefact): """A :class:`~pyterrier.Indexer` that caches and loads generated query *scores* from :class:`~pyterrier_doc2query.QueryScorer`.""" def __init__(self, path: str): """ Args: path: The path to the cache. """ self.path = Path(path) self._meta = None self._scores = None self._scores_offsets = None self._queries = None self._queries_offsets = None self._docnos = None self._doc2query_store = None def payload(self): if self._meta is None: assert (self.path/'pt_meta.json').exists() with (self.path/'pt_meta.json').open('rt') as fin: self._meta = json.load(fin) assert self._meta.get('type') == 'query_score_store' if self._scores is None: self._scores = np.memmap(self.path/'scores.f2', mode='r', dtype=np.float16) if self._scores_offsets is None: self._scores_offsets = np.memmap(self.path/'scores.offsets.u8', mode='r', dtype=np.uint64) if self._meta.get('doc2query_store_repo'): if self._doc2query_store is None: self._doc2query_store = Doc2QueryStore.from_repo(self._meta['doc2query_store_repo']) queries, queries_offsets, docnos = self._doc2query_store.payload() if self._queries is None: self._queries = queries if self._queries_offsets is None: self._queries_offsets = queries_offsets if self._docnos is None: self._docnos = docnos else: if self._queries is None: self._queries = (self.path/'queries.lz4').open('rb') if self._queries_offsets is None: self._queries_offsets = np.memmap(self.path/'queries.offsets.u8', mode='r', dtype=np.uint64) if self._docnos is None: self._docnos = Lookup(self.path/'docnos.npids') return self._scores, self._scores_offsets, self._queries, self._queries_offsets, self._docnos def close(self): self._meta = None self._scores = None self._scores_offsets = None self._queries = None self._queries_offsets = None self._docnos = None self._doc2query_store = None def query_scorer(self, limit_k=None): """Returns a transformer that scores queries with the scores stored in this cache. """ return QueryScoreStoreScorer(self, limit_k) def transform(self, inp): return self.query_scorer()(inp) def index(self, inp_it): """Builds this cache from the input iterator.""" assert not (self.path/'pt_meta.json').exists() self.path.mkdir(parents=True, exist_ok=True) with contextlib.ExitStack() as stack: f_scores = stack.enter_context((self.path/'scores.f2').open('wb')) f_scores_offsets = stack.enter_context((self.path/'scores.offsets.u8').open('wb')) f_queries = stack.enter_context((self.path/'queries.lz4').open('wb')) f_queries_offsets = stack.enter_context((self.path/'queries.offsets.u8').open('wb')) f_docnos = stack.enter_context(Lookup.builder(self.path/'docnos.npids')) scores_count = 0 f_scores_offsets.write(np.array([scores_count], dtype=np.uint64).tobytes()) f_queries_offsets.write(np.array([f_queries.tell()], dtype=np.uint64).tobytes()) for record in inp_it: scores_count += len(record['querygen_score']) f_scores.write(record['querygen_score'].astype(np.float16).tobytes()) f_scores_offsets.write(np.array([scores_count], dtype=np.uint64).tobytes()) f_queries.write(lz4.block.compress(record['querygen'].encode())) f_queries_offsets.write(np.array([f_queries.tell()], dtype=np.uint64).tobytes()) f_docnos.add(record['docno']) with (self.path/'pt_meta.json').open('wt') as f_meta: json.dump({'type': 'query_score_store'}, f_meta) return self def percentile(self, p: float) -> float: """Returns the ``p``-th percentile of the scores stored in this cache.""" scores, s_offsets, queries, q_offsets, docnos = self.payload() return np.percentile(scores, p) def lookup(self, docnos, limit_k=None): """Looks up the scores generated for the given docnos. Args: docnos: The docnos to lookup. limit_k: The maximum number of scores to return per document. """ single = False if isinstance(docnos, str): docnos = np.array([docnos]) single = True scores, s_offsets, queries, q_offsets, docnos_lookup = self.payload() dids = docnos_lookup.inv[docnos] if (dids == -1).sum() > 0: raise ValueError(f"unknown docno(s) encountered: {docnos[dids == -1]}") querygen = [_lz4_read(queries, s, e, limit_k) for s, e in zip(q_offsets[dids], q_offsets[dids+1])] querygen_score = [np.array(scores[s:e][:limit_k]) for s, e in zip(s_offsets[dids], s_offsets[dids+1])] if single: return {'querygen': querygen[0], 'querygen_score': querygen_score[0]} return {'querygen': querygen, 'querygen_score': querygen_score} def __iter__(self): scores, s_offsets, queries, q_offsets, docnos = self.payload() for i, docno in enumerate(docnos): yield {'docno': docno, 'querygen': _lz4_read(queries, q_offsets[i], q_offsets[i+1], limit_k=None), 'querygen_score': np.array(scores[s_offsets[i]:s_offsets[i+1]])}
def _lz4_read(f, start, end, limit_k): if f.tell() != start: f.seek(start) buffer = f.read(end-start) queries = lz4.block.decompress(buffer).decode() if limit_k is not None: queries = '\n'.join(queries.split('\n')[:limit_k]) return queries class QueryScoreStoreScorer(pt.Transformer): def __init__(self, scorer_store, limit_k=None): self.scorer_store = scorer_store self.limit_k = limit_k def transform(self, inp): pta.validate.document_frame(inp, extra_columns=['text', 'querygen']) return inp.assign(**self.scorer_store.lookup(inp.docno, self.limit_k)) class Doc2QueryStoreGenerator(pt.Transformer): def __init__(self, d2q_store, limit_k=None, append=False): self.d2q_store = d2q_store self.limit_k = limit_k self.append = append def transform(self, inp): if self.append: pta.validate.document_frame(inp, extra_columns=['text']) else: pta.validate.document_frame(inp) res = self.d2q_store.lookup(inp.docno, self.limit_k) if self.append: return inp.assign(text=inp['text'] + '\n' + res['querygen']) return inp.assign(**self.d2q_store.lookup(inp.docno, self.limit_k))