Source code for pyterrier_adaptive.gar
from typing import Optional
import numpy as np
from collections import Counter
import pyterrier as pt
import pandas as pd
import ir_datasets
import pyterrier_adaptive
logger = ir_datasets.log.easy()
[docs]
class GAR(pt.Transformer):
"""A :class:`~pyterrier.Transformer` that implements Graph-based Adaptive Re-ranking (GAR).
Required input columns: ``['qid', 'query', 'docno', 'score', 'rank']``
Output columns: ``['qid', 'query', 'docno', 'score', 'rank', 'iteration']``
.. note::
The iteration column defines the batch number that first identified the document in the
results. Due to the alternating nature of the algorithm, ``even=initial retrieval``, ``odd=corpus graph``,
and ``-1=backfilled``.
.. cite.dblp:: conf/cikm/MacAvaneyTM22
"""
def __init__(self,
scorer: pt.Transformer,
corpus_graph: 'pyterrier_adaptive.CorpusGraph',
num_results: int = 1000,
batch_size: Optional[int] = None,
backfill: bool = True,
enabled: bool = True,
verbose: bool = False):
"""
Args:
scorer(:class:`~pyterrier.Transformer`): A transformer that scores query-document pairs. It will only be provided with ['qid, 'query', 'docno', 'score'].
corpus_graph(:class:`~pyterrier_adaptive.CorpusGraph`): A graph of the corpus, enabling quick lookups of nearest neighbours
num_results(int): The maximum number of documents to score (called "budget" and $c$ in the paper)
batch_size(int): The number of documents to score at once (called $b$ in the paper). If not provided, will attempt to use the batch size from the scorer
backfill(bool): If True, always include all documents from the initial stage, even if they were not re-scored
enabled(bool): If False, perform re-ranking without using the corpus graph
verbose(bool): If True, print progress information
"""
self.scorer = scorer
self.corpus_graph = corpus_graph
self.num_results = num_results
if batch_size is None:
batch_size = scorer.batch_size if hasattr(scorer, 'batch_size') else 16
self.batch_size = batch_size
self.backfill = backfill
self.enabled = enabled
self.verbose = verbose
[docs]
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Applies Graph-based Adaptive Re-ranking to the provided dataframe. Essentially,
Algorithm 1 from the paper.
"""
result = {'qid': [], 'query': [], 'docno': [], 'rank': [], 'score': [], 'iteration': []}
df = dict(iter(df.groupby(by=['qid'])))
qids = df.keys()
if self.verbose:
qids = logger.pbar(qids, desc='adaptive re-ranking', unit='query')
for qid in qids:
query = df[qid]['query'].iloc[0]
scores = {}
res_map = [Counter(dict(zip(df[qid].docno, df[qid].score)))] # initial results
if self.enabled:
res_map.append(Counter()) # frontier
frontier_data = {'minscore': float('inf')}
iteration = 0
while len(scores) < self.num_results and any(r for r in res_map):
if len(res_map[iteration%len(res_map)]) == 0:
# if there's nothing available for the one we select, skip this iteration (i.e., move on to the next one)
iteration += 1
continue
this_res = res_map[iteration%len(res_map)] # alternate between the initial ranking and frontier
size = min(self.batch_size, self.num_results - len(scores)) # get either the batch size or remaining budget (whichever is smaller)
# build batch of documents to score in this round
batch = this_res.most_common(size)
batch = pd.DataFrame(batch, columns=['docno', 'score'])
batch['qid'] = qid
batch['query'] = query
# go score the batch of document with the re-ranker
batch = self.scorer(batch)
scores.update({k: (s, iteration) for k, s in zip(batch.docno, batch.score)})
self._drop_docnos_from_counters(batch.docno, res_map)
if len(scores) < self.num_results and self.enabled:
self._update_frontier(batch, res_map[1], frontier_data, scores)
iteration += 1
# Add scored items to results
result['qid'].append(np.full(len(scores), qid))
result['query'].append(np.full(len(scores), query))
result['rank'].append(np.arange(len(scores)))
for did, (score, i) in Counter(scores).most_common():
result['docno'].append(did)
result['score'].append(score)
result['iteration'].append(i)
# Backfill unscored items
if self.backfill and len(scores) < self.num_results:
last_score = result['score'][-1] if result['score'] else 0.
count = min(self.num_results - len(scores), len(res_map[0]))
result['qid'].append(np.full(count, qid))
result['query'].append(np.full(count, query))
result['rank'].append(np.arange(len(scores), len(scores) + count))
for i, (did, score) in enumerate(res_map[0].most_common()):
if i >= count:
break
result['docno'].append(did)
result['score'].append(last_score - 1 - i)
result['iteration'].append(-1)
return pd.DataFrame({
'qid': np.concatenate(result['qid']),
'query': np.concatenate(result['query']),
'docno': result['docno'],
'rank': np.concatenate(result['rank']),
'score': result['score'],
'iteration': result['iteration'],
})
def _update_frontier(self, scored_batch, frontier, frontier_data, scored_dids):
remaining_budget = self.num_results - len(scored_dids)
for score, did in sorted(zip(scored_batch.score, scored_batch.docno), reverse=True):
if len(frontier) < remaining_budget or score >= frontier_data['minscore']:
hit = False
for target_did in self.corpus_graph.neighbours(did):
if target_did not in scored_dids:
if target_did not in frontier or score > frontier[target_did]:
frontier[target_did] = score
hit = True
if hit and score < frontier_data['minscore']:
frontier_data['minscore'] = score
def _drop_docnos_from_counters(self, docnos, counters):
for docno in docnos:
for c in counters:
del c[docno]