Source code for pyterrier_adaptive.corpus_graph

import pickle
import tempfile
from lz4.frame import LZ4FrameFile
import json
import numpy as np
import pandas as pd
from pathlib import Path
import more_itertools
from typing import Union, Tuple, List
import ir_datasets
from npids import Lookup
import pyterrier_alpha as pta

try:
  from functools import cached_property
except ImportError:
  # polyfill for python<3.8, adapted from <https://github.com/pydanny/cached-property>
  class cached_property(object):
    def __init__(self, fn):
      self.fn = fn
    def __get__(self, obj, cls):
      value = obj.__dict__[self.fn.__name__] = self.fn(obj)
      return value
logger = ir_datasets.log.easy()


[docs] class CorpusGraph(pta.Artifact): def neighbours(self, docid: Union[int, str], weights: bool = False) -> Union[np.array, List[str], Tuple[np.array, np.array], Tuple[List[str], np.array]]: raise NotImplementedError() @staticmethod def load(path, **kwargs): with (Path(path)/'pt_meta.json').open('rt') as fin: meta = json.load(fin) assert meta.get('type') == 'corpus_graph' fmt = meta.get('format') if fmt == 'np_topk' or fmt == 'numpy_kmax': return NpTopKCorpusGraph(path, **kwargs) raise ValueError(f'Unknown corpus graph format: {fmt}') # TODO: rework & verify # @staticmethod # def from_np_dense_index(np_index, out_dir: Union[str, Path], k: int = 16, batch_size: int = 50000) -> 'CorpusGraph': # out_dir = Path(out_dir) # if out_dir.exists(): # raise FileExistsError(out_dir) # out_dir.mkdir(parents=True) # edges_path = out_dir/'edges.u32.np' # weights_path = out_dir/'weights.f16.np' # shutil.copyfile(np_index.index_path/'docnos.np', out_dir/'docnos.np') # S = batch_size # docnos, vectors = np_index.docnos_and_data() # doc_count = 0 # with ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw: # for i in logger.pbar(range(0, vectors.shape[0], S)): # top_idxs = [] # top_scores = [] # left = torch.from_numpy(vectors[i:i+S]).cuda() # left /= left.norm(dim=1, keepdim=True) # for j in logger.pbar(range(0, vectors.shape[0], S)): # right = torch.from_numpy(vectors[j:j+S]).cuda() # right /= right.norm(dim=1, keepdim=True) # top = (left @ right.T).topk(k+1, sorted=True, dim=1) # top_idxs.append((top.indices + j).cpu()) # top_scores.append(top.values.cpu()) # top_idxs = torch.cat(top_idxs, dim=1) # top_scores = torch.cat(top_scores, dim=1).cuda() # top = top_scores.topk(k+1, sorted=True, dim=1) # idxs = torch.gather(top_idxs, 1, top.indices.cpu())[:, 1:] # sims = top.values.cpu()[:, 1:] # fe.write(idxs.cpu().numpy().astype(np.uint32).tobytes()) # fw.write(sims.numpy().astype(np.float16).tobytes()) # doc_count += left.shape[0] # with (out_dir/'pt_meta.json').open('wt') as fout: # json.dump({ # 'type': 'corpus_graph', # 'format': 'np_topk', # 'doc_count': doc_count, # 'k': k, # }, fout) # return NumpyKMaxCorpusGraph(out_dir) @staticmethod def from_retriever(retriever, docs_it, out_dir: Union[str, Path], k: int = 16, batch_size: int = 1024) -> 'CorpusGraph': out_dir = Path(out_dir) if out_dir.exists(): raise FileExistsError(out_dir) out_dir.mkdir(parents=True) edges_path = out_dir/'edges.u32.np' weights_path = out_dir/'weights.f16.np' # First step: We need a docno <-> index mapping for this to work. Do a pass over the iterator # to build a docno loookup file, while also writing the contents to a temporary file that we'll read # in the next loop. (This avoids loading the entire iterator into memory.) with tempfile.TemporaryDirectory() as dout: with LZ4FrameFile(f'{dout}/docs.pkl.lz4', 'wb') as fout, Lookup.builder(out_dir/'docnos.npids') as docno_builder: for doc in logger.pbar(docs_it, miniters=1, smoothing=0, desc='first pass'): pickle.dump(doc, fout) docno_builder.add(doc['docno']) docnos = Lookup(out_dir/'docnos.npids') # Now read out everything in the file and retrieve using each one. Do this in batches for efficiency. with ir_datasets.util.finialized_file(str(edges_path), 'wb') as fe, ir_datasets.util.finialized_file(str(weights_path), 'wb') as fw, LZ4FrameFile(f'{dout}/docs.pkl.lz4', 'rb') as fin: for chunk in more_itertools.chunked(logger.pbar(range(len(docnos)), miniters=1, smoothing=0, desc='searching', total=len(docnos)), batch_size): chunk = [pickle.load(fin) for _ in chunk] res = retriever(pd.DataFrame(chunk).rename(columns={'docno': 'qid', 'text': 'query'})) res_by_qid = dict(iter(res.groupby('qid'))) for docno in [c['docno'] for c in chunk]: did_res = res_by_qid.get(docno) dids, scores = [], [] if did_res is not None: did_res = did_res[did_res.docno != docno].iloc[:k] if len(did_res) > 0: dids = docnos.inv[list(did_res.docno)] scores = list(did_res.score) if len(dids) < k: # if we didn't get as many as we expect, loop the document back to itself. dids += [docnos.inv[docno]] * (k - len(dids)) scores += [0.] * (k - len(scores)) fe.write(np.array(dids, dtype=np.uint32).tobytes()) fw.write(np.array(scores, dtype=np.float16).tobytes()) # Finally, keep track of metadata about this artefact. with (out_dir/'pt_meta.json').open('wt') as fout: json.dump({ 'type': 'corpus_graph', 'format': 'np_topk', 'package_hint': 'pyterrier-adaptive', 'doc_count': len(docnos), 'k': k, }, fout) return NpTopKCorpusGraph(out_dir)
[docs] class NpTopKCorpusGraph(CorpusGraph): def __init__(self, path, k=None): super().__init__(path) with (self.path/'pt_meta.json').open('rt') as fin: self.meta = json.load(fin) assert self.meta.get('type') == 'corpus_graph' and self.meta.get('format') in ('numpy_kmax', 'np_topk') self._data_k = self.meta['k'] if k is not None: assert k <= self.meta['k'] self._k = self.meta['k'] if k is None else k self._edges_path = self.path/'edges.u32.np' self._weights_path = self.path/'weights.f16.np' self._docnos = Lookup(self.path/'docnos.npids') def __repr__(self): return f'NpTopKCorpusGraph({repr(str(self.path))}, k={self._k})' @cached_property def edges_data(self): res = np.memmap(self._edges_path, mode='r', dtype=np.uint32).reshape(-1, self._data_k) if self._k != self._data_k: res = res[:, :self._k] return res @cached_property def weights_data(self): res = np.memmap(self._weights_path, mode='r', dtype=np.float16).reshape(-1, self._data_k) if self._k != self._data_k: res = res[:, :self._k] return res def neighbours(self, docid, weights=False): as_str = isinstance(docid, str) if as_str: docid = self._docnos.inv[docid] neigh = self.edges_data[docid] if as_str: neigh = self._docnos.fwd[neigh] if weights: weigh = self.weights_data[docid] return neigh, weigh return neigh
[docs] def to_limit_k(self, k: int) -> 'NpTopKCorpusGraph': """ Creates a version of the graph with a maximum of k edges from each node. k must be less than the number of edges per node from the original graph. """ return NpTopKCorpusGraph(self.path, k)