import more_itertools
from collections import defaultdict
import re
import pandas as pd
from typing import Any, List, Union, Literal, Protocol, runtime_checkable
from warnings import warn
import types
import pyterrier as pt
@runtime_checkable
class HasTextLoader(Protocol):
def text_loader(
self,
fields: Union[List[str], str, Literal['*']] = '*',
*,
verbose: bool = False,
) -> pt.Transformer:
"""
Returns a transformer that loads and populates text columns for each document in the
provided input frame.
:param fields: The names of the fields to load. If a list of strings, all fields are provided.
If a single string, this single field is provided. If the special value of '*' (default,
all available fields are provided.
:param verbose: Show a progress bar.
"""
[docs]
def get_text(
indexlike: Union[HasTextLoader, str],
metadata : Union[str,List[str], Literal['*']] = '*',
by_query : bool = False,
verbose : bool = False,
**kwargs: Any) -> pt.Transformer:
"""
A utility transformer for obtaining the text from the text of documents (or other document metadata) from Terrier's MetaIndex
or an IRDSDataset docstore.
:param indexlike: an object that provides a .text_loader() factory method, such as a Terrier index or IRDSDataset.
If a ``str`` is provided, it will try to load a Terrier index from the provided path.
:param metadata: The names of the fields to load. If a list of strings, all fields are provided.
If a single string, this single field is provided. If the special value of '*' (default), all
available fields are provided.
:param by_query: whether the entire dataframe should be progressed at once, rather than one query at a time.
Defaults to false, which means that all document metadata will be fetched at once.
:param verbose: whether to print a tqdm progress bar. When by_query=True, prints progress by query. Otherwise,
the behaviour is defined by the provided ``indexlike``.
:param kwargs: other arguments to pass through to the text_loader.
:return: a transformer that loads the text of documents from the provided indexlike.
:rtype: pt.Transformer
:raises ValueError: if indexlike does not provide a .text_loader() method.
Example (Terrier Index)::
index = pt.IndexFactory.of("./index/")
pipe = ( pt.terrier.Retriever(index, wmodel="DPH")
>> pt.text.get_text(index) # load text using a PyTerrier index
>> pt.text.scorer(wmodel="DPH") )
Example (IR Datasets)::
# see https://github.com/terrierteam/pyterrier_t5
from pyterrier_t5 import MonoT5ReRanker
bm25 = pt.terrier.Retriever.from_dataset(pt.get_dataset('msmarcov2_passage'), wmodel='BM25')
# load text using IR Datasets
loader = pt.text.get_text(pt.get_dataset('irds:msmarco-passage-v2'), ['text'])
monoT5 = bm25 >> loader >> MonoT5ReRanker()
"""
if isinstance(indexlike, str):
# TODO: We'll need to decide how to handle this once terrier is split from core
# Maybe it should run Artifact.load(indexlike) instead?
indexlike = pt.IndexFactory.of(indexlike)
if not isinstance(indexlike, HasTextLoader):
raise ValueError('indexlike must provide a .text_loader() method.')
result : pt.Transformer
result = indexlike.text_loader(metadata, verbose=verbose and not by_query, **kwargs)
if by_query:
result = pt.apply.by_query(result, verbose=verbose)
return result
[docs]
def scorer(*args, **kwargs) -> pt.Transformer:
"""
This allows scoring of the documents with respect to a query, without creating an index first.
This is an alias to pt.TextScorer(). Internally, a Terrier memory index is created, before being
used for scoring.
:pararm body_attr: what dataframe input column contains the text of the document. Default is `"body"`.
:param wmodel: name of the weighting model to use for scoring.
:param background_index: An optional background index to use for collection statistics. If a weighting
model such as BM25 or TF_IDF or PL2 is used without setting the background_index, the background statistics
will be calculated from the dataframe, which is ususally not the desired behaviour.
:param args: other arguments to pass through to the TextScorer.
:param kwargs: other arguments to pass through to the TextScorer.
:return: a transformer that scores the documents with respect to a query.
:rtype: pt.Transformer
Example::
df = pd.DataFrame(
[
["q1", "chemical reactions", "d1", "professor protor poured the chemicals"],
["q1", "chemical reactions", "d2", "chemical brothers turned up the beats"],
], columns=["qid", "query", "docno", "text"])
textscorerTf = pt.text.scorer(body_attr="text", wmodel="Tf")
rtr = textscorerTf.transform(df)
# rtr will have a score for each document for the query "chemical reactions" based on the provided document contents
# both attain score 1, as, after stemming, they both contain one occurrence of the query term 'chemical'
# ["q1", "chemical reactions", "d1", "professor protor poured the chemicals", 0, 1]
# ["q1", "chemical reactions", "d2", "chemical brothers turned up the beats", 0, 1]
For calculating the scores of documents using any weighting model with the concept of IDF, it is strongly advised to make use of
an existing Terrier index for background statistics. Without a background index, IDF will be calculated based on the supplied
dataframe (for models such as BM25, this can lead to negative scores)::
textscorerTfIdf = pt.text.scorer(body_attr="text", wmodel="TF_IDF", background_index=index)
"""
return pt.terrier.retriever.TextScorer(*args, **kwargs)
[docs]
def sliding( text_attr='body', length=150, stride=75, join=' ', prepend_attr='title', tokenizer=None, **kwargs) -> pt.Transformer:
r"""
A useful transformer for splitting long documents into smaller passages within a pipeline. This applies a *sliding* window over the
text, where each passage is the give number of tokens long. Passages can overlap, if the stride is set smaller than the length. In
applying this transformer, docnos are altered by adding '%p' and a passage number. The original scores for each document can be recovered
by aggregation functions, such as ``max_passage()``.
For the puposes of obtaining passages of a given length, the tokenisation can be controlled. By default, tokenisation takes place by splitting
on space, i.e. based on the Python regular expression ``re.compile(r'\s+')``. However, more fine-grained tokenisation can applied by passing
an object matching the HuggingFace Transformers `Tokenizer API <https://huggingface.co/docs/transformers/main/en/main_classes/tokenizer#transformers.PreTrainedTokenizer>`_
as the `tokenizer` kwarg argument. In short, the `tokenizer` object must have a ``.tokenize(str) -> list[str]`` method and
``.convert_tokens_to_string(list[str]) -> str`` for detokenisation.
:param text_attr: what is the name of the dataframe attribute containing the main text of the document to be split into passages.
Default is 'body'.
:param length: how many tokens in each passage. Default is 150.
:param stride: how many tokens to advance each passage by. Default is 75.
:param join: how to join the tokens of the passage together. Default is ' '.
:param prepend_attr: whether another document attribute, such as the title of the document, to each passage, following [Dai2019]. Defaults to 'title'.
:param tokenizer: which model to use for tokenizing. The object must have a ``.tokenize(str) -> list[str]`` method for tokenization and ``.convert_tokens_to_string(list[str]) -> str`` for detokenization.
Default is None. Tokenisation is perfomed by splitting on one-or-more spaces, i.e. based on the Python regular expression ``re.compile(r'\s+')``
:param kwargs: other arguments to pass through to the SlidingWindowPassager.
:return: a transformer that splits the documents into passages.
:rtype: pt.Transformer
:raises KeyError: if the text_attr or title_attr columns are not found in the input dataframe.
Example::
pipe = ( pt.terrier.Retriever(index, wmodel="DPH", metadata=["docno", "body"])
>> pt.text.sliding(length=128, stride=64, prepend_attr=None)
>> pt.text.scorer(wmodel="DPH")
>> pt.text.max_passage() )
# tokenizer model
from transformers import AutoTokenizer
tok = AutoTokenizer.from_pretrained("bert-base-uncased")
pipe = (pt.terrier.Retriever(index, wmodel="DPH", metadata=["docno", "body"])
>> pt.text.sliding(length=128, stride=64, prepend_attr=None, tokenizer=tok)
>> pt.text.scorer(wmodel="DPH")
>> pt.text.max_passage() )
"""
# deal with older names for attributes
if 'passage_length' in kwargs:
length = kwargs['passage_length']
del kwargs['passage_length']
warn(
"passage_length should be length.", FutureWarning, 2)
if 'passage_stride' in kwargs:
stride = kwargs['passage_stride']
del kwargs['passage_stride']
warn(
"passage_stride should be stride.", FutureWarning, 2)
if 'prepend_title' in kwargs:
warn(
"prepend_title and title_attr should be replaced with prepend_attr.", FutureWarning, 2)
if kwargs['prepend_title']:
prepend_attr = kwargs['title_attr']
del kwargs['title_attr']
else:
prepend_attr = None
del kwargs['prepend_title']
return SlidingWindowPassager(
text_attr=text_attr,
passage_length=length,
passage_stride=stride,
prepend_title=prepend_attr is not None,
title_attr=prepend_attr,
join=' ',
tokenizer=tokenizer,
**kwargs
)
[docs]
def max_passage() -> pt.Transformer:
"""
Scores each document based on the maximum score of any constituent passage. Applied after a sliding window transformation
has been scored.
"""
return MaxPassage()
[docs]
def mean_passage() -> pt.Transformer:
"""
Scores each document based on the mean score of all constituent passages. Applied after a sliding window transformation
has been scored.
"""
return MeanPassage()
[docs]
def first_passage() -> pt.Transformer:
"""
Scores each document based on score of the first passage of that document. Note that this transformer is rarely used in conjunction with
the sliding window transformer, as all passages would required to be scored, only for the first one to be used.
"""
return FirstPassage()
[docs]
def kmaxavg_passage(k : int) -> pt.Transformer:
"""
Scores each document based on the average score of the top scoring k passages. Generalises combination of mean_passage()
and max_passage(). Proposed in [Chen2020].
:param k: The number of top-scored passages for each document to use when scoring
"""
return KMaxAvgPassage(k)
def slidingWindow(sequence : list, winSize : int, step : int) -> list:
"""
For the specified sequence, break into sliding windows of size winSize,
stepping forward by the specified amount each time
:param sequence: the sequence to break into sliding windows
:param winSize: the size of the sliding window
:param step: how much to step forward each time
:return: a list of sliding windows, where each window is a list of the elements in that window
:rtype: list
"""
return [x for x in list(more_itertools.windowed(sequence,n=winSize, step=step)) if x[-1] is not None]
[docs]
def snippets(
text_scorer_pipe : pt.Transformer,
text_attr : str = "text",
summary_attr : str = "summary",
num_psgs : int = 5,
joinstr : str ='...') -> pt.Transformer:
"""
Applies query-biased summarisation (snippet), by applying the specified text scoring pipeline.
Takes a return a dataframe with the columns ['qid', 'query', 'docno', text_attr], and returns a dataframe
with the columns ['qid', 'query', 'docno', text_attr, summary_attr]. The summary_attr column contains the
query-biased summary for that document, upto num_psgs passages, joined together with the specified joinstr.
:param text_scorer_pipe: the pipeline for scoring passages in response to the query. Normally this applies passaging.
The pipeline should take a dataframe with the columns ['qid', 'query', 'docno', text_attr] and return a dataframe
with the columns ['qid', 'query', 'docno', text_attr, 'score', 'rank'], where these are smaller passages than the input df.
:param text_attr: what is the name of the attribute that contains the text of the document
:param summary_attr: what is the name of the attribute that should contain the query-biased summary for that document
:param num_psgs: how many passages to select for the summary of each document
:param joinstr: how to join passages for a given document together
Example::
# retrieve documents with text
br = pt.terrier.Retriever(index, metadata=['docno', 'text'])
# use Tf as a passage scorer on sliding window passages
psg_scorer = (
pt.text.sliding(text_attr='text', length=15, prepend_attr=None)
>> pt.text.scorer(body_attr="text", wmodel='Tf', takes='docs')
)
# use psg_scorer for performing query-biased summarisation on docs retrieved by br
retr_pipe = br >> pt.text.snippets(psg_scorer)
"""
tsp = (
pt.apply.rename({'qid' : 'oldqid'})
>> pt.apply.qid(lambda row: row['oldqid'] + '-' + row['docno'])
>> ( text_scorer_pipe % num_psgs )
>> pt.apply.qid(drop=True)
>> pt.apply.rename({'oldqid' : 'qid'})
)
def _qbsjoin(docres):
if len(docres) == 0:
docres[summary_attr] = pd.Series(dtype='str')
return docres
psgres = tsp(docres)
if len(psgres) == 0:
print('no passages found in %d documents for query %s' % (len(docres), docres.iloc[0].query))
docres = docres.copy()
docres[summary_attr] = ""
return docres
# dont assign new columns to an existing df
psgres = psgres.copy()
psgres[["olddocno", "pid"]] = psgres.docno.str.split("%p", expand=True)
newdf = psgres.groupby(['qid', 'olddocno'])[text_attr].agg(joinstr.join).reset_index().rename(columns={text_attr : summary_attr, 'olddocno' : 'docno'})
return docres.merge(newdf, on=['qid', 'docno'], how='left')
rtr = pt.apply.generic(_qbsjoin, required_columns=['qid', 'query', 'docno', text_attr])
rtr.subtransformers = types.MethodType(lambda self: {'tsp' : tsp}, rtr) # type: ignore[attr-defined]
return rtr
class DePassager(pt.Transformer):
def __init__(self, agg="max", **kwargs):
super().__init__(**kwargs)
self.agg = agg
def transform(self, topics_and_res):
pt.validate.columns(topics_and_res, includes=['qid', 'docno'] + (['score'] if self.agg != 'first' else []))
topics_and_res = topics_and_res.copy()
topics_and_res[["olddocno", "pid"]] = topics_and_res.docno.str.split("%p", expand=True) if len(topics_and_res) > 0 else pd.DataFrame(columns=["olddocno", "pid"])
if self.agg == 'max':
groups = topics_and_res.groupby(['qid', 'olddocno'])
group_max_idx = groups['score'].idxmax()
rtr = topics_and_res.loc[group_max_idx, :]
rtr = rtr.drop(columns=['docno', 'pid']).rename(columns={"olddocno" : "docno"})
if self.agg == 'first':
#could this be done by just selectin pid = 0?
topics_and_res.pid = topics_and_res.pid.astype(int)
rtr = topics_and_res[topics_and_res.pid == 0].rename(columns={"olddocno" : "docno"})
groups = topics_and_res.groupby(['qid', 'olddocno'])
group_first_idx = groups['pid'].idxmin()
rtr = topics_and_res.loc[group_first_idx, ]
rtr = rtr.drop(columns=['docno', 'pid']).rename(columns={"olddocno" : "docno"})
if self.agg == 'mean':
rtr = topics_and_res.groupby(['qid', 'olddocno'])['score'].mean().reset_index().rename(columns={'olddocno' : 'docno'})
#add query columns back
rtr = rtr.merge(topics_and_res[pt.model.query_columns(topics_and_res)].drop_duplicates(), on='qid')
if self.agg == 'kmaxavg':
rtr = topics_and_res.groupby(['qid', 'olddocno'])['score'].apply(lambda ser: ser.nlargest(self.K).mean()).reset_index().rename(columns={'olddocno' : 'docno'})
#add query columns back
rtr = rtr.merge(topics_and_res[pt.model.query_columns(topics_and_res)].drop_duplicates(), on='qid')
if "docid" in rtr.columns:
rtr = rtr.drop(columns=['docid'])
rtr = pt.model.add_ranks(rtr)
return rtr
class KMaxAvgPassage(DePassager):
"""
.. cite.dblp:: conf/trec/ChenHSCH020
Usage:
X >> SlidingWindowPassager() >> Y >> KMaxAvgPassage(2)
where X is some kind of model for obtaining the text of documents and Y is a text scorer, such as BERT or ColBERT
"""
def __init__(self, K, **kwargs):
kwargs["agg"] = "kmaxavg"
self.K = K
super().__init__(**kwargs)
class MaxPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "max"
super().__init__(**kwargs)
class FirstPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "first"
super().__init__(**kwargs)
class MeanPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "mean"
super().__init__(**kwargs)
class SlidingWindowPassager(pt.Transformer):
schematic = {'label': 'SlidingWindow'}
def __init__(self, text_attr='body', title_attr='title', passage_length=150, passage_stride=75, join=' ', prepend_title=True, tokenizer=None, **kwargs):
super().__init__(**kwargs)
self.text_attr=text_attr
self.title_attr=title_attr
self.passage_length = passage_length
self.passage_stride= passage_stride
self.join = join
self.prepend_title = prepend_title
self.tokenizer = tokenizer
# check if the tokenizer has the `.tokenize()` and the `.convert_tokens_to_string()` method
if self.tokenizer is not None:
self.tokenize = self.tokenizer.tokenize
self.detokenize = self.tokenizer.convert_tokens_to_string
else:
self.tokenize = re.compile(r"\s+").split
self.detokenize = ' '.join
def transform(self, topics_and_res):
with pt.validate.any(topics_and_res) as v:
cols = [self.text_attr] + ([self.title_attr] if self.prepend_title else [])
v.result_frame(cols)
v.document_frame(cols)
print("calling sliding on df of %d rows" % len(topics_and_res))
# now apply the passaging
if "qid" in topics_and_res.columns:
return self.applyPassaging(topics_and_res, labels="label" in topics_and_res.columns)
return self.applyPassaging_no_qid(topics_and_res)
def applyPassaging_no_qid(self, df):
result_columns = list(df.columns)
if self.prepend_title:
result_columns = [c for c in result_columns if c != self.title_attr]
rows=[]
for row in df.itertuples():
row = row._asdict()
toks = self.tokenize(row[self.text_attr])
if len(toks) < self.passage_length:
row['docno'] = row['docno'] + "%p0"
row[self.text_attr] = self.detokenize(toks)
if self.prepend_title:
row[self.text_attr] = str(row[self.title_attr]) + self.join + row[self.text_attr]
del(row[self.title_attr])
rows.append(row)
else:
passageCount=0
for i, passage in enumerate( slidingWindow(toks, self.passage_length, self.passage_stride)):
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p" + str(i)
newRow[self.text_attr] = self.detokenize(passage)
if self.prepend_title:
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
del(newRow[self.title_attr])
rows.append(newRow)
passageCount+=1
return pd.DataFrame(rows, columns=result_columns)
def applyPassaging(self, df, labels=True):
newRows=[]
labelCount=defaultdict(int)
currentQid=None
rank=0
copy_columns=[]
for col in ["score", "rank"]:
if col in df.columns:
copy_columns.append(col)
if len(df) == 0:
return pd.DataFrame(columns=['qid', 'query', 'docno', self.text_attr, 'score', 'rank'])
with pt.tqdm('passsaging', total=len(df), desc='passaging', leave=False) as pbar:
for row in df.itertuples(index=False):
row = row._asdict()
pbar.update(1)
qid = row['qid']
if currentQid is None or currentQid != qid:
rank=0
currentQid = qid
rank+=1
toks = self.tokenize(row[self.text_attr])
if len(toks) < self.passage_length:
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p0"
newRow[self.text_attr] = self.detokenize(toks)
if self.prepend_title:
newRow.drop(labels=[self.title_attr], inplace=True)
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
if labels:
labelCount[row['label']] += 1
for col in copy_columns:
newRow[col] = row[col]
newRows.append(newRow)
else:
passageCount=0
for i, passage in enumerate( slidingWindow(toks, self.passage_length, self.passage_stride)):
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p" + str(i)
newRow[self.text_attr] = self.detokenize(passage)
if self.prepend_title:
#newRow.drop(labels=[self.title_attr], inplace=True)
del newRow[self.title_attr]
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
for col in copy_columns:
newRow[col] = row[col]
if labels:
labelCount[row['label']] += 1
newRows.append(newRow)
passageCount+=1
newDF = pd.DataFrame(newRows)
newDF['query'] = newDF['query'].fillna('')
newDF[self.text_attr] = newDF[self.text_attr].fillna('')
newDF['qid'] = newDF['qid'].fillna('')
newDF.reset_index(inplace=True,drop=True)
return newDF