test_insert.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. # testing/suite/test_insert.py
  2. # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. # mypy: ignore-errors
  8. from decimal import Decimal
  9. import uuid
  10. from . import testing
  11. from .. import fixtures
  12. from ..assertions import eq_
  13. from ..config import requirements
  14. from ..schema import Column
  15. from ..schema import Table
  16. from ... import Double
  17. from ... import Float
  18. from ... import Identity
  19. from ... import Integer
  20. from ... import literal
  21. from ... import literal_column
  22. from ... import Numeric
  23. from ... import select
  24. from ... import String
  25. from ...types import LargeBinary
  26. from ...types import UUID
  27. from ...types import Uuid
  28. class LastrowidTest(fixtures.TablesTest):
  29. run_deletes = "each"
  30. __backend__ = True
  31. __requires__ = "implements_get_lastrowid", "autoincrement_insert"
  32. @classmethod
  33. def define_tables(cls, metadata):
  34. Table(
  35. "autoinc_pk",
  36. metadata,
  37. Column(
  38. "id", Integer, primary_key=True, test_needs_autoincrement=True
  39. ),
  40. Column("data", String(50)),
  41. implicit_returning=False,
  42. )
  43. Table(
  44. "manual_pk",
  45. metadata,
  46. Column("id", Integer, primary_key=True, autoincrement=False),
  47. Column("data", String(50)),
  48. implicit_returning=False,
  49. )
  50. def _assert_round_trip(self, table, conn):
  51. row = conn.execute(table.select()).first()
  52. eq_(
  53. row,
  54. (
  55. conn.dialect.default_sequence_base,
  56. "some data",
  57. ),
  58. )
  59. def test_autoincrement_on_insert(self, connection):
  60. connection.execute(
  61. self.tables.autoinc_pk.insert(), dict(data="some data")
  62. )
  63. self._assert_round_trip(self.tables.autoinc_pk, connection)
  64. def test_last_inserted_id(self, connection):
  65. r = connection.execute(
  66. self.tables.autoinc_pk.insert(), dict(data="some data")
  67. )
  68. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  69. eq_(r.inserted_primary_key, (pk,))
  70. @requirements.dbapi_lastrowid
  71. def test_native_lastrowid_autoinc(self, connection):
  72. r = connection.execute(
  73. self.tables.autoinc_pk.insert(), dict(data="some data")
  74. )
  75. lastrowid = r.lastrowid
  76. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  77. eq_(lastrowid, pk)
  78. class InsertBehaviorTest(fixtures.TablesTest):
  79. run_deletes = "each"
  80. __backend__ = True
  81. @classmethod
  82. def define_tables(cls, metadata):
  83. Table(
  84. "autoinc_pk",
  85. metadata,
  86. Column(
  87. "id", Integer, primary_key=True, test_needs_autoincrement=True
  88. ),
  89. Column("data", String(50)),
  90. )
  91. Table(
  92. "manual_pk",
  93. metadata,
  94. Column("id", Integer, primary_key=True, autoincrement=False),
  95. Column("data", String(50)),
  96. )
  97. Table(
  98. "no_implicit_returning",
  99. metadata,
  100. Column(
  101. "id", Integer, primary_key=True, test_needs_autoincrement=True
  102. ),
  103. Column("data", String(50)),
  104. implicit_returning=False,
  105. )
  106. Table(
  107. "includes_defaults",
  108. metadata,
  109. Column(
  110. "id", Integer, primary_key=True, test_needs_autoincrement=True
  111. ),
  112. Column("data", String(50)),
  113. Column("x", Integer, default=5),
  114. Column(
  115. "y",
  116. Integer,
  117. default=literal_column("2", type_=Integer) + literal(2),
  118. ),
  119. )
  120. @testing.variation("style", ["plain", "return_defaults"])
  121. @testing.variation("executemany", [True, False])
  122. def test_no_results_for_non_returning_insert(
  123. self, connection, style, executemany
  124. ):
  125. """test another INSERT issue found during #10453"""
  126. table = self.tables.no_implicit_returning
  127. stmt = table.insert()
  128. if style.return_defaults:
  129. stmt = stmt.return_defaults()
  130. if executemany:
  131. data = [
  132. {"data": "d1"},
  133. {"data": "d2"},
  134. {"data": "d3"},
  135. {"data": "d4"},
  136. {"data": "d5"},
  137. ]
  138. else:
  139. data = {"data": "d1"}
  140. r = connection.execute(stmt, data)
  141. assert not r.returns_rows
  142. @requirements.autoincrement_insert
  143. def test_autoclose_on_insert(self, connection):
  144. r = connection.execute(
  145. self.tables.autoinc_pk.insert(), dict(data="some data")
  146. )
  147. assert r._soft_closed
  148. assert not r.closed
  149. assert r.is_insert
  150. # new as of I8091919d45421e3f53029b8660427f844fee0228; for the moment
  151. # an insert where the PK was taken from a row that the dialect
  152. # selected, as is the case for mssql/pyodbc, will still report
  153. # returns_rows as true because there's a cursor description. in that
  154. # case, the row had to have been consumed at least.
  155. assert not r.returns_rows or r.fetchone() is None
  156. @requirements.insert_returning
  157. def test_autoclose_on_insert_implicit_returning(self, connection):
  158. r = connection.execute(
  159. # return_defaults() ensures RETURNING will be used,
  160. # new in 2.0 as sqlite/mariadb offer both RETURNING and
  161. # cursor.lastrowid
  162. self.tables.autoinc_pk.insert().return_defaults(),
  163. dict(data="some data"),
  164. )
  165. assert r._soft_closed
  166. assert not r.closed
  167. assert r.is_insert
  168. # note we are experimenting with having this be True
  169. # as of I8091919d45421e3f53029b8660427f844fee0228 .
  170. # implicit returning has fetched the row, but it still is a
  171. # "returns rows"
  172. assert r.returns_rows
  173. # and we should be able to fetchone() on it, we just get no row
  174. eq_(r.fetchone(), None)
  175. # and the keys, etc.
  176. eq_(r.keys(), ["id"])
  177. # but the dialect took in the row already. not really sure
  178. # what the best behavior is.
  179. @requirements.empty_inserts
  180. def test_empty_insert(self, connection):
  181. r = connection.execute(self.tables.autoinc_pk.insert())
  182. assert r._soft_closed
  183. assert not r.closed
  184. r = connection.execute(
  185. self.tables.autoinc_pk.select().where(
  186. self.tables.autoinc_pk.c.id != None
  187. )
  188. )
  189. eq_(len(r.all()), 1)
  190. @requirements.empty_inserts_executemany
  191. def test_empty_insert_multiple(self, connection):
  192. r = connection.execute(self.tables.autoinc_pk.insert(), [{}, {}, {}])
  193. assert r._soft_closed
  194. assert not r.closed
  195. r = connection.execute(
  196. self.tables.autoinc_pk.select().where(
  197. self.tables.autoinc_pk.c.id != None
  198. )
  199. )
  200. eq_(len(r.all()), 3)
  201. @requirements.insert_from_select
  202. def test_insert_from_select_autoinc(self, connection):
  203. src_table = self.tables.manual_pk
  204. dest_table = self.tables.autoinc_pk
  205. connection.execute(
  206. src_table.insert(),
  207. [
  208. dict(id=1, data="data1"),
  209. dict(id=2, data="data2"),
  210. dict(id=3, data="data3"),
  211. ],
  212. )
  213. result = connection.execute(
  214. dest_table.insert().from_select(
  215. ("data",),
  216. select(src_table.c.data).where(
  217. src_table.c.data.in_(["data2", "data3"])
  218. ),
  219. )
  220. )
  221. eq_(result.inserted_primary_key, (None,))
  222. result = connection.execute(
  223. select(dest_table.c.data).order_by(dest_table.c.data)
  224. )
  225. eq_(result.fetchall(), [("data2",), ("data3",)])
  226. @requirements.insert_from_select
  227. def test_insert_from_select_autoinc_no_rows(self, connection):
  228. src_table = self.tables.manual_pk
  229. dest_table = self.tables.autoinc_pk
  230. result = connection.execute(
  231. dest_table.insert().from_select(
  232. ("data",),
  233. select(src_table.c.data).where(
  234. src_table.c.data.in_(["data2", "data3"])
  235. ),
  236. )
  237. )
  238. eq_(result.inserted_primary_key, (None,))
  239. result = connection.execute(
  240. select(dest_table.c.data).order_by(dest_table.c.data)
  241. )
  242. eq_(result.fetchall(), [])
  243. @requirements.insert_from_select
  244. def test_insert_from_select(self, connection):
  245. table = self.tables.manual_pk
  246. connection.execute(
  247. table.insert(),
  248. [
  249. dict(id=1, data="data1"),
  250. dict(id=2, data="data2"),
  251. dict(id=3, data="data3"),
  252. ],
  253. )
  254. connection.execute(
  255. table.insert()
  256. .inline()
  257. .from_select(
  258. ("id", "data"),
  259. select(table.c.id + 5, table.c.data).where(
  260. table.c.data.in_(["data2", "data3"])
  261. ),
  262. )
  263. )
  264. eq_(
  265. connection.execute(
  266. select(table.c.data).order_by(table.c.data)
  267. ).fetchall(),
  268. [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
  269. )
  270. @requirements.insert_from_select
  271. def test_insert_from_select_with_defaults(self, connection):
  272. table = self.tables.includes_defaults
  273. connection.execute(
  274. table.insert(),
  275. [
  276. dict(id=1, data="data1"),
  277. dict(id=2, data="data2"),
  278. dict(id=3, data="data3"),
  279. ],
  280. )
  281. connection.execute(
  282. table.insert()
  283. .inline()
  284. .from_select(
  285. ("id", "data"),
  286. select(table.c.id + 5, table.c.data).where(
  287. table.c.data.in_(["data2", "data3"])
  288. ),
  289. )
  290. )
  291. eq_(
  292. connection.execute(
  293. select(table).order_by(table.c.data, table.c.id)
  294. ).fetchall(),
  295. [
  296. (1, "data1", 5, 4),
  297. (2, "data2", 5, 4),
  298. (7, "data2", 5, 4),
  299. (3, "data3", 5, 4),
  300. (8, "data3", 5, 4),
  301. ],
  302. )
  303. class ReturningTest(fixtures.TablesTest):
  304. run_create_tables = "each"
  305. __requires__ = "insert_returning", "autoincrement_insert"
  306. __backend__ = True
  307. def _assert_round_trip(self, table, conn):
  308. row = conn.execute(table.select()).first()
  309. eq_(
  310. row,
  311. (
  312. conn.dialect.default_sequence_base,
  313. "some data",
  314. ),
  315. )
  316. @classmethod
  317. def define_tables(cls, metadata):
  318. Table(
  319. "autoinc_pk",
  320. metadata,
  321. Column(
  322. "id", Integer, primary_key=True, test_needs_autoincrement=True
  323. ),
  324. Column("data", String(50)),
  325. )
  326. @requirements.fetch_rows_post_commit
  327. def test_explicit_returning_pk_autocommit(self, connection):
  328. table = self.tables.autoinc_pk
  329. r = connection.execute(
  330. table.insert().returning(table.c.id), dict(data="some data")
  331. )
  332. pk = r.first()[0]
  333. fetched_pk = connection.scalar(select(table.c.id))
  334. eq_(fetched_pk, pk)
  335. def test_explicit_returning_pk_no_autocommit(self, connection):
  336. table = self.tables.autoinc_pk
  337. r = connection.execute(
  338. table.insert().returning(table.c.id), dict(data="some data")
  339. )
  340. pk = r.first()[0]
  341. fetched_pk = connection.scalar(select(table.c.id))
  342. eq_(fetched_pk, pk)
  343. def test_autoincrement_on_insert_implicit_returning(self, connection):
  344. connection.execute(
  345. self.tables.autoinc_pk.insert(), dict(data="some data")
  346. )
  347. self._assert_round_trip(self.tables.autoinc_pk, connection)
  348. def test_last_inserted_id_implicit_returning(self, connection):
  349. r = connection.execute(
  350. self.tables.autoinc_pk.insert(), dict(data="some data")
  351. )
  352. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  353. eq_(r.inserted_primary_key, (pk,))
  354. @requirements.insert_executemany_returning
  355. def test_insertmanyvalues_returning(self, connection):
  356. r = connection.execute(
  357. self.tables.autoinc_pk.insert().returning(
  358. self.tables.autoinc_pk.c.id
  359. ),
  360. [
  361. {"data": "d1"},
  362. {"data": "d2"},
  363. {"data": "d3"},
  364. {"data": "d4"},
  365. {"data": "d5"},
  366. ],
  367. )
  368. rall = r.all()
  369. pks = connection.execute(select(self.tables.autoinc_pk.c.id))
  370. eq_(rall, pks.all())
  371. @testing.combinations(
  372. (Double(), 8.5514716, True),
  373. (
  374. Double(53),
  375. 8.5514716,
  376. True,
  377. testing.requires.float_or_double_precision_behaves_generically,
  378. ),
  379. (Float(), 8.5514, True),
  380. (
  381. Float(8),
  382. 8.5514,
  383. True,
  384. testing.requires.float_or_double_precision_behaves_generically,
  385. ),
  386. (
  387. Numeric(precision=15, scale=12, asdecimal=False),
  388. 8.5514716,
  389. True,
  390. testing.requires.literal_float_coercion,
  391. ),
  392. (
  393. Numeric(precision=15, scale=12, asdecimal=True),
  394. Decimal("8.5514716"),
  395. False,
  396. ),
  397. argnames="type_,value,do_rounding",
  398. )
  399. @testing.variation("sort_by_parameter_order", [True, False])
  400. @testing.variation("multiple_rows", [True, False])
  401. def test_insert_w_floats(
  402. self,
  403. connection,
  404. metadata,
  405. sort_by_parameter_order,
  406. type_,
  407. value,
  408. do_rounding,
  409. multiple_rows,
  410. ):
  411. """test #9701.
  412. this tests insertmanyvalues as well as decimal / floating point
  413. RETURNING types
  414. """
  415. t = Table(
  416. # Oracle backends seems to be getting confused if
  417. # this table is named the same as the one
  418. # in test_imv_returning_datatypes. use a different name
  419. "f_t",
  420. metadata,
  421. Column("id", Integer, Identity(), primary_key=True),
  422. Column("value", type_),
  423. )
  424. t.create(connection)
  425. result = connection.execute(
  426. t.insert().returning(
  427. t.c.id,
  428. t.c.value,
  429. sort_by_parameter_order=bool(sort_by_parameter_order),
  430. ),
  431. (
  432. [{"value": value} for i in range(10)]
  433. if multiple_rows
  434. else {"value": value}
  435. ),
  436. )
  437. if multiple_rows:
  438. i_range = range(1, 11)
  439. else:
  440. i_range = range(1, 2)
  441. # we want to test only that we are getting floating points back
  442. # with some degree of the original value maintained, that it is not
  443. # being truncated to an integer. there's too much variation in how
  444. # drivers return floats, which should not be relied upon to be
  445. # exact, for us to just compare as is (works for PG drivers but not
  446. # others) so we use rounding here. There's precedent for this
  447. # in suite/test_types.py::NumericTest as well
  448. if do_rounding:
  449. eq_(
  450. {(id_, round(val_, 5)) for id_, val_ in result},
  451. {(id_, round(value, 5)) for id_ in i_range},
  452. )
  453. eq_(
  454. {
  455. round(val_, 5)
  456. for val_ in connection.scalars(select(t.c.value))
  457. },
  458. {round(value, 5)},
  459. )
  460. else:
  461. eq_(
  462. set(result),
  463. {(id_, value) for id_ in i_range},
  464. )
  465. eq_(
  466. set(connection.scalars(select(t.c.value))),
  467. {value},
  468. )
  469. @testing.combinations(
  470. (
  471. "non_native_uuid",
  472. Uuid(native_uuid=False),
  473. uuid.uuid4(),
  474. ),
  475. (
  476. "non_native_uuid_str",
  477. Uuid(as_uuid=False, native_uuid=False),
  478. str(uuid.uuid4()),
  479. ),
  480. (
  481. "generic_native_uuid",
  482. Uuid(native_uuid=True),
  483. uuid.uuid4(),
  484. testing.requires.uuid_data_type,
  485. ),
  486. (
  487. "generic_native_uuid_str",
  488. Uuid(as_uuid=False, native_uuid=True),
  489. str(uuid.uuid4()),
  490. testing.requires.uuid_data_type,
  491. ),
  492. ("UUID", UUID(), uuid.uuid4(), testing.requires.uuid_data_type),
  493. (
  494. "LargeBinary1",
  495. LargeBinary(),
  496. b"this is binary",
  497. ),
  498. ("LargeBinary2", LargeBinary(), b"7\xe7\x9f"),
  499. argnames="type_,value",
  500. id_="iaa",
  501. )
  502. @testing.variation("sort_by_parameter_order", [True, False])
  503. @testing.variation("multiple_rows", [True, False])
  504. @testing.requires.insert_returning
  505. def test_imv_returning_datatypes(
  506. self,
  507. connection,
  508. metadata,
  509. sort_by_parameter_order,
  510. type_,
  511. value,
  512. multiple_rows,
  513. ):
  514. """test #9739, #9808 (similar to #9701).
  515. this tests insertmanyvalues in conjunction with various datatypes.
  516. These tests are particularly for the asyncpg driver which needs
  517. most types to be explicitly cast for the new IMV format
  518. """
  519. t = Table(
  520. "d_t",
  521. metadata,
  522. Column("id", Integer, Identity(), primary_key=True),
  523. Column("value", type_),
  524. )
  525. t.create(connection)
  526. result = connection.execute(
  527. t.insert().returning(
  528. t.c.id,
  529. t.c.value,
  530. sort_by_parameter_order=bool(sort_by_parameter_order),
  531. ),
  532. (
  533. [{"value": value} for i in range(10)]
  534. if multiple_rows
  535. else {"value": value}
  536. ),
  537. )
  538. if multiple_rows:
  539. i_range = range(1, 11)
  540. else:
  541. i_range = range(1, 2)
  542. eq_(
  543. set(result),
  544. {(id_, value) for id_ in i_range},
  545. )
  546. eq_(
  547. set(connection.scalars(select(t.c.value))),
  548. {value},
  549. )
  550. __all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")