Source code for redis.commands.search.commands

import itertools
import time
from typing import Any, Dict, List, Optional, Union

from redis._parsers.helpers import pairs_to_dict
from redis.client import NEVER_DECODE, Pipeline
from redis.commands.search.hybrid_query import (
    CombineResultsMethod,
    HybridCursorQuery,
    HybridPostProcessingConfig,
    HybridQuery,
)
from redis.commands.search.hybrid_result import HybridCursorResult, HybridResult
from redis.utils import (
    check_protocol_version,
    decode_field_value,
    deprecated_function,
    experimental_method,
    str_if_bytes,
)

from ..helpers import get_legacy_responses, get_protocol_version
from .aggregation import (
    AggregateRequest,
    AggregateResult,
    Cursor,
)
from .document import Document
from .field import Field
from .index_definition import IndexDefinition
from .profile_information import ProfileInformation
from .query import Query
from .result import Result
from .suggestion import SuggestionParser

NUMERIC = "NUMERIC"

CREATE_CMD = "FT.CREATE"
ALTER_CMD = "FT.ALTER"
SEARCH_CMD = "FT.SEARCH"
ADD_CMD = "FT.ADD"
ADDHASH_CMD = "FT.ADDHASH"
DROPINDEX_CMD = "FT.DROPINDEX"
EXPLAIN_CMD = "FT.EXPLAIN"
EXPLAINCLI_CMD = "FT.EXPLAINCLI"
DEL_CMD = "FT.DEL"
AGGREGATE_CMD = "FT.AGGREGATE"
PROFILE_CMD = "FT.PROFILE"
CURSOR_CMD = "FT.CURSOR"
SPELLCHECK_CMD = "FT.SPELLCHECK"
DICT_ADD_CMD = "FT.DICTADD"
DICT_DEL_CMD = "FT.DICTDEL"
DICT_DUMP_CMD = "FT.DICTDUMP"
MGET_CMD = "FT.MGET"
CONFIG_CMD = "FT.CONFIG"
TAGVALS_CMD = "FT.TAGVALS"
ALIAS_ADD_CMD = "FT.ALIASADD"
ALIAS_UPDATE_CMD = "FT.ALIASUPDATE"
ALIAS_DEL_CMD = "FT.ALIASDEL"
INFO_CMD = "FT.INFO"
SUGADD_COMMAND = "FT.SUGADD"
SUGDEL_COMMAND = "FT.SUGDEL"
SUGLEN_COMMAND = "FT.SUGLEN"
SUGGET_COMMAND = "FT.SUGGET"
SYNUPDATE_CMD = "FT.SYNUPDATE"
SYNDUMP_CMD = "FT.SYNDUMP"
HYBRID_CMD = "FT.HYBRID"

NOOFFSETS = "NOOFFSETS"
NOFIELDS = "NOFIELDS"
NOHL = "NOHL"
NOFREQS = "NOFREQS"
MAXTEXTFIELDS = "MAXTEXTFIELDS"
TEMPORARY = "TEMPORARY"
STOPWORDS = "STOPWORDS"
SKIPINITIALSCAN = "SKIPINITIALSCAN"
WITHSCORES = "WITHSCORES"
FUZZY = "FUZZY"
WITHPAYLOADS = "WITHPAYLOADS"


[docs]class SearchCommands: """Search commands.""" # Commands whose parsers require a ``query`` kwarg. When invoked as a # pipeline response-callback the kwarg is carried inside the options # dict that ``execute_command`` stored earlier. If the key is absent # (e.g. a raw ``execute_command("FT.SEARCH", ...)`` call) return the # response unparsed so we don't crash. _QUERY_REQUIRED_CMDS = frozenset( {SEARCH_CMD, AGGREGATE_CMD, CURSOR_CMD, HYBRID_CMD, PROFILE_CMD} ) def _init_module_callbacks(self): """Build the per-protocol module callback maps. Called from ``Search.__init__``, ``Pipeline.__init__`` and ``AsyncPipeline.__init__`` so the mapping lives in a single place rather than being duplicated across all three classes. """ # ``protocol=2`` + ``legacy_responses=True``: original RESP2 wire # parsers preserving the v5 Python shapes exactly. self._RESP2_MODULE_CALLBACKS = { INFO_CMD: self._parse_info, SEARCH_CMD: self._parse_search, HYBRID_CMD: self._parse_hybrid_search, AGGREGATE_CMD: self._parse_aggregate, PROFILE_CMD: self._parse_profile, SPELLCHECK_CMD: self._parse_spellcheck, CONFIG_CMD: self._parse_config_get, SYNDUMP_CMD: self._parse_syndump, } # Explicit ``protocol=3`` + ``legacy_responses=True`` keeps the # pre-existing native RESP3 surface. The only registered callback # is for experimental HYBRID, which normalizes the native shape. # FT.PROFILE stays on the old direct ``_parse_results`` special # case and is not registered as a response callback. self._RESP3_MODULE_CALLBACKS = { HYBRID_CMD: self._parse_hybrid_search_resp3_native, } # ``protocol=None`` + ``legacy_responses=True`` (the v8 default): # the wire is RESP3 but the Python surface mirrors RESP2 legacy # objects (``Result``, ``AggregateResult``, ``(result, profile)`` # tuple, ...). self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS = { INFO_CMD: self._parse_info_resp3_to_legacy, SEARCH_CMD: self._parse_search_resp3, HYBRID_CMD: self._parse_hybrid_search_resp3, AGGREGATE_CMD: self._parse_aggregate_resp3, PROFILE_CMD: self._parse_profile_resp3, SPELLCHECK_CMD: self._parse_spellcheck_resp3, CONFIG_CMD: self._parse_config_get_resp3_to_legacy, SYNDUMP_CMD: self._parse_syndump_resp3, } # Search pipelines historically returned raw wire responses in # legacy mode. The default connection now uses RESP3 on the wire, # so these callbacks adapt only the default legacy pipeline case # back to the raw RESP2 pipeline shapes users saw prior v8.0. self._RESP3_TO_RESP2_LEGACY_PIPELINE_CALLBACKS = { SEARCH_CMD: self._pipeline_parse_search_resp3_to_legacy, HYBRID_CMD: self._pipeline_parse_hybrid_search_resp3_to_legacy, } # ``legacy_responses=False`` + RESP2 wire: enhanced RESP2 parsers # producing the unified shape (``attributes`` as list of dicts, # command-specific value normalisation where the approved shape # requires it). self._RESP2_UNIFIED_MODULE_CALLBACKS = { INFO_CMD: self._parse_info_unified, SEARCH_CMD: self._parse_search, HYBRID_CMD: self._parse_hybrid_search_unified, AGGREGATE_CMD: self._parse_aggregate, PROFILE_CMD: self._parse_profile_unified, SPELLCHECK_CMD: self._parse_spellcheck, CONFIG_CMD: self._parse_config_get_unified, SYNDUMP_CMD: self._parse_syndump_unified, } # ``legacy_responses=False`` + RESP3 wire: keeps the native RESP3 # shape for commands whose unified shape diverges from the # RESP3-to-RESP2-legacy adapter (``FT.INFO`` keeps the native # nested dict, ``FT.PROFILE`` keeps profile data as a dict). self._RESP3_UNIFIED_MODULE_CALLBACKS = dict( self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS ) self._RESP3_UNIFIED_MODULE_CALLBACKS[INFO_CMD] = self._parse_info_resp3 self._RESP3_UNIFIED_MODULE_CALLBACKS[CONFIG_CMD] = self._parse_config_get_resp3 self._RESP3_UNIFIED_MODULE_CALLBACKS[PROFILE_CMD] = ( self._parse_profile_resp3_unified ) self._RESP3_UNIFIED_MODULE_CALLBACKS[HYBRID_CMD] = ( self._parse_hybrid_search_resp3_unified ) def _parse_results(self, cmd, res, **kwargs): if cmd in self._QUERY_REQUIRED_CMDS and "query" not in kwargs: return res protocol = get_protocol_version(self.client) legacy = get_legacy_responses(self.client) if legacy: if protocol in (3, "3"): if cmd == PROFILE_CMD: return ProfileInformation(res) cb = self._RESP3_MODULE_CALLBACKS.get(cmd) elif check_protocol_version(protocol, 3): cb = self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS.get(cmd) else: cb = self._RESP2_MODULE_CALLBACKS.get(cmd) else: if check_protocol_version(protocol, 3): cb = self._RESP3_UNIFIED_MODULE_CALLBACKS.get(cmd) else: cb = self._RESP2_UNIFIED_MODULE_CALLBACKS.get(cmd) if cb is None: return res return cb(res, **kwargs) @staticmethod def _resp3_get(mapping, key, default=None): if not isinstance(mapping, dict): return default return mapping.get(key, mapping.get(key.encode(), default)) @staticmethod def _flatten_resp3_mapping(mapping): if not isinstance(mapping, dict): return mapping flat = [] for key, value in mapping.items(): flat.append(str_if_bytes(key)) flat.append(value) return flat def _pipeline_parse_search_resp3_to_legacy(self, res, **kwargs): """Convert RESP3 FT.SEARCH pipeline output to raw RESP2 pipeline shape.""" query = kwargs.get("query") if query is None or not isinstance(res, dict): return res output = [self._resp3_get(res, "total_results", 0)] for item in self._resp3_get(res, "results", []): output.append(self._resp3_get(item, "id")) if query._with_scores: output.append(self._resp3_get(item, "score")) if query._with_payloads: output.append(self._resp3_get(item, "payload")) if not query._no_content: output.append( self._flatten_resp3_mapping( self._resp3_get(item, "extra_attributes", {}) ) ) return output def _pipeline_parse_hybrid_search_resp3_to_legacy(self, res, **kwargs): """Convert RESP3 FT.HYBRID pipeline output to raw RESP2 pipeline shape.""" if not isinstance(res, dict): return res res = {str_if_bytes(key): value for key, value in res.items()} if "cursor" in kwargs: return ["SEARCH", res.get("SEARCH"), "VSIM", res.get("VSIM")] results = [ self._flatten_resp3_mapping(item) if isinstance(item, dict) else item for item in res.get("results", []) ] return [ "total_results", res.get("total_results", 0), "results", results, "warnings", res.get("warnings", []), "execution_time", res.get("execution_time", 0), ] # ---- RESP2 legacy parsers ---- def _parse_info(self, res, **kwargs): it = map(str_if_bytes, res) return dict(zip(it, it)) def _parse_search(self, res, **kwargs): return Result( res, not kwargs["query"]._no_content, duration=kwargs["duration"], has_payload=kwargs["query"]._with_payloads, with_scores=kwargs["query"]._with_scores, field_encodings=kwargs["query"]._return_fields_decode_as, ) def _parse_hybrid_search(self, res, **kwargs): res_dict = pairs_to_dict(res, decode_keys=True) if "cursor" in kwargs: return HybridCursorResult( search_cursor_id=int(res_dict["SEARCH"]), vsim_cursor_id=int(res_dict["VSIM"]), ) results: List[Dict[str, Any]] = [] # the original results are a list of lists # we convert them to a list of dicts for res_item in res_dict["results"]: item_dict = pairs_to_dict(res_item, decode_keys=True) results.append(item_dict) return HybridResult( total_results=int(res_dict["total_results"]), results=results, warnings=res_dict["warnings"], execution_time=float(res_dict["execution_time"]), ) def _parse_aggregate(self, res, **kwargs): return self._get_aggregate_result(res, kwargs["query"], kwargs["has_cursor"]) def _parse_profile(self, res, **kwargs): query = kwargs["query"] if isinstance(query, AggregateRequest): result = self._get_aggregate_result(res[0], query, query._cursor) else: result = Result( res[0], not query._no_content, duration=kwargs["duration"], has_payload=query._with_payloads, with_scores=query._with_scores, ) return result, ProfileInformation(res[1]) def _parse_spellcheck(self, res, **kwargs): corrections = {} if res == 0: return corrections for _correction in res: if isinstance(_correction, int) and _correction == 0: continue if len(_correction) != 3: continue if not _correction[2]: continue if not _correction[2][0]: continue # For spellcheck output # 1) 1) "TERM" # 2) "{term1}" # 3) 1) 1) "{score1}" # 2) "{suggestion1}" # 2) 1) "{score2}" # 2) "{suggestion2}" # # Following dictionary will be made # corrections = { # '{term1}': [ # {'score': '{score1}', 'suggestion': '{suggestion1}'}, # {'score': '{score2}', 'suggestion': '{suggestion2}'} # ] # } corrections[_correction[1]] = [ {"score": _item[0], "suggestion": _item[1]} for _item in _correction[2] ] return corrections def _parse_config_get(self, res, **kwargs): return {kvs[0]: kvs[1] for kvs in res} if res else {} def _parse_syndump(self, res, **kwargs): return {res[i]: res[i + 1] for i in range(0, len(res), 2)} # ---- RESP2 unified parsers (legacy_responses=False) ---- # Known FT.INFO attribute keys that are followed by a value # (key-value pairs in the RESP2 flat list). _INFO_ATTR_PAIR_KEYS = frozenset( {"identifier", "attribute", "type", "WEIGHT", "SEPARATOR", "PHONETIC"} ) @staticmethod def _normalize_info_attribute(attr_list): """Convert a RESP2 flat attribute list into a RESP3-style dict. RESP2 format: ``[identifier, name, attribute, alias, type, TEXT, WEIGHT, 1, SORTABLE, NOSTEM]``. RESP3 format: ``{"identifier": name, "attribute": alias, "type": "TEXT", "WEIGHT": "1", "flags": ["SORTABLE", "NOSTEM"]}``. """ result = {} flags = [] pair_keys = SearchCommands._INFO_ATTR_PAIR_KEYS i = 0 while i < len(attr_list): key = str_if_bytes(attr_list[i]) if key in pair_keys and i + 1 < len(attr_list): result[key] = str_if_bytes(attr_list[i + 1]) i += 2 else: flags.append(key) i += 1 result["flags"] = flags return result def _parse_info_unified(self, res, **kwargs): """Parse FT.INFO into the unified shape with ``attributes`` as a list of dicts so RESP2 output matches RESP3 output. """ it = map(str_if_bytes, res) info = dict(zip(it, it)) if "attributes" in info and isinstance(info["attributes"], list): info["attributes"] = [ self._normalize_info_attribute(attr) if isinstance(attr, list) else attr for attr in info["attributes"] ] return info def _parse_hybrid_search_unified(self, res, **kwargs): res_dict = pairs_to_dict(res, decode_keys=True) if "cursor" in kwargs: return HybridCursorResult( search_cursor_id=int(res_dict["SEARCH"]), vsim_cursor_id=int(res_dict["VSIM"]), ) field_encodings = self._hybrid_field_encodings(**kwargs) results: List[Dict[str, Any]] = [] for res_item in res_dict["results"]: item_dict = pairs_to_dict(res_item, decode_keys=True) results.append( { key: self._decode_hybrid_field_value(value, key, field_encodings) for key, value in item_dict.items() } ) return HybridResult( total_results=int(res_dict["total_results"]), results=results, warnings=res_dict["warnings"], execution_time=float(res_dict["execution_time"]), ) def _parse_profile_unified(self, res, **kwargs): """Parse FT.PROFILE into ``(result, ProfileInformation)`` with the profile_data normalised to a dict on >= 7.9.0 servers. """ query = kwargs["query"] if isinstance(query, AggregateRequest): result = self._get_aggregate_result(res[0], query, query._cursor) else: result = Result( res[0], not query._no_content, duration=kwargs["duration"], has_payload=query._with_payloads, with_scores=query._with_scores, ) profile_data = res[1] # >= 7.9.0 servers return a flat ``[key, value, ...]`` list at the # top level; convert to dict to match the RESP3 profile shape. # < 7.9.0 servers return a list-of-pairs whose first element is # itself a list — leave as-is. if ( isinstance(profile_data, list) and profile_data and isinstance(profile_data[0], (str, bytes)) ): profile_data = pairs_to_dict(profile_data, decode_keys=True) return result, ProfileInformation(profile_data) def _parse_config_get_unified(self, res, **kwargs): if not res: return {} return {str_if_bytes(kvs[0]): str_if_bytes(kvs[1]) for kvs in res} def _parse_syndump_unified(self, res, **kwargs): if not res: return {} return { str_if_bytes(res[i]): [str_if_bytes(s) for s in res[i + 1]] if isinstance(res[i + 1], list) else str_if_bytes(res[i + 1]) for i in range(0, len(res), 2) } # ---- RESP3 shared result parsers ---- def _parse_search_resp3(self, res, **kwargs): """Parse RESP3 FT.SEARCH response into a Result object.""" query = kwargs.get("query") return Result.from_resp3( res, duration=kwargs.get("duration", 0), with_scores=getattr(query, "_with_scores", False), field_encodings=getattr(query, "_return_fields_decode_as", None), ) def _parse_aggregate_resp3(self, res, **kwargs): """Parse RESP3 FT.AGGREGATE response into an AggregateResult object.""" query = kwargs.get("query") has_cursor = kwargs.get("has_cursor", False) # When has_cursor is True, RESP3 returns [data_dict, cursor_id]. cursor_id = 0 if has_cursor and isinstance(res, list): data = res[0] cursor_id = res[1] if len(res) > 1 else 0 else: data = res warnings = [str_if_bytes(w) for w in data.get("warning", [])] total = data.get("total_results", 0) rows = [] for result_item in data.get("results", []): extra_attrs = result_item.get("extra_attributes", {}) # Convert dict to flat list [key, value, key, value, ...] # to match RESP2 row format consumers expect. flat = [] for k, v in extra_attrs.items(): flat.append(k) flat.append(v) rows.append(flat) cursor = None if has_cursor: if isinstance(query, Cursor): query.cid = cursor_id cursor = query else: cursor = Cursor(cursor_id) return AggregateResult(rows, cursor, None, total=total, warnings=warnings) # ---- RESP3 HYBRID parsers ---- def _parse_hybrid_search_resp3(self, res, **kwargs): """Parse RESP3 FT.HYBRID response into HybridResult/HybridCursorResult. Top-level keys are normalised to strings. Values are preserved as delivered by the wire (bytes when ``NEVER_DECODE`` is set, strings otherwise) so byte/str semantics match the RESP2 legacy parser. """ res = {str_if_bytes(k): v for k, v in res.items()} if "cursor" in kwargs: return HybridCursorResult( search_cursor_id=int(res["SEARCH"]), vsim_cursor_id=int(res["VSIM"]), ) results: List[Dict[str, Any]] = [] for res_item in res.get("results", []): if isinstance(res_item, dict): results.append({str_if_bytes(k): v for k, v in res_item.items()}) else: results.append(pairs_to_dict(res_item, decode_keys=True)) return HybridResult( total_results=int(res.get("total_results", 0)), results=results, warnings=res.get("warnings", []), execution_time=float(res.get("execution_time", 0)), ) def _parse_hybrid_search_resp3_unified(self, res, **kwargs): """Parse RESP3 FT.HYBRID into the approved unified HybridResult.""" res = {str_if_bytes(k): v for k, v in res.items()} if "cursor" in kwargs: return HybridCursorResult( search_cursor_id=int(res["SEARCH"]), vsim_cursor_id=int(res["VSIM"]), ) field_encodings = self._hybrid_field_encodings(**kwargs) results: List[Dict[str, Any]] = [] for res_item in res.get("results", []): if isinstance(res_item, dict): results.append( { str_if_bytes(key): self._decode_hybrid_field_value( value, str_if_bytes(key), field_encodings ) for key, value in res_item.items() } ) else: item_dict = pairs_to_dict(res_item, decode_keys=True) results.append( { key: self._decode_hybrid_field_value( value, key, field_encodings ) for key, value in item_dict.items() } ) return HybridResult( total_results=int(res.get("total_results", 0)), results=results, warnings=res.get("warnings", []), execution_time=float(res.get("execution_time", 0)), ) @staticmethod def _hybrid_field_encodings(**kwargs): encodings = {} for source_name in ("query", "post_processing"): source = kwargs.get(source_name) source_encodings = getattr(source, "_return_fields_decode_as", None) if source_encodings: encodings.update(source_encodings) return encodings or None @staticmethod def _decode_hybrid_field_value(value, key, field_encodings): if not field_encodings or key not in field_encodings: return value return decode_field_value(value, key, field_encodings) def _parse_hybrid_search_resp3_native(self, res, **kwargs): """Normalise RESP3 FT.HYBRID map keys while preserving native shape. ``protocol=3`` + ``legacy_responses=True`` keeps the RESP3 dict surface, but HYBRID uses ``NEVER_DECODE`` so result values mirror legacy RESP2 bytes. Decode only structural keys so callers can use the same native RESP3 key names as before. """ res = {str_if_bytes(k): v for k, v in res.items()} if "cursor" in kwargs: return res if "results" in res: res["results"] = [ {str_if_bytes(k): v for k, v in item.items()} if isinstance(item, dict) else pairs_to_dict(item, decode_keys=True) for item in res["results"] ] if "warnings" in res: res["warnings"] = [str_if_bytes(w) for w in res["warnings"]] return res # ---- RESP3 spellcheck parser ---- def _parse_spellcheck_resp3(self, res, **kwargs): """Parse RESP3 FT.SPELLCHECK response into unified format. RESP3 format: {"results": {"term": [{"suggestion": score}, ...], ...}} Unified format (matches RESP2 parsed output): {"term": [{"score": score_str, "suggestion": suggestion}, ...], ...} """ if not isinstance(res, dict): return self._parse_spellcheck(res, **kwargs) corrections = {} results = res.get("results", {}) for term, suggestions in results.items(): if not suggestions: continue term_corrections = [] for suggestion_dict in suggestions: for suggestion, score in suggestion_dict.items(): # Normalize score to match RESP2's string form: RESP3 # returns a float (e.g. ``0.0``) but RESP2 returns the # string ``"0"``. score_str = str(score) if score_str.endswith(".0"): score_str = score_str[:-2] term_corrections.append( {"score": score_str, "suggestion": str(suggestion)} ) if term_corrections: corrections[term] = term_corrections return corrections # ---- RESP3 profile parsers ---- def _extract_resp3_profile_parts(self, res, **kwargs): """Extract ``(result, profile_data_dict)`` from a RESP3 FT.PROFILE response. ``profile_data_dict`` has its keys/values normalised to strings but is otherwise left as the native RESP3 dict. """ query = kwargs["query"] # RESP3 returns a dict with "Results" and "Profile" keys. Handle # both decoded (str) and raw (bytes) keys. Use ``is not None`` to # avoid dropping falsy values such as empty dicts/lists. results_data = res.get("Results") if results_data is None: results_data = res.get(b"Results") if results_data is None: results_data = res.get("results") if results_data is None: results_data = res.get(b"results") if results_data is None: results_data = res.get(0) profile_data = res.get("Profile") if profile_data is None: profile_data = res.get(b"Profile") if profile_data is None: profile_data = res.get("profile") if profile_data is None: profile_data = res.get(b"profile") if profile_data is None: profile_data = res.get(1) # On older servers (pre MOD-6816, e.g. Redis 7.2/7.4) the "Results" # value is a bare list of result-item dicts, not the wrapper dict # ``{"total_results": N, "results": [...], "warning": [...]}``. # Wrap the list so downstream parsers receive the expected format. if isinstance(results_data, list): results_data = { "total_results": len(results_data), "results": results_data, } if isinstance(query, AggregateRequest): result = self._parse_aggregate_resp3( results_data, query=query, has_cursor=bool(query._cursor) ) else: result = Result.from_resp3( results_data, duration=kwargs.get("duration", 0), with_scores=getattr(query, "_with_scores", False), ) profile_data = self._to_string_recursive(profile_data) return result, profile_data def _parse_profile_resp3(self, res, **kwargs): """Parse RESP3 FT.PROFILE response into ``(result, ProfileInformation)``. RESP3 format (aligned, RediSearch >= MOD-6816): {"Results": {search/aggregate result dict}, "Profile": {profile information dict}} Older RediSearch versions may return a list (same as RESP2) even when the connection uses RESP3. In that case we delegate to the RESP2 ``_parse_profile`` parser. """ if isinstance(res, list): return self._parse_profile(res, **kwargs) result, profile_data = self._extract_resp3_profile_parts(res, **kwargs) # Convert the RESP3 profile dict to the RESP2 list shape so # consumers see the same structure as the RESP2 wire path. # Post-7.9.0 servers return a top-level ``{"Shards": ..., # "Coordinator": ...}`` dict which RESP2 wires as a flat # alternating ``[key, value, key, value]`` list. Pre-7.9.0 # servers return a nested list-of-pairs. if isinstance(profile_data, dict): flat_top = "Shards" in profile_data or "Coordinator" in profile_data profile_data = self._resp3_profile_dict_to_list( profile_data, flat_top=flat_top ) return result, ProfileInformation(profile_data) def _parse_profile_resp3_unified(self, res, **kwargs): """Parse RESP3 FT.PROFILE for the unified shape. Redis < 7.9.0 returns RESP2 profile data as nested list-of-pairs, while RESP3 returns the same data as a dict. Convert that pre-7.9 RESP3 dict back to the RESP2 list shape so the unified surface is protocol-independent. Redis >= 7.9.0 coordinator profiles contain ``Shards``/``Coordinator`` keys and stay as dicts, matching the RESP2 unified parser. """ if isinstance(res, list): return self._parse_profile_unified(res, **kwargs) result, profile_data = self._extract_resp3_profile_parts(res, **kwargs) if isinstance(profile_data, dict) and not ( "Shards" in profile_data or "Coordinator" in profile_data ): profile_data = self._resp3_profile_dict_to_list(profile_data) return result, ProfileInformation(profile_data) @staticmethod def _resp3_profile_dict_to_list(data, flat_top=False): """Convert a RESP3 profile dict into the RESP2 wire list shape. Pre-7.9.0 servers serialise the RESP2 profile as a list of ``[key, value]`` pairs at the top level, with nested structures as flat alternating key-value lists. Post-7.9.0 servers serialise the top level itself as a flat ``[key, value, key, value]`` list (with ``"Shards"``/``"Coordinator"`` keys). ``flat_top`` selects between these two top-level shapes; nested dicts always use the flat alternating form. Key structural difference: when a dict value is a list of dicts (e.g. ``"Child iterators": [{...}, {...}]``), RESP2 expands each dict as a separate sibling element in the parent flat list rather than keeping them nested inside a single value. """ def _is_list_of_dicts(obj): return isinstance(obj, list) and obj and isinstance(obj[0], dict) def _convert(obj, top_level=False): if isinstance(obj, dict): if top_level and not flat_top: result = [] for k, v in obj.items(): entry = [k] if _is_list_of_dicts(v): for item in v: entry.append(_convert(item)) else: entry.append(_convert(v)) result.append(entry) return result else: result = [] for k, v in obj.items(): result.append(k) if _is_list_of_dicts(v): for item in v: result.append(_convert(item)) else: result.append(_convert(v)) return result elif isinstance(obj, list): return [_convert(item) for item in obj] return obj return _convert(data, top_level=True) # ---- RESP3 structural parsers ---- @staticmethod def _to_string_recursive(obj): """Recursively convert bytes keys/values to strings in nested structures. Non-bytes scalars (int, float, None, bool) pass through unchanged. """ if isinstance(obj, bytes): return str_if_bytes(obj) if isinstance(obj, dict): return { str_if_bytes(k) if isinstance(k, bytes) else k: ( SearchCommands._to_string_recursive(v) ) for k, v in obj.items() } if isinstance(obj, list): return [SearchCommands._to_string_recursive(item) for item in obj] return obj def _parse_info_resp3(self, res, **kwargs): """Parse RESP3 FT.INFO response, normalising bytes to strings.""" return self._to_string_recursive(res) @staticmethod def _flatten_info_attribute(attr_dict): """Convert a RESP3-style attribute dict back into the RESP2 flat list (key-value pairs followed by bare flag tokens). """ flat = [] flags = attr_dict.get("flags") or [] for k, v in attr_dict.items(): if k == "flags": continue flat.append(k) flat.append(v) flat.extend(flags) return flat def _parse_info_resp3_to_legacy(self, res, **kwargs): """Parse RESP3 FT.INFO into the legacy RESP2 flat shape so ``legacy_responses=True`` on a RESP3 wire matches RESP2 output. """ info = self._to_string_recursive(res) attrs = info.get("attributes") if isinstance(attrs, list): info["attributes"] = [ self._flatten_info_attribute(attr) if isinstance(attr, dict) else attr for attr in attrs ] return info def _parse_config_get_resp3(self, res, **kwargs): """Parse RESP3 FT.CONFIG GET response, normalising bytes to strings.""" if not res: return {} return {str_if_bytes(k): str_if_bytes(v) for k, v in res.items()} def _parse_config_get_resp3_to_legacy(self, res, **kwargs): """Parse RESP3 FT.CONFIG GET back to the RESP2 legacy dict shape.""" return dict(res) if res else {} def _parse_syndump_resp3(self, res, **kwargs): """Parse RESP3 FT.SYNDUMP response, normalising bytes to strings.""" if not res: return {} return self._to_string_recursive(res)
[docs] def batch_indexer(self, chunk_size=100): """ Create a new batch indexer from the client with a given chunk size """ return self.BatchIndexer(self, chunk_size=chunk_size)
[docs] def create_index( self, fields: List[Field], no_term_offsets: bool = False, no_field_flags: bool = False, stopwords: Optional[List[str]] = None, definition: Optional[IndexDefinition] = None, max_text_fields=False, temporary=None, no_highlight: bool = False, no_term_frequencies: bool = False, skip_initial_scan: bool = False, ): """ Creates the search index. The index must not already exist. For more information, see https://redis.io/commands/ft.create/ Args: fields: A list of Field objects. no_term_offsets: If `true`, term offsets will not be saved in the index. no_field_flags: If true, field flags that allow searching in specific fields will not be saved. stopwords: If provided, the index will be created with this custom stopword list. The list can be empty. definition: If provided, the index will be created with this custom index definition. max_text_fields: If true, indexes will be encoded as if there were more than 32 text fields, allowing for additional fields beyond 32. temporary: Creates a lightweight temporary index which will expire after the specified period of inactivity. The internal idle timer is reset whenever the index is searched or added to. no_highlight: If true, disables highlighting support. Also implied by `no_term_offsets`. no_term_frequencies: If true, term frequencies will not be saved in the index. skip_initial_scan: If true, the initial scan and indexing will be skipped. """ args = [CREATE_CMD, self.index_name] if definition is not None: args += definition.args if max_text_fields: args.append(MAXTEXTFIELDS) if temporary is not None and isinstance(temporary, int): args.append(TEMPORARY) args.append(temporary) if no_term_offsets: args.append(NOOFFSETS) if no_highlight: args.append(NOHL) if no_field_flags: args.append(NOFIELDS) if no_term_frequencies: args.append(NOFREQS) if skip_initial_scan: args.append(SKIPINITIALSCAN) if stopwords is not None and isinstance(stopwords, (list, tuple, set)): args += [STOPWORDS, len(stopwords)] if len(stopwords) > 0: args += list(stopwords) args.append("SCHEMA") try: args += list(itertools.chain(*(f.redis_args() for f in fields))) except TypeError: args += fields.redis_args() return self.execute_command(*args)
[docs] def alter_schema_add(self, fields: Union[Field, List[Field]]): """ Alter the existing search index by adding new fields. The index must already exist. ### Parameters: - **fields**: a list of Field objects to add for the index For more information see `FT.ALTER <https://redis.io/commands/ft.alter>`_. """ # noqa args = [ALTER_CMD, self.index_name, "SCHEMA", "ADD"] try: args += list(itertools.chain(*(f.redis_args() for f in fields))) except TypeError: args += fields.redis_args() return self.execute_command(*args)
[docs] def dropindex(self, delete_documents: bool = False): """ Drop the index if it exists. Replaced `drop_index` in RediSearch 2.0. Default behavior was changed to not delete the indexed documents. ### Parameters: - **delete_documents**: If `True`, all documents will be deleted. For more information see `FT.DROPINDEX <https://redis.io/commands/ft.dropindex>`_. """ # noqa args = [DROPINDEX_CMD, self.index_name] delete_str = ( "DD" if isinstance(delete_documents, bool) and delete_documents is True else "" ) if delete_str: args.append(delete_str) return self.execute_command(*args)
def _add_document( self, doc_id, conn=None, nosave=False, score=1.0, payload=None, replace=False, partial=False, language=None, no_create=False, **fields, ): """ Internal add_document used for both batch and single doc indexing """ if partial or no_create: replace = True args = [ADD_CMD, self.index_name, doc_id, score] if nosave: args.append("NOSAVE") if payload is not None: args.append("PAYLOAD") args.append(payload) if replace: args.append("REPLACE") if partial: args.append("PARTIAL") if no_create: args.append("NOCREATE") if language: args += ["LANGUAGE", language] args.append("FIELDS") args += list(itertools.chain(*fields.items())) if conn is not None: return conn.execute_command(*args) return self.execute_command(*args) def _add_document_hash( self, doc_id, conn=None, score=1.0, language=None, replace=False ): """ Internal add_document_hash used for both batch and single doc indexing """ args = [ADDHASH_CMD, self.index_name, doc_id, score] if replace: args.append("REPLACE") if language: args += ["LANGUAGE", language] if conn is not None: return conn.execute_command(*args) return self.execute_command(*args)
[docs] @deprecated_function( version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead" ) def add_document( self, doc_id: str, nosave: bool = False, score: float = 1.0, payload: Optional[bool] = None, replace: bool = False, partial: bool = False, language: Optional[str] = None, no_create: bool = False, **fields: List[str], ): """ Add a single document to the index. Args: doc_id: the id of the saved document. nosave: if set to true, we just index the document, and don't save a copy of it. This means that searches will just return ids. score: the document ranking, between 0.0 and 1.0 payload: optional inner-index payload we can save for fast access in scoring functions replace: if True, and the document already is in the index, we perform an update and reindex the document partial: if True, the fields specified will be added to the existing document. This has the added benefit that any fields specified with `no_index` will not be reindexed again. Implies `replace` language: Specify the language used for document tokenization. no_create: if True, the document is only updated and reindexed if it already exists. If the document does not exist, an error will be returned. Implies `replace` fields: kwargs dictionary of the document fields to be saved and/or indexed. NOTE: Geo points shoule be encoded as strings of "lon,lat" """ # noqa return self._add_document( doc_id, conn=None, nosave=nosave, score=score, payload=payload, replace=replace, partial=partial, language=language, no_create=no_create, **fields, )
[docs] @deprecated_function( version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead" ) def add_document_hash(self, doc_id, score=1.0, language=None, replace=False): """ Add a hash document to the index. ### Parameters - **doc_id**: the document's id. This has to be an existing HASH key in Redis that will hold the fields the index needs. - **score**: the document ranking, between 0.0 and 1.0 - **replace**: if True, and the document already is in the index, we perform an update and reindex the document - **language**: Specify the language used for document tokenization. """ # noqa return self._add_document_hash( doc_id, conn=None, score=score, language=language, replace=replace )
[docs] @deprecated_function(version="2.0.0", reason="deprecated since redisearch 2.0") def delete_document(self, doc_id, conn=None, delete_actual_document=False): """ Delete a document from index Returns 1 if the document was deleted, 0 if not ### Parameters - **delete_actual_document**: if set to True, RediSearch also delete the actual document if it is in the index """ # noqa args = [DEL_CMD, self.index_name, doc_id] if delete_actual_document: args.append("DD") if conn is not None: return conn.execute_command(*args) return self.execute_command(*args)
[docs] def load_document(self, id, field_encodings: Optional[Dict[str, Any]] = None): """ Load a single document by id - **field_encodings**: optional dict mapping field names to encodings. If a field's encoding is ``None`` the raw bytes value is preserved (useful for binary data such as vectors). """ fields = self.client.hgetall(id) fields = { str_if_bytes(k): decode_field_value(v, str_if_bytes(k), field_encodings) for k, v in fields.items() } try: del fields["id"] except KeyError: pass return Document(id=id, **fields)
[docs] @deprecated_function(version="2.0.0", reason="deprecated since redisearch 2.0") def get(self, *ids): """ Returns the full contents of multiple documents. ### Parameters - **ids**: the ids of the saved documents. """ return self.execute_command(MGET_CMD, self.index_name, *ids)
[docs] def info(self): """ Get info an stats about the the current index, including the number of documents, memory consumption, etc For more information see `FT.INFO <https://redis.io/commands/ft.info>`_. """ res = self.execute_command(INFO_CMD, self.index_name) return self._parse_results(INFO_CMD, res)
def get_params_args( self, query_params: Optional[Dict[str, Union[str, int, float, bytes]]] ): if query_params is None: return [] args = [] if len(query_params) > 0: args.append("PARAMS") args.append(len(query_params) * 2) for key, value in query_params.items(): args.append(key) args.append(value) return args def _mk_query_args( self, query, query_params: Optional[Dict[str, Union[str, int, float, bytes]]] ): args = [self.index_name] if isinstance(query, str): # convert the query from a text to a query object query = Query(query) if not isinstance(query, Query): raise ValueError(f"Bad query type {type(query)}") args += query.get_args() args += self.get_params_args(query_params) return args, query
[docs] def search( self, query: Union[str, Query], query_params: Union[Dict[str, Union[str, int, float, bytes]], None] = None, ): """ Search the index for a given query, and return a result of documents ### Parameters - **query**: the search query. Either a text for simple queries with default parameters, or a Query object for complex queries. See RediSearch's documentation on query format For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_. """ # noqa args, query = self._mk_query_args(query, query_params=query_params) st = time.monotonic() options = {} if not check_protocol_version(get_protocol_version(self.client), 3): options[NEVER_DECODE] = True if isinstance(self, Pipeline): options["query"] = query options["duration"] = 0 res = self.execute_command(SEARCH_CMD, *args, **options) if isinstance(res, Pipeline): return res return self._parse_results( SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0 )
[docs] def explain( self, query: Union[str, Query], query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None, ): """Returns the execution plan for a complex query. For more information see `FT.EXPLAIN <https://redis.io/commands/ft.explain>`_. """ # noqa args, query_text = self._mk_query_args(query, query_params=query_params) return self.execute_command(EXPLAIN_CMD, *args)
def explain_cli(self, query: Union[str, Query]): # noqa raise NotImplementedError("EXPLAINCLI will not be implemented.")
[docs] def aggregate( self, query: Union[AggregateRequest, Cursor], query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None, ): """ Issue an aggregation query. ### Parameters **query**: This can be either an `AggregateRequest`, or a `Cursor` An `AggregateResult` object is returned. You can access the rows from its `rows` property, which will always yield the rows of the result. For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_. """ # noqa if isinstance(query, AggregateRequest): has_cursor = bool(query._cursor) cmd = [AGGREGATE_CMD, self.index_name] + query.build_args() elif isinstance(query, Cursor): has_cursor = True cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args() else: raise ValueError("Bad query", query) cmd += self.get_params_args(query_params) raw = self.execute_command(*cmd) return self._parse_results( AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor )
def _get_aggregate_result( self, raw: List, query: Union[AggregateRequest, Cursor], has_cursor: bool ): if has_cursor: if isinstance(query, Cursor): query.cid = raw[1] cursor = query else: cursor = Cursor(raw[1]) raw = raw[0] else: cursor = None if isinstance(query, AggregateRequest) and query._with_schema: schema = raw[0] rows = raw[2:] else: schema = None rows = raw[1:] return AggregateResult(rows, cursor, schema)
[docs] def profile( self, query: Union[Query, AggregateRequest], limited: bool = False, query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None, ) -> Union[ tuple[Union[Result, AggregateResult], ProfileInformation], ProfileInformation, ]: """ Performs a search or aggregate command and collects performance information. ### Parameters **query**: This can be either an `AggregateRequest` or `Query`. **limited**: If set to True, removes details of reader iterator. **query_params**: Define one or more value parameters. Each parameter has a name and a value. """ st = time.monotonic() cmd = [PROFILE_CMD, self.index_name, ""] if limited: cmd.append("LIMITED") cmd.append("QUERY") if isinstance(query, AggregateRequest): cmd[2] = "AGGREGATE" cmd += query.build_args() elif isinstance(query, Query): cmd[2] = "SEARCH" cmd += query.get_args() cmd += self.get_params_args(query_params) else: raise ValueError("Must provide AggregateRequest object or Query object.") res = self.execute_command(*cmd) return self._parse_results( PROFILE_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0 )
[docs] def spellcheck(self, query, distance=None, include=None, exclude=None): """ Issue a spellcheck query Args: query: search query. distance: the maximal Levenshtein distance for spelling suggestions (default: 1, max: 4). include: specifies an inclusion custom dictionary. exclude: specifies an exclusion custom dictionary. For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_. """ # noqa cmd = [SPELLCHECK_CMD, self.index_name, query] if distance: cmd.extend(["DISTANCE", distance]) if include: cmd.extend(["TERMS", "INCLUDE", include]) if exclude: cmd.extend(["TERMS", "EXCLUDE", exclude]) res = self.execute_command(*cmd) return self._parse_results(SPELLCHECK_CMD, res)
[docs] def dict_add(self, name: str, *terms: List[str]): """Adds terms to a dictionary. ### Parameters - **name**: Dictionary name. - **terms**: List of items for adding to the dictionary. For more information see `FT.DICTADD <https://redis.io/commands/ft.dictadd>`_. """ # noqa cmd = [DICT_ADD_CMD, name] cmd.extend(terms) return self.execute_command(*cmd)
[docs] def dict_del(self, name: str, *terms: List[str]): """Deletes terms from a dictionary. ### Parameters - **name**: Dictionary name. - **terms**: List of items for removing from the dictionary. For more information see `FT.DICTDEL <https://redis.io/commands/ft.dictdel>`_. """ # noqa cmd = [DICT_DEL_CMD, name] cmd.extend(terms) return self.execute_command(*cmd)
[docs] def dict_dump(self, name: str): """Dumps all terms in the given dictionary. ### Parameters - **name**: Dictionary name. For more information see `FT.DICTDUMP <https://redis.io/commands/ft.dictdump>`_. """ # noqa cmd = [DICT_DUMP_CMD, name] return self.execute_command(*cmd)
[docs] @deprecated_function( version="8.0.0", reason="deprecated since Redis 8.0, call config_set from core module instead", ) def config_set(self, option: str, value: str) -> bool: """Set runtime configuration option. ### Parameters - **option**: the name of the configuration option. - **value**: a value for the configuration option. For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_. """ # noqa cmd = [CONFIG_CMD, "SET", option, value] raw = self.execute_command(*cmd) return raw == "OK"
[docs] @deprecated_function( version="8.0.0", reason="deprecated since Redis 8.0, call config_get from core module instead", ) def config_get(self, option: str) -> str: """Get runtime configuration option value. ### Parameters - **option**: the name of the configuration option. For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_. """ # noqa cmd = [CONFIG_CMD, "GET", option] res = self.execute_command(*cmd) return self._parse_results(CONFIG_CMD, res)
[docs] def tagvals(self, tagfield: str): """ Return a list of all possible tag values ### Parameters - **tagfield**: Tag field name For more information see `FT.TAGVALS <https://redis.io/commands/ft.tagvals>`_. """ # noqa return self.execute_command(TAGVALS_CMD, self.index_name, tagfield)
[docs] def aliasadd(self, alias: str): """ Alias a search index - will fail if alias already exists ### Parameters - **alias**: Name of the alias to create For more information see `FT.ALIASADD <https://redis.io/commands/ft.aliasadd>`_. """ # noqa return self.execute_command(ALIAS_ADD_CMD, alias, self.index_name)
[docs] def aliasupdate(self, alias: str): """ Updates an alias - will fail if alias does not already exist ### Parameters - **alias**: Name of the alias to create For more information see `FT.ALIASUPDATE <https://redis.io/commands/ft.aliasupdate>`_. """ # noqa return self.execute_command(ALIAS_UPDATE_CMD, alias, self.index_name)
[docs] def aliasdel(self, alias: str): """ Removes an alias to a search index ### Parameters - **alias**: Name of the alias to delete For more information see `FT.ALIASDEL <https://redis.io/commands/ft.aliasdel>`_. """ # noqa return self.execute_command(ALIAS_DEL_CMD, alias)
[docs] def sugadd(self, key, *suggestions, **kwargs): """ Add suggestion terms to the AutoCompleter engine. Each suggestion has a score and string. If kwargs["increment"] is true and the terms are already in the server's dictionary, we increment their scores. For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd/>`_. """ # noqa # If Transaction is not False it will MULTI/EXEC which will error pipe = self.pipeline(transaction=False) for sug in suggestions: args = [SUGADD_COMMAND, key, sug.string, sug.score] if kwargs.get("increment"): args.append("INCR") if sug.payload: args.append("PAYLOAD") args.append(sug.payload) pipe.execute_command(*args) return pipe.execute()[-1]
[docs] def suglen(self, key: str) -> int: """ Return the number of entries in the AutoCompleter index. For more information see `FT.SUGLEN <https://redis.io/commands/ft.suglen>`_. """ # noqa return self.execute_command(SUGLEN_COMMAND, key)
[docs] def sugdel(self, key: str, string: str) -> int: """ Delete a string from the AutoCompleter index. Returns 1 if the string was found and deleted, 0 otherwise. For more information see `FT.SUGDEL <https://redis.io/commands/ft.sugdel>`_. """ # noqa return self.execute_command(SUGDEL_COMMAND, key, string)
[docs] def sugget( self, key: str, prefix: str, fuzzy: bool = False, num: int = 10, with_scores: bool = False, with_payloads: bool = False, ) -> List[SuggestionParser]: """ Get a list of suggestions from the AutoCompleter, for a given prefix. Parameters: prefix : str The prefix we are searching. **Must be valid ascii or utf-8** fuzzy : bool If set to true, the prefix search is done in fuzzy mode. **NOTE**: Running fuzzy searches on short (<3 letters) prefixes can be very slow, and even scan the entire index. with_scores : bool If set to true, we also return the (refactored) score of each suggestion. This is normally not needed, and is NOT the original score inserted into the index. with_payloads : bool Return suggestion payloads num : int The maximum number of results we return. Note that we might return less. The algorithm trims irrelevant suggestions. Returns: list: A list of Suggestion objects. If with_scores was False, the score of all suggestions is 1. For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_. """ # noqa args = [SUGGET_COMMAND, key, prefix, "MAX", num] if fuzzy: args.append(FUZZY) if with_scores: args.append(WITHSCORES) if with_payloads: args.append(WITHPAYLOADS) res = self.execute_command(*args) results = [] if not res: return results parser = SuggestionParser(with_scores, with_payloads, res) return [s for s in parser]
[docs] def synupdate(self, groupid: str, skipinitial: bool = False, *terms: List[str]): """ Updates a synonym group. The command is used to create or update a synonym group with additional terms. Only documents which were indexed after the update will be affected. Parameters: groupid : Synonym group id. skipinitial : bool If set to true, we do not scan and index. terms : The terms. For more information see `FT.SYNUPDATE <https://redis.io/commands/ft.synupdate>`_. """ # noqa cmd = [SYNUPDATE_CMD, self.index_name, groupid] if skipinitial: cmd.extend(["SKIPINITIALSCAN"]) cmd.extend(terms) return self.execute_command(*cmd)
[docs] def syndump(self): """ Dumps the contents of a synonym group. The command is used to dump the synonyms data structure. Returns a list of synonym terms and their synonym group ids. For more information see `FT.SYNDUMP <https://redis.io/commands/ft.syndump>`_. """ # noqa res = self.execute_command(SYNDUMP_CMD, self.index_name) return self._parse_results(SYNDUMP_CMD, res)
class AsyncSearchCommands(SearchCommands): async def info(self): """ Get info an stats about the the current index, including the number of documents, memory consumption, etc For more information see `FT.INFO <https://redis.io/commands/ft.info>`_. """ res = await self.execute_command(INFO_CMD, self.index_name) return self._parse_results(INFO_CMD, res) async def search( self, query: Union[str, Query], query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None, ): """ Search the index for a given query, and return a result of documents ### Parameters - **query**: the search query. Either a text for simple queries with default parameters, or a Query object for complex queries. See RediSearch's documentation on query format For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_. """ # noqa args, query = self._mk_query_args(query, query_params=query_params) st = time.monotonic() options = {} if not check_protocol_version(get_protocol_version(self.client), 3): options[NEVER_DECODE] = True if isinstance(self, Pipeline): options["query"] = query options["duration"] = 0 res = await self.execute_command(SEARCH_CMD, *args, **options) if isinstance(res, Pipeline): return res return self._parse_results( SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0 ) @experimental_method() async def hybrid_search( self, query: HybridQuery, combine_method: Optional[CombineResultsMethod] = None, post_processing: Optional[HybridPostProcessingConfig] = None, params_substitution: Optional[Dict[str, Union[str, int, float, bytes]]] = None, timeout: Optional[int] = None, cursor: Optional[HybridCursorQuery] = None, ) -> Union[HybridResult, HybridCursorResult, Pipeline]: """ Execute a hybrid search using both text and vector queries Args: - **query**: HybridQuery object Contains the text and vector queries - **combine_method**: CombineResultsMethod object Contains the combine method and parameters - **post_processing**: HybridPostProcessingConfig object Contains the post processing configuration - **params_substitution**: Dict[str, Union[str, int, float, bytes]] Contains the parameters substitution - **timeout**: int - contains the timeout in milliseconds - **cursor**: HybridCursorQuery object - contains the cursor configuration For more information see `FT.SEARCH <https://redis.io/commands/ft.hybrid>`. """ index = self.index_name options = {} pieces = [HYBRID_CMD, index] pieces.extend(query.get_args()) if combine_method: pieces.extend(combine_method.get_args()) if post_processing: pieces.extend(post_processing.build_args()) options["post_processing"] = post_processing if params_substitution: pieces.extend(self.get_params_args(params_substitution)) if timeout: pieces.extend(("TIMEOUT", timeout)) if cursor: options["cursor"] = True pieces.extend(cursor.build_args()) # Preserve HYBRID result values as bytes by default, matching the # legacy RESP2 Search surface; selected LOAD fields can opt into # decoding through HybridPostProcessingConfig.load(..., decode_field=True). options[NEVER_DECODE] = True options["query"] = query res = await self.execute_command(*pieces, **options) if isinstance(res, Pipeline): return res return self._parse_results(HYBRID_CMD, res, **options) async def aggregate( self, query: Union[AggregateResult, Cursor], query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None, ): """ Issue an aggregation query. ### Parameters **query**: This can be either an `AggregateRequest`, or a `Cursor` An `AggregateResult` object is returned. You can access the rows from its `rows` property, which will always yield the rows of the result. For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_. """ # noqa if isinstance(query, AggregateRequest): has_cursor = bool(query._cursor) cmd = [AGGREGATE_CMD, self.index_name] + query.build_args() elif isinstance(query, Cursor): has_cursor = True cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args() else: raise ValueError("Bad query", query) cmd += self.get_params_args(query_params) raw = await self.execute_command(*cmd) return self._parse_results( AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor ) async def spellcheck(self, query, distance=None, include=None, exclude=None): """ Issue a spellcheck query ### Parameters **query**: search query. **distance***: the maximal Levenshtein distance for spelling suggestions (default: 1, max: 4). **include**: specifies an inclusion custom dictionary. **exclude**: specifies an exclusion custom dictionary. For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_. """ # noqa cmd = [SPELLCHECK_CMD, self.index_name, query] if distance: cmd.extend(["DISTANCE", distance]) if include: cmd.extend(["TERMS", "INCLUDE", include]) if exclude: cmd.extend(["TERMS", "EXCLUDE", exclude]) res = await self.execute_command(*cmd) return self._parse_results(SPELLCHECK_CMD, res) @deprecated_function( version="8.0.0", reason="deprecated since Redis 8.0, call config_set from core module instead", ) async def config_set(self, option: str, value: str) -> bool: """Set runtime configuration option. ### Parameters - **option**: the name of the configuration option. - **value**: a value for the configuration option. For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_. """ # noqa cmd = [CONFIG_CMD, "SET", option, value] raw = await self.execute_command(*cmd) return raw == "OK" @deprecated_function( version="8.0.0", reason="deprecated since Redis 8.0, call config_get from core module instead", ) async def config_get(self, option: str) -> str: """Get runtime configuration option value. ### Parameters - **option**: the name of the configuration option. For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_. """ # noqa cmd = [CONFIG_CMD, "GET", option] res = {} res = await self.execute_command(*cmd) return self._parse_results(CONFIG_CMD, res) async def load_document(self, id, field_encodings: Optional[Dict[str, Any]] = None): """ Load a single document by id - **field_encodings**: optional dict mapping field names to encodings. If a field's encoding is ``None`` the raw bytes value is preserved (useful for binary data such as vectors). """ fields = await self.client.hgetall(id) fields = { str_if_bytes(k): decode_field_value(v, str_if_bytes(k), field_encodings) for k, v in fields.items() } try: del fields["id"] except KeyError: pass return Document(id=id, **fields) async def sugadd(self, key, *suggestions, **kwargs): """ Add suggestion terms to the AutoCompleter engine. Each suggestion has a score and string. If kwargs["increment"] is true and the terms are already in the server's dictionary, we increment their scores. For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd>`_. """ # noqa # If Transaction is not False it will MULTI/EXEC which will error pipe = self.pipeline(transaction=False) for sug in suggestions: args = [SUGADD_COMMAND, key, sug.string, sug.score] if kwargs.get("increment"): args.append("INCR") if sug.payload: args.append("PAYLOAD") args.append(sug.payload) pipe.execute_command(*args) return (await pipe.execute())[-1] async def sugget( self, key: str, prefix: str, fuzzy: bool = False, num: int = 10, with_scores: bool = False, with_payloads: bool = False, ) -> List[SuggestionParser]: """ Get a list of suggestions from the AutoCompleter, for a given prefix. Parameters: prefix : str The prefix we are searching. **Must be valid ascii or utf-8** fuzzy : bool If set to true, the prefix search is done in fuzzy mode. **NOTE**: Running fuzzy searches on short (<3 letters) prefixes can be very slow, and even scan the entire index. with_scores : bool If set to true, we also return the (refactored) score of each suggestion. This is normally not needed, and is NOT the original score inserted into the index. with_payloads : bool Return suggestion payloads num : int The maximum number of results we return. Note that we might return less. The algorithm trims irrelevant suggestions. Returns: list: A list of Suggestion objects. If with_scores was False, the score of all suggestions is 1. For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_. """ # noqa args = [SUGGET_COMMAND, key, prefix, "MAX", num] if fuzzy: args.append(FUZZY) if with_scores: args.append(WITHSCORES) if with_payloads: args.append(WITHPAYLOADS) ret = await self.execute_command(*args) results = [] if not ret: return results parser = SuggestionParser(with_scores, with_payloads, ret) return [s for s in parser]