model.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. from __future__ import annotations
  2. import re
  3. import typing as t
  4. import sqlalchemy as sa
  5. import sqlalchemy.orm as sa_orm
  6. from .query import Query
  7. if t.TYPE_CHECKING:
  8. from .extension import SQLAlchemy
  9. class _QueryProperty:
  10. """A class property that creates a query object for a model.
  11. :meta private:
  12. """
  13. def __get__(self, obj: Model | None, cls: type[Model]) -> Query:
  14. return cls.query_class(
  15. cls, session=cls.__fsa__.session() # type: ignore[arg-type]
  16. )
  17. class Model:
  18. """The base class of the :attr:`.SQLAlchemy.Model` declarative model class.
  19. To define models, subclass :attr:`db.Model <.SQLAlchemy.Model>`, not this. To
  20. customize ``db.Model``, subclass this and pass it as ``model_class`` to
  21. :class:`.SQLAlchemy`. To customize ``db.Model`` at the metaclass level, pass an
  22. already created declarative model class as ``model_class``.
  23. """
  24. __fsa__: t.ClassVar[SQLAlchemy]
  25. """Internal reference to the extension object.
  26. :meta private:
  27. """
  28. query_class: t.ClassVar[type[Query]] = Query
  29. """Query class used by :attr:`query`. Defaults to :attr:`.SQLAlchemy.Query`, which
  30. defaults to :class:`.Query`.
  31. """
  32. query: t.ClassVar[Query] = _QueryProperty() # type: ignore[assignment]
  33. """A SQLAlchemy query for a model. Equivalent to ``db.session.query(Model)``. Can be
  34. customized per-model by overriding :attr:`query_class`.
  35. .. warning::
  36. The query interface is considered legacy in SQLAlchemy. Prefer using
  37. ``session.execute(select())`` instead.
  38. """
  39. def __repr__(self) -> str:
  40. state = sa.inspect(self)
  41. assert state is not None
  42. if state.transient:
  43. pk = f"(transient {id(self)})"
  44. elif state.pending:
  45. pk = f"(pending {id(self)})"
  46. else:
  47. pk = ", ".join(map(str, state.identity))
  48. return f"<{type(self).__name__} {pk}>"
  49. class BindMetaMixin(type):
  50. """Metaclass mixin that sets a model's ``metadata`` based on its ``__bind_key__``.
  51. If the model sets ``metadata`` or ``__table__`` directly, ``__bind_key__`` is
  52. ignored. If the ``metadata`` is the same as the parent model, it will not be set
  53. directly on the child model.
  54. """
  55. __fsa__: SQLAlchemy
  56. metadata: sa.MetaData
  57. def __init__(
  58. cls, name: str, bases: tuple[type, ...], d: dict[str, t.Any], **kwargs: t.Any
  59. ) -> None:
  60. if not ("metadata" in cls.__dict__ or "__table__" in cls.__dict__):
  61. bind_key = getattr(cls, "__bind_key__", None)
  62. parent_metadata = getattr(cls, "metadata", None)
  63. metadata = cls.__fsa__._make_metadata(bind_key)
  64. if metadata is not parent_metadata:
  65. cls.metadata = metadata
  66. super().__init__(name, bases, d, **kwargs)
  67. class BindMixin:
  68. """DeclarativeBase mixin to set a model's ``metadata`` based on ``__bind_key__``.
  69. If no ``__bind_key__`` is specified, the model will use the default metadata
  70. provided by ``DeclarativeBase`` or ``DeclarativeBaseNoMeta``.
  71. If the model doesn't set ``metadata`` or ``__table__`` directly
  72. and does set ``__bind_key__``, the model will use the metadata
  73. for the specified bind key.
  74. If the ``metadata`` is the same as the parent model, it will not be set
  75. directly on the child model.
  76. .. versionchanged:: 3.1.0
  77. """
  78. __fsa__: SQLAlchemy
  79. metadata: sa.MetaData
  80. @classmethod
  81. def __init_subclass__(cls: t.Type[BindMixin], **kwargs: t.Dict[str, t.Any]) -> None:
  82. if not ("metadata" in cls.__dict__ or "__table__" in cls.__dict__) and hasattr(
  83. cls, "__bind_key__"
  84. ):
  85. bind_key = getattr(cls, "__bind_key__", None)
  86. parent_metadata = getattr(cls, "metadata", None)
  87. metadata = cls.__fsa__._make_metadata(bind_key)
  88. if metadata is not parent_metadata:
  89. cls.metadata = metadata
  90. super().__init_subclass__(**kwargs)
  91. class NameMetaMixin(type):
  92. """Metaclass mixin that sets a model's ``__tablename__`` by converting the
  93. ``CamelCase`` class name to ``snake_case``. A name is set for non-abstract models
  94. that do not otherwise define ``__tablename__``. If a model does not define a primary
  95. key, it will not generate a name or ``__table__``, for single-table inheritance.
  96. """
  97. metadata: sa.MetaData
  98. __tablename__: str
  99. __table__: sa.Table
  100. def __init__(
  101. cls, name: str, bases: tuple[type, ...], d: dict[str, t.Any], **kwargs: t.Any
  102. ) -> None:
  103. if should_set_tablename(cls):
  104. cls.__tablename__ = camel_to_snake_case(cls.__name__)
  105. super().__init__(name, bases, d, **kwargs)
  106. # __table_cls__ has run. If no table was created, use the parent table.
  107. if (
  108. "__tablename__" not in cls.__dict__
  109. and "__table__" in cls.__dict__
  110. and cls.__dict__["__table__"] is None
  111. ):
  112. del cls.__table__
  113. def __table_cls__(cls, *args: t.Any, **kwargs: t.Any) -> sa.Table | None:
  114. """This is called by SQLAlchemy during mapper setup. It determines the final
  115. table object that the model will use.
  116. If no primary key is found, that indicates single-table inheritance, so no table
  117. will be created and ``__tablename__`` will be unset.
  118. """
  119. schema = kwargs.get("schema")
  120. if schema is None:
  121. key = args[0]
  122. else:
  123. key = f"{schema}.{args[0]}"
  124. # Check if a table with this name already exists. Allows reflected tables to be
  125. # applied to models by name.
  126. if key in cls.metadata.tables:
  127. return sa.Table(*args, **kwargs)
  128. # If a primary key is found, create a table for joined-table inheritance.
  129. for arg in args:
  130. if (isinstance(arg, sa.Column) and arg.primary_key) or isinstance(
  131. arg, sa.PrimaryKeyConstraint
  132. ):
  133. return sa.Table(*args, **kwargs)
  134. # If no base classes define a table, return one that's missing a primary key
  135. # so SQLAlchemy shows the correct error.
  136. for base in cls.__mro__[1:-1]:
  137. if "__table__" in base.__dict__:
  138. break
  139. else:
  140. return sa.Table(*args, **kwargs)
  141. # Single-table inheritance, use the parent table name. __init__ will unset
  142. # __table__ based on this.
  143. if "__tablename__" in cls.__dict__:
  144. del cls.__tablename__
  145. return None
  146. class NameMixin:
  147. """DeclarativeBase mixin that sets a model's ``__tablename__`` by converting the
  148. ``CamelCase`` class name to ``snake_case``. A name is set for non-abstract models
  149. that do not otherwise define ``__tablename__``. If a model does not define a primary
  150. key, it will not generate a name or ``__table__``, for single-table inheritance.
  151. .. versionchanged:: 3.1.0
  152. """
  153. metadata: sa.MetaData
  154. __tablename__: str
  155. __table__: sa.Table
  156. @classmethod
  157. def __init_subclass__(cls: t.Type[NameMixin], **kwargs: t.Dict[str, t.Any]) -> None:
  158. if should_set_tablename(cls):
  159. cls.__tablename__ = camel_to_snake_case(cls.__name__)
  160. super().__init_subclass__(**kwargs)
  161. # __table_cls__ has run. If no table was created, use the parent table.
  162. if (
  163. "__tablename__" not in cls.__dict__
  164. and "__table__" in cls.__dict__
  165. and cls.__dict__["__table__"] is None
  166. ):
  167. del cls.__table__
  168. @classmethod
  169. def __table_cls__(cls, *args: t.Any, **kwargs: t.Any) -> sa.Table | None:
  170. """This is called by SQLAlchemy during mapper setup. It determines the final
  171. table object that the model will use.
  172. If no primary key is found, that indicates single-table inheritance, so no table
  173. will be created and ``__tablename__`` will be unset.
  174. """
  175. schema = kwargs.get("schema")
  176. if schema is None:
  177. key = args[0]
  178. else:
  179. key = f"{schema}.{args[0]}"
  180. # Check if a table with this name already exists. Allows reflected tables to be
  181. # applied to models by name.
  182. if key in cls.metadata.tables:
  183. return sa.Table(*args, **kwargs)
  184. # If a primary key is found, create a table for joined-table inheritance.
  185. for arg in args:
  186. if (isinstance(arg, sa.Column) and arg.primary_key) or isinstance(
  187. arg, sa.PrimaryKeyConstraint
  188. ):
  189. return sa.Table(*args, **kwargs)
  190. # If no base classes define a table, return one that's missing a primary key
  191. # so SQLAlchemy shows the correct error.
  192. for base in cls.__mro__[1:-1]:
  193. if "__table__" in base.__dict__:
  194. break
  195. else:
  196. return sa.Table(*args, **kwargs)
  197. # Single-table inheritance, use the parent table name. __init__ will unset
  198. # __table__ based on this.
  199. if "__tablename__" in cls.__dict__:
  200. del cls.__tablename__
  201. return None
  202. def should_set_tablename(cls: type) -> bool:
  203. """Determine whether ``__tablename__`` should be generated for a model.
  204. - If no class in the MRO sets a name, one should be generated.
  205. - If a declared attr is found, it should be used instead.
  206. - If a name is found, it should be used if the class is a mixin, otherwise one
  207. should be generated.
  208. - Abstract models should not have one generated.
  209. Later, ``__table_cls__`` will determine if the model looks like single or
  210. joined-table inheritance. If no primary key is found, the name will be unset.
  211. """
  212. if (
  213. cls.__dict__.get("__abstract__", False)
  214. or (
  215. not issubclass(cls, (sa_orm.DeclarativeBase, sa_orm.DeclarativeBaseNoMeta))
  216. and not any(isinstance(b, sa_orm.DeclarativeMeta) for b in cls.__mro__[1:])
  217. )
  218. or any(
  219. (b is sa_orm.DeclarativeBase or b is sa_orm.DeclarativeBaseNoMeta)
  220. for b in cls.__bases__
  221. )
  222. ):
  223. return False
  224. for base in cls.__mro__:
  225. if "__tablename__" not in base.__dict__:
  226. continue
  227. if isinstance(base.__dict__["__tablename__"], sa_orm.declared_attr):
  228. return False
  229. return not (
  230. base is cls
  231. or base.__dict__.get("__abstract__", False)
  232. or not (
  233. # SQLAlchemy 1.x
  234. isinstance(base, sa_orm.DeclarativeMeta)
  235. # 2.x: DeclarativeBas uses this as metaclass
  236. or isinstance(base, sa_orm.decl_api.DeclarativeAttributeIntercept)
  237. # 2.x: DeclarativeBaseNoMeta doesn't use a metaclass
  238. or issubclass(base, sa_orm.DeclarativeBaseNoMeta)
  239. )
  240. )
  241. return True
  242. def camel_to_snake_case(name: str) -> str:
  243. """Convert a ``CamelCase`` name to ``snake_case``."""
  244. name = re.sub(r"((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))", r"_\1", name)
  245. return name.lower().lstrip("_")
  246. class DefaultMeta(BindMetaMixin, NameMetaMixin, sa_orm.DeclarativeMeta):
  247. """SQLAlchemy declarative metaclass that provides ``__bind_key__`` and
  248. ``__tablename__`` support.
  249. """
  250. class DefaultMetaNoName(BindMetaMixin, sa_orm.DeclarativeMeta):
  251. """SQLAlchemy declarative metaclass that provides ``__bind_key__`` and
  252. ``__tablename__`` support.
  253. """