pyodbc.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # connectors/pyodbc.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from __future__ import annotations
  8. import re
  9. import typing
  10. from typing import Any
  11. from typing import Dict
  12. from typing import List
  13. from typing import Optional
  14. from typing import Tuple
  15. from typing import Union
  16. from urllib.parse import unquote_plus
  17. from . import Connector
  18. from .. import ExecutionContext
  19. from .. import pool
  20. from .. import util
  21. from ..engine import ConnectArgsType
  22. from ..engine import Connection
  23. from ..engine import interfaces
  24. from ..engine import URL
  25. from ..sql.type_api import TypeEngine
  26. if typing.TYPE_CHECKING:
  27. from ..engine.interfaces import DBAPIModule
  28. from ..engine.interfaces import IsolationLevel
  29. class PyODBCConnector(Connector):
  30. driver = "pyodbc"
  31. # this is no longer False for pyodbc in general
  32. supports_sane_rowcount_returning = True
  33. supports_sane_multi_rowcount = False
  34. supports_native_decimal = True
  35. default_paramstyle = "named"
  36. fast_executemany = False
  37. # for non-DSN connections, this *may* be used to
  38. # hold the desired driver name
  39. pyodbc_driver_name: Optional[str] = None
  40. def __init__(self, use_setinputsizes: bool = False, **kw: Any):
  41. super().__init__(**kw)
  42. if use_setinputsizes:
  43. self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
  44. @classmethod
  45. def import_dbapi(cls) -> DBAPIModule:
  46. return __import__("pyodbc")
  47. def create_connect_args(self, url: URL) -> ConnectArgsType:
  48. opts = url.translate_connect_args(username="user")
  49. opts.update(url.query)
  50. keys = opts
  51. query = url.query
  52. connect_args: Dict[str, Any] = {}
  53. connectors: List[str]
  54. for param in ("ansi", "unicode_results", "autocommit"):
  55. if param in keys:
  56. connect_args[param] = util.asbool(keys.pop(param))
  57. if "odbc_connect" in keys:
  58. connectors = [unquote_plus(keys.pop("odbc_connect"))]
  59. else:
  60. def check_quote(token: str) -> str:
  61. if ";" in str(token) or str(token).startswith("{"):
  62. token = "{%s}" % token.replace("}", "}}")
  63. return token
  64. keys = {k: check_quote(v) for k, v in keys.items()}
  65. dsn_connection = "dsn" in keys or (
  66. "host" in keys and "database" not in keys
  67. )
  68. if dsn_connection:
  69. connectors = [
  70. "dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", ""))
  71. ]
  72. else:
  73. port = ""
  74. if "port" in keys and "port" not in query:
  75. port = ",%d" % int(keys.pop("port"))
  76. connectors = []
  77. driver = keys.pop("driver", self.pyodbc_driver_name)
  78. if driver is None and keys:
  79. # note if keys is empty, this is a totally blank URL
  80. util.warn(
  81. "No driver name specified; "
  82. "this is expected by PyODBC when using "
  83. "DSN-less connections"
  84. )
  85. else:
  86. connectors.append("DRIVER={%s}" % driver)
  87. connectors.extend(
  88. [
  89. "Server=%s%s" % (keys.pop("host", ""), port),
  90. "Database=%s" % keys.pop("database", ""),
  91. ]
  92. )
  93. user = keys.pop("user", None)
  94. if user:
  95. connectors.append("UID=%s" % user)
  96. pwd = keys.pop("password", "")
  97. if pwd:
  98. connectors.append("PWD=%s" % pwd)
  99. else:
  100. authentication = keys.pop("authentication", None)
  101. if authentication:
  102. connectors.append("Authentication=%s" % authentication)
  103. else:
  104. connectors.append("Trusted_Connection=Yes")
  105. # if set to 'Yes', the ODBC layer will try to automagically
  106. # convert textual data from your database encoding to your
  107. # client encoding. This should obviously be set to 'No' if
  108. # you query a cp1253 encoded database from a latin1 client...
  109. if "odbc_autotranslate" in keys:
  110. connectors.append(
  111. "AutoTranslate=%s" % keys.pop("odbc_autotranslate")
  112. )
  113. connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
  114. return ((";".join(connectors),), connect_args)
  115. def is_disconnect(
  116. self,
  117. e: Exception,
  118. connection: Optional[
  119. Union[pool.PoolProxiedConnection, interfaces.DBAPIConnection]
  120. ],
  121. cursor: Optional[interfaces.DBAPICursor],
  122. ) -> bool:
  123. if isinstance(e, self.loaded_dbapi.ProgrammingError):
  124. return "The cursor's connection has been closed." in str(
  125. e
  126. ) or "Attempt to use a closed connection." in str(e)
  127. else:
  128. return False
  129. def _dbapi_version(self) -> interfaces.VersionInfoType:
  130. if not self.dbapi:
  131. return ()
  132. return self._parse_dbapi_version(self.dbapi.version)
  133. def _parse_dbapi_version(self, vers: str) -> interfaces.VersionInfoType:
  134. m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers)
  135. if not m:
  136. return ()
  137. vers_tuple: interfaces.VersionInfoType = tuple(
  138. [int(x) for x in m.group(1).split(".")]
  139. )
  140. if m.group(2):
  141. vers_tuple += (m.group(2),)
  142. return vers_tuple
  143. def _get_server_version_info(
  144. self, connection: Connection
  145. ) -> interfaces.VersionInfoType:
  146. # NOTE: this function is not reliable, particularly when
  147. # freetds is in use. Implement database-specific server version
  148. # queries.
  149. dbapi_con = connection.connection.dbapi_connection
  150. version: Tuple[Union[int, str], ...] = ()
  151. r = re.compile(r"[.\-]")
  152. for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)): # type: ignore[union-attr] # noqa: E501
  153. try:
  154. version += (int(n),)
  155. except ValueError:
  156. pass
  157. return tuple(version)
  158. def do_set_input_sizes(
  159. self,
  160. cursor: interfaces.DBAPICursor,
  161. list_of_tuples: List[Tuple[str, Any, TypeEngine[Any]]],
  162. context: ExecutionContext,
  163. ) -> None:
  164. # the rules for these types seems a little strange, as you can pass
  165. # non-tuples as well as tuples, however it seems to assume "0"
  166. # for the subsequent values if you don't pass a tuple which fails
  167. # for types such as pyodbc.SQL_WLONGVARCHAR, which is the datatype
  168. # that ticket #5649 is targeting.
  169. # NOTE: as of #6058, this won't be called if the use_setinputsizes
  170. # parameter were not passed to the dialect, or if no types were
  171. # specified in list_of_tuples
  172. # as of #8177 for 2.0 we assume use_setinputsizes=True and only
  173. # omit the setinputsizes calls for .executemany() with
  174. # fast_executemany=True
  175. if (
  176. context.execute_style is interfaces.ExecuteStyle.EXECUTEMANY
  177. and self.fast_executemany
  178. ):
  179. return
  180. cursor.setinputsizes(
  181. [
  182. (
  183. (dbtype, None, None)
  184. if not isinstance(dbtype, tuple)
  185. else dbtype
  186. )
  187. for key, dbtype, sqltype in list_of_tuples
  188. ]
  189. )
  190. def get_isolation_level_values(
  191. self, dbapi_conn: interfaces.DBAPIConnection
  192. ) -> List[IsolationLevel]:
  193. return [*super().get_isolation_level_values(dbapi_conn), "AUTOCOMMIT"]
  194. def set_isolation_level(
  195. self,
  196. dbapi_connection: interfaces.DBAPIConnection,
  197. level: IsolationLevel,
  198. ) -> None:
  199. # adjust for ConnectionFairy being present
  200. # allows attribute set e.g. "connection.autocommit = True"
  201. # to work properly
  202. if level == "AUTOCOMMIT":
  203. dbapi_connection.autocommit = True
  204. else:
  205. dbapi_connection.autocommit = False
  206. super().set_isolation_level(dbapi_connection, level)
  207. def detect_autocommit_setting(
  208. self, dbapi_conn: interfaces.DBAPIConnection
  209. ) -> bool:
  210. return bool(dbapi_conn.autocommit)