| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- # sql/default_comparator.py
- # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Default implementation of SQL comparison operations."""
- from __future__ import annotations
- import typing
- from typing import Any
- from typing import Callable
- from typing import Dict
- from typing import NoReturn
- from typing import Optional
- from typing import Tuple
- from typing import Type
- from typing import Union
- from . import coercions
- from . import operators
- from . import roles
- from . import type_api
- from .elements import and_
- from .elements import BinaryExpression
- from .elements import ClauseElement
- from .elements import CollationClause
- from .elements import CollectionAggregate
- from .elements import ExpressionClauseList
- from .elements import False_
- from .elements import Null
- from .elements import OperatorExpression
- from .elements import or_
- from .elements import True_
- from .elements import UnaryExpression
- from .operators import OperatorType
- from .. import exc
- from .. import util
- _T = typing.TypeVar("_T", bound=Any)
- if typing.TYPE_CHECKING:
- from .elements import ColumnElement
- from .operators import custom_op
- from .type_api import TypeEngine
- def _boolean_compare(
- expr: ColumnElement[Any],
- op: OperatorType,
- obj: Any,
- *,
- negate_op: Optional[OperatorType] = None,
- reverse: bool = False,
- _python_is_types: Tuple[Type[Any], ...] = (type(None), bool),
- result_type: Optional[TypeEngine[bool]] = None,
- **kwargs: Any,
- ) -> OperatorExpression[bool]:
- if result_type is None:
- result_type = type_api.BOOLEANTYPE
- if isinstance(obj, _python_is_types + (Null, True_, False_)):
- # allow x ==/!= True/False to be treated as a literal.
- # this comes out to "== / != true/false" or "1/0" if those
- # constants aren't supported and works on all platforms
- if op in (operators.eq, operators.ne) and isinstance(
- obj, (bool, True_, False_)
- ):
- return OperatorExpression._construct_for_op(
- expr,
- coercions.expect(roles.ConstExprRole, obj),
- op,
- type_=result_type,
- negate=negate_op,
- modifiers=kwargs,
- )
- elif op in (
- operators.is_distinct_from,
- operators.is_not_distinct_from,
- ):
- return OperatorExpression._construct_for_op(
- expr,
- coercions.expect(roles.ConstExprRole, obj),
- op,
- type_=result_type,
- negate=negate_op,
- modifiers=kwargs,
- )
- elif expr._is_collection_aggregate:
- obj = coercions.expect(
- roles.ConstExprRole, element=obj, operator=op, expr=expr
- )
- else:
- # all other None uses IS, IS NOT
- if op in (operators.eq, operators.is_):
- return OperatorExpression._construct_for_op(
- expr,
- coercions.expect(roles.ConstExprRole, obj),
- operators.is_,
- negate=operators.is_not,
- type_=result_type,
- )
- elif op in (operators.ne, operators.is_not):
- return OperatorExpression._construct_for_op(
- expr,
- coercions.expect(roles.ConstExprRole, obj),
- operators.is_not,
- negate=operators.is_,
- type_=result_type,
- )
- else:
- raise exc.ArgumentError(
- "Only '=', '!=', 'is_()', 'is_not()', "
- "'is_distinct_from()', 'is_not_distinct_from()' "
- "operators can be used with None/True/False"
- )
- else:
- obj = coercions.expect(
- roles.BinaryElementRole, element=obj, operator=op, expr=expr
- )
- if reverse:
- return OperatorExpression._construct_for_op(
- obj,
- expr,
- op,
- type_=result_type,
- negate=negate_op,
- modifiers=kwargs,
- )
- else:
- return OperatorExpression._construct_for_op(
- expr,
- obj,
- op,
- type_=result_type,
- negate=negate_op,
- modifiers=kwargs,
- )
- def _custom_op_operate(
- expr: ColumnElement[Any],
- op: custom_op[Any],
- obj: Any,
- reverse: bool = False,
- result_type: Optional[TypeEngine[Any]] = None,
- **kw: Any,
- ) -> ColumnElement[Any]:
- if result_type is None:
- if op.return_type:
- result_type = op.return_type
- elif op.is_comparison:
- result_type = type_api.BOOLEANTYPE
- return _binary_operate(
- expr, op, obj, reverse=reverse, result_type=result_type, **kw
- )
- def _binary_operate(
- expr: ColumnElement[Any],
- op: OperatorType,
- obj: roles.BinaryElementRole[Any],
- *,
- reverse: bool = False,
- result_type: Optional[TypeEngine[_T]] = None,
- **kw: Any,
- ) -> OperatorExpression[_T]:
- coerced_obj = coercions.expect(
- roles.BinaryElementRole, obj, expr=expr, operator=op
- )
- if reverse:
- left, right = coerced_obj, expr
- else:
- left, right = expr, coerced_obj
- if result_type is None:
- op, result_type = left.comparator._adapt_expression(
- op, right.comparator
- )
- return OperatorExpression._construct_for_op(
- left, right, op, type_=result_type, modifiers=kw
- )
- def _conjunction_operate(
- expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
- ) -> ColumnElement[Any]:
- if op is operators.and_:
- return and_(expr, other)
- elif op is operators.or_:
- return or_(expr, other)
- else:
- raise NotImplementedError()
- def _scalar(
- expr: ColumnElement[Any],
- op: OperatorType,
- fn: Callable[[ColumnElement[Any]], ColumnElement[Any]],
- **kw: Any,
- ) -> ColumnElement[Any]:
- return fn(expr)
- def _in_impl(
- expr: ColumnElement[Any],
- op: OperatorType,
- seq_or_selectable: ClauseElement,
- negate_op: OperatorType,
- **kw: Any,
- ) -> ColumnElement[Any]:
- seq_or_selectable = coercions.expect(
- roles.InElementRole, seq_or_selectable, expr=expr, operator=op
- )
- if "in_ops" in seq_or_selectable._annotations:
- op, negate_op = seq_or_selectable._annotations["in_ops"]
- return _boolean_compare(
- expr, op, seq_or_selectable, negate_op=negate_op, **kw
- )
- def _getitem_impl(
- expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
- ) -> ColumnElement[Any]:
- if (
- isinstance(expr.type, type_api.INDEXABLE)
- or isinstance(expr.type, type_api.TypeDecorator)
- and isinstance(expr.type.impl_instance, type_api.INDEXABLE)
- ):
- other = coercions.expect(
- roles.BinaryElementRole, other, expr=expr, operator=op
- )
- return _binary_operate(expr, op, other, **kw)
- else:
- _unsupported_impl(expr, op, other, **kw)
- def _unsupported_impl(
- expr: ColumnElement[Any], op: OperatorType, *arg: Any, **kw: Any
- ) -> NoReturn:
- raise NotImplementedError(
- "Operator '%s' is not supported on this expression" % op.__name__
- )
- def _inv_impl(
- expr: ColumnElement[Any], op: OperatorType, **kw: Any
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.__inv__`."""
- # undocumented element currently used by the ORM for
- # relationship.contains()
- if hasattr(expr, "negation_clause"):
- return expr.negation_clause
- else:
- return expr._negate()
- def _neg_impl(
- expr: ColumnElement[Any], op: OperatorType, **kw: Any
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.__neg__`."""
- return UnaryExpression(expr, operator=operators.neg, type_=expr.type)
- def _bitwise_not_impl(
- expr: ColumnElement[Any], op: OperatorType, **kw: Any
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.bitwise_not`."""
- return UnaryExpression(
- expr, operator=operators.bitwise_not_op, type_=expr.type
- )
- def _match_impl(
- expr: ColumnElement[Any], op: OperatorType, other: Any, **kw: Any
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.match`."""
- return _boolean_compare(
- expr,
- operators.match_op,
- coercions.expect(
- roles.BinaryElementRole,
- other,
- expr=expr,
- operator=operators.match_op,
- ),
- result_type=type_api.MATCHTYPE,
- negate_op=(
- operators.not_match_op
- if op is operators.match_op
- else operators.match_op
- ),
- **kw,
- )
- def _distinct_impl(
- expr: ColumnElement[Any], op: OperatorType, **kw: Any
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.distinct`."""
- return UnaryExpression(
- expr, operator=operators.distinct_op, type_=expr.type
- )
- def _between_impl(
- expr: ColumnElement[Any],
- op: OperatorType,
- cleft: Any,
- cright: Any,
- **kw: Any,
- ) -> ColumnElement[Any]:
- """See :meth:`.ColumnOperators.between`."""
- return BinaryExpression(
- expr,
- ExpressionClauseList._construct_for_list(
- operators.and_,
- type_api.NULLTYPE,
- coercions.expect(
- roles.BinaryElementRole,
- cleft,
- expr=expr,
- operator=operators.and_,
- ),
- coercions.expect(
- roles.BinaryElementRole,
- cright,
- expr=expr,
- operator=operators.and_,
- ),
- group=False,
- ),
- op,
- negate=(
- operators.not_between_op
- if op is operators.between_op
- else operators.between_op
- ),
- modifiers=kw,
- )
- def _collate_impl(
- expr: ColumnElement[str], op: OperatorType, collation: str, **kw: Any
- ) -> ColumnElement[str]:
- return CollationClause._create_collation_expression(expr, collation)
- def _regexp_match_impl(
- expr: ColumnElement[str],
- op: OperatorType,
- pattern: Any,
- flags: Optional[str],
- **kw: Any,
- ) -> ColumnElement[Any]:
- return BinaryExpression(
- expr,
- coercions.expect(
- roles.BinaryElementRole,
- pattern,
- expr=expr,
- operator=operators.comma_op,
- ),
- op,
- negate=operators.not_regexp_match_op,
- modifiers={"flags": flags},
- )
- def _regexp_replace_impl(
- expr: ColumnElement[Any],
- op: OperatorType,
- pattern: Any,
- replacement: Any,
- flags: Optional[str],
- **kw: Any,
- ) -> ColumnElement[Any]:
- return BinaryExpression(
- expr,
- ExpressionClauseList._construct_for_list(
- operators.comma_op,
- type_api.NULLTYPE,
- coercions.expect(
- roles.BinaryElementRole,
- pattern,
- expr=expr,
- operator=operators.comma_op,
- ),
- coercions.expect(
- roles.BinaryElementRole,
- replacement,
- expr=expr,
- operator=operators.comma_op,
- ),
- group=False,
- ),
- op,
- modifiers={"flags": flags},
- )
- # a mapping of operators with the method they use, along with
- # additional keyword arguments to be passed
- operator_lookup: Dict[
- str,
- Tuple[
- Callable[..., ColumnElement[Any]],
- util.immutabledict[
- str, Union[OperatorType, Callable[..., ColumnElement[Any]]]
- ],
- ],
- ] = {
- "and_": (_conjunction_operate, util.EMPTY_DICT),
- "or_": (_conjunction_operate, util.EMPTY_DICT),
- "inv": (_inv_impl, util.EMPTY_DICT),
- "add": (_binary_operate, util.EMPTY_DICT),
- "mul": (_binary_operate, util.EMPTY_DICT),
- "sub": (_binary_operate, util.EMPTY_DICT),
- "div": (_binary_operate, util.EMPTY_DICT),
- "mod": (_binary_operate, util.EMPTY_DICT),
- "bitwise_xor_op": (_binary_operate, util.EMPTY_DICT),
- "bitwise_or_op": (_binary_operate, util.EMPTY_DICT),
- "bitwise_and_op": (_binary_operate, util.EMPTY_DICT),
- "bitwise_not_op": (_bitwise_not_impl, util.EMPTY_DICT),
- "bitwise_lshift_op": (_binary_operate, util.EMPTY_DICT),
- "bitwise_rshift_op": (_binary_operate, util.EMPTY_DICT),
- "truediv": (_binary_operate, util.EMPTY_DICT),
- "floordiv": (_binary_operate, util.EMPTY_DICT),
- "custom_op": (_custom_op_operate, util.EMPTY_DICT),
- "json_path_getitem_op": (_binary_operate, util.EMPTY_DICT),
- "json_getitem_op": (_binary_operate, util.EMPTY_DICT),
- "concat_op": (_binary_operate, util.EMPTY_DICT),
- "any_op": (
- _scalar,
- util.immutabledict({"fn": CollectionAggregate._create_any}),
- ),
- "all_op": (
- _scalar,
- util.immutabledict({"fn": CollectionAggregate._create_all}),
- ),
- "lt": (_boolean_compare, util.immutabledict({"negate_op": operators.ge})),
- "le": (_boolean_compare, util.immutabledict({"negate_op": operators.gt})),
- "ne": (_boolean_compare, util.immutabledict({"negate_op": operators.eq})),
- "gt": (_boolean_compare, util.immutabledict({"negate_op": operators.le})),
- "ge": (_boolean_compare, util.immutabledict({"negate_op": operators.lt})),
- "eq": (_boolean_compare, util.immutabledict({"negate_op": operators.ne})),
- "is_distinct_from": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.is_not_distinct_from}),
- ),
- "is_not_distinct_from": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.is_distinct_from}),
- ),
- "like_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_like_op}),
- ),
- "ilike_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_ilike_op}),
- ),
- "not_like_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.like_op}),
- ),
- "not_ilike_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.ilike_op}),
- ),
- "contains_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_contains_op}),
- ),
- "icontains_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_icontains_op}),
- ),
- "startswith_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_startswith_op}),
- ),
- "istartswith_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_istartswith_op}),
- ),
- "endswith_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_endswith_op}),
- ),
- "iendswith_op": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.not_iendswith_op}),
- ),
- "desc_op": (
- _scalar,
- util.immutabledict({"fn": UnaryExpression._create_desc}),
- ),
- "asc_op": (
- _scalar,
- util.immutabledict({"fn": UnaryExpression._create_asc}),
- ),
- "nulls_first_op": (
- _scalar,
- util.immutabledict({"fn": UnaryExpression._create_nulls_first}),
- ),
- "nulls_last_op": (
- _scalar,
- util.immutabledict({"fn": UnaryExpression._create_nulls_last}),
- ),
- "in_op": (
- _in_impl,
- util.immutabledict({"negate_op": operators.not_in_op}),
- ),
- "not_in_op": (
- _in_impl,
- util.immutabledict({"negate_op": operators.in_op}),
- ),
- "is_": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.is_}),
- ),
- "is_not": (
- _boolean_compare,
- util.immutabledict({"negate_op": operators.is_not}),
- ),
- "collate": (_collate_impl, util.EMPTY_DICT),
- "match_op": (_match_impl, util.EMPTY_DICT),
- "not_match_op": (_match_impl, util.EMPTY_DICT),
- "distinct_op": (_distinct_impl, util.EMPTY_DICT),
- "between_op": (_between_impl, util.EMPTY_DICT),
- "not_between_op": (_between_impl, util.EMPTY_DICT),
- "neg": (_neg_impl, util.EMPTY_DICT),
- "getitem": (_getitem_impl, util.EMPTY_DICT),
- "lshift": (_unsupported_impl, util.EMPTY_DICT),
- "rshift": (_unsupported_impl, util.EMPTY_DICT),
- "contains": (_unsupported_impl, util.EMPTY_DICT),
- "regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
- "not_regexp_match_op": (_regexp_match_impl, util.EMPTY_DICT),
- "regexp_replace_op": (_regexp_replace_impl, util.EMPTY_DICT),
- }
|