Source code for pyterrier.apply

from typing import Callable, Any, Dict, Union, Sequence
from .apply_base import ApplyDocumentScoringTransformer, ApplyQueryTransformer, ApplyDocFeatureTransformer, ApplyForEachQuery, ApplyGenericTransformer, Transformer
from nptyping import NDArray
import pandas as pd

def _bind(instance, func, as_name=None):
    """
    Bind the function *func* to *instance*, with either provided name *as_name*
    or the existing name of *func*. The provided *func* should accept the 
    instance as the first argument, i.e. "self".
    """
    if as_name is None:
        as_name = func.__name__
    bound_method = func.__get__(instance, instance.__class__)
    setattr(instance, as_name, bound_method)
    return bound_method

[docs]def query(fn : Callable[[pd.Series], str], *args, **kwargs) -> Transformer: """ Create a transformer that takes as input a query, and applies a supplied function to compute a new query formulation. The supplied function is called once for each query, and must return a string containing the new query formulation. Each time it is called, the function is supplied with a Panda Series representing the attributes of the query. The previous query formulation is saved in the "query_0" column. If a later pipeline stage is intended to resort to be executed on the previous query formulation, a `pt.rewrite.reset()` transformer can be applied. Arguments: fn(Callable): the function to apply to each row. It must return a string containing the new query formulation. verbose(bool): if set to True, a TQDM progress bar will be displayed Examples:: # this will remove pre-defined stopwords from the query stops=set(["and", "the"]) # a naieve function to remove stopwords - takes as input a Pandas Series, and returns a string def _remove_stops(q): terms = q["query"].split(" ") terms = [t for t in terms if not t in stops ] return " ".join(terms) # a query rewriting transformer that applies the _remove_stops to each row of an input dataframe p1 = pt.apply.query(_remove_stops) >> pt.BatchRetrieve(index, wmodel="DPH") # an equivalent query rewriting transformer using an anonymous lambda function p2 = pt.apply.query( lambda q : " ".join([t for t in q["query"].split(" ") if t not in stops ]) ) >> pt.BatchRetrieve(index, wmodel="DPH") In both of the example pipelines above (`p1` and `p2`), the exact topics are not known until the pipeline is invoked, e.g. by using `p1.transform(topics)` on a topics dataframe, or within a `pt.Experiment()`. When the pipeline is invoked, the specified function (`_remove_stops` in the case of `p1`) is called for **each** row of the input datatrame (becoming the `q` function argument). """ return ApplyQueryTransformer(fn, *args, **kwargs)
[docs]def doc_score(fn : Union[Callable[[pd.Series], float], Callable[[pd.DataFrame], Sequence[float]]], *args, batch_size=None, **kwargs) -> Transformer: """ Create a transformer that takes as input a ranked documents dataframe, and applies a supplied function to compute a new score. Ranks are automatically computed. doc_score() can operate row-wise, or batch-wise, depending on whether batch_size is set. The supplied function is called once for each document, and must return a float containing the new score for that document. Each time it is called, the function is supplied with a Panda Series representing the attributes of the query and document. Arguments: fn(Callable): the function to apply to each row batch_size (int or None). How many documents to operate on at once (batch-wise). If None, operates row-wise verbose(bool): if set to True, a TQDM progress bar will be displayed Example (Row-wise):: # this transformer will subtract 5 from the score of each document p = pt.BatchRetrieve(index, wmodel="DPH") >> pt.apply.doc_score(lambda doc : doc["score"] -5) Can be used in batch-wise manner, which is particularly useful for appling neural models. In this case, the scoring function receives a dataframe, rather than a single row:: def _doclen(df): # returns series of lengths return df.text.str.len() pipe = pt.BatchRetrieve(index) >> pt.apply.doc_score(_doclen, batch_size=128) """ return ApplyDocumentScoringTransformer(fn, *args, batch_size=batch_size, **kwargs)
[docs]def doc_features(fn : Callable[[pd.Series], NDArray[Any]], *args, **kwargs) -> Transformer: """ Create a transformer that takes as input a ranked documents dataframe, and applies the supplied function to each document to compute feature scores. The supplied function is called once for each document, must each time return a 1D numpy array. Each time it is called, the function is supplied with a Panda Series representing the attributes of the query and document. Arguments: fn(Callable): the function to apply to each row verbose(bool): if set to True, a TQDM progress bar will be displayed Example:: # this transformer will compute the character and number of word in each document retrieved # using the contents of the document obtained from the MetaIndex def _features(row): docid = row["docid"] content = index.getMetaIndex().getItem("text", docid) f1 = len(content) f2 = len(content.split(" ")) return np.array([f1, f2]) p = pt.BatchRetrieve(index, wmodel="BM25") >> pt.apply.doc_features(_features ) """ return ApplyDocFeatureTransformer(fn, *args, **kwargs)
[docs]def rename(columns : Dict[str,str], *args, errors='raise', **kwargs) -> Transformer: """ Creates a transformer that renames columns in a dataframe. Args: columns(dict): A dictionary mapping from old column name to new column name errors(str): Maps to df.rename() errors kwarg - default to 'raise', alternatively can be 'ignore' Example:: pipe = pt.BatchRetrieve(index, metadata=["docno", "body"]) >> pt.apply.rename({'body':'text'}) """ return ApplyGenericTransformer(lambda df: df.rename(columns=columns, errors=errors), *args, **kwargs)
[docs]def generic(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> Transformer: """ Create a transformer that changes the input dataframe to another dataframe in an unspecified way. The supplied function is called once for an entire result set as a dataframe (which may contain one of more queries). Each time it should return a new dataframe. The returned dataframe should abide by the general PyTerrier Data Model, for instance updating the rank column if the scores are amended. Arguments: fn(Callable): the function to apply to each row batch_size(int or None): whether to apply fn on batches of rows or all that are received verbose(bool): Whether to display a progress bar over batches (only used if batch_size is set). Example:: # this transformer will remove all documents at rank greater than 2. # this pipeline would remove all but the first two documents from a result set pipe = pt.BatchRetrieve(index) >> pt.apply.generic(lambda res : res[res["rank"] < 2]) """ return ApplyGenericTransformer(fn, *args, batch_size=batch_size, **kwargs)
[docs]def by_query(fn : Callable[[pd.DataFrame], pd.DataFrame], *args, batch_size=None, **kwargs) -> Transformer: """ As `pt.apply.generic()` except that fn receives a dataframe for one query at at time, rather than all results at once. If batch_size is set, fn will receive no more than batch_size documents for any query. The verbose kwargs controls whether to display a progress bar over queries. """ return ApplyForEachQuery(fn, *args, batch_size=batch_size, **kwargs)
class _apply: def __init__(self): _bind(self, lambda self, fn, *args, **kwargs : query(fn, *args, **kwargs), as_name='query') _bind(self, lambda self, fn, *args, **kwargs : doc_score(fn, *args, **kwargs), as_name='doc_score') _bind(self, lambda self, fn, *args, **kwargs : doc_features(fn, *args, **kwargs), as_name='doc_features') _bind(self, lambda self, fn, *args, **kwargs : rename(fn, *args, **kwargs), as_name='rename') _bind(self, lambda self, fn, *args, **kwargs : by_query(fn, *args, **kwargs), as_name='by_query') _bind(self, lambda self, fn, *args, **kwargs : generic(fn, *args, **kwargs), as_name='generic') def __getattr__(self, item): from functools import partial return partial(generic_apply, item) def generic_apply(name, *args, drop=False, **kwargs) -> Transformer: if drop: return ApplyGenericTransformer(lambda df : df.drop(name, axis=1), *args, **kwargs) if len(args) == 0: raise ValueError("Must specify a fn, e.g. a lambda") fn = args[0] args=[] def _new_column(df): df[name] = df.apply(fn, axis=1, result_type='reduce') return df return ApplyGenericTransformer(_new_column, *args, **kwargs)