from pyterrier import Transformer
from typing import Dict, List, Literal, Optional, Tuple, Union, overload
from ._execution import _run_and_evaluate
from ._utils import _restore_state, _save_state
from . import MEASURE_TYPE, MEASURES_TYPE
from ir_measures import Measure
import pandas as pd
import pyterrier as pt
TRANSFORMER_PARAMETER_VALUE_TYPE = Union[str,float,int,str]
GRID_SCAN_PARAM_SETTING = Tuple[
Transformer,
str,
TRANSFORMER_PARAMETER_VALUE_TYPE
]
GRID_SEARCH_RETURN_TYPE_SETTING = Tuple[
float,
List[GRID_SCAN_PARAM_SETTING]
]
GRID_SEARCH_RETURN_TYPE_BOTH = Tuple[
Transformer,
float,
List[GRID_SCAN_PARAM_SETTING]
]
@overload
def GridSearch(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics : pd.DataFrame,
qrels : pd.DataFrame,
metric : MEASURE_TYPE,
jobs : int,
backend: str,
verbose: bool ,
batch_size : Optional[int],
return_type : Literal['opt_pipeline'],
) -> Transformer: ...
@overload
def GridSearch(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics : pd.DataFrame,
qrels : pd.DataFrame,
metric : MEASURE_TYPE,
jobs : int,
backend: str,
verbose: bool ,
batch_size : Optional[int],
return_type : Literal['best_setting'],
) -> GRID_SEARCH_RETURN_TYPE_SETTING: ...
@overload
def GridSearch(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics : pd.DataFrame,
qrels : pd.DataFrame,
metric : MEASURE_TYPE,
jobs : int,
backend: str,
verbose: bool ,
batch_size : Optional[int],
return_type : Literal['both'],
) -> GRID_SEARCH_RETURN_TYPE_BOTH: ...
[docs]
def GridSearch(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics : pd.DataFrame,
qrels : pd.DataFrame,
metric : MEASURE_TYPE = "map",
jobs : int = 1,
backend='joblib',
verbose: bool = False,
batch_size : Optional[int] = None,
return_type : Literal['opt_pipeline', 'best_setting', 'both'] = "opt_pipeline"
) -> Union[Transformer,GRID_SEARCH_RETURN_TYPE_SETTING,GRID_SEARCH_RETURN_TYPE_BOTH]:
"""
GridSearch is essentially, an argmax GridScan(), i.e. it returns an instance of the pipeline to tune
with the best parameter settings among params, that were found that were obtained using the specified
topics and qrels, and for the specified measure.
:param pipeline: a transformer or pipeline to tune
:param params: a two-level dictionary, mapping transformer to param name to a list of values
:param topics: topics to tune upon
:param qrels: qrels to tune upon
:param metric: name of the metric on which to determine the most effective setting. Defaults to "map".
:param batch_size: If not None, evaluation is conducted in batches of batch_size topics. Default=None, which evaluates all topics at once.
Applying a batch_size is useful if you have large numbers of topics, and/or if your pipeline requires large amounts of temporary memory
during a run. Default is None.
:param jobs: Number of parallel jobs to run. Default is 1, which means sequentially.
:param backend: Parallelisation backend to use. Defaults to "joblib".
:param verbose: whether to display progress bars or not
:param return_type: whether to return the same transformer with optimal pipeline setting, and/or a setting of the
higher metric value, and the resulting transformers and settings.
"""
# save state
initial_state = _save_state(params)
if isinstance(metric, list):
raise KeyError("GridSearch can only maximise ONE metric, but you passed a list (%s)." % str(metric))
grid_outcomes = GridScan(
pipeline,
params,
topics,
qrels,
[metric],
jobs,
backend,
verbose,
batch_size,
dataframe=False)
assert not isinstance(grid_outcomes, pd.DataFrame)
assert len(grid_outcomes) > 0, "GridScan returned 0 rows"
max_measure = grid_outcomes[0][1][metric]
max_setting = grid_outcomes[0][0]
for setting, measures in grid_outcomes: # TODO what is the type of this iteration?
if measures[metric] > max_measure:
max_measure = measures[metric]
max_setting = setting
print("Best %s is %f" % (metric, max_measure))
print("Best setting is %s" % str(["%s %s=%s" % (str(t), k, v) for t, k, v in max_setting]))
if return_type == "opt_pipeline":
for tran, param, value in max_setting:
tran.set_parameter(param, value)
return pipeline
if return_type == "best_setting":
_restore_state(initial_state)
return max_measure, max_setting
if return_type == "both":
for tran, param, value in max_setting:
tran.set_parameter(param, value)
return (pipeline, max_measure, max_setting)
raise ValueError("Unknown return_type option %s" % return_type)
[docs]
def GridScan(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics : pd.DataFrame,
qrels : pd.DataFrame,
metrics : Union[MEASURE_TYPE,MEASURES_TYPE] = ["map"],
jobs : int = 1,
backend='joblib',
verbose: bool = False,
batch_size = None,
dataframe = True,
) -> Union[pd.DataFrame, List [ Tuple [ List[ GRID_SCAN_PARAM_SETTING ], Dict[Union[str, Measure] ,float] ] ] ]:
"""
GridScan applies a set of named parameters on a given pipeline and evaluates the outcome. The topics and qrels
must be specified. The trec_eval measure names can be optionally specified.
The transformers being tuned, and their respective parameters are named in the param_dict. The parameter being
varied must be changable using the :func:`set_parameter()` method. This means instance variables,
as well as controls in the case of Retriever.
:param pipeline: a transformer or pipeline
:param params: a two-level dictionary, mapping transformer to param name to a list of values
:param topics: topics to tune upon
:param qrels: qrels to tune upon
:param metrics): name of the metrics to report for each setting. Defaults to ["map"].
:param batch_size: If not None, evaluation is conducted in batches of batch_size topics. Default=None, which evaluates all topics at once.
Applying a batch_size is useful if you have large numbers of topics, and/or if your pipeline requires large amounts of temporary memory
during a run. Default is None.
:param jobs: Number of parallel jobs to run. Default is 1, which means sequentially.
:param backend: Parallelisation backend to use. Defaults to "joblib".
:param verbose: whether to display progress bars or not
:param dataframe: return a dataframe or a list
:return: A dataframe showing the effectiveness of all evaluated settings, if dataframe=True
A list of settings and resulting evaluation measures, if dataframe=False
Raises:
ValueError: if a specified transformer does not have such a parameter
Example::
# graph how PL2's c parameter affects MAP
pl2 = pt.terrier.Retriever(index, wmodel="PL2", controls={'c' : 1})
rtr = pt.GridScan(
pl2,
{pl2 : {'c' : [0.1, 1, 5, 10, 20, 100]}},
topics,
qrels,
["map"]
)
import matplotlib.pyplot as plt
plt.plot(rtr["tran_0_c"], rtr["map"])
plt.xlabel("PL2's c value")
plt.ylabel("MAP")
plt.show()
"""
import itertools
if verbose and jobs > 1:
from warnings import warn
warn("Cannot provide progress on parallel job")
if isinstance(metrics, str):
metrics = [metrics]
# Store the all parameter names and candidate values into a dictionary, keyed by a tuple of the transformer and the parameter name
# such as {(Retriever, 'wmodel'): ['BM25', 'PL2'], (Retriever, 'c'): [0.1, 0.2, 0.3], (Bla, 'lr'): [0.001, 0.01, 0.1]}
candi_dict: Dict[Tuple[Transformer, str], List[TRANSFORMER_PARAMETER_VALUE_TYPE]] = {}
for tran, param_set in params.items():
for param_name, values in param_set.items():
candi_dict[ (tran, param_name) ] = values
if len(candi_dict) == 0:
raise ValueError("No parameters specified to optimise")
for tran, param in candi_dict:
try:
tran.get_parameter(param)
except Exception:
raise ValueError("Transformer %s does not expose a parameter named %s" % (str(tran), param))
keys, vals = zip(*candi_dict.items())
combinations = list(itertools.product(*vals))
assert len(combinations) > 0, "No combinations selected"
def _evaluate_one_setting(keys, values):
#'params' is every combination of candidates
params = dict(zip(keys, values))
parameter_list = []
# Set the parameter value in the corresponding transformer of the pipeline
for (tran, param_name), value in params.items():
tran.set_parameter(param_name, value)
# such as (Retriever, 'wmodel', 'BM25')
parameter_list.append( (tran, param_name, value) )
time, eval_scores = _run_and_evaluate(pipeline, topics, qrels, metrics, perquery=False, batch_size=batch_size)
return parameter_list, eval_scores
def _evaluate_several_settings(inputs : List[Tuple]):
return [_evaluate_one_setting(k,v) for k, v in inputs]
eval_list = []
#for each combination of parameter values
if jobs == 1:
for v in pt.tqdm(combinations, total=len(combinations), desc="GridScan", mininterval=0.3) if verbose else combinations:
parameter_list, eval_scores = _evaluate_one_setting(keys, v)
eval_list.append( (parameter_list, eval_scores) )
else:
import itertools
import more_itertools
try:
from pyterrier_alpha.parallel import parallel_lambda # type: ignore
except ImportError as ie:
raise ImportError("pyterrier-alpha[parallel] must be installed for jobs>1") from ie
all_inputs = [(keys, values) for values in combinations]
# how many jobs to distribute this to
num_batches = int(len(combinations)/jobs) if len(combinations) >= jobs else len(combinations)
# built the batches to distribute
batched_inputs = list(more_itertools.chunked(all_inputs, num_batches))
assert len(batched_inputs) > 0, "No inputs identified for parallel_lambda"
eval_list = parallel_lambda(_evaluate_several_settings, batched_inputs, jobs, backend=backend)
eval_list = list(itertools.chain(*eval_list))
assert len(eval_list) > 0, "parallel_lambda returned 0 rows"
# resulting eval_list has the form [
# ( [(BR, 'wmodel', 'BM25'), (BR, 'c', 0.2)] , {"map" : 0.2654} )
# ]
# ie, a list of possible settings, combined with measure values
if not dataframe:
return eval_list
rtr=[]
for setting, measures in eval_list:
row={}
for i, (tran, param, value) in enumerate(setting):
row["tran_%d" % i] = tran
row["tran_%d_%s" % (i,param) ] = value
row.update(measures)
rtr.append(row)
# resulting dataframe looks like:
# tran_0 tran_0_c map
#0 BR(PL2) 0.1 0.104820
#1 BR(PL2) 1.0 0.189274
#2 BR(PL2) 5.0 0.230838
return pd.DataFrame(rtr)
[docs]
def KFoldGridSearch(
pipeline : Transformer,
params : Dict[Transformer,Dict[str,List[TRANSFORMER_PARAMETER_VALUE_TYPE]]],
topics_list : List[pd.DataFrame],
qrels : Union[pd.DataFrame,List[pd.DataFrame]],
metric : MEASURE_TYPE = "map",
jobs : int = 1,
backend='joblib',
verbose: bool = False,
batch_size : Optional[int] = None) -> Tuple[pd.DataFrame, List[List[GRID_SCAN_PARAM_SETTING]]]:
"""
Applies a GridSearch using different folds. It returns the *results* of the
tuned transformer pipeline on the test topics. The number of topics dataframes passed
to topics_list defines the number of folds. For each fold, all but one of the dataframes
is used as training, and the remainder used for testing.
The state of the transformers in the pipeline is restored after the KFoldGridSearch has
been executed.
:param pipeline: a transformer or pipeline to tune
:param params: a two-level dictionary, mapping transformer to param name to a list of values
:param topics_list: a *list* of topics dataframes to tune upon
:param qrels: qrels to tune upon. A single dataframe, or a list for each fold.
:param metric: name of the metric on which to determine the most effective setting. Defaults to "map".
:param batch_size: If not None, evaluation is conducted in batches of batch_size topics. Default=None, which evaluates all topics at once.
Applying a batch_size is useful if you have large numbers of topics, and/or if your pipeline requires large amounts of temporary memory
during a run. Default is None.
:param jobs: Number of parallel jobs to run. Default is 1, which means sequentially.
:param backend: Parallelisation backend to use. Defaults to "joblib".
:param verbose(bool): whether to display progress bars or not
:return: A tuple containing, firstly, the results of pipeline on the test topics after tuning, and secondly, a list of the best parameter settings for each fold.
Consider tuning a terrier.Retriever PL2 where the folds of queries are pre-determined::
pl2 = pt.terrier.Retriever(index, wmodel="PL2", controls={'c' : 1})
tuned_pl2, _ = pt.KFoldGridSearch(
pl2,
{pl2 : {'c' : [0.1, 1, 5, 10, 20, 100]}},
[topicsf1, topicsf2],
qrels,
["map"]
)
pt.Experiment([pl2, tuned_pl2], all_topics, qrels, ["map"])
As 2 splits are defined, PL2 is first tuned on topicsf1 and tested on topicsf2, then
trained on topicsf2 and tested on topicsf1. The results dataframe of PL2 after tuning of the c
parameter are returned by the KFoldGridSearch, and can be used directly in a pt.Experiment().
"""
import pandas as pd
num_folds = len(topics_list)
if isinstance(qrels, pd.DataFrame):
qrels = [qrels] * num_folds
FOLDS=list(range(0, num_folds))
results : List[pd.DataFrame] = []
settings=[]
# save state
initial_state = _save_state(params)
for fold in FOLDS:
print("Fold %d" % (fold+1))
train_indx = FOLDS.copy()
train_indx.remove(fold)
train_topics = pd.concat([topics_list[offset] for offset in train_indx])
train_qrels = pd.concat([qrels[offset] for offset in train_indx])
test_topics = topics_list[fold]
#test_qrels arent needed
#test_qrels = qrels[fold]
# safety - give the GridSearch a stable initial setting
_restore_state(initial_state)
optPipe: Transformer
max_measure: float
max_setting: List[GRID_SCAN_PARAM_SETTING]
optPipe, max_measure, max_setting = GridSearch(
pipeline,
params,
train_topics,
train_qrels,
metric,
jobs=jobs,
backend=backend,
verbose=verbose,
batch_size=batch_size,
return_type="both")
results.append(optPipe.transform(test_topics))
settings.append(max_setting)
# restore state
_restore_state(initial_state)
return (pd.concat(results), settings)