from pyterrier.transformer import Transformer
from . import Transformer
from pyterrier.datasets import IRDSDataset
import more_itertools
from collections import defaultdict
import re
from pyterrier.model import add_ranks
import pandas as pd
import numpy as np
from typing import List, Union
from warnings import warn
[docs]def get_text(
indexlike,
metadata : Union[str,List[str]] = "body",
by_query : bool = False,
verbose : bool = False) -> Transformer:
"""
A utility transformer for obtaining the text from the text of documents (or other document metadata) from Terrier's MetaIndex
or an IRDSDataset docstore.
Arguments:
indexlike: a Terrier index or IRDSDataset to retrieve the metadata from
metadata(list(str) or str): a list of strings of the metadata keys to retrieve from the index. Defaults to ["body"]
by_query(bool): whether the entire dataframe should be progressed at once, rather than one query at a time.
Defaults to false, which means that all document metadata will be fetched at once.
verbose(bool): whether to print a tqdm progress bar. Defaults to false. Has no effect when by_query=False
Example::
pipe = ( pt.BatchRetrieve(index, wmodel="DPH")
>> pt.text.get_text(index)
>> pt.text.scorer(wmodel="DPH") )
"""
import pyterrier as pt
JIR = pt.autoclass('org.terrier.querying.IndexRef')
JI = pt.autoclass('org.terrier.structures.Index')
if isinstance(metadata, str):
metadata = [metadata]
if isinstance(indexlike, str) or isinstance(indexlike, JIR):
index = pt.IndexFactory.of(indexlike)
add_text_fn = _add_text_terrier_metaindex(index, metadata)
elif isinstance(indexlike, JI):
add_text_fn = _add_text_terrier_metaindex(indexlike, metadata)
elif isinstance(indexlike, IRDSDataset):
add_text_fn = _add_text_irds_docstore(indexlike, metadata)
else:
raise ValueError("indexlike %s of type %s not supported. Pass a string, an IndexRef, an Index, or an IRDSDataset" %
(str(indexlike), type(indexlike)))
if by_query:
return pt.apply.by_query(add_text_fn, verbose=verbose)
return pt.apply.generic(add_text_fn)
def _add_text_terrier_metaindex(index, metadata):
metaindex = index.getMetaIndex()
if metaindex is None:
raise ValueError("Index %s does not have a metaindex" % str(index))
for k in metadata:
if not k in metaindex.getKeys():
raise ValueError("Index from %s did not have requested metaindex key %s. Keys present in metaindex are %s" %
(str(index), k, str( metaindex.getKeys()) ))
def add_docids(res):
res = res.copy()
res["docid"] = res.apply(lambda row: metaindex.getDocument("docno", row.docno), axis=1)
return res
def add_text_function_docids(res):
res = res.copy()
if len(res) == 0:
for k in metadata:
res[k] = pd.Series(dtype='object')
return res
docids = res.docid.values.tolist()
# indexed by docid then keys
allmeta = metaindex.getItems(metadata, docids)
import numpy as np
# get transpose to make easier for insertion back into dataframe?
allmeta = np.array(allmeta).T
for i, k in enumerate(metadata):
res[k] = allmeta[i]
return res
def add_text_generic(res):
if not "docid" in res.columns:
assert "docno" in res.columns, "Neither docid nor docno are in the input dataframe, found %s" % (str(res.columns))
res = add_docids(res)
return add_text_function_docids(res)
return add_text_generic
def _add_text_irds_docstore(irds_dataset, metadata):
irds = irds_dataset.irds_ref()
assert irds.has_docs(), f"dataset {irds_dataset} doesn't provide docs"
docs_cls = irds.docs_cls()
for k in metadata:
if not k in docs_cls._fields:
raise ValueError(f"{irds_dataset} did not have requested field {k}. Keys present are {docs_cls._fields} (from {docs_cls})")
field_idx = [(f, docs_cls._fields.index(f)) for f in metadata]
docstore = irds.docs_store()
def add_text_function_docids(res):
assert 'docno' in res, "requires docno column"
res = res.copy()
docids = res.docno.values.tolist()
did2idxs = defaultdict(list)
for i, did in enumerate(docids):
did2idxs[did].append(i)
new_columns = {f: [None] * len(docids) for f in metadata}
for doc in docstore.get_many_iter(docids):
for didx in did2idxs[doc.doc_id]:
for f, fidx in field_idx:
new_columns[f][didx] = doc[fidx]
for k, v in new_columns.items():
res[k] = v
return res
return add_text_function_docids
[docs]def scorer(*args, **kwargs) -> Transformer:
"""
This allows scoring of the documents with respect to a query, without creating an index first.
This is an alias to pt.TextScorer(). Internally, a Terrier memory index is created, before being
used for scoring.
Arguments:
body_attr(str): what dataframe input column contains the text of the document. Default is `"body"`.
wmodel(str): name of the weighting model to use for scoring.
background_index(index_like): An optional background index to use for collection statistics. If a weighting
model such as BM25 or TF_IDF or PL2 is used without setting the background_index, the background statistics
will be calculated from the dataframe, which is ususally not the desired behaviour.
Example::
df = pd.DataFrame(
[
["q1", "chemical reactions", "d1", "professor protor poured the chemicals"],
["q1", "chemical reactions", "d2", "chemical brothers turned up the beats"],
], columns=["qid", "query", "docno", "text"])
textscorerTf = pt.text.scorer(body_attr="text", wmodel="Tf")
rtr = textscorerTf.transform(df)
# rtr will have a score for each document for the query "chemical reactions" based on the provided document contents
# both attain score 1, as, after stemming, they both contain one occurrence of the query term 'chemical'
# ["q1", "chemical reactions", "d1", "professor protor poured the chemicals", 0, 1]
# ["q1", "chemical reactions", "d2", "chemical brothers turned up the beats", 0, 1]
For calculating the scores of documents using any weighting model with the concept of IDF, it is strongly advised to make use of
an existing Terrier index for background statistics. Without a background index, IDF will be calculated based on the supplied
dataframe (for models such as BM25, this can lead to negative scores)::
textscorerTfIdf = pt.text.scorer(body_attr="text", wmodel="TF_IDF", background_index=index)
"""
import pyterrier as pt
return pt.batchretrieve.TextScorer(*args, **kwargs)
[docs]def sliding( text_attr='body', length=150, stride=75, join=' ', prepend_attr='title', tokenizer=None, **kwargs) -> Transformer:
r"""
A useful transformer for splitting long documents into smaller passages within a pipeline. This applies a *sliding* window over the
text, where each passage is the give number of tokens long. Passages can overlap, if the stride is set smaller than the length. In
applying this transformer, docnos are altered by adding '%p' and a passage number. The original scores for each document can be recovered
by aggregation functions, such as `max_passage()`.
For the puposes of obtaining passages of a given length, the tokenisation can be controlled. By default, tokenisation takes place by splitting
on space, i.e. based on the Python regular expression ``re.compile(r'\s+')``. However, more fine-grained tokenisation can applied by passing
an object matching the HuggingFace Transformers `Tokenizer API <https://huggingface.co/docs/transformers/main/en/main_classes/tokenizer#transformers.PreTrainedTokenizer>`_
as the `tokenizer` kwarg argument. In short, the `tokenizer` object must have a `.tokenize(str) -> list[str]` method and
`.convert_tokens_to_string(list[str]) -> str` for detokenisation.
Parameters:
text_attr(str): what is the name of the dataframe attribute containing the main text of the document to be split into passages.
Default is 'body'.
length(int): how many tokens in each passage. Default is 150.
stride(int): how many tokens to advance each passage by. Default is 75.
prepend_attr(str): whether another document attribute, such as the title of the document, to each passage, following [Dai2019]. Defaults to 'title'.
title_attr(str): what is the name of the dataframe attribute containing the title the document to be split into passages.
Default is 'title'. Only used if prepend_title is set to True.
tokenizer(obj): which model to use for tokenizing. The object must have a `.tokenize(str) -> list[str]` method for tokenization and `.convert_tokens_to_string(list[str]) -> str` for detokenization.
Default is None. Tokenisation is perfomed by splitting on one-or-more spaces, i.e. based on the Python regular expression ``re.compile(r'\s+')``
Example::
pipe = ( pt.BatchRetrieve(index, wmodel="DPH", metadata=["docno", "body"])
>> pt.text.sliding(length=128, stride=64, prepend_attr=None)
>> pt.text.scorer(wmodel="DPH")
>> pt.text.max_passage() )
# tokenizer model
from transformers import AutoTokenizer
tok = AutoTokenizer.from_pretrained("bert-base-uncased")
pipe = (pt.BatchRetrieve(index, wmodel="DPH", metadata=["docno", "body"])
>> pt.text.sliding(length=128, stride=64, prepend_attr=None, tokenizer=tok)
>> pt.text.scorer(wmodel="DPH")
>> pt.text.max_passage() )
"""
# deal with older names for attributes
if 'passage_length' in kwargs:
length = kwargs['passage_length']
del kwargs['passage_length']
warn("passage_length should be length.", FutureWarning, 2)
if 'passage_stride' in kwargs:
stride = kwargs['passage_stride']
del kwargs['passage_stride']
warn("passage_stride should be stride.", FutureWarning, 2)
if 'prepend_title' in kwargs:
warn("prepend_title and title_attr should be replaced with prepend_attr.", FutureWarning, 2)
if kwargs['prepend_title']:
prepend_attr = kwargs['title_attr']
del kwargs['title_attr']
else:
prepend_attr = None
del kwargs['prepend_title']
return SlidingWindowPassager(
text_attr=text_attr,
passage_length=length,
passage_stride=stride,
prepend_title=prepend_attr is not None,
title_attr=prepend_attr,
join=' ',
tokenizer=tokenizer,
**kwargs
)
[docs]def max_passage() -> Transformer:
"""
Scores each document based on the maximum score of any constituent passage. Applied after a sliding window transformation
has been scored.
"""
return MaxPassage()
[docs]def mean_passage() -> Transformer:
"""
Scores each document based on the mean score of all constituent passages. Applied after a sliding window transformation
has been scored.
"""
return MeanPassage()
[docs]def first_passage() -> Transformer:
"""
Scores each document based on score of the first passage of that document. Note that this transformer is rarely used in conjunction with
the sliding window transformer, as all passages would required to be scored, only for the first one to be used.
"""
return FirstPassage()
[docs]def kmaxavg_passage(k : int) -> Transformer:
"""
Scores each document based on the average score of the top scoring k passages. Generalises combination of mean_passage()
and max_passage(). Proposed in [Chen2020].
Parameters:
k(int): The number of top-scored passages for each document to use when scoring
"""
return KMaxAvgPassage(k)
def slidingWindow(sequence : list, winSize : int, step : int) -> list:
"""
For the specified sequence, break into sliding windows of size winSize,
stepping forward by the specified amount each time
"""
return [x for x in list(more_itertools.windowed(sequence,n=winSize, step=step)) if x[-1] is not None]
[docs]def snippets(
text_scorer_pipe : Transformer,
text_attr : str = "text",
summary_attr : str = "summary",
num_psgs : int = 5,
joinstr : str ='...') -> Transformer:
"""
Applies query-biased summarisation (snippet), by applying the specified text scoring pipeline.
Parameters:
text_scorer_pipe(Transformer): the pipeline for scoring passages in response to the query. Normally this applies passaging.
text_attr(str): what is the name of the attribute that contains the text of the document
summary_attr(str): what is the name of the attribute that should contain the query-biased summary for that document
num_psgs(int): how many passages to select for the summary of each document
joinstr(str): how to join passages for a given document together
Example::
# retrieve documents with text
br = pt.BatchRetrieve(index, metadata=['docno', 'text'])
# use Tf as a passage scorer on sliding window passages
psg_scorer = (
pt.text.sliding(text_attr='text', length=15, prepend_attr=None)
>> pt.text.scorer(body_attr="text", wmodel='Tf', takes='docs')
)
# use psg_scorer for performing query-biased summarisation on docs retrieved by br
retr_pipe = br >> pt.text.snippets(psg_scorer)
"""
import pyterrier as pt
tsp = (
pt.apply.rename({'qid' : 'oldqid'})
>> pt.apply.qid(lambda row: row['oldqid'] + '-' + row['docno'])
>> ( text_scorer_pipe % num_psgs )
>> pt.apply.qid(drop=True)
>> pt.apply.rename({'oldqid' : 'qid'})
)
def _qbsjoin(docres):
import pandas as pd
if len(docres) == 0:
docres[summary_attr] = pd.Series(dtype='str')
return docres
psgres = tsp(docres)
if len(psgres) == 0:
print('no passages found in %d documents for query %s' % (len(docres), docres.iloc[0].query))
docres = docres.copy()
docres[summary_attr] = ""
return docres
psgres[["olddocno", "pid"]] = psgres.docno.str.split("%p", expand=True)
newdf = psgres.groupby(['qid', 'olddocno'])[text_attr].agg(joinstr.join).reset_index().rename(columns={text_attr : summary_attr, 'olddocno' : 'docno'})
return docres.merge(newdf, on=['qid', 'docno'], how='left')
return pt.apply.generic(_qbsjoin)
class DePassager(Transformer):
def __init__(self, agg="max", **kwargs):
super().__init__(**kwargs)
self.agg = agg
def transform(self, topics_and_res):
topics_and_res = topics_and_res.copy()
topics_and_res[["olddocno", "pid"]] = topics_and_res.docno.str.split("%p", expand=True)
if self.agg == 'max':
groups = topics_and_res.groupby(['qid', 'olddocno'])
group_max_idx = groups['score'].idxmax()
rtr = topics_and_res.loc[group_max_idx, :]
rtr = rtr.drop(columns=['docno', 'pid']).rename(columns={"olddocno" : "docno"})
if self.agg == 'first':
#could this be done by just selectin pid = 0?
topics_and_res.pid = topics_and_res.pid.astype(int)
rtr = topics_and_res[topics_and_res.pid == 0].rename(columns={"olddocno" : "docno"})
groups = topics_and_res.groupby(['qid', 'olddocno'])
group_first_idx = groups['pid'].idxmin()
rtr = topics_and_res.loc[group_first_idx, ]
rtr = rtr.drop(columns=['docno', 'pid']).rename(columns={"olddocno" : "docno"})
if self.agg == 'mean':
rtr = topics_and_res.groupby(['qid', 'olddocno'])['score'].mean().reset_index().rename(columns={'olddocno' : 'docno'})
from .model import query_columns
#add query columns back
rtr = rtr.merge(topics_and_res[query_columns(topics_and_res)].drop_duplicates(), on='qid')
if self.agg == 'kmaxavg':
rtr = topics_and_res.groupby(['qid', 'olddocno'])['score'].apply(lambda ser: ser.nlargest(self.K).mean()).reset_index().rename(columns={'olddocno' : 'docno'})
from .model import query_columns
#add query columns back
rtr = rtr.merge(topics_and_res[query_columns(topics_and_res)].drop_duplicates(), on='qid')
if "docid" in rtr.columns:
rtr = rtr.drop(columns=['docid'])
rtr = add_ranks(rtr)
return rtr
class KMaxAvgPassage(DePassager):
"""
See ICIP at TREC-2020 Deep Learning Track, X.Chen et al. Proc. TREC 2020.
Usage:
X >> SlidingWindowPassager() >> Y >> KMaxAvgPassage(2)
where X is some kind of model for obtaining the text of documents and Y is a text scorer, such as BERT or ColBERT
"""
def __init__(self, K, **kwargs):
kwargs["agg"] = "kmaxavg"
self.K = K
super().__init__(**kwargs)
class MaxPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "max"
super().__init__(**kwargs)
class FirstPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "first"
super().__init__(**kwargs)
class MeanPassage(DePassager):
def __init__(self, **kwargs):
kwargs["agg"] = "mean"
super().__init__(**kwargs)
class SlidingWindowPassager(Transformer):
def __init__(self, text_attr='body', title_attr='title', passage_length=150, passage_stride=75, join=' ', prepend_title=True, tokenizer=None, **kwargs):
super().__init__(**kwargs)
self.text_attr=text_attr
self.title_attr=title_attr
self.passage_length = passage_length
self.passage_stride= passage_stride
self.join = join
self.prepend_title = prepend_title
self.tokenizer = tokenizer
# check if the tokenizer has the `.tokenize()` and the `.convert_tokens_to_string()` method
if self.tokenizer is not None:
self.tokenize = self.tokenizer.tokenize
self.detokenize = self.tokenizer.convert_tokens_to_string
else:
self.tokenize = re.compile(r"\s+").split
self.detokenize = ' '.join
def _check_columns(self, topics_and_res):
if not self.text_attr in topics_and_res.columns:
raise KeyError("%s is a required input column, but not found in input dataframe. Found %s" % (self.text_attr, str(list(topics_and_res.columns))))
if self.prepend_title and not self.title_attr in topics_and_res.columns:
raise KeyError("%s is a required input column, but not found in input dataframe. Set prepend_title=False to disable its use. Found %s" % (self.title_attr, str(list(topics_and_res.columns))))
if not "docno" in topics_and_res.columns:
raise KeyError("%s is a required input column, but not found in input dataframe. Found %s" % ("docno", str(list(topics_and_res.columns))))
def transform(self, topics_and_res):
# validate input columns
self._check_columns(topics_and_res)
print("calling sliding on df of %d rows" % len(topics_and_res))
# now apply the passaging
if "qid" in topics_and_res.columns:
return self.applyPassaging(topics_and_res, labels="label" in topics_and_res.columns)
return self.applyPassaging_no_qid(topics_and_res)
def applyPassaging_no_qid(self, df):
rows=[]
for row in df.itertuples():
row = row._asdict()
toks = self.tokenize(row[self.text_attr])
if len(toks) < self.passage_length:
row['docno'] = row['docno'] + "%p0"
row[self.text_attr] = self.detokenize(toks)
if self.prepend_title:
row[self.text_attr] = str(row[self.title_attr]) + self.join + row[self.text_attr]
del(row[self.title_attr])
rows.append(row)
else:
passageCount=0
for i, passage in enumerate( slidingWindow(toks, self.passage_length, self.passage_stride)):
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p" + str(i)
newRow[self.text_attr] = self.detokenize(passage)
if self.prepend_title:
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
del(newRow[self.title_attr])
rows.append(newRow)
passageCount+=1
return pd.DataFrame(rows)
def applyPassaging(self, df, labels=True):
newRows=[]
labelCount=defaultdict(int)
currentQid=None
rank=0
copy_columns=[]
for col in ["score", "rank"]:
if col in df.columns:
copy_columns.append(col)
if len(df) == 0:
return pd.DataFrame(columns=['qid', 'query', 'docno', self.text_attr, 'score', 'rank'])
from pyterrier import tqdm
with tqdm('passsaging', total=len(df), desc='passaging', leave=False) as pbar:
for index, row in df.iterrows():
pbar.update(1)
qid = row['qid']
if currentQid is None or currentQid != qid:
rank=0
currentQid = qid
rank+=1
toks = self.tokenize(row[self.text_attr])
if len(toks) < self.passage_length:
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p0"
newRow[self.text_attr] = self.detokenize(toks)
if self.prepend_title:
newRow.drop(labels=[self.title_attr], inplace=True)
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
if labels:
labelCount[row['label']] += 1
for col in copy_columns:
newRow[col] = row[col]
newRows.append(newRow)
else:
passageCount=0
for i, passage in enumerate( slidingWindow(toks, self.passage_length, self.passage_stride)):
newRow = row.copy()
newRow['docno'] = row['docno'] + "%p" + str(i)
newRow[self.text_attr] = self.detokenize(passage)
if self.prepend_title:
newRow.drop(labels=[self.title_attr], inplace=True)
newRow[self.text_attr] = str(row[self.title_attr]) + self.join + newRow[self.text_attr]
for col in copy_columns:
newRow[col] = row[col]
if labels:
labelCount[row['label']] += 1
newRows.append(newRow)
passageCount+=1
newDF = pd.DataFrame(newRows)
newDF['query'].fillna('',inplace=True)
newDF[self.text_attr].fillna('',inplace=True)
newDF['qid'].fillna('',inplace=True)
newDF.reset_index(inplace=True,drop=True)
return newDF