Source code for pyterrier_dr.indexes

# Deprecated module
import torch
import itertools
import math
import json
from pathlib import Path
from os.path import commonprefix
import numpy as np
import shutil
import more_itertools
import pandas as pd
import pyterrier as pt
from pyterrier.model import add_ranks
import ir_datasets

_logger = ir_datasets.log.easy()


[docs] class DocnoFile: """Represents a document ID lookup file. .. deprecated:: 0.6.0 This class was replaced with ``npids.Lookup`` """ def __init__(self, path): with open(path, 'rb') as f: metadata_size = int.from_bytes(f.read(2), byteorder='little') metadata = json.loads(f.read(metadata_size)) self.prefix = metadata['prefix'] self.max_length = metadata['max_length'] self.count = metadata['count'] if 'start' in metadata: self.start = metadata['start'] self.step = metadata['step'] self.fmt_string = metadata['fmt_string'] self.mmap = None else: self.mmap = np.memmap(path, f'S{self.max_length}', offset=metadata_size+2) def __getitem__(self, idx): if isinstance(idx, (int, np.int64, np.int32)): if self.mmap is not None: return self.prefix + self.mmap[idx].decode() else: assert 0 <= idx < self.count return self._fmt_idx(idx) elif isinstance(idx, str): if self.mmap is not None: raise RuntimeError("reverse lookup not supported") else: if not idx.startswith(self.prefix): return None idx = idx[len(self.prefix):] if not idx.isnumeric(): return None idx = int(idx) if idx % self.step != 0: return None idx = (idx // self.step) - self.start if idx < 0 or idx >= self.count: return None return idx else: if self.mmap is not None: docnos = self.mmap[idx] return np.array([self.prefix + d.decode() for d in docnos], dtype=object) else: assert ((0 <= idx) & (idx < self.count)).all() docnos = idx * self.step + self.start return np.array([self.prefix + format(d, self.fmt_string) for d in docnos], dtype=object) def __len__(self): return self.count def __iter__(self): if self.mmap is not None: for item in self.mmap: yield self.prefix + item.decode() else: for idx in range(self.count): yield self._fmt_idx(idx) def __del__(self): del self.mmap self.mmap = None def _fmt_idx(self, idx): return self.prefix + format(idx * self.step + self.start, self.fmt_string) @staticmethod def build(docnos, path): assert len(docnos) > 0 prefix = commonprefix(docnos) # reduce the size by removing common docno prefixes docnos = [d[len(prefix):].encode() for d in docnos] # remove prefix max_length = max(len(d) for d in docnos) metadata = {'count': len(docnos), 'prefix': prefix, 'max_length': max_length} if all(d.decode().isnumeric() for d in docnos) and len(docnos) >= 2: # Special case: are the docnos (1) all numeric, (2) incrementing at a constant step? start = int(docnos[0]) step = int(docnos[1]) - start fmt_string = f'0{max_length}d' if any(d!=b'0' and d.startswith(b'0') for d in docnos) else 'd' for d, expected in zip(docnos, itertools.count(start, step)): if d.decode() != format(expected, fmt_string): break # pattern broken else: # everything was numeric and incremental metadata['start'] = start metadata['step'] = step metadata['fmt_string'] = fmt_string with open(path, 'wb') as f: b_metadata = json.dumps(metadata).encode() f.write(len(b_metadata).to_bytes(2, byteorder='little')) f.write(b_metadata) if 'start' not in metadata: f.write(np.array(docnos, dtype=f'S{max_length}').tobytes())
[docs] class NilIndex(pt.Indexer): """This class is an indexer that does nothing. It is meant to be used for testing. .. deprecated:: 0.6.0 """ def transform(self, res): return res def index(self, it): for _ in it: pass
[docs] class NumpyIndex(pt.Indexer): """This class implements a disk-based dense vector index using numpy memory maps. .. deprecated:: 0.6.0 This class has been replaced with :class:`pyterrier_dr.FlexIndex`. """ def __init__(self, index_path=None, num_results=1000, score_fn='dot', overwrite=False, batch_size=4096, verbose=False, dtype='f4', drop_query_vec=True, inmem=False, cuda=False): self.index_path = Path(index_path) self.num_results = num_results self.score_fn = score_fn self.overwrite = overwrite self.verbose = verbose self.batch_size = batch_size self.dtype = dtype self.drop_query_vec = drop_query_vec self._docnos = None self._data = None self.inmem = inmem self.cuda = cuda def docnos_and_data(self): if self._data is None: path = self.index_path with open(path/'meta.json', 'rt') as f_meta: meta = json.load(f_meta) if self.inmem: with open(path/'index.npy', 'rb') as fin: self._data = np.frombuffer(fin.read(), dtype=self.dtype).reshape(meta['count'], meta['vec_size']) if self.cuda: self._data = torch.tensor(self._data, dtype=torch.float16, device='cuda') else: self._data = np.memmap(path/'index.npy', mode='r', dtype=self.dtype, shape=(meta['count'], meta['vec_size'])) self._docnos = DocnoFile(path/'docnos.npy') return self._docnos, self._data def __len__(self): with open(self.index_path/'meta.json', 'rt') as f_meta: meta = json.load(f_meta) return meta['count'] def transform(self, inp): columns = set(inp.columns) if all(c in columns for c in ('qid', 'query_vec', 'docno')): return self.transform_R(inp) if all(c in columns for c in ('qid', 'query_vec')): return self.transform_Q(inp) raise RuntimeError('NumpyIndex expects input columns of either:\n' ' - ["qid", "query_vec", "docno", ...]: Re-ranking\n' ' - ["qid", "query_vec", ...]: Retrieval\n' '(Are you encoding the query?)') def transform_Q(self, inp): query_vecs = np.stack(inp['query_vec']) if self.score_fn == 'cos': query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True) if self.cuda: query_vecs = torch.from_numpy(query_vecs).cuda() num_q = query_vecs.shape[0] res = [] docnos, data = self.docnos_and_data() ranked_lists = RankedLists(self.num_results, num_q) if self.cuda and self.inmem: scores = query_vecs.half() @ data.T scores, dids = torch.topk(scores, k=self.num_results, dim=1) scores = scores.cpu().float().numpy() dids = dids.cpu().numpy() ranked_lists.update(scores, dids) else: batch_it = range(0, data.shape[0], self.batch_size) if self.verbose: batch_it = pt.tqdm(batch_it) for idx_start in batch_it: doc_batch = data[idx_start:idx_start+self.batch_size].T if self.score_fn == 'cos': doc_batch = doc_batch / np.linalg.norm(doc_batch, axis=0, keepdims=True) if self.cuda: doc_batch = torch.from_numpy(doc_batch).cuda() scores = query_vecs @ doc_batch if self.cuda: scores = scores.cpu().numpy() dids = np.arange(idx_start, idx_start+doc_batch.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0) ranked_lists.update(scores, dids) result_scores, result_dids = ranked_lists.results() unique_dids, unique_inverse = np.unique(result_dids, return_inverse=True) unique_docnos = docnos[unique_dids] result_docnos = unique_docnos[unique_inverse].reshape(num_q, -1) for query, scores, docnos in zip(inp.itertuples(index=False), result_scores, result_docnos): if self.drop_query_vec: query = query._replace(query_vec=None) for score, docno in zip(scores, docnos): res.append((*query, docno, score)) res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score']) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = res[~res.score.isna()] res = add_ranks(res) return res def transform_R(self, inp): assert not self.cuda, "cuda not yet supported for re-ranking" docnos, data = self.docnos_and_data() all_scores = [] it = range(0, len(inp), self.batch_size) if self.verbose: it = pt.tqdm(it, desc='re-ranking from index', unit='batch') for start_idx in it: end_idx = start_idx + self.batch_size query_vecs = np.stack(inp.iloc[start_idx:end_idx]['query_vec']) dids = np.array([docnos[str(d)] for d in inp.iloc[start_idx:end_idx]['docno']]) # reverse lookup doc_vecs = data[dids] if self.score_fn == 'cos': query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True) doc_vecs = doc_vecs / np.linalg.norm(doc_vecs, axis=1, keepdims=True) all_scores.append((query_vecs * doc_vecs).sum(axis=1)) all_scores = np.concatenate(all_scores, axis=0) res = inp.assign(score=all_scores) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = res[~res.score.isna()] res = add_ranks(res) return res def index(self, inp): if isinstance(inp, pd.DataFrame): inp = inp.to_dict(orient="records") inp = more_itertools.peekable(inp) path = Path(self.index_path) if path.exists(): if self.overwrite: shutil.rmtree(path) else: raise RuntimeError(f'Index already exists at {self.index_path}. If you want to delete and re-create an existing index, you can pass overwrite=True') path.mkdir(parents=True, exist_ok=True) docnos = [] vec_size = None count = 0 with open(path/'index.npy', 'wb') as fout: for docs in more_itertools.chunked(inp, self.batch_size): doc_vecs = np.stack([d['doc_vec'] for d in docs]) if vec_size is None: vec_size = doc_vecs.shape[1] elif vec_size != doc_vecs.shape[1]: raise ValueError('Inconsistent vector shapes detected') doc_vecs = doc_vecs.astype(self.dtype) fout.write(doc_vecs.tobytes()) docnos.extend([d['docno'] for d in docs]) count += len(docs) DocnoFile.build(docnos, path/'docnos.npy') with open(path/'meta.json', 'wt') as f_meta: json.dump({'dtype': self.dtype, 'vec_size': vec_size, 'count': count}, f_meta) def get_corpus_iter(self, start_idx=None, stop_idx=None, verbose=True): docnos, data = self.docnos_and_data() docno_iter = iter(docnos) if start_idx is not None or stop_idx is not None: docno_iter = itertools.islice(docno_iter, start_idx, stop_idx) data = data[start_idx:stop_idx] with pt.tqdm(total=data.shape[0], desc=f'{str(self.index_path)} document vectors') as pbar: for start_idx in range(0, data.shape[0], self.batch_size): vec_batch = data[start_idx:start_idx+self.batch_size] for vec_idx in range(vec_batch.shape[0]): pbar.update() yield {'docno': next(docno_iter), 'doc_vec': vec_batch[vec_idx]}
[docs] class MemIndex(pt.Indexer): """This class implements an in-memory dense vector index using numpy arrays. .. deprecated:: 0.6.0 This class has been replaced with :class:`pyterrier_dr.FlexIndex`. """ def __init__(self, num_results=1000, score_fn='dot', batch_size=4096, verbose=True, dtype='f4', drop_query_vec=True): # check we havent been passed a destination index, as per disk-based indexers assert isinstance(num_results, int) self.num_results = num_results self.score_fn = score_fn self.verbose = verbose self.batch_size = batch_size self.dtype = dtype self.drop_query_vec = drop_query_vec self._docnos = None self._data = None def docnos_and_data(self): return self._docnos, self._data def clear(self): self._docnos = None self._data = None def __len__(self): return self._data.shape[0] def transform(self, inp): columns = set(inp.columns) assert all(f in columns for f in ['qid', 'query_vec']), "NumpyIndex expects columns: ['qid', 'query_vec'] when used in a pipeline" query_vecs = np.stack(inp['query_vec']) if self.score_fn == 'cos': query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True) num_q = query_vecs.shape[0] res = [] docnos, data = self.docnos_and_data() data = data.T assert docnos is not None or data is not None, "You must first call index()" ranked_lists = RankedLists(self.num_results, num_q) if self.score_fn == 'cos': data = data / np.linalg.norm(data, axis=0, keepdims=True) scores = query_vecs @ data dids = np.arange(data.shape[1], dtype='i4').reshape(1, -1).repeat(num_q, axis=0) ranked_lists.update(scores, dids) result_scores, result_dids = ranked_lists.results() for query, scores, dids in zip(inp.itertuples(index=False), result_scores, result_dids): if self.drop_query_vec: query = query._replace(query_vec=None) for score, did in zip(scores, dids): res.append((*query, docnos[int(did)], score)) res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score']) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = res[~res.score.isna()] res = add_ranks(res) return res def index(self, inp): if isinstance(inp, pd.DataFrame): inp = inp.to_dict(orient="records") inp = more_itertools.peekable(inp) if self.verbose: inp = pt.tqdm(inp, desc='indexing', unit='doc') docnos = [] vec_size = None count = 0 data = [] for docs in more_itertools.chunked(inp, self.batch_size): doc_vecs = np.stack([d['doc_vec'] for d in docs]) if vec_size is None: vec_size = doc_vecs.shape[1] doc_vecs = doc_vecs.astype(self.dtype) data.append(doc_vecs) docnos.extend([d['docno'] for d in docs]) count += len(docs) self._data = np.concatenate(data, axis=0) self._docnos = docnos def get_corpus_iter(self, start_idx=None, stop_idx=None, verbose=True): docnos, data = self.docnos_and_data() docno_iter = iter(docnos) if start_idx is not None or stop_idx is not None: docno_iter = iter(docnos[start_idx:stop_idx]) data = data[start_idx:stop_idx] with pt.tqdm(total=data.shape[0], desc=f'{str(self.index_path)} document vectors') as pbar: for start_idx in range(0, data.shape[0], self.batch_size): vec_batch = data[start_idx:start_idx+self.batch_size] for vec_idx in range(vec_batch.shape[0]): pbar.update() yield {'docno': next(docno_iter), 'doc_vec': vec_batch[vec_idx]}
class RankedLists: def __init__(self, num_results : int, num_queries : int): self.num_results = num_results self.num_queries = num_queries self.scores = np.empty((num_queries, 0), dtype='f4') self.docids = np.empty((num_queries, 0), dtype='i4') def update(self, scores, docids): assert self.num_queries == scores.shape[0] self.scores = np.concatenate([self.scores, -scores], axis=1) self.docids = np.concatenate([self.docids, docids], axis=1) if self.scores.shape[1] > self.num_results: partition_idxs = np.argpartition(self.scores, self.num_results, axis=1)[:, :self.num_results] self.scores = np.take_along_axis(self.scores, partition_idxs, axis=1) self.docids = np.take_along_axis(self.docids, partition_idxs, axis=1) def results(self): sort_idxs = np.argsort(self.scores, axis=1) return ( np.take_along_axis(-self.scores, sort_idxs, axis=1), np.take_along_axis( self.docids, sort_idxs, axis=1), ) class TorchRankedLists: def __init__(self, num_results, num_queries): self.num_results = num_results self.num_queries = num_queries self.scores = None self.docids = None def update(self, scores, docids): if self.scores is not None: self.scores, idxs = torch.cat([self.scores, scores], dim=1).topk(self.num_results, dim=1) self.docids = torch.cat([self.docids, docids], dim=1).take_along_dim(idxs, dim=1) else: self.scores = scores self.docids = docids #self.scores.append(scores) #self.docids.append(docids) def results(self): return (self.scores.cpu().numpy(), self.docids.cpu().numpy()) #scores = torch.cat(self.scores, dim=0) #docids = torch.cat(self.docids, dim=0) #top_scores, top_idxs = scores.topk(self.num_results, dim=1) #return ( # top_scores.cpu().numpy(), # docids[top_idxs].cpu().numpy() #)
[docs] class FaissFlat(pt.Indexer): """This class implements a disk-based dense vector index using Faiss Flat indexes. .. deprecated:: 0.6.0 This class has been replaced with :class:`pyterrier_dr.FlexIndex`. """ def __init__(self, index_path=None, num_results=1000, shard_size=500_000, score_fn='cos', overwrite=False, batch_size=4096, verbose=False, drop_query_vec=True, inmem=False, cuda=False): self.index_path = Path(index_path) self.num_results = num_results self.shard_size = shard_size self.score_fn = score_fn self.overwrite = overwrite self.verbose = verbose self.batch_size = batch_size self.drop_query_vec = drop_query_vec self.cuda = cuda self._shards = None self._shards = list(self.iter_shards()) def iter_shards(self): import faiss if self.cuda: res = faiss.StandardGpuResources() if self._shards is None: for shardid in itertools.count(): if not (self.index_path/f'{shardid}.faiss').exists(): break index = faiss.read_index(str(self.index_path/f'{shardid}.faiss')) if self.cuda: index = faiss.index_cpu_to_gpu(res, 0, index) yield index else: yield from self._shards # if self._shards is None: # shards = [] # for shardid in itertools.count(): # if not (self.index_path/f'{shardid}.faiss').exists(): # break # index = faiss.read_index(str(self.index_path/f'{shardid}.faiss')) # shards.append(index) # self._shards = shards # if shardid == 0: # raise RuntimeError(f'Index not found at {self.index_path}') # return iter(self._shards) def transform(self, inp): columns = set(inp.columns) assert all(f in columns for f in ['qid', 'query_vec']), "Faiss expects columns: ['qid', 'query_vec'] when used in a pipeline" query_vecs = np.stack(inp['query_vec']) if self.score_fn == 'cos': query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True) query_vecs = query_vecs.copy() res = [] docnos = DocnoFile(self.index_path/'docnos.npy') num_q = query_vecs.shape[0] ranked_lists = RankedLists(self.num_results, num_q) dids_offset = 0 it = enumerate(self.iter_shards()) if self.verbose: it = pt.tqdm(it, unit='shard', desc='scanning shards') for shardidx, index in it: scores, dids = [], [] QBATCH = 16 for qidx in range(0, num_q, QBATCH): s, d = index.search(query_vecs[qidx:qidx+QBATCH], self.num_results) scores.append(s) dids.append(d) ranked_lists.update(np.concatenate(scores, axis=0), np.concatenate(dids, axis=0)+dids_offset) dids_offset += index.ntotal result_scores, result_dids = ranked_lists.results() unique_dids, unique_inverse = np.unique(result_dids, return_inverse=True) unique_docnos = docnos[unique_dids] result_docnos = unique_docnos[unique_inverse].reshape(num_q, -1) for query, scores, docnos in zip(inp.itertuples(), result_scores, result_docnos): if self.drop_query_vec: query = query._replace(query_vec=None) for score, docno in zip(scores, docnos): res.append((*query, docno, score)) res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score']) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = add_ranks(res) return res def index(self, inp): import faiss if isinstance(inp, pd.DataFrame): inp = inp.to_dict(orient="records") path = Path(self.index_path) if path.exists(): if self.overwrite: shutil.rmtree(path) else: raise RuntimeError(f'Index already exists at {self.index_path}. If you want to delete and re-create an existing index, you can pass overwrite=True') path.mkdir(parents=True, exist_ok=True) docnos = [] for shardid, shard in enumerate(more_itertools.ichunked(inp, self.shard_size)): shard = more_itertools.peekable(shard) d = shard.peek()['doc_vec'].shape[0] index = faiss.index_factory(d, 'Flat', faiss.METRIC_INNER_PRODUCT) shard_it = more_itertools.chunked(shard, self.batch_size) if self.verbose: shard_it = pt.tqdm(shard_it, total=math.ceil(self.shard_size/self.batch_size), unit='batch', desc=f'building shard {shardid}') for batch in shard_it: doc_vecs = np.stack(d['doc_vec'] for d in batch) if self.score_fn == 'cos': doc_vecs = doc_vecs / np.linalg.norm(doc_vecs, axis=1, keepdims=True) index.add(doc_vecs) docnos.extend(d['docno'] for d in batch) faiss.write_index(index, str(path/f'{shardid}.faiss')) DocnoFile.build(docnos, path/'docnos.npy')
[docs] class FaissHnsw(pt.Indexer): """This class implements a disk-based dense vector index using Faiss HNSW for approximate nearest neighbor retrieval. .. deprecated:: 0.6.0 This class has been replaced with :class:`pyterrier_dr.FlexIndex`. """ def __init__(self, index_path, num_links=32, num_results=1000, shard_size=500_000, score_fn='cos', overwrite=False, batch_size=4096, verbose=False, drop_query_vec=True, qbatch=16): self.index_path = Path(index_path) self.num_links = num_links self.num_results = num_results self.shard_size = shard_size self.score_fn = score_fn self.overwrite = overwrite self.verbose = verbose self.batch_size = batch_size self.drop_query_vec = drop_query_vec self.qbatch = qbatch self._shards = None def iter_shards(self, cache=True): if not cache: return self._iter_shards() else: if self._shards is None: self._shards = list(self._iter_shards()) return iter(self._shards) def _iter_shards(self): import faiss for shardid in itertools.count(): if not (self.index_path/f'{shardid}.faiss').exists(): break yield faiss.read_index(str(self.index_path/f'{shardid}.faiss')) if shardid == 0: raise RuntimeError(f'Index not found at {self.index_path}') def transform(self, inp): columns = set(inp.columns) assert all(f in columns for f in ['qid', 'query_vec']), "Faiss expects columns: ['qid', 'query_vec'] when used in a pipeline" query_vecs = np.stack(inp['query_vec']) if self.score_fn == 'cos': query_vecs = query_vecs / np.linalg.norm(query_vecs, axis=1, keepdims=True) query_vecs = query_vecs.copy() res = [] docnos = DocnoFile(self.index_path/'docnos.npy') num_q = query_vecs.shape[0] ranked_lists = RankedLists(self.num_results, num_q) dids_offset = 0 it = enumerate(self.iter_shards()) if self.verbose: it = pt.tqdm(it, unit='shard', desc='scanning shards') for shardidx, index in it: scores, dids = [], [] for qidx in range(0, num_q, self.qbatch): s, d = index.search(query_vecs[qidx:qidx+self.qbatch], self.num_results) scores.append(s) dids.append(d) ranked_lists.update(np.concatenate(scores, axis=0), np.concatenate(dids, axis=0)+dids_offset) dids_offset += index.ntotal result_scores, result_dids = ranked_lists.results() unique_dids, unique_inverse = np.unique(result_dids, return_inverse=True) unique_docnos = docnos[unique_dids] result_docnos = unique_docnos[unique_inverse].reshape(num_q, -1) for query, scores, docnos in zip(inp.itertuples(), result_scores, result_docnos): if self.drop_query_vec: query = query._replace(query_vec=None) for score, docno in zip(scores, docnos): res.append((*query, docno, score)) res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score']) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = add_ranks(res) return res def index(self, inp): import faiss if isinstance(inp, pd.DataFrame): inp = inp.to_dict(orient="records") path = Path(self.index_path) if path.exists(): if self.overwrite: shutil.rmtree(path) else: raise RuntimeError(f'Index already exists at {self.index_path}. If you want to delete and re-create an existing index, you can pass overwrite=True') path.mkdir(parents=True, exist_ok=True) docnos = [] if self.shard_size == 0: it = [(0, inp)] else: it = enumerate(more_itertools.ichunked(inp, self.shard_size)) for shardid, shard in it: shard = more_itertools.peekable(shard) d = shard.peek()['doc_vec'].shape[0] index = faiss.index_factory(d, f'HNSW{self.num_links}', faiss.METRIC_INNER_PRODUCT) shard_it = more_itertools.chunked(shard, self.batch_size) if self.verbose: shard_it = pt.tqdm(shard_it, total=math.ceil(self.shard_size/self.batch_size), unit='batch', desc=f'building shard {shardid}') for batch in shard_it: doc_vecs = np.stack(d['doc_vec'] for d in batch) if self.score_fn == 'cos': doc_vecs = doc_vecs / np.linalg.norm(doc_vecs, axis=1, keepdims=True) index.add(doc_vecs) docnos.extend(d['docno'] for d in batch) faiss.write_index(index, str(path/f'{shardid}.faiss')) DocnoFile.build(docnos, path/'docnos.npy')
[docs] class TorchIndex(NumpyIndex): """This class implements a disk-based dense vector index using PyTorch for GPU-accelerated retrieval. .. deprecated:: 0.6.0 This class has been replaced with :class:`pyterrier_dr.FlexIndex`. """ def __init__(self, index_path=None, num_results=1000, score_fn='dot', overwrite=False, batch_size=4096, verbose=False, dtype='f4', drop_query_vec=True, half=False, idx_mem=500000000): super().__init__(index_path, num_results, score_fn, overwrite, batch_size, verbose, dtype, drop_query_vec, inmem=False) self.half = half self.idx_mem = idx_mem self._meta = None self._cpu_data = None self._cuda_data = None self._docnos = None self._did_start = None self._cuda_slice = None def docnos_and_data(self): if self._meta is None: with (self.index_path/'meta.json').open('rt') as f_meta: self._meta = json.load(f_meta) meta = self._meta if self._cpu_data is None: SType, TType, CTType, SIZE = { 'f4': (torch.FloatStorage, torch.FloatTensor, torch.cuda.FloatTensor, 4), 'f2': (torch.HalfStorage, torch.HalfTensor, torch.cuda.HalfTensor, 2), }[self.dtype] self._cpu_data = TType(SType.from_file(str(self.index_path/'index.npy'), size=meta['count'] * meta['vec_size'])).reshape(meta['count'], meta['vec_size']) doc_batch_size = self.idx_mem//SIZE//meta['vec_size'] if self.half: self._cuda_data = torch.cuda.HalfTensor(size=(doc_batch_size, meta['vec_size']), device='cuda') else: self._cuda_data = CTType(size=(doc_batch_size, meta['vec_size']), device='cuda') if self.verbose and doc_batch_size > self.num_results: _logger.info(f'TorchIndex using document batches of size {doc_batch_size}') if self.num_results >= doc_batch_size: _logger.warn(f'TorchIndex using document batches of size {doc_batch_size} but this is less than num_rsults={self.num_results}. Consider using a larger idx_mem (currently {self.idx_mem})') self._docnos = DocnoFile(self.index_path/'docnos.npy') def transform(self, inp): columns = set(inp.columns) assert all(f in columns for f in ['qid', 'query_vec']), "TrochIndex expects columns ['qid', 'query_vec'] when used in a pipeline" query_vecs = np.stack(inp['query_vec']) query_vecs = torch.from_numpy(query_vecs).cuda() # TODO: can this go directly to CUDA? device='cuda' doesn't work if self.half: query_vecs = query_vecs.half() self.docnos_and_data() step = self._cuda_data.shape[0] it = range(0, self._cpu_data.shape[0], step) if self.verbose: it = pt.tqdm(it, desc='TorchIndex scoring', unit='docbatch') ranked_lists = RankedLists(self.num_results, query_vecs.shape[0]) for start_idx in it: end_idx = start_idx + step batch = self._cpu_data[start_idx:end_idx] bsize = batch.shape[0] if self.half: self._cuda_data[:bsize] = batch.half() else: self._cuda_data[:bsize] = batch scores = query_vecs @ self._cuda_data[:bsize].T if self.half: scores = scores.float() if scores.shape[0] > self.num_results: scores, dids = torch.topk(scores, k=self.num_results, dim=1) else: scores, dids = torch.sort(scores, dim=1, descending=False) scores = scores.cpu().float().numpy() dids = (dids + start_idx).cpu().numpy() ranked_lists.update(scores, dids) result_scores, result_dids = ranked_lists.results() unique_dids, unique_inverse = np.unique(result_dids, return_inverse=True) unique_docnos = self._docnos[unique_dids] result_docnos = unique_docnos[unique_inverse].reshape(query_vecs.shape[0], -1) res = [] for query, scores, docnos in zip(inp.itertuples(index=False), result_scores, result_docnos): if self.drop_query_vec: query = query._replace(query_vec=None) for score, docno in zip(scores, docnos): res.append((*query, docno, score)) res = pd.DataFrame(res, columns=list(query._fields) + ['docno', 'score']) if self.drop_query_vec: res = res.drop(columns=['query_vec']) res = res[~res.score.isna()] res = add_ranks(res) return res