Source code for pyterrier.terrier.rewrite

import pandas as pd
from warnings import warn
from typing import List, Union, Callable
from types import FunctionType
import pyterrier as pt
from pyterrier.terrier.index import TerrierTokeniser

[docs] @pt.java.required def tokenise(tokeniser : Union[str,TerrierTokeniser,FunctionType] = 'english', matchop=False) -> pt.Transformer: """ Applies tokenisation to the query. Until PyTerrier 1.0, queries obtained from ``pt.get_dataset().get_topics()`` were generally tokenised. Args: tokeniser(Union[str,TerrierTokeniser,FunctionType]): Defines what tokeniser should be used - either a Java tokeniser name in Terrier, a TerrierTokeniser instance, or a function that takes a str as input and returns a list of str. matchop(bool): Whether query terms should be wrapped in matchops, to ensure they can be parsed by a Terrier Retriever transformer. Example - use default tokeniser:: pipe = pt.rewrite.tokenise() >> pt.terrier.Retriever() pipe.search("Question with 'capitals' and other stuff?") Example - roll your own tokeniser:: poortokenisation = pt.rewrite.tokenise(lambda query: query.split(" ")) >> pt.terrier.Retriever() Example - for non-English languages, tokenise on standard UTF non-alphanumeric characters:: utftokenised = pt.rewrite.tokenise(pt.TerrierTokeniser.utf)) >> pt.terrier.Retriever() utftokenised = pt.rewrite.tokenise("utf")) >> pt.terrier.Retriever() Example - tokenising queries using a `HuggingFace tokenizer <https://huggingface.co/docs/transformers/fast_tokenizers>`_ :: # this assumes the index was created in a pretokenised manner br = pt.terrier.Retriever(indexref) tok = AutoTokenizer.from_pretrained("bert-base-uncased") query_toks = pt.rewrite.tokenise(tok.tokenize, matchop=True) retr_pipe = query_toks >> br """ _query_fn: Callable[[str], List[str]] if callable(tokeniser): _query_fn = tokeniser else: tokeniser = TerrierTokeniser._to_obj(tokeniser) tokeniser = TerrierTokeniser._to_class(tokeniser) if "." not in tokeniser: tokeniser = 'org.terrier.indexing.tokenisation.' + tokeniser tokenobj = pt.java.autoclass(tokeniser)() _query_fn = tokenobj.getTokens def _join_str(input : Union[str,List[str]]): if isinstance(input, str): return input return ' '.join(input) def _join_str_matchop(input : List[str]): assert not isinstance(input, str), "Expected a list of strings" return ' '.join(map(pt.terrier.Retriever.matchop, input)) if matchop: return pt.apply.query(lambda r: _join_str_matchop(_query_fn(r.query))) return pt.apply.query(lambda r: _join_str(_query_fn(r.query)))
class ResetQuery(pt.Transformer): def transform(self, inp): return pt.model.pop_queries(inp)
[docs] def reset() -> pt.Transformer: """ Undoes a previous query rewriting operation. This results in the query formulation stored in the `"query_0"` attribute being moved to the `"query"` attribute, and, if present, the `"query_1"` being moved to `"query_0"` and so on. This transformation is useful if you have rewritten the query for the purposes of one retrieval stage, but wish a subquent transformer to be applies on the original formulation. Internally, this function applies `pt.model.pop_queries()`. Example:: firststage = pt.rewrite.SDM() >> pt.terrier.Retriever(index, wmodel="DPH") secondstage = pyterrier_bert.cedr.CEDRPipeline() fullranker = firststage >> pt.rewrite.reset() >> secondstage """ return ResetQuery()
[docs] @pt.java.required class SDM(pt.Transformer): ''' Implements the sequential dependence model, which Terrier supports using its Indri/Galagoo compatible matchop query language. The rewritten query is derived using the Terrier class DependenceModelPreProcess. This transformer changes the query. It must be followed by a Terrier Retrieve() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. .. cite.dblp:: conf/sigir/MetzlerC05 ''' def __init__(self, verbose = 0, remove_stopwords = True, prox_model = None, tokeniser : Union[str,TerrierTokeniser] = TerrierTokeniser.english, **kwargs): super().__init__(**kwargs) self.verbose = 0 self.prox_model = prox_model self.remove_stopwords = remove_stopwords assert pt.terrier.check_version("5.3") self.ApplyTermPipeline_stopsonly = pt.terrier.J.ApplyTermPipeline('Stopwords') self.tokeniser = TerrierTokeniser.java_tokeniser(TerrierTokeniser._to_obj(tokeniser)) schematic = {'label': 'SDM'} def __repr__(self): return "SDM()" def transform(self, topics_and_res): from .retriever import _query_needs_tokenised results = [] queries = pt.model.ranked_documents_to_queries(topics_and_res) # instantiate the DependenceModelPreProcess, specifying a proximity model if specified sdm = pt.terrier.J.DependenceModelPreProcess() if self.prox_model is None else pt.terrier.J.DependenceModelPreProcess(self.prox_model) for row in pt.tqdm(queries.itertuples(), desc=self.name, total=queries.shape[0], unit="q") if self.verbose else queries.itertuples(): qid = row.qid query = row.query if _query_needs_tokenised(query): query = ' '.join(self.tokeniser.getTokens(query)) # parse the querying into a MQT rq = pt.terrier.J.Request() rq.setQueryID(qid) rq.setOriginalQuery(query) pt.terrier.J.TerrierQLParser().process(None, rq) pt.terrier.J.TerrierQLToMatchingQueryTerms().process(None, rq) if self.remove_stopwords: self.ApplyTermPipeline_stopsonly.process(None, rq) # rewrite the query sdm.expandQuery(rq.getMatchingQueryTerms(), rq) new_query = "" # put the query back into a matchopql form that Terrier can parse later for me in rq.getMatchingQueryTerms(): term = me.getKey().toString() w = me.getValue().getWeight() prefix = "" if w != 1.0 or me.getValue().termModels.size() > 0: prefix="#combine" if w != 1: prefix += ":0=" + str(w) if me.getValue().termModels.size() == 1: prefix += ":wmodel=" + me.getValue().termModels[0].getClass().getName() term = prefix + "(" + term + ")" new_query += term + " " new_query = new_query[:-1] results.append([qid, new_query]) new_queries = pd.DataFrame(results, columns=["qid", "query"]) # restore any other columns, e.g. put back docs if we are re-ranking return new_queries.merge(pt.model.push_queries(topics_and_res, inplace=True) , on="qid")
class SequentialDependence(SDM): ''' Implements the sequential dependence model, which Terrier supports using its Indri/Galagoo compatible matchop query language. The rewritten query is derived using the Terrier class DependenceModelPreProcess. This transformer changes the query. It must be followed by a Terrier Retrieve() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. ''' pass @pt.java.required class QueryExpansion(pt.Transformer): ''' A base class for applying different types of query expansion using Terrier's classes. This transformer changes the query. It must be followed by a Terrier Retrieve() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. Instance Attributes: - fb_terms(int): number of feedback terms. Defaults to 10 - fb_docs(int): number of feedback documents. Defaults to 3 ''' def __init__(self, index_like, fb_terms=10, fb_docs=3, qeclass="org.terrier.querying.QueryExpansion", verbose=0, properties={}, requires_scores=False, tokeniser : Union[str,TerrierTokeniser] = TerrierTokeniser.english, **kwargs): super().__init__(**kwargs) self.verbose = verbose if isinstance(qeclass, str): self.qe = pt.java.autoclass(qeclass)() else: self.qe = qeclass self.indexref = pt.terrier.retriever._parse_index_like(index_like) self.properties = properties for k,v in properties.items(): pt.terrier.J.ApplicationSetup.setProperty(k, str(v)) self.applytp = pt.terrier.J.ApplyTermPipeline() self.fb_terms = fb_terms self.fb_docs = fb_docs self.manager = pt.terrier.J.ManagerFactory._from_(self.indexref) self.requires_scores = requires_scores self.tokeniser = TerrierTokeniser.java_tokeniser(TerrierTokeniser._to_obj(tokeniser)) def compile(self) -> pt.Transformer: return pt.RankCutoff(self.fb_docs) >> self def __reduce__(self): return ( self.__class__, (self.indexref,), self.__getstate__() ) def __getstate__(self): if isinstance(self.qe, str): qe = self.qe else: qe = self.qe.getClass().getName() return { 'fb_terms' : self.fb_terms, 'fb_docs' : self.fb_docs, 'qeclass' : qe, 'properties' : self.properties } def __setstate__(self, d): self.fb_terms = d["fb_terms"] self.fb_docs = d["fb_docs"] self.qe = pt.java.autoclass(d['qeclass'])() self.properties.update(d["properties"]) for key,value in d["properties"].items(): self.appSetup.setProperty(key, str(value)) self.manager = pt.terrier.J.ManagerFactory._from_(self.indexref) def _populate_resultset(self, topics_and_res, qid, index): docids = None scores = None occurrences = None if "docid" in topics_and_res.columns: # we need .tolist() as jnius cannot convert numpy arrays topics_and_res_for_qid = topics_and_res[topics_and_res["qid"] == qid] docids = topics_and_res_for_qid["docid"].values.tolist() scores = [0.0] * len(docids) if self.requires_scores: scores = topics_and_res_for_qid["score"].values.tolist() occurrences = [0] * len(docids) elif "docno" in topics_and_res.columns: topics_and_res_for_qid = topics_and_res[topics_and_res["qid"] == qid] docnos = topics_and_res_for_qid["docno"].values docids = [] scores = [] _scores = [0.0] * len(docids) if self.requires_scores: _scores = topics_and_res_for_qid["score"].values.tolist() occurrences = [] metaindex = index.getMetaIndex() skipped = 0 for docno, docscore in zip(docnos, _scores): docid = metaindex.getDocument("docno", docno) if docid == -1: skipped +=1 assert docid != -1, "could not match docno" + docno + " to a docid for query " + qid docids.append(docid) occurrences.append(0) scores.append(docscore) if skipped > 0: if skipped == len(docnos): warn( "*ALL* %d feedback docnos for qid %s could not be found in the index" % (skipped, qid)) else: warn( "%d feedback docnos for qid %s could not be found in the index" % (skipped, qid)) else: raise ValueError("Input resultset has neither docid nor docno") return pt.terrier.J.QueryResultSet(docids, scores, occurrences) def __repr__(self): return "QueryExpansion(" + ",".join([ self.indexref.toString(), str(self.fb_docs), str(self.fb_terms), str(self.qe) ]) + ")" def _configure_request(self, rq): rq.setControl("qe_fb_docs", str(self.fb_docs)) rq.setControl("qe_fb_terms", str(self.fb_terms)) def transform(self, topics_and_res): with pt.validate.any(topics_and_res) as v: v.columns(includes=['qid', 'docno', 'query']) v.columns(includes=['qid', 'docid', 'query']) from .retriever import _query_needs_tokenised results = [] queries = pt.model.ranked_documents_to_queries(topics_and_res) #queries = topics_and_res[query_columns(topics_and_res, qid=True)].dropna(axis=0, subset=query_columns(topics_and_res, qid=False)).drop_duplicates() for row in pt.tqdm(queries.itertuples(), desc=self.name, total=queries.shape[0], unit="q") if self.verbose else queries.itertuples(): qid = row.qid query = row.query if _query_needs_tokenised(query): query = ' '.join(self.tokeniser.getTokens(query)) srq = self.manager.newSearchRequest(qid, query) rq = pt.java.cast("org.terrier.querying.Request", srq) self.qe.configureIndex(rq.getIndex()) self._configure_request(rq) # generate the result set from the input rq.setResultSet(self._populate_resultset(topics_and_res, qid, rq.getIndex())) pt.terrier.J.TerrierQLParser().process(None, rq) pt.terrier.J.TerrierQLToMatchingQueryTerms().process(None, rq) # how to make sure this happens/doesnt happen when appropriate. self.applytp.process(None, rq) # to ensure weights are identical to Terrier rq.getMatchingQueryTerms().normaliseTermWeights() self.qe.expandQuery(rq.getMatchingQueryTerms(), rq) # this control for Terrier stops it re-stemming the expanded terms new_query = "applypipeline:off " for me in rq.getMatchingQueryTerms(): new_query += me.getKey().toString() + ( "^%.9f " % me.getValue().getWeight() ) # remove trailing space new_query = new_query[:-1] results.append([qid, new_query]) new_queries = pd.DataFrame(results, columns=["qid", "query"]) return pt.model.push_queries(queries, inplace=True).merge(new_queries, on="qid") class DFRQueryExpansion(QueryExpansion): def __init__(self, *args, qemodel="Bo1", **kwargs): super().__init__(*args, **kwargs) self.qemodel = qemodel def _configure_request(self, rq): super()._configure_request(rq) rq.setControl("qemodel", self.qemodel)
[docs] class Bo1QueryExpansion(DFRQueryExpansion): ''' Applies the Bo1 query expansion model from the Divergence from Randomness Framework, as provided by Terrier. It must be followed by a terrier.Retriever() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. Instance Attributes: - fb_terms(int): number of feedback terms. Defaults to 10 - fb_docs(int): number of feedback documents. Defaults to 3 .. cite.dblp:: journals/tois/AmatiR02 ''' schematic = {'label': 'Bo1'} def __init__(self, *args, **kwargs): """ Args: index_like: the Terrier index to use. fb_terms(int): number of terms to add to the query. Terrier's default setting is 10 expansion terms. fb_docs(int): number of feedback documents to consider. Terrier's default setting is 3 feedback documents. """ kwargs["qemodel"] = "Bo1" super().__init__(*args, **kwargs)
[docs] class KLQueryExpansion(DFRQueryExpansion): ''' Applies the KL query expansion model from the Divergence from Randomness Framework, as provided by Terrier. This transformer must be followed by a terrier.Retriever() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. Instance Attributes: - fb_terms(int): number of feedback terms. Defaults to 10 - fb_docs(int): number of feedback documents. Defaults to 3 ''' schematic = {'label': 'KL'} def __init__(self, *args, **kwargs): """ Args: index_like: the Terrier index to use fb_terms(int): number of terms to add to the query. Terrier's default setting is 10 expansion terms. fb_docs(int): number of feedback documents to consider. Terrier's default setting is 3 feedback documents. """ kwargs["qemodel"] = "KL" super().__init__(*args, **kwargs)
[docs] @pt.java.required class RM3(QueryExpansion): ''' Performs query expansion using RM3 relevance models. This transformer must be followed by a terrier.Retriever() transformer. The original query is saved in the `"query_0"` column, which can be restored using `pt.rewrite.reset()`. Instance Attributes: - fb_terms(int): number of feedback terms. Defaults to 10 - fb_docs(int): number of feedback documents. Defaults to 3 - fb_lambda(float): lambda in RM3, i.e. importance of relevance model viz feedback model. Defaults to 0.6. Example:: bm25 = pt.terrier.Retriever(index, wmodel="BM25") rm3_pipe = bm25 >> pt.rewrite.RM3(index) >> bm25 pt.Experiment([bm25, rm3_pipe], dataset.get_topics(), dataset.get_qrels(), ["map"] ) .. cite.dblp:: conf/trec/JaleelACDLLSW04 ''' def __init__(self, *args, fb_terms=10, fb_docs=3, fb_lambda=0.6, **kwargs): """ Args: index_like: the Terrier index to use fb_terms(int): number of terms to add to the query. Terrier's default setting is 10 expansion terms. fb_docs(int): number of feedback documents to consider. Terrier's default setting is 3 feedback documents. fb_lambda(float): lambda in RM3, i.e. importance of relevance model viz feedback model. Defaults to 0.6. """ assert pt.terrier.check_version("5.10"), "Terrier 5.10 required" rm = pt.terrier.J.RM3() self.fb_lambda = fb_lambda kwargs["qeclass"] = rm super().__init__(*args, fb_terms=fb_terms, fb_docs=fb_docs, requires_scores=True, **kwargs) def __getstate__(self): rtr = super().__getstate__() rtr['fb_lambda'] = self.fb_lambda return rtr def __setstate__(self, d): super().__setstate__(d) self.fb_lambda = d["fb_lambda"] def _configure_request(self, rq): super()._configure_request(rq) rq.setControl("rm3.lambda", str(self.fb_lambda)) def transform(self, queries_and_docs): self.qe.fbTerms = self.fb_terms self.qe.fbDocs = self.fb_docs return super().transform(queries_and_docs)
[docs] def stash_results(clear=True) -> pt.Transformer: """ Stashes (saves) the current retrieved documents for each query into the column `"stashed_results_0"`. This means that they can be restored later by using `pt.rewrite.reset_results()`. thereby converting a retrieved documents dataframe into one of queries. Args: clear(bool): whether to drop the document and retrieved document related columns. Defaults to True. """ return _StashResults(clear)
[docs] def reset_results() -> pt.Transformer: """ Applies a transformer that undoes a `pt.rewrite.stash_results()` transformer, thereby restoring the ranked documents. """ return _ResetResults()
class _StashResults(pt.Transformer): def __init__(self, clear, *args, **kwargs): super().__init__(*args, **kwargs) self.clear = clear def transform(self, topics_and_res: pd.DataFrame) -> pd.DataFrame: if "stashed_results_0" in topics_and_res.columns: raise ValueError("Cannot apply pt.rewrite.stash_results() more than once") doc_cols = pt.model.document_columns(topics_and_res) rtr = [] if self.clear: query_cols = pt.model.query_columns(topics_and_res) for qid, groupDf in topics_and_res.groupby("qid"): documentsDF = groupDf[doc_cols] queryDf = groupDf[query_cols].iloc[0] queryDict = queryDf.to_dict() queryDict["stashed_results_0"] = documentsDF.to_dict(orient='records') rtr.append(queryDict) return pd.DataFrame(rtr) else: for qid, groupDf in topics_and_res.groupby("qid"): groupDf = groupDf.reset_index().copy() documentsDF = groupDf[doc_cols] docsDict = documentsDF.to_dict(orient='records') groupDf["stashed_results_0"] = None for i in range(len(groupDf)): groupDf.at[i, "stashed_results_0"] = docsDict rtr.append(groupDf) return pd.concat(rtr) def __repr__(self): return "pt.rewrite.stash_results()" class _ResetResults(pt.Transformer): def transform(self, topics_with_saved_docs : pd.DataFrame) -> pd.DataFrame: if "stashed_results_0" not in topics_with_saved_docs.columns: raise ValueError("Cannot apply pt.rewrite.reset_results() without pt.rewrite.stash_results() - column stashed_results_0 not found") rtr = [] for row in topics_with_saved_docs.itertuples(): docsdf = pd.DataFrame.from_records(row.stashed_results_0) docsdf["qid"] = row.qid querydf = pd.DataFrame(data=[row]) querydf.drop("stashed_results_0", axis=1, inplace=True) finaldf = querydf.merge(docsdf, on="qid") rtr.append(finaldf) return pd.concat(rtr) def __repr__(self): return "pt.rewrite.reset_results()"
[docs] def linear(weightCurrent : float, weightPrevious : float, format="terrierql", **kwargs) -> pt.Transformer: """ Applied to make a linear combination of the current and previous query formulation. The implementation is tied to the underlying query language used by the retrieval/re-ranker transformers. Two of Terrier's query language formats are supported by the `format` kwarg, namely `"terrierql"` and `"matchoptql"`. Their exact respective formats are `detailed in the Terrier documentation <https://github.com/terrier-org/terrier-core/blob/5.x/doc/querylanguage.md>`_. Args: weightCurrent(float): weight to apply to the current query formulation. weightPrevious(float): weight to apply to the previous query formulation. format(str): which query language to use to rewrite the queries, one of "terrierql" or "matchopql". Example:: pipeTQL = pt.apply.query(lambda row: "az") >> pt.rewrite.linear(0.75, 0.25, format="terrierql") pipeMQL = pt.apply.query(lambda row: "az") >> pt.rewrite.linear(0.75, 0.25, format="matchopql") pipeT.search("a") pipeM.search("a") Example outputs of `pipeTQL` and `pipeMQL` corresponding to the query "a" above: - Terrier QL output: `"(az)^0.750000 (a)^0.250000"` - MatchOp QL output: `"#combine:0:0.750000:1:0.250000(#combine(az) #combine(a))"` """ return _LinearRewriteMix([weightCurrent, weightPrevious], format, **kwargs)
class _LinearRewriteMix(pt.Transformer): def __init__(self, weights : List[float], format : str = 'terrierql', **kwargs): super().__init__(**kwargs) self.weights = weights self.format = format if format not in ["terrierql", "matchopql"]: raise ValueError("Format must be one of 'terrierql', 'matchopql'") def _terrierql(self, row): return "(%s)^%f (%s)^%f" % ( row["query_0"], self.weights[0], row["query_1"], self.weights[1]) def _matchopql(self, row): return "#combine:0:%f:1:%f(#combine(%s) #combine(%s))" % ( self.weights[0], self.weights[1], row["query_0"], row["query_1"]) def transform(self, topics_and_res): fn = None if self.format == "terrierql": fn = self._terrierql elif self.format == "matchopql": fn = self._matchopql newDF = pt.model.push_queries(topics_and_res) newDF["query"] = newDF.apply(fn, axis=1) return newDF def __repr__(self): return "pt.rewrite.linear()"