Source code for pyterrier_ciff._invert

import tempfile
from dataclasses import dataclass, field
from typing import Any, BinaryIO, Dict, Iterable, Iterator, List, NamedTuple, Tuple, Union

import numpy as np
import pyterrier as pt

_BUFFER_SIZE = 4096
_BUFFER_READ_SIZE = _BUFFER_SIZE * 4 # 4 bytes in uint32


@dataclass
class _PostingBuffer:
    term_id: int
    buffer_idx: int = 0
    buffer_did: np.ndarray = field(default_factory=list)
    buffer_tf: np.ndarray = field(default_factory=list)
    file_chunk_offsets: List[int] = field(default_factory=list)

    def add(self, did: int, tf: int, scratch: BinaryIO):
        self.buffer_did.append(did)
        self.buffer_tf.append(tf)
        self.buffer_idx += 1

        # Flush to disk if needed
        if self.buffer_idx == _BUFFER_SIZE:
            self.file_chunk_offsets.append(scratch.tell())
            scratch.write(np.array(self.buffer_did, dtype=np.uint32).tobytes())
            scratch.write(np.array(self.buffer_tf, dtype=np.uint32).tobytes())
            self.buffer_did.clear()
            self.buffer_tf.clear()
            self.buffer_idx = 0

    def load(self, scratch: BinaryIO) -> Tuple[np.array, np.array]:
        dids = []
        tfs = []
        for offset in self.file_chunk_offsets:
            scratch.seek(offset)
            dids.append(np.frombuffer(scratch.read(_BUFFER_READ_SIZE), dtype=np.uint32))
            tfs.append(np.frombuffer(scratch.read(_BUFFER_READ_SIZE), dtype=np.uint32))
        if self.buffer_idx > 0:
            dids.append(np.array(self.buffer_did[:self.buffer_idx], dtype=np.uint32))
            tfs.append(np.array(self.buffer_tf[:self.buffer_idx], dtype=np.uint32))
        if len(dids) > 0:
            dids = np.concatenate(dids)
            tfs = np.concatenate(tfs)
        else:
            dids = dids[0]
            tfs = tfs[0]
        return dids, tfs


class InvertDoc(NamedTuple):
    did: int
    docno: str
    tids: np.array
    tfs: np.array


class InvertTerm(NamedTuple):
    tid: int
    term: str
    dids: np.array
    tfs: np.array


class InvertRecord(NamedTuple):
    type: str
    data: Union[InvertDoc, InvertTerm]


[docs] def invert(inp: Iterable[Dict[str, Any]], *, scale: float = 100., verbose: bool = False) -> Iterator[InvertRecord]: """Inverts the provided stream of documents, yielding "doc" and "term" records as they are finalized. The function yields all documents before terms. It also assigns dids and tids from 0 increasing by 1. .. code-block:: python :caption: Invert a stream of documents using :meth:`~pyterrier_ciff.invert` >>> from pyterrier_ciff import invert >>> docs = [ ... {"docno": "100", "toks": {"a": 0.02, "b": 1.41}}, ... {"docno": "101", "toks": {"b": 2.15, "c": -3.83, "d": 4.65, "e": 0.42}}, ... ] >>> for record in invert(docs): ... print(record) InvertRecord(type='doc', data=InvertDoc(did=0, docno='100', tids=array([0, 1]), tfs=array([2, 141]))) InvertRecord(type='doc', data=InvertDoc(did=1, docno='101', tids=array([1, 2, 3]), tfs=array([215, 465, 42]))) InvertRecord(type='term', data=InvertTerm(tid=0, term='a', dids=array([0]), tfs=array([2]))) InvertRecord(type='term', data=InvertTerm(tid=1, term='b', dids=array([0, 1]), tfs=array([141, 215]))) InvertRecord(type='term', data=InvertTerm(tid=2, term='d', dids=array([1]), tfs=array([465]))) InvertRecord(type='term', data=InvertTerm(tid=3, term='e', dids=array([1]), tfs=array([42]))) Args: inp: An iterable with ``docno`` and ``toks`` fields. scale: The scaling factor for term frequencies. Defaults to 100. verbose: Whether to show a progress bar. Defaults to False. """ with tempfile.TemporaryFile() as scratch: buffer = {} if verbose: inp = pt.tqdm(inp, unit='doc', desc='inverting') for did, doc in enumerate(inp): tids = [] tfs = [] for term, weight in doc['toks'].items(): tf = int(weight * scale) if tf <= 0: # Not indexed continue if term not in buffer: buffer[term] = _PostingBuffer(term_id=len(buffer)) b = buffer[term] tids.append(b.term_id) tfs.append(tf) # Update the buffer entry and flush (if needed) b.add(did, tf, scratch) yield InvertRecord('doc', InvertDoc( did, doc['docno'], np.array(tids, dtype=np.uint32), np.array(tfs, dtype=np.uint32), )) it = buffer.items() if verbose: it = pt.tqdm(it, unit='term', desc='posting') for term, buf in it: dids, tfs = buf.load(scratch) yield InvertRecord('term', InvertTerm( buf.term_id, term, dids, tfs, ))