pt.apply - Custom Transformers¶
PyTerrier pipelines are easily extensible through the use of apply functions. These are inspired by the Pandas apply() method, which allow to apply a function to each row of a dataframe. Instead, in PyTerrier, the apply methods allow to construct pipeline transformers to address common use cases by using custom functions (including Python lambda functions) to easily transform inputs.
The table below lists the main classes of transformation in the PyTerrier data
model, as well as the appropriate apply method to use in each case. In general,
if there is a one-to-one mapping between the input and the output, then the specific
pt.apply methods should be used (i.e. query(), doc_score(), .doc_features()).
If the cardinality of the dataframe changes through applying the transformer,
then generic() or by_query() must be applied.
In particular, through the use of pt.apply.doc_score(), any reranking method that can be expressed
as a function of the text of the query and the text of the doucment can used as a reranker
in a PyTerrier pipeline.
Each apply method takes as input a function (e.g. a function name, or a lambda expression). Objects that are passed to the function vary in terms of the type of the input dataframe (queries or ranked documents), whether they represent one row (pd.Series or dictionary) or many rows (pd.DataFrame or list of dictionaries), and also vary in terms of what should be returned by the function.
Hint
It is usually a good idea to validate the inputs to make sure they contain the values you expect. See Input Validation for more details.
Input |
Output |
Cardinality |
Example |
Example apply |
Function Input type |
Function Return type |
|---|---|---|---|---|---|---|
Q |
Q |
1 to 1 |
Query rewriting |
pt.apply.query() |
row of one query (pd.Series/dict) |
str |
Q x D |
Q x D |
1 to 1 |
Re-ranking |
pt.apply.doc_score() |
row of one document (pd.Series/dict) |
float |
Q x D |
Q x Df |
1 to 1 |
Feature scoring |
pt.apply.doc_features() |
row of one document (pd.Series/dict) |
numpy array |
Q x D |
Q |
N to 1 |
Query expansion |
pt.apply.generic() |
entire (dataframe/iter-dict) |
entire dataframe / iterable of dict |
pt.apply.by_query() |
(dataframe/iter-dict) for 1 query |
dataframe / iterable of dict for 1 query |
||||
Q |
Q x D |
1 to N |
Retrieval |
pt.apply.generic() |
entire (dataframe/iter-dict) |
entire dataframe |
pt.apply.by_query() |
(dataframe/iter-dict) for 1 query |
dataframe / iterable of dict for 1 query |
||||
D |
None |
N to 0 |
Indexing |
pt.apply.indexer() |
iterable dictionary |
anything |
In each case, the result from calling a pyterrier.apply method is another PyTerrier transformer
(i.e. extends pt.Transformer), which can be used for experimentation or combined with other
PyTerrier transformers through the standard PyTerrier operators.
If verbose=True is passed to any pyterrier apply method (except generic()), then a TQDM progress bar will be shown as the transformer is applied.
Example¶
In the following, we create a document re-ranking transformer that increases the score of documents by 10% if their url attribute contains “https:”
>>> df = pd.DataFrame([["q1", "d1", "https://www.example.com", 1.0, 1]], columns=["qid", "docno", "url", "score", "rank"])
>>> df
qid docno url score rank
0 q1 d1 https://www.example.com 1.0 1
>>>
>>> http_boost = pt.apply.doc_score(lambda row: row["score"] * 1.1 if "https:" in row["url"] else row["score"])
>>> http_boost(df)
qid docno url score rank
0 q1 d1 https://www.example.com 1.1 0
We can combine this pt.apply.doc_score() transformer into as a re-ranking pipeline using the >> operator:
pipeline = bm25 >> http_boost
Further examples are shown for each apply method below.
Apply Methods¶
- pyterrier.apply.query(fn, *args, required_columns=['qid', 'query'], label=None, **kwargs)[source]¶
Create a transformer that takes as input a query, and applies a supplied function to compute a new query formulation.
The supplied function is called once for each query, and must return a string containing the new query formulation. Each time it is called, the function is supplied with a Panda Series or dict representing the attributes of the query. The particular type of the input is not controlled by the implementor, so the function should be written to support both, e.g. using
row["key"]notation and not therow.keythat is supported by a Series.The previous query formulation is saved in the “query_0” column. If a later pipeline stage is intended to resort to be executed on the previous query formulation, a
pt.rewrite.reset()transformer can be applied.- Parameters:
fn (
Callable[[Series|Dict[str,Any]],str]) – the function to apply to each row. It must return a string containing the new query formulation.required_columns (
List[str] |None) – The list of columns that must be present in the input dataframe. Defaults to [‘qid’, ‘query’].verbose – if set to True, a TQDM progress bar will be displayed
label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Examples:
# this will remove pre-defined stopwords from the query stops=set(["and", "the"]) # a naieve function to remove stopwords - takes as input a Pandas Series, and returns a string def _remove_stops(q): terms = q["query"].split(" ") terms = [t for t in terms if not t in stops ] return " ".join(terms) # a query rewriting transformer that applies the _remove_stops to each row of an input dataframe p1 = pt.apply.query(_remove_stops) >> pt.terrier.Retriever(index, wmodel="DPH") # an equivalent query rewriting transformer using an anonymous lambda function p2 = pt.apply.query( lambda q : " ".join([t for t in q["query"].split(" ") if t not in stops ]) ) >> pt.terrier.Retriever(index, wmodel="DPH")
In both of the example pipelines above (p1 and p2), the exact topics are not known until the pipeline is invoked, e.g. by using p1.transform(topics) on a topics dataframe, or within a
pt.Experiment(). When the pipeline is invoked, the specified function (_remove_stops in the case of p1) is called for each row of the input datatrame (becoming the q function argument).
- pyterrier.apply.doc_score(fn, *args, required_columns=['qid', 'query', 'docno'], batch_size=None, label=None, **kwargs)[source]¶
Create a transformer that takes as input a ranked documents dataframe, and applies a supplied function to compute a new score. Ranks are automatically computed. doc_score() can operate row-wise, or batch-wise, depending on whether batch_size is set.
The supplied function is called once for each document, and must return a float containing the new score for that document. Each time it is called, the function is supplied with a Panda Series representing the attributes of the query and document.
- Parameters:
fn (
Callable[[Series|Dict[str,Any]],float] |Callable[[DataFrame],Sequence[float]]) – the function to apply to each rowbatch_size – How many documents to operate on at once (batch-wise). If None, operates row-wise
required_columns (
List[str] |None) – If provided, should be a list of columns that must be present in the input dataframe. Defaults to [‘qid’, ‘query’, ‘docno’].verbose – if set to True, a TQDM progress bar will be displayed
label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Example (Row-wise):
# this transformer will subtract 5 from the score of each document p = pt.terrier.Retriever(index, wmodel="DPH") >> pt.apply.doc_score(lambda doc : doc["score"] -5) # doc["score"] works for both a dict and Series
Can be used in batch-wise manner, which is particularly useful for appling neural models. In this case, the scoring function receives a dataframe, rather than a single row:
def _doclen(df): # returns series of lengths return df.text.str.len() pipe = pt.terrier.Retriever(index) >> pt.apply.doc_score(_doclen, batch_size=128)
Can also be used to create individual features that are combined using the
**feature-union operator:pipeline = bm25 >> ( some_features ** pt.apply.doc_score(_doclen) )
- pyterrier.apply.doc_features(fn, *args, required_columns=['qid', 'query', 'docno'], label=None, **kwargs)[source]¶
Create a transformer that takes as input a ranked documents dataframe, and applies the supplied function to each document to compute feature scores.
The supplied function is called once for each document, must each time return a 1D numpy array. Each time it is called, the function is supplied with a Panda Series, or a dictionary, representing the attributes of the query and document. The particular type of the input is not controlled by the implementor, so the function should be written to support both, e.g. using
row["key"]notation and not therow.keythat is supported by a Series.- Parameters:
fn (
Callable[[Series|Dict[str,Any]],ndarray[Any,dtype[Any]]]) – the function to apply to each row. It must return a 1D numpy arrayrequired_columns (
List[str] |None) – The list of columns that must be present in the input dataframe.verbose – if set to True, a TQDM progress bar will be displayed
label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Example:
# this transformer will compute the character and number of word in each document retrieved # using the contents of the document obtained from the MetaIndex def _features(row): docid = row["docid"] content = index.getMetaIndex().getItem("text", docid) f1 = len(content) f2 = len(content.split(" ")) return np.array([f1, f2]) p = pt.terrier.Retriever(index, wmodel="BM25") >> pt.apply.doc_features(_features )
NB: If you only want to calculate a single feature to add to existing features, it is better to use
pt.apply.doc_score()and the**feature union operator:pipeline = bm25 >> ( some_features ** pt.apply.doc_score(one_feature) )
- pyterrier.apply.indexer(fn, required_columns=None, *, label=None, **kwargs)[source]¶
Create an instance of pt.Indexer using a function that takes as input an interable dictionary.
The supplied function is called once. It may optionally return something (typically a reference to the “index”).
- Parameters:
fn (
Callable[[Iterable[Dict[str,Any]]],Any]) – the function that consumes documents as IterDicts.required_columns (
List[str] |None) – If provided, should be a list of columns that must be present in the input IterDicts.label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Example:
# make a pt.Indexer that returns the numnber of documents consumed def _counter(iter_dict): count = 0 for d in iter_dict: count += 1 return count indexer = pt.apply.indexer(_counter) rtr = indexer.index([ {'docno' : 'd1'}, {'docno' : 'd2'}])
- pyterrier.apply.rename(columns, *, errors='raise', label=None)[source]¶
Creates a transformer that renames columns in a dataframe.
- Parameters:
columns (
Dict[str,str]) – A dictionary mapping from old column name to new column nameerrors (
Literal['raise','ignore']) – Maps to df.rename() errors kwarg - default to ‘raise’, alternatively can be ‘ignore’label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Example:
pipe = pt.terrier.Retriever(index, metadata=["docno", "body"]) >> pt.apply.rename({'body':'text'})
- pyterrier.apply.generic(fn, *args, batch_size=None, iter=False, transform_outputs=None, label=None, **kwargs)[source]¶
Create a transformer that changes the input dataframe to another dataframe in an unspecified way.
The supplied function is called once for an entire result set as a dataframe or iter-dict (which may contain one or more queries and one or more documents). Each time it should return a new dataframe. The returned dataframe (or yielded row) should abide by the general PyTerrier Data Model, for instance updating the rank column if the scores are amended.
- Parameters:
fn (
Callable[[DataFrame],DataFrame] |Callable[[Iterable[Dict[str,Any]]],Iterable[Dict[str,Any]]]) – the function to apply to each result setbatch_size (
int|None) – whether to apply fn on batches of rows or all that are receivedrequired_columns – If provided, should be a list of columns that must be present in the input dataframe.
verbose – Whether to display a progress bar over batches (only used if batch_size is set, and iter is not set).
iter (
bool) – Whether to use the iter-dict API - if-so, thenfnreceives an iterable, and returns an iterable.transform_outputs (
Callable[[List[str]],List[str]] |None) – Used to support inspection. If provided, should be a function (e.g. lambda) that takes the input columns as an argument, and returns the list of columns that the transformer will output. This need only be set if you need inspectability and iter=True, or your transformer doesn’t respond well to being inspected by empty dataframes.label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Example (dataframe):
# this pipeline would remove all but the first two documents from a result set pipe = pt.terrier.Retriever(index) >> pt.apply.generic(lambda res : res[res["rank"] < 2])
Example (iter-dict):
# this pipeline would simlarly remove all but the first two documents from a result set def _fn(iterdict): for result in iterdict: if result["rank"] < 2: yield result pipe1 = pt.terrier.Retriever(index) >> pt.apply.generic(_fn, iter=True) # transform_iter() can also return an iterable, so returning a list is also permissible pipe2 = pt.terrier.Retriever(index) >> pt.apply.generic(lambda res: [row for row in res if row["rank"] < 2], iter=True)
- pyterrier.apply.by_query(fn, *args, batch_size=None, iter=False, verbose=False, transform_outputs=None, label=None, **kwargs)[source]¶
As pt.apply.generic() except that fn receives a dataframe (or iter-dict) for one query at at time, rather than all results at once. If batch_size is set, fn will receive no more than batch_size documents for any query. The verbose kwargs controls whether to display a progress bar over queries.
- Parameters:
fn (
Callable[[DataFrame],DataFrame] |Callable[[Iterable[Dict[str,Any]]],Iterable[Dict[str,Any]]]) – the function to apply to each row. Should return a generatorbatch_size (
int|None) – whether to apply fn on batches of rows or all that are received.required_columns – If provided, should be a list of columns that must be present in the input dataframe.
verbose (
bool) – Whether to display a progress bar over batches (only used if batch_size is set, and iter is not set).iter (
bool) – Whether to use the iter-dict API - if-so, thenfnreceives an iterable, and must return an iterable.transform_outputs (
Callable[[List[str]],List[str]] |None) – Used to support inspection. If provided, should be a function (e.g. lambda) that takes the input columns as an argument, and returns the list of columns that the transformer will output. This need only be set if you need inspectability and iter=True, or your transformer doesn’t respond well to being inspected by empty dataframes.label (
str|None) – Optional label for the schematic representation of this transformer
- Return type:
Making New Columns and Dropping Columns¶
Its also possible to construct a transformer that makes a new column on a row-wise basis by directly naming the new column in pt.apply.
For instance, if the column you are creating is called rank_2, it might be created as follows:
pipe = pt.terrier.Retriever(index) >> pt.apply.rank_2(lambda row: row["rank"] * 2)
To create a transformer that drops a column, you can instead pass drop=True as a kwarg:
pipe = pt.terrier.Retriever(index, metadata=["docno", "text"] >> pt.text.scorer() >> pt.apply.text(drop=True)