first add files
This commit is contained in:
13
lib/sqlalchemy/testing/suite/__init__.py
Normal file
13
lib/sqlalchemy/testing/suite/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from .test_cte import * # noqa
|
||||
from .test_ddl import * # noqa
|
||||
from .test_deprecations import * # noqa
|
||||
from .test_dialect import * # noqa
|
||||
from .test_insert import * # noqa
|
||||
from .test_reflection import * # noqa
|
||||
from .test_results import * # noqa
|
||||
from .test_rowcount import * # noqa
|
||||
from .test_select import * # noqa
|
||||
from .test_sequence import * # noqa
|
||||
from .test_types import * # noqa
|
||||
from .test_unicode_ddl import * # noqa
|
||||
from .test_update_delete import * # noqa
|
||||
204
lib/sqlalchemy/testing/suite/test_cte.py
Normal file
204
lib/sqlalchemy/testing/suite/test_cte.py
Normal file
@@ -0,0 +1,204 @@
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import ForeignKey
|
||||
from ... import Integer
|
||||
from ... import select
|
||||
from ... import String
|
||||
from ... import testing
|
||||
|
||||
|
||||
class CTETest(fixtures.TablesTest):
|
||||
__backend__ = True
|
||||
__requires__ = ("ctes",)
|
||||
|
||||
run_inserts = "each"
|
||||
run_deletes = "each"
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"some_table",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
Column("parent_id", ForeignKey("some_table.id")),
|
||||
)
|
||||
|
||||
Table(
|
||||
"some_other_table",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
Column("parent_id", Integer),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, connection):
|
||||
connection.execute(
|
||||
cls.tables.some_table.insert(),
|
||||
[
|
||||
{"id": 1, "data": "d1", "parent_id": None},
|
||||
{"id": 2, "data": "d2", "parent_id": 1},
|
||||
{"id": 3, "data": "d3", "parent_id": 1},
|
||||
{"id": 4, "data": "d4", "parent_id": 3},
|
||||
{"id": 5, "data": "d5", "parent_id": 3},
|
||||
],
|
||||
)
|
||||
|
||||
def test_select_nonrecursive_round_trip(self, connection):
|
||||
some_table = self.tables.some_table
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte")
|
||||
)
|
||||
result = connection.execute(
|
||||
select(cte.c.data).where(cte.c.data.in_(["d4", "d5"]))
|
||||
)
|
||||
eq_(result.fetchall(), [("d4",)])
|
||||
|
||||
def test_select_recursive_round_trip(self, connection):
|
||||
some_table = self.tables.some_table
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte", recursive=True)
|
||||
)
|
||||
|
||||
cte_alias = cte.alias("c1")
|
||||
st1 = some_table.alias()
|
||||
# note that SQL Server requires this to be UNION ALL,
|
||||
# can't be UNION
|
||||
cte = cte.union_all(
|
||||
select(st1).where(st1.c.id == cte_alias.c.parent_id)
|
||||
)
|
||||
result = connection.execute(
|
||||
select(cte.c.data)
|
||||
.where(cte.c.data != "d2")
|
||||
.order_by(cte.c.data.desc())
|
||||
)
|
||||
eq_(
|
||||
result.fetchall(),
|
||||
[("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)],
|
||||
)
|
||||
|
||||
def test_insert_from_select_round_trip(self, connection):
|
||||
some_table = self.tables.some_table
|
||||
some_other_table = self.tables.some_other_table
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte")
|
||||
)
|
||||
connection.execute(
|
||||
some_other_table.insert().from_select(
|
||||
["id", "data", "parent_id"], select(cte)
|
||||
)
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(some_other_table).order_by(some_other_table.c.id)
|
||||
).fetchall(),
|
||||
[(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)],
|
||||
)
|
||||
|
||||
@testing.requires.ctes_with_update_delete
|
||||
@testing.requires.update_from
|
||||
def test_update_from_round_trip(self, connection):
|
||||
some_table = self.tables.some_table
|
||||
some_other_table = self.tables.some_other_table
|
||||
|
||||
connection.execute(
|
||||
some_other_table.insert().from_select(
|
||||
["id", "data", "parent_id"], select(some_table)
|
||||
)
|
||||
)
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte")
|
||||
)
|
||||
connection.execute(
|
||||
some_other_table.update()
|
||||
.values(parent_id=5)
|
||||
.where(some_other_table.c.data == cte.c.data)
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(some_other_table).order_by(some_other_table.c.id)
|
||||
).fetchall(),
|
||||
[
|
||||
(1, "d1", None),
|
||||
(2, "d2", 5),
|
||||
(3, "d3", 5),
|
||||
(4, "d4", 5),
|
||||
(5, "d5", 3),
|
||||
],
|
||||
)
|
||||
|
||||
@testing.requires.ctes_with_update_delete
|
||||
@testing.requires.delete_from
|
||||
def test_delete_from_round_trip(self, connection):
|
||||
some_table = self.tables.some_table
|
||||
some_other_table = self.tables.some_other_table
|
||||
|
||||
connection.execute(
|
||||
some_other_table.insert().from_select(
|
||||
["id", "data", "parent_id"], select(some_table)
|
||||
)
|
||||
)
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte")
|
||||
)
|
||||
connection.execute(
|
||||
some_other_table.delete().where(
|
||||
some_other_table.c.data == cte.c.data
|
||||
)
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(some_other_table).order_by(some_other_table.c.id)
|
||||
).fetchall(),
|
||||
[(1, "d1", None), (5, "d5", 3)],
|
||||
)
|
||||
|
||||
@testing.requires.ctes_with_update_delete
|
||||
def test_delete_scalar_subq_round_trip(self, connection):
|
||||
|
||||
some_table = self.tables.some_table
|
||||
some_other_table = self.tables.some_other_table
|
||||
|
||||
connection.execute(
|
||||
some_other_table.insert().from_select(
|
||||
["id", "data", "parent_id"], select(some_table)
|
||||
)
|
||||
)
|
||||
|
||||
cte = (
|
||||
select(some_table)
|
||||
.where(some_table.c.data.in_(["d2", "d3", "d4"]))
|
||||
.cte("some_cte")
|
||||
)
|
||||
connection.execute(
|
||||
some_other_table.delete().where(
|
||||
some_other_table.c.data
|
||||
== select(cte.c.data)
|
||||
.where(cte.c.id == some_other_table.c.id)
|
||||
.scalar_subquery()
|
||||
)
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(some_other_table).order_by(some_other_table.c.id)
|
||||
).fetchall(),
|
||||
[(1, "d1", None), (5, "d5", 3)],
|
||||
)
|
||||
381
lib/sqlalchemy/testing/suite/test_ddl.py
Normal file
381
lib/sqlalchemy/testing/suite/test_ddl.py
Normal file
@@ -0,0 +1,381 @@
|
||||
import random
|
||||
|
||||
from . import testing
|
||||
from .. import config
|
||||
from .. import fixtures
|
||||
from .. import util
|
||||
from ..assertions import eq_
|
||||
from ..assertions import is_false
|
||||
from ..assertions import is_true
|
||||
from ..config import requirements
|
||||
from ..schema import Table
|
||||
from ... import CheckConstraint
|
||||
from ... import Column
|
||||
from ... import ForeignKeyConstraint
|
||||
from ... import Index
|
||||
from ... import inspect
|
||||
from ... import Integer
|
||||
from ... import schema
|
||||
from ... import String
|
||||
from ... import UniqueConstraint
|
||||
|
||||
|
||||
class TableDDLTest(fixtures.TestBase):
|
||||
__backend__ = True
|
||||
|
||||
def _simple_fixture(self, schema=None):
|
||||
return Table(
|
||||
"test_table",
|
||||
self.metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("data", String(50)),
|
||||
schema=schema,
|
||||
)
|
||||
|
||||
def _underscore_fixture(self):
|
||||
return Table(
|
||||
"_test_table",
|
||||
self.metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("_data", String(50)),
|
||||
)
|
||||
|
||||
def _table_index_fixture(self, schema=None):
|
||||
table = self._simple_fixture(schema=schema)
|
||||
idx = Index("test_index", table.c.data)
|
||||
return table, idx
|
||||
|
||||
def _simple_roundtrip(self, table):
|
||||
with config.db.begin() as conn:
|
||||
conn.execute(table.insert().values((1, "some data")))
|
||||
result = conn.execute(table.select())
|
||||
eq_(result.first(), (1, "some data"))
|
||||
|
||||
@requirements.create_table
|
||||
@util.provide_metadata
|
||||
def test_create_table(self):
|
||||
table = self._simple_fixture()
|
||||
table.create(config.db, checkfirst=False)
|
||||
self._simple_roundtrip(table)
|
||||
|
||||
@requirements.create_table
|
||||
@requirements.schemas
|
||||
@util.provide_metadata
|
||||
def test_create_table_schema(self):
|
||||
table = self._simple_fixture(schema=config.test_schema)
|
||||
table.create(config.db, checkfirst=False)
|
||||
self._simple_roundtrip(table)
|
||||
|
||||
@requirements.drop_table
|
||||
@util.provide_metadata
|
||||
def test_drop_table(self):
|
||||
table = self._simple_fixture()
|
||||
table.create(config.db, checkfirst=False)
|
||||
table.drop(config.db, checkfirst=False)
|
||||
|
||||
@requirements.create_table
|
||||
@util.provide_metadata
|
||||
def test_underscore_names(self):
|
||||
table = self._underscore_fixture()
|
||||
table.create(config.db, checkfirst=False)
|
||||
self._simple_roundtrip(table)
|
||||
|
||||
@requirements.comment_reflection
|
||||
@util.provide_metadata
|
||||
def test_add_table_comment(self, connection):
|
||||
table = self._simple_fixture()
|
||||
table.create(connection, checkfirst=False)
|
||||
table.comment = "a comment"
|
||||
connection.execute(schema.SetTableComment(table))
|
||||
eq_(
|
||||
inspect(connection).get_table_comment("test_table"),
|
||||
{"text": "a comment"},
|
||||
)
|
||||
|
||||
@requirements.comment_reflection
|
||||
@util.provide_metadata
|
||||
def test_drop_table_comment(self, connection):
|
||||
table = self._simple_fixture()
|
||||
table.create(connection, checkfirst=False)
|
||||
table.comment = "a comment"
|
||||
connection.execute(schema.SetTableComment(table))
|
||||
connection.execute(schema.DropTableComment(table))
|
||||
eq_(
|
||||
inspect(connection).get_table_comment("test_table"), {"text": None}
|
||||
)
|
||||
|
||||
@requirements.table_ddl_if_exists
|
||||
@util.provide_metadata
|
||||
def test_create_table_if_not_exists(self, connection):
|
||||
table = self._simple_fixture()
|
||||
|
||||
connection.execute(schema.CreateTable(table, if_not_exists=True))
|
||||
|
||||
is_true(inspect(connection).has_table("test_table"))
|
||||
connection.execute(schema.CreateTable(table, if_not_exists=True))
|
||||
|
||||
@requirements.index_ddl_if_exists
|
||||
@util.provide_metadata
|
||||
def test_create_index_if_not_exists(self, connection):
|
||||
table, idx = self._table_index_fixture()
|
||||
|
||||
connection.execute(schema.CreateTable(table, if_not_exists=True))
|
||||
is_true(inspect(connection).has_table("test_table"))
|
||||
is_false(
|
||||
"test_index"
|
||||
in [
|
||||
ix["name"]
|
||||
for ix in inspect(connection).get_indexes("test_table")
|
||||
]
|
||||
)
|
||||
|
||||
connection.execute(schema.CreateIndex(idx, if_not_exists=True))
|
||||
|
||||
is_true(
|
||||
"test_index"
|
||||
in [
|
||||
ix["name"]
|
||||
for ix in inspect(connection).get_indexes("test_table")
|
||||
]
|
||||
)
|
||||
|
||||
connection.execute(schema.CreateIndex(idx, if_not_exists=True))
|
||||
|
||||
@requirements.table_ddl_if_exists
|
||||
@util.provide_metadata
|
||||
def test_drop_table_if_exists(self, connection):
|
||||
table = self._simple_fixture()
|
||||
|
||||
table.create(connection)
|
||||
|
||||
is_true(inspect(connection).has_table("test_table"))
|
||||
|
||||
connection.execute(schema.DropTable(table, if_exists=True))
|
||||
|
||||
is_false(inspect(connection).has_table("test_table"))
|
||||
|
||||
connection.execute(schema.DropTable(table, if_exists=True))
|
||||
|
||||
@requirements.index_ddl_if_exists
|
||||
@util.provide_metadata
|
||||
def test_drop_index_if_exists(self, connection):
|
||||
table, idx = self._table_index_fixture()
|
||||
|
||||
table.create(connection)
|
||||
|
||||
is_true(
|
||||
"test_index"
|
||||
in [
|
||||
ix["name"]
|
||||
for ix in inspect(connection).get_indexes("test_table")
|
||||
]
|
||||
)
|
||||
|
||||
connection.execute(schema.DropIndex(idx, if_exists=True))
|
||||
|
||||
is_false(
|
||||
"test_index"
|
||||
in [
|
||||
ix["name"]
|
||||
for ix in inspect(connection).get_indexes("test_table")
|
||||
]
|
||||
)
|
||||
|
||||
connection.execute(schema.DropIndex(idx, if_exists=True))
|
||||
|
||||
|
||||
class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest):
|
||||
pass
|
||||
|
||||
|
||||
class LongNameBlowoutTest(fixtures.TestBase):
|
||||
"""test the creation of a variety of DDL structures and ensure
|
||||
label length limits pass on backends
|
||||
|
||||
"""
|
||||
|
||||
__backend__ = True
|
||||
|
||||
def fk(self, metadata, connection):
|
||||
convention = {
|
||||
"fk": "foreign_key_%(table_name)s_"
|
||||
"%(column_0_N_name)s_"
|
||||
"%(referred_table_name)s_"
|
||||
+ (
|
||||
"_".join(
|
||||
"".join(random.choice("abcdef") for j in range(20))
|
||||
for i in range(10)
|
||||
)
|
||||
),
|
||||
}
|
||||
metadata.naming_convention = convention
|
||||
|
||||
Table(
|
||||
"a_things_with_stuff",
|
||||
metadata,
|
||||
Column("id_long_column_name", Integer, primary_key=True),
|
||||
test_needs_fk=True,
|
||||
)
|
||||
|
||||
cons = ForeignKeyConstraint(
|
||||
["aid"], ["a_things_with_stuff.id_long_column_name"]
|
||||
)
|
||||
Table(
|
||||
"b_related_things_of_value",
|
||||
metadata,
|
||||
Column(
|
||||
"aid",
|
||||
),
|
||||
cons,
|
||||
test_needs_fk=True,
|
||||
)
|
||||
actual_name = cons.name
|
||||
|
||||
metadata.create_all(connection)
|
||||
|
||||
if testing.requires.foreign_key_constraint_name_reflection.enabled:
|
||||
insp = inspect(connection)
|
||||
fks = insp.get_foreign_keys("b_related_things_of_value")
|
||||
reflected_name = fks[0]["name"]
|
||||
|
||||
return actual_name, reflected_name
|
||||
else:
|
||||
return actual_name, None
|
||||
|
||||
def pk(self, metadata, connection):
|
||||
convention = {
|
||||
"pk": "primary_key_%(table_name)s_"
|
||||
"%(column_0_N_name)s"
|
||||
+ (
|
||||
"_".join(
|
||||
"".join(random.choice("abcdef") for j in range(30))
|
||||
for i in range(10)
|
||||
)
|
||||
),
|
||||
}
|
||||
metadata.naming_convention = convention
|
||||
|
||||
a = Table(
|
||||
"a_things_with_stuff",
|
||||
metadata,
|
||||
Column("id_long_column_name", Integer, primary_key=True),
|
||||
Column("id_another_long_name", Integer, primary_key=True),
|
||||
)
|
||||
cons = a.primary_key
|
||||
actual_name = cons.name
|
||||
|
||||
metadata.create_all(connection)
|
||||
insp = inspect(connection)
|
||||
pk = insp.get_pk_constraint("a_things_with_stuff")
|
||||
reflected_name = pk["name"]
|
||||
return actual_name, reflected_name
|
||||
|
||||
def ix(self, metadata, connection):
|
||||
convention = {
|
||||
"ix": "index_%(table_name)s_"
|
||||
"%(column_0_N_name)s"
|
||||
+ (
|
||||
"_".join(
|
||||
"".join(random.choice("abcdef") for j in range(30))
|
||||
for i in range(10)
|
||||
)
|
||||
),
|
||||
}
|
||||
metadata.naming_convention = convention
|
||||
|
||||
a = Table(
|
||||
"a_things_with_stuff",
|
||||
metadata,
|
||||
Column("id_long_column_name", Integer, primary_key=True),
|
||||
Column("id_another_long_name", Integer),
|
||||
)
|
||||
cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name)
|
||||
actual_name = cons.name
|
||||
|
||||
metadata.create_all(connection)
|
||||
insp = inspect(connection)
|
||||
ix = insp.get_indexes("a_things_with_stuff")
|
||||
reflected_name = ix[0]["name"]
|
||||
return actual_name, reflected_name
|
||||
|
||||
def uq(self, metadata, connection):
|
||||
convention = {
|
||||
"uq": "unique_constraint_%(table_name)s_"
|
||||
"%(column_0_N_name)s"
|
||||
+ (
|
||||
"_".join(
|
||||
"".join(random.choice("abcdef") for j in range(30))
|
||||
for i in range(10)
|
||||
)
|
||||
),
|
||||
}
|
||||
metadata.naming_convention = convention
|
||||
|
||||
cons = UniqueConstraint("id_long_column_name", "id_another_long_name")
|
||||
Table(
|
||||
"a_things_with_stuff",
|
||||
metadata,
|
||||
Column("id_long_column_name", Integer, primary_key=True),
|
||||
Column("id_another_long_name", Integer),
|
||||
cons,
|
||||
)
|
||||
actual_name = cons.name
|
||||
|
||||
metadata.create_all(connection)
|
||||
insp = inspect(connection)
|
||||
uq = insp.get_unique_constraints("a_things_with_stuff")
|
||||
reflected_name = uq[0]["name"]
|
||||
return actual_name, reflected_name
|
||||
|
||||
def ck(self, metadata, connection):
|
||||
convention = {
|
||||
"ck": "check_constraint_%(table_name)s"
|
||||
+ (
|
||||
"_".join(
|
||||
"".join(random.choice("abcdef") for j in range(30))
|
||||
for i in range(10)
|
||||
)
|
||||
),
|
||||
}
|
||||
metadata.naming_convention = convention
|
||||
|
||||
cons = CheckConstraint("some_long_column_name > 5")
|
||||
Table(
|
||||
"a_things_with_stuff",
|
||||
metadata,
|
||||
Column("id_long_column_name", Integer, primary_key=True),
|
||||
Column("some_long_column_name", Integer),
|
||||
cons,
|
||||
)
|
||||
actual_name = cons.name
|
||||
|
||||
metadata.create_all(connection)
|
||||
insp = inspect(connection)
|
||||
ck = insp.get_check_constraints("a_things_with_stuff")
|
||||
reflected_name = ck[0]["name"]
|
||||
return actual_name, reflected_name
|
||||
|
||||
@testing.combinations(
|
||||
("fk",),
|
||||
("pk",),
|
||||
("ix",),
|
||||
("ck", testing.requires.check_constraint_reflection.as_skips()),
|
||||
("uq", testing.requires.unique_constraint_reflection.as_skips()),
|
||||
argnames="type_",
|
||||
)
|
||||
def test_long_convention_name(self, type_, metadata, connection):
|
||||
actual_name, reflected_name = getattr(self, type_)(
|
||||
metadata, connection
|
||||
)
|
||||
|
||||
assert len(actual_name) > 255
|
||||
|
||||
if reflected_name is not None:
|
||||
overlap = actual_name[0 : len(reflected_name)]
|
||||
if len(overlap) < len(actual_name):
|
||||
eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5])
|
||||
else:
|
||||
eq_(overlap, reflected_name)
|
||||
|
||||
|
||||
__all__ = ("TableDDLTest", "FutureTableDDLTest", "LongNameBlowoutTest")
|
||||
145
lib/sqlalchemy/testing/suite/test_deprecations.py
Normal file
145
lib/sqlalchemy/testing/suite/test_deprecations.py
Normal file
@@ -0,0 +1,145 @@
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import Integer
|
||||
from ... import select
|
||||
from ... import testing
|
||||
from ... import union
|
||||
|
||||
|
||||
class DeprecatedCompoundSelectTest(fixtures.TablesTest):
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"some_table",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("x", Integer),
|
||||
Column("y", Integer),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, connection):
|
||||
connection.execute(
|
||||
cls.tables.some_table.insert(),
|
||||
[
|
||||
{"id": 1, "x": 1, "y": 2},
|
||||
{"id": 2, "x": 2, "y": 3},
|
||||
{"id": 3, "x": 3, "y": 4},
|
||||
{"id": 4, "x": 4, "y": 5},
|
||||
],
|
||||
)
|
||||
|
||||
def _assert_result(self, conn, select, result, params=()):
|
||||
eq_(conn.execute(select, params).fetchall(), result)
|
||||
|
||||
def test_plain_union(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = select(table).where(table.c.id == 2)
|
||||
s2 = select(table).where(table.c.id == 3)
|
||||
|
||||
u1 = union(s1, s2)
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
|
||||
# note we've had to remove one use case entirely, which is this
|
||||
# one. the Select gets its FROMS from the WHERE clause and the
|
||||
# columns clause, but not the ORDER BY, which means the old ".c" system
|
||||
# allowed you to "order_by(s.c.foo)" to get an unnamed column in the
|
||||
# ORDER BY without adding the SELECT into the FROM and breaking the
|
||||
# query. Users will have to adjust for this use case if they were doing
|
||||
# it before.
|
||||
def _dont_test_select_from_plain_union(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = select(table).where(table.c.id == 2)
|
||||
s2 = select(table).where(table.c.id == 3)
|
||||
|
||||
u1 = union(s1, s2).alias().select()
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
|
||||
@testing.requires.order_by_col_from_union
|
||||
@testing.requires.parens_in_union_contained_select_w_limit_offset
|
||||
def test_limit_offset_selectable_in_unions(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = select(table).where(table.c.id == 2).limit(1).order_by(table.c.id)
|
||||
s2 = select(table).where(table.c.id == 3).limit(1).order_by(table.c.id)
|
||||
|
||||
u1 = union(s1, s2).limit(2)
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
|
||||
@testing.requires.parens_in_union_contained_select_wo_limit_offset
|
||||
def test_order_by_selectable_in_unions(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = select(table).where(table.c.id == 2).order_by(table.c.id)
|
||||
s2 = select(table).where(table.c.id == 3).order_by(table.c.id)
|
||||
|
||||
u1 = union(s1, s2).limit(2)
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
|
||||
def test_distinct_selectable_in_unions(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = select(table).where(table.c.id == 2).distinct()
|
||||
s2 = select(table).where(table.c.id == 3).distinct()
|
||||
|
||||
u1 = union(s1, s2).limit(2)
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
|
||||
def test_limit_offset_aliased_selectable_in_unions(self, connection):
|
||||
table = self.tables.some_table
|
||||
s1 = (
|
||||
select(table)
|
||||
.where(table.c.id == 2)
|
||||
.limit(1)
|
||||
.order_by(table.c.id)
|
||||
.alias()
|
||||
.select()
|
||||
)
|
||||
s2 = (
|
||||
select(table)
|
||||
.where(table.c.id == 3)
|
||||
.limit(1)
|
||||
.order_by(table.c.id)
|
||||
.alias()
|
||||
.select()
|
||||
)
|
||||
|
||||
u1 = union(s1, s2).limit(2)
|
||||
with testing.expect_deprecated(
|
||||
"The SelectBase.c and SelectBase.columns "
|
||||
"attributes are deprecated"
|
||||
):
|
||||
self._assert_result(
|
||||
connection, u1.order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
|
||||
)
|
||||
361
lib/sqlalchemy/testing/suite/test_dialect.py
Normal file
361
lib/sqlalchemy/testing/suite/test_dialect.py
Normal file
@@ -0,0 +1,361 @@
|
||||
#! coding: utf-8
|
||||
|
||||
from . import testing
|
||||
from .. import assert_raises
|
||||
from .. import config
|
||||
from .. import engines
|
||||
from .. import eq_
|
||||
from .. import fixtures
|
||||
from .. import ne_
|
||||
from .. import provide_metadata
|
||||
from ..config import requirements
|
||||
from ..provision import set_default_schema_on_connection
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import bindparam
|
||||
from ... import event
|
||||
from ... import exc
|
||||
from ... import Integer
|
||||
from ... import literal_column
|
||||
from ... import select
|
||||
from ... import String
|
||||
from ...util import compat
|
||||
|
||||
|
||||
class ExceptionTest(fixtures.TablesTest):
|
||||
"""Test basic exception wrapping.
|
||||
|
||||
DBAPIs vary a lot in exception behavior so to actually anticipate
|
||||
specific exceptions from real round trips, we need to be conservative.
|
||||
|
||||
"""
|
||||
|
||||
run_deletes = "each"
|
||||
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"manual_pk",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
@requirements.duplicate_key_raises_integrity_error
|
||||
def test_integrity_error(self):
|
||||
|
||||
with config.db.connect() as conn:
|
||||
|
||||
trans = conn.begin()
|
||||
conn.execute(
|
||||
self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}
|
||||
)
|
||||
|
||||
assert_raises(
|
||||
exc.IntegrityError,
|
||||
conn.execute,
|
||||
self.tables.manual_pk.insert(),
|
||||
{"id": 1, "data": "d1"},
|
||||
)
|
||||
|
||||
trans.rollback()
|
||||
|
||||
def test_exception_with_non_ascii(self):
|
||||
with config.db.connect() as conn:
|
||||
try:
|
||||
# try to create an error message that likely has non-ascii
|
||||
# characters in the DBAPI's message string. unfortunately
|
||||
# there's no way to make this happen with some drivers like
|
||||
# mysqlclient, pymysql. this at least does produce a non-
|
||||
# ascii error message for cx_oracle, psycopg2
|
||||
conn.execute(select(literal_column(u"méil")))
|
||||
assert False
|
||||
except exc.DBAPIError as err:
|
||||
err_str = str(err)
|
||||
|
||||
assert str(err.orig) in str(err)
|
||||
|
||||
# test that we are actually getting string on Py2k, unicode
|
||||
# on Py3k.
|
||||
if compat.py2k:
|
||||
assert isinstance(err_str, str)
|
||||
else:
|
||||
assert isinstance(err_str, str)
|
||||
|
||||
|
||||
class IsolationLevelTest(fixtures.TestBase):
|
||||
__backend__ = True
|
||||
|
||||
__requires__ = ("isolation_level",)
|
||||
|
||||
def _get_non_default_isolation_level(self):
|
||||
levels = requirements.get_isolation_levels(config)
|
||||
|
||||
default = levels["default"]
|
||||
supported = levels["supported"]
|
||||
|
||||
s = set(supported).difference(["AUTOCOMMIT", default])
|
||||
if s:
|
||||
return s.pop()
|
||||
else:
|
||||
config.skip_test("no non-default isolation level available")
|
||||
|
||||
def test_default_isolation_level(self):
|
||||
eq_(
|
||||
config.db.dialect.default_isolation_level,
|
||||
requirements.get_isolation_levels(config)["default"],
|
||||
)
|
||||
|
||||
def test_non_default_isolation_level(self):
|
||||
non_default = self._get_non_default_isolation_level()
|
||||
|
||||
with config.db.connect() as conn:
|
||||
existing = conn.get_isolation_level()
|
||||
|
||||
ne_(existing, non_default)
|
||||
|
||||
conn.execution_options(isolation_level=non_default)
|
||||
|
||||
eq_(conn.get_isolation_level(), non_default)
|
||||
|
||||
conn.dialect.reset_isolation_level(conn.connection)
|
||||
|
||||
eq_(conn.get_isolation_level(), existing)
|
||||
|
||||
def test_all_levels(self):
|
||||
levels = requirements.get_isolation_levels(config)
|
||||
|
||||
all_levels = levels["supported"]
|
||||
|
||||
for level in set(all_levels).difference(["AUTOCOMMIT"]):
|
||||
with config.db.connect() as conn:
|
||||
conn.execution_options(isolation_level=level)
|
||||
|
||||
eq_(conn.get_isolation_level(), level)
|
||||
|
||||
trans = conn.begin()
|
||||
trans.rollback()
|
||||
|
||||
eq_(conn.get_isolation_level(), level)
|
||||
|
||||
with config.db.connect() as conn:
|
||||
eq_(
|
||||
conn.get_isolation_level(),
|
||||
levels["default"],
|
||||
)
|
||||
|
||||
|
||||
class AutocommitIsolationTest(fixtures.TablesTest):
|
||||
|
||||
run_deletes = "each"
|
||||
|
||||
__requires__ = ("autocommit",)
|
||||
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"some_table",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("data", String(50)),
|
||||
test_needs_acid=True,
|
||||
)
|
||||
|
||||
def _test_conn_autocommits(self, conn, autocommit):
|
||||
trans = conn.begin()
|
||||
conn.execute(
|
||||
self.tables.some_table.insert(), {"id": 1, "data": "some data"}
|
||||
)
|
||||
trans.rollback()
|
||||
|
||||
eq_(
|
||||
conn.scalar(select(self.tables.some_table.c.id)),
|
||||
1 if autocommit else None,
|
||||
)
|
||||
|
||||
with conn.begin():
|
||||
conn.execute(self.tables.some_table.delete())
|
||||
|
||||
def test_autocommit_on(self, connection_no_trans):
|
||||
conn = connection_no_trans
|
||||
c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
|
||||
self._test_conn_autocommits(c2, True)
|
||||
|
||||
c2.dialect.reset_isolation_level(c2.connection)
|
||||
|
||||
self._test_conn_autocommits(conn, False)
|
||||
|
||||
def test_autocommit_off(self, connection_no_trans):
|
||||
conn = connection_no_trans
|
||||
self._test_conn_autocommits(conn, False)
|
||||
|
||||
def test_turn_autocommit_off_via_default_iso_level(
|
||||
self, connection_no_trans
|
||||
):
|
||||
conn = connection_no_trans
|
||||
conn = conn.execution_options(isolation_level="AUTOCOMMIT")
|
||||
self._test_conn_autocommits(conn, True)
|
||||
|
||||
conn.execution_options(
|
||||
isolation_level=requirements.get_isolation_levels(config)[
|
||||
"default"
|
||||
]
|
||||
)
|
||||
self._test_conn_autocommits(conn, False)
|
||||
|
||||
|
||||
class EscapingTest(fixtures.TestBase):
|
||||
@provide_metadata
|
||||
def test_percent_sign_round_trip(self):
|
||||
"""test that the DBAPI accommodates for escaped / nonescaped
|
||||
percent signs in a way that matches the compiler
|
||||
|
||||
"""
|
||||
m = self.metadata
|
||||
t = Table("t", m, Column("data", String(50)))
|
||||
t.create(config.db)
|
||||
with config.db.begin() as conn:
|
||||
conn.execute(t.insert(), dict(data="some % value"))
|
||||
conn.execute(t.insert(), dict(data="some %% other value"))
|
||||
|
||||
eq_(
|
||||
conn.scalar(
|
||||
select(t.c.data).where(
|
||||
t.c.data == literal_column("'some % value'")
|
||||
)
|
||||
),
|
||||
"some % value",
|
||||
)
|
||||
|
||||
eq_(
|
||||
conn.scalar(
|
||||
select(t.c.data).where(
|
||||
t.c.data == literal_column("'some %% other value'")
|
||||
)
|
||||
),
|
||||
"some %% other value",
|
||||
)
|
||||
|
||||
|
||||
class WeCanSetDefaultSchemaWEventsTest(fixtures.TestBase):
|
||||
__backend__ = True
|
||||
|
||||
__requires__ = ("default_schema_name_switch",)
|
||||
|
||||
def test_control_case(self):
|
||||
default_schema_name = config.db.dialect.default_schema_name
|
||||
|
||||
eng = engines.testing_engine()
|
||||
with eng.connect():
|
||||
pass
|
||||
|
||||
eq_(eng.dialect.default_schema_name, default_schema_name)
|
||||
|
||||
def test_wont_work_wo_insert(self):
|
||||
default_schema_name = config.db.dialect.default_schema_name
|
||||
|
||||
eng = engines.testing_engine()
|
||||
|
||||
@event.listens_for(eng, "connect")
|
||||
def on_connect(dbapi_connection, connection_record):
|
||||
set_default_schema_on_connection(
|
||||
config, dbapi_connection, config.test_schema
|
||||
)
|
||||
|
||||
with eng.connect() as conn:
|
||||
what_it_should_be = eng.dialect._get_default_schema_name(conn)
|
||||
eq_(what_it_should_be, config.test_schema)
|
||||
|
||||
eq_(eng.dialect.default_schema_name, default_schema_name)
|
||||
|
||||
def test_schema_change_on_connect(self):
|
||||
eng = engines.testing_engine()
|
||||
|
||||
@event.listens_for(eng, "connect", insert=True)
|
||||
def on_connect(dbapi_connection, connection_record):
|
||||
set_default_schema_on_connection(
|
||||
config, dbapi_connection, config.test_schema
|
||||
)
|
||||
|
||||
with eng.connect() as conn:
|
||||
what_it_should_be = eng.dialect._get_default_schema_name(conn)
|
||||
eq_(what_it_should_be, config.test_schema)
|
||||
|
||||
eq_(eng.dialect.default_schema_name, config.test_schema)
|
||||
|
||||
def test_schema_change_works_w_transactions(self):
|
||||
eng = engines.testing_engine()
|
||||
|
||||
@event.listens_for(eng, "connect", insert=True)
|
||||
def on_connect(dbapi_connection, *arg):
|
||||
set_default_schema_on_connection(
|
||||
config, dbapi_connection, config.test_schema
|
||||
)
|
||||
|
||||
with eng.connect() as conn:
|
||||
trans = conn.begin()
|
||||
what_it_should_be = eng.dialect._get_default_schema_name(conn)
|
||||
eq_(what_it_should_be, config.test_schema)
|
||||
trans.rollback()
|
||||
|
||||
what_it_should_be = eng.dialect._get_default_schema_name(conn)
|
||||
eq_(what_it_should_be, config.test_schema)
|
||||
|
||||
eq_(eng.dialect.default_schema_name, config.test_schema)
|
||||
|
||||
|
||||
class FutureWeCanSetDefaultSchemaWEventsTest(
|
||||
fixtures.FutureEngineMixin, WeCanSetDefaultSchemaWEventsTest
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class DifficultParametersTest(fixtures.TestBase):
|
||||
__backend__ = True
|
||||
|
||||
@testing.combinations(
|
||||
("boring",),
|
||||
("per cent",),
|
||||
("per % cent",),
|
||||
("%percent",),
|
||||
("par(ens)",),
|
||||
("percent%(ens)yah",),
|
||||
("col:ons",),
|
||||
("more :: %colons%",),
|
||||
("/slashes/",),
|
||||
("more/slashes",),
|
||||
("q?marks",),
|
||||
("1param",),
|
||||
("1col:on",),
|
||||
argnames="name",
|
||||
)
|
||||
def test_round_trip(self, name, connection, metadata):
|
||||
t = Table(
|
||||
"t",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column(name, String(50), nullable=False),
|
||||
)
|
||||
|
||||
# table is created
|
||||
t.create(connection)
|
||||
|
||||
# automatic param generated by insert
|
||||
connection.execute(t.insert().values({"id": 1, name: "some name"}))
|
||||
|
||||
# automatic param generated by criteria, plus selecting the column
|
||||
stmt = select(t.c[name]).where(t.c[name] == "some name")
|
||||
|
||||
eq_(connection.scalar(stmt), "some name")
|
||||
|
||||
# use the name in a param explicitly
|
||||
stmt = select(t.c[name]).where(t.c[name] == bindparam(name))
|
||||
|
||||
row = connection.execute(stmt, {name: "some name"}).first()
|
||||
|
||||
# name works as the key from cursor.description
|
||||
eq_(row._mapping[name], "some name")
|
||||
367
lib/sqlalchemy/testing/suite/test_insert.py
Normal file
367
lib/sqlalchemy/testing/suite/test_insert.py
Normal file
@@ -0,0 +1,367 @@
|
||||
from .. import config
|
||||
from .. import engines
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..config import requirements
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import Integer
|
||||
from ... import literal
|
||||
from ... import literal_column
|
||||
from ... import select
|
||||
from ... import String
|
||||
|
||||
|
||||
class LastrowidTest(fixtures.TablesTest):
|
||||
run_deletes = "each"
|
||||
|
||||
__backend__ = True
|
||||
|
||||
__requires__ = "implements_get_lastrowid", "autoincrement_insert"
|
||||
|
||||
__engine_options__ = {"implicit_returning": False}
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"autoinc_pk",
|
||||
metadata,
|
||||
Column(
|
||||
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
||||
),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
Table(
|
||||
"manual_pk",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
def _assert_round_trip(self, table, conn):
|
||||
row = conn.execute(table.select()).first()
|
||||
eq_(
|
||||
row,
|
||||
(
|
||||
conn.dialect.default_sequence_base,
|
||||
"some data",
|
||||
),
|
||||
)
|
||||
|
||||
def test_autoincrement_on_insert(self, connection):
|
||||
|
||||
connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
self._assert_round_trip(self.tables.autoinc_pk, connection)
|
||||
|
||||
def test_last_inserted_id(self, connection):
|
||||
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
|
||||
eq_(r.inserted_primary_key, (pk,))
|
||||
|
||||
@requirements.dbapi_lastrowid
|
||||
def test_native_lastrowid_autoinc(self, connection):
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
lastrowid = r.lastrowid
|
||||
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
|
||||
eq_(lastrowid, pk)
|
||||
|
||||
|
||||
class InsertBehaviorTest(fixtures.TablesTest):
|
||||
run_deletes = "each"
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"autoinc_pk",
|
||||
metadata,
|
||||
Column(
|
||||
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
||||
),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
Table(
|
||||
"manual_pk",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True, autoincrement=False),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
Table(
|
||||
"includes_defaults",
|
||||
metadata,
|
||||
Column(
|
||||
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
||||
),
|
||||
Column("data", String(50)),
|
||||
Column("x", Integer, default=5),
|
||||
Column(
|
||||
"y",
|
||||
Integer,
|
||||
default=literal_column("2", type_=Integer) + literal(2),
|
||||
),
|
||||
)
|
||||
|
||||
@requirements.autoincrement_insert
|
||||
def test_autoclose_on_insert(self):
|
||||
if requirements.returning.enabled:
|
||||
engine = engines.testing_engine(
|
||||
options={"implicit_returning": False}
|
||||
)
|
||||
else:
|
||||
engine = config.db
|
||||
|
||||
with engine.begin() as conn:
|
||||
r = conn.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
assert r._soft_closed
|
||||
assert not r.closed
|
||||
assert r.is_insert
|
||||
|
||||
# new as of I8091919d45421e3f53029b8660427f844fee0228; for the moment
|
||||
# an insert where the PK was taken from a row that the dialect
|
||||
# selected, as is the case for mssql/pyodbc, will still report
|
||||
# returns_rows as true because there's a cursor description. in that
|
||||
# case, the row had to have been consumed at least.
|
||||
assert not r.returns_rows or r.fetchone() is None
|
||||
|
||||
@requirements.returning
|
||||
def test_autoclose_on_insert_implicit_returning(self, connection):
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
assert r._soft_closed
|
||||
assert not r.closed
|
||||
assert r.is_insert
|
||||
|
||||
# note we are experimenting with having this be True
|
||||
# as of I8091919d45421e3f53029b8660427f844fee0228 .
|
||||
# implicit returning has fetched the row, but it still is a
|
||||
# "returns rows"
|
||||
assert r.returns_rows
|
||||
|
||||
# and we should be able to fetchone() on it, we just get no row
|
||||
eq_(r.fetchone(), None)
|
||||
|
||||
# and the keys, etc.
|
||||
eq_(r.keys(), ["id"])
|
||||
|
||||
# but the dialect took in the row already. not really sure
|
||||
# what the best behavior is.
|
||||
|
||||
@requirements.empty_inserts
|
||||
def test_empty_insert(self, connection):
|
||||
r = connection.execute(self.tables.autoinc_pk.insert())
|
||||
assert r._soft_closed
|
||||
assert not r.closed
|
||||
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.select().where(
|
||||
self.tables.autoinc_pk.c.id != None
|
||||
)
|
||||
)
|
||||
eq_(len(r.all()), 1)
|
||||
|
||||
@requirements.empty_inserts_executemany
|
||||
def test_empty_insert_multiple(self, connection):
|
||||
r = connection.execute(self.tables.autoinc_pk.insert(), [{}, {}, {}])
|
||||
assert r._soft_closed
|
||||
assert not r.closed
|
||||
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.select().where(
|
||||
self.tables.autoinc_pk.c.id != None
|
||||
)
|
||||
)
|
||||
|
||||
eq_(len(r.all()), 3)
|
||||
|
||||
@requirements.insert_from_select
|
||||
def test_insert_from_select_autoinc(self, connection):
|
||||
src_table = self.tables.manual_pk
|
||||
dest_table = self.tables.autoinc_pk
|
||||
connection.execute(
|
||||
src_table.insert(),
|
||||
[
|
||||
dict(id=1, data="data1"),
|
||||
dict(id=2, data="data2"),
|
||||
dict(id=3, data="data3"),
|
||||
],
|
||||
)
|
||||
|
||||
result = connection.execute(
|
||||
dest_table.insert().from_select(
|
||||
("data",),
|
||||
select(src_table.c.data).where(
|
||||
src_table.c.data.in_(["data2", "data3"])
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
eq_(result.inserted_primary_key, (None,))
|
||||
|
||||
result = connection.execute(
|
||||
select(dest_table.c.data).order_by(dest_table.c.data)
|
||||
)
|
||||
eq_(result.fetchall(), [("data2",), ("data3",)])
|
||||
|
||||
@requirements.insert_from_select
|
||||
def test_insert_from_select_autoinc_no_rows(self, connection):
|
||||
src_table = self.tables.manual_pk
|
||||
dest_table = self.tables.autoinc_pk
|
||||
|
||||
result = connection.execute(
|
||||
dest_table.insert().from_select(
|
||||
("data",),
|
||||
select(src_table.c.data).where(
|
||||
src_table.c.data.in_(["data2", "data3"])
|
||||
),
|
||||
)
|
||||
)
|
||||
eq_(result.inserted_primary_key, (None,))
|
||||
|
||||
result = connection.execute(
|
||||
select(dest_table.c.data).order_by(dest_table.c.data)
|
||||
)
|
||||
|
||||
eq_(result.fetchall(), [])
|
||||
|
||||
@requirements.insert_from_select
|
||||
def test_insert_from_select(self, connection):
|
||||
table = self.tables.manual_pk
|
||||
connection.execute(
|
||||
table.insert(),
|
||||
[
|
||||
dict(id=1, data="data1"),
|
||||
dict(id=2, data="data2"),
|
||||
dict(id=3, data="data3"),
|
||||
],
|
||||
)
|
||||
|
||||
connection.execute(
|
||||
table.insert()
|
||||
.inline()
|
||||
.from_select(
|
||||
("id", "data"),
|
||||
select(table.c.id + 5, table.c.data).where(
|
||||
table.c.data.in_(["data2", "data3"])
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(table.c.data).order_by(table.c.data)
|
||||
).fetchall(),
|
||||
[("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
|
||||
)
|
||||
|
||||
@requirements.insert_from_select
|
||||
def test_insert_from_select_with_defaults(self, connection):
|
||||
table = self.tables.includes_defaults
|
||||
connection.execute(
|
||||
table.insert(),
|
||||
[
|
||||
dict(id=1, data="data1"),
|
||||
dict(id=2, data="data2"),
|
||||
dict(id=3, data="data3"),
|
||||
],
|
||||
)
|
||||
|
||||
connection.execute(
|
||||
table.insert()
|
||||
.inline()
|
||||
.from_select(
|
||||
("id", "data"),
|
||||
select(table.c.id + 5, table.c.data).where(
|
||||
table.c.data.in_(["data2", "data3"])
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
eq_(
|
||||
connection.execute(
|
||||
select(table).order_by(table.c.data, table.c.id)
|
||||
).fetchall(),
|
||||
[
|
||||
(1, "data1", 5, 4),
|
||||
(2, "data2", 5, 4),
|
||||
(7, "data2", 5, 4),
|
||||
(3, "data3", 5, 4),
|
||||
(8, "data3", 5, 4),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class ReturningTest(fixtures.TablesTest):
|
||||
run_create_tables = "each"
|
||||
__requires__ = "returning", "autoincrement_insert"
|
||||
__backend__ = True
|
||||
|
||||
__engine_options__ = {"implicit_returning": True}
|
||||
|
||||
def _assert_round_trip(self, table, conn):
|
||||
row = conn.execute(table.select()).first()
|
||||
eq_(
|
||||
row,
|
||||
(
|
||||
conn.dialect.default_sequence_base,
|
||||
"some data",
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"autoinc_pk",
|
||||
metadata,
|
||||
Column(
|
||||
"id", Integer, primary_key=True, test_needs_autoincrement=True
|
||||
),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
@requirements.fetch_rows_post_commit
|
||||
def test_explicit_returning_pk_autocommit(self, connection):
|
||||
table = self.tables.autoinc_pk
|
||||
r = connection.execute(
|
||||
table.insert().returning(table.c.id), dict(data="some data")
|
||||
)
|
||||
pk = r.first()[0]
|
||||
fetched_pk = connection.scalar(select(table.c.id))
|
||||
eq_(fetched_pk, pk)
|
||||
|
||||
def test_explicit_returning_pk_no_autocommit(self, connection):
|
||||
table = self.tables.autoinc_pk
|
||||
r = connection.execute(
|
||||
table.insert().returning(table.c.id), dict(data="some data")
|
||||
)
|
||||
pk = r.first()[0]
|
||||
fetched_pk = connection.scalar(select(table.c.id))
|
||||
eq_(fetched_pk, pk)
|
||||
|
||||
def test_autoincrement_on_insert_implicit_returning(self, connection):
|
||||
|
||||
connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
self._assert_round_trip(self.tables.autoinc_pk, connection)
|
||||
|
||||
def test_last_inserted_id_implicit_returning(self, connection):
|
||||
|
||||
r = connection.execute(
|
||||
self.tables.autoinc_pk.insert(), dict(data="some data")
|
||||
)
|
||||
pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
|
||||
eq_(r.inserted_primary_key, (pk,))
|
||||
|
||||
|
||||
__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")
|
||||
1738
lib/sqlalchemy/testing/suite/test_reflection.py
Normal file
1738
lib/sqlalchemy/testing/suite/test_reflection.py
Normal file
File diff suppressed because it is too large
Load Diff
426
lib/sqlalchemy/testing/suite/test_results.py
Normal file
426
lib/sqlalchemy/testing/suite/test_results.py
Normal file
@@ -0,0 +1,426 @@
|
||||
import datetime
|
||||
|
||||
from .. import engines
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..config import requirements
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import DateTime
|
||||
from ... import func
|
||||
from ... import Integer
|
||||
from ... import select
|
||||
from ... import sql
|
||||
from ... import String
|
||||
from ... import testing
|
||||
from ... import text
|
||||
from ... import util
|
||||
|
||||
|
||||
class RowFetchTest(fixtures.TablesTest):
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"plain_pk",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
Table(
|
||||
"has_dates",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("today", DateTime),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, connection):
|
||||
connection.execute(
|
||||
cls.tables.plain_pk.insert(),
|
||||
[
|
||||
{"id": 1, "data": "d1"},
|
||||
{"id": 2, "data": "d2"},
|
||||
{"id": 3, "data": "d3"},
|
||||
],
|
||||
)
|
||||
|
||||
connection.execute(
|
||||
cls.tables.has_dates.insert(),
|
||||
[{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}],
|
||||
)
|
||||
|
||||
def test_via_attr(self, connection):
|
||||
row = connection.execute(
|
||||
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
|
||||
).first()
|
||||
|
||||
eq_(row.id, 1)
|
||||
eq_(row.data, "d1")
|
||||
|
||||
def test_via_string(self, connection):
|
||||
row = connection.execute(
|
||||
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
|
||||
).first()
|
||||
|
||||
eq_(row._mapping["id"], 1)
|
||||
eq_(row._mapping["data"], "d1")
|
||||
|
||||
def test_via_int(self, connection):
|
||||
row = connection.execute(
|
||||
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
|
||||
).first()
|
||||
|
||||
eq_(row[0], 1)
|
||||
eq_(row[1], "d1")
|
||||
|
||||
def test_via_col_object(self, connection):
|
||||
row = connection.execute(
|
||||
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
|
||||
).first()
|
||||
|
||||
eq_(row._mapping[self.tables.plain_pk.c.id], 1)
|
||||
eq_(row._mapping[self.tables.plain_pk.c.data], "d1")
|
||||
|
||||
@requirements.duplicate_names_in_cursor_description
|
||||
def test_row_with_dupe_names(self, connection):
|
||||
result = connection.execute(
|
||||
select(
|
||||
self.tables.plain_pk.c.data,
|
||||
self.tables.plain_pk.c.data.label("data"),
|
||||
).order_by(self.tables.plain_pk.c.id)
|
||||
)
|
||||
row = result.first()
|
||||
eq_(result.keys(), ["data", "data"])
|
||||
eq_(row, ("d1", "d1"))
|
||||
|
||||
def test_row_w_scalar_select(self, connection):
|
||||
"""test that a scalar select as a column is returned as such
|
||||
and that type conversion works OK.
|
||||
|
||||
(this is half a SQLAlchemy Core test and half to catch database
|
||||
backends that may have unusual behavior with scalar selects.)
|
||||
|
||||
"""
|
||||
datetable = self.tables.has_dates
|
||||
s = select(datetable.alias("x").c.today).scalar_subquery()
|
||||
s2 = select(datetable.c.id, s.label("somelabel"))
|
||||
row = connection.execute(s2).first()
|
||||
|
||||
eq_(row.somelabel, datetime.datetime(2006, 5, 12, 12, 0, 0))
|
||||
|
||||
|
||||
class PercentSchemaNamesTest(fixtures.TablesTest):
|
||||
"""tests using percent signs, spaces in table and column names.
|
||||
|
||||
This didn't work for PostgreSQL / MySQL drivers for a long time
|
||||
but is now supported.
|
||||
|
||||
"""
|
||||
|
||||
__requires__ = ("percent_schema_names",)
|
||||
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
cls.tables.percent_table = Table(
|
||||
"percent%table",
|
||||
metadata,
|
||||
Column("percent%", Integer),
|
||||
Column("spaces % more spaces", Integer),
|
||||
)
|
||||
cls.tables.lightweight_percent_table = sql.table(
|
||||
"percent%table",
|
||||
sql.column("percent%"),
|
||||
sql.column("spaces % more spaces"),
|
||||
)
|
||||
|
||||
def test_single_roundtrip(self, connection):
|
||||
percent_table = self.tables.percent_table
|
||||
for params in [
|
||||
{"percent%": 5, "spaces % more spaces": 12},
|
||||
{"percent%": 7, "spaces % more spaces": 11},
|
||||
{"percent%": 9, "spaces % more spaces": 10},
|
||||
{"percent%": 11, "spaces % more spaces": 9},
|
||||
]:
|
||||
connection.execute(percent_table.insert(), params)
|
||||
self._assert_table(connection)
|
||||
|
||||
def test_executemany_roundtrip(self, connection):
|
||||
percent_table = self.tables.percent_table
|
||||
connection.execute(
|
||||
percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12}
|
||||
)
|
||||
connection.execute(
|
||||
percent_table.insert(),
|
||||
[
|
||||
{"percent%": 7, "spaces % more spaces": 11},
|
||||
{"percent%": 9, "spaces % more spaces": 10},
|
||||
{"percent%": 11, "spaces % more spaces": 9},
|
||||
],
|
||||
)
|
||||
self._assert_table(connection)
|
||||
|
||||
def _assert_table(self, conn):
|
||||
percent_table = self.tables.percent_table
|
||||
lightweight_percent_table = self.tables.lightweight_percent_table
|
||||
|
||||
for table in (
|
||||
percent_table,
|
||||
percent_table.alias(),
|
||||
lightweight_percent_table,
|
||||
lightweight_percent_table.alias(),
|
||||
):
|
||||
eq_(
|
||||
list(
|
||||
conn.execute(table.select().order_by(table.c["percent%"]))
|
||||
),
|
||||
[(5, 12), (7, 11), (9, 10), (11, 9)],
|
||||
)
|
||||
|
||||
eq_(
|
||||
list(
|
||||
conn.execute(
|
||||
table.select()
|
||||
.where(table.c["spaces % more spaces"].in_([9, 10]))
|
||||
.order_by(table.c["percent%"])
|
||||
)
|
||||
),
|
||||
[(9, 10), (11, 9)],
|
||||
)
|
||||
|
||||
row = conn.execute(
|
||||
table.select().order_by(table.c["percent%"])
|
||||
).first()
|
||||
eq_(row._mapping["percent%"], 5)
|
||||
eq_(row._mapping["spaces % more spaces"], 12)
|
||||
|
||||
eq_(row._mapping[table.c["percent%"]], 5)
|
||||
eq_(row._mapping[table.c["spaces % more spaces"]], 12)
|
||||
|
||||
conn.execute(
|
||||
percent_table.update().values(
|
||||
{percent_table.c["spaces % more spaces"]: 15}
|
||||
)
|
||||
)
|
||||
|
||||
eq_(
|
||||
list(
|
||||
conn.execute(
|
||||
percent_table.select().order_by(
|
||||
percent_table.c["percent%"]
|
||||
)
|
||||
)
|
||||
),
|
||||
[(5, 15), (7, 15), (9, 15), (11, 15)],
|
||||
)
|
||||
|
||||
|
||||
class ServerSideCursorsTest(
|
||||
fixtures.TestBase, testing.AssertsExecutionResults
|
||||
):
|
||||
|
||||
__requires__ = ("server_side_cursors",)
|
||||
|
||||
__backend__ = True
|
||||
|
||||
def _is_server_side(self, cursor):
|
||||
# TODO: this is a huge issue as it prevents these tests from being
|
||||
# usable by third party dialects.
|
||||
if self.engine.dialect.driver == "psycopg2":
|
||||
return bool(cursor.name)
|
||||
elif self.engine.dialect.driver == "pymysql":
|
||||
sscursor = __import__("pymysql.cursors").cursors.SSCursor
|
||||
return isinstance(cursor, sscursor)
|
||||
elif self.engine.dialect.driver in ("aiomysql", "asyncmy"):
|
||||
return cursor.server_side
|
||||
elif self.engine.dialect.driver == "mysqldb":
|
||||
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
|
||||
return isinstance(cursor, sscursor)
|
||||
elif self.engine.dialect.driver == "mariadbconnector":
|
||||
return not cursor.buffered
|
||||
elif self.engine.dialect.driver in ("asyncpg", "aiosqlite"):
|
||||
return cursor.server_side
|
||||
elif self.engine.dialect.driver == "pg8000":
|
||||
return getattr(cursor, "server_side", False)
|
||||
else:
|
||||
return False
|
||||
|
||||
def _fixture(self, server_side_cursors):
|
||||
if server_side_cursors:
|
||||
with testing.expect_deprecated(
|
||||
"The create_engine.server_side_cursors parameter is "
|
||||
"deprecated and will be removed in a future release. "
|
||||
"Please use the Connection.execution_options.stream_results "
|
||||
"parameter."
|
||||
):
|
||||
self.engine = engines.testing_engine(
|
||||
options={"server_side_cursors": server_side_cursors}
|
||||
)
|
||||
else:
|
||||
self.engine = engines.testing_engine(
|
||||
options={"server_side_cursors": server_side_cursors}
|
||||
)
|
||||
return self.engine
|
||||
|
||||
@testing.combinations(
|
||||
("global_string", True, "select 1", True),
|
||||
("global_text", True, text("select 1"), True),
|
||||
("global_expr", True, select(1), True),
|
||||
("global_off_explicit", False, text("select 1"), False),
|
||||
(
|
||||
"stmt_option",
|
||||
False,
|
||||
select(1).execution_options(stream_results=True),
|
||||
True,
|
||||
),
|
||||
(
|
||||
"stmt_option_disabled",
|
||||
True,
|
||||
select(1).execution_options(stream_results=False),
|
||||
False,
|
||||
),
|
||||
("for_update_expr", True, select(1).with_for_update(), True),
|
||||
# TODO: need a real requirement for this, or dont use this test
|
||||
(
|
||||
"for_update_string",
|
||||
True,
|
||||
"SELECT 1 FOR UPDATE",
|
||||
True,
|
||||
testing.skip_if("sqlite"),
|
||||
),
|
||||
("text_no_ss", False, text("select 42"), False),
|
||||
(
|
||||
"text_ss_option",
|
||||
False,
|
||||
text("select 42").execution_options(stream_results=True),
|
||||
True,
|
||||
),
|
||||
id_="iaaa",
|
||||
argnames="engine_ss_arg, statement, cursor_ss_status",
|
||||
)
|
||||
def test_ss_cursor_status(
|
||||
self, engine_ss_arg, statement, cursor_ss_status
|
||||
):
|
||||
engine = self._fixture(engine_ss_arg)
|
||||
with engine.begin() as conn:
|
||||
if isinstance(statement, util.string_types):
|
||||
result = conn.exec_driver_sql(statement)
|
||||
else:
|
||||
result = conn.execute(statement)
|
||||
eq_(self._is_server_side(result.cursor), cursor_ss_status)
|
||||
result.close()
|
||||
|
||||
def test_conn_option(self):
|
||||
engine = self._fixture(False)
|
||||
|
||||
with engine.connect() as conn:
|
||||
# should be enabled for this one
|
||||
result = conn.execution_options(
|
||||
stream_results=True
|
||||
).exec_driver_sql("select 1")
|
||||
assert self._is_server_side(result.cursor)
|
||||
|
||||
def test_stmt_enabled_conn_option_disabled(self):
|
||||
engine = self._fixture(False)
|
||||
|
||||
s = select(1).execution_options(stream_results=True)
|
||||
|
||||
with engine.connect() as conn:
|
||||
# not this one
|
||||
result = conn.execution_options(stream_results=False).execute(s)
|
||||
assert not self._is_server_side(result.cursor)
|
||||
|
||||
def test_aliases_and_ss(self):
|
||||
engine = self._fixture(False)
|
||||
s1 = (
|
||||
select(sql.literal_column("1").label("x"))
|
||||
.execution_options(stream_results=True)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
# options don't propagate out when subquery is used as a FROM clause
|
||||
with engine.begin() as conn:
|
||||
result = conn.execute(s1.select())
|
||||
assert not self._is_server_side(result.cursor)
|
||||
result.close()
|
||||
|
||||
s2 = select(1).select_from(s1)
|
||||
with engine.begin() as conn:
|
||||
result = conn.execute(s2)
|
||||
assert not self._is_server_side(result.cursor)
|
||||
result.close()
|
||||
|
||||
def test_roundtrip_fetchall(self, metadata):
|
||||
md = self.metadata
|
||||
|
||||
engine = self._fixture(True)
|
||||
test_table = Table(
|
||||
"test_table",
|
||||
md,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
with engine.begin() as connection:
|
||||
test_table.create(connection, checkfirst=True)
|
||||
connection.execute(test_table.insert(), dict(data="data1"))
|
||||
connection.execute(test_table.insert(), dict(data="data2"))
|
||||
eq_(
|
||||
connection.execute(
|
||||
test_table.select().order_by(test_table.c.id)
|
||||
).fetchall(),
|
||||
[(1, "data1"), (2, "data2")],
|
||||
)
|
||||
connection.execute(
|
||||
test_table.update()
|
||||
.where(test_table.c.id == 2)
|
||||
.values(data=test_table.c.data + " updated")
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
test_table.select().order_by(test_table.c.id)
|
||||
).fetchall(),
|
||||
[(1, "data1"), (2, "data2 updated")],
|
||||
)
|
||||
connection.execute(test_table.delete())
|
||||
eq_(
|
||||
connection.scalar(
|
||||
select(func.count("*")).select_from(test_table)
|
||||
),
|
||||
0,
|
||||
)
|
||||
|
||||
def test_roundtrip_fetchmany(self, metadata):
|
||||
md = self.metadata
|
||||
|
||||
engine = self._fixture(True)
|
||||
test_table = Table(
|
||||
"test_table",
|
||||
md,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
with engine.begin() as connection:
|
||||
test_table.create(connection, checkfirst=True)
|
||||
connection.execute(
|
||||
test_table.insert(),
|
||||
[dict(data="data%d" % i) for i in range(1, 20)],
|
||||
)
|
||||
|
||||
result = connection.execute(
|
||||
test_table.select().order_by(test_table.c.id)
|
||||
)
|
||||
|
||||
eq_(
|
||||
result.fetchmany(5),
|
||||
[(i, "data%d" % i) for i in range(1, 6)],
|
||||
)
|
||||
eq_(
|
||||
result.fetchmany(10),
|
||||
[(i, "data%d" % i) for i in range(6, 16)],
|
||||
)
|
||||
eq_(result.fetchall(), [(i, "data%d" % i) for i in range(16, 20)])
|
||||
165
lib/sqlalchemy/testing/suite/test_rowcount.py
Normal file
165
lib/sqlalchemy/testing/suite/test_rowcount.py
Normal file
@@ -0,0 +1,165 @@
|
||||
from sqlalchemy import bindparam
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy import testing
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.testing import eq_
|
||||
from sqlalchemy.testing import fixtures
|
||||
|
||||
|
||||
class RowCountTest(fixtures.TablesTest):
|
||||
"""test rowcount functionality"""
|
||||
|
||||
__requires__ = ("sane_rowcount",)
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"employees",
|
||||
metadata,
|
||||
Column(
|
||||
"employee_id",
|
||||
Integer,
|
||||
autoincrement=False,
|
||||
primary_key=True,
|
||||
),
|
||||
Column("name", String(50)),
|
||||
Column("department", String(1)),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, connection):
|
||||
cls.data = data = [
|
||||
("Angela", "A"),
|
||||
("Andrew", "A"),
|
||||
("Anand", "A"),
|
||||
("Bob", "B"),
|
||||
("Bobette", "B"),
|
||||
("Buffy", "B"),
|
||||
("Charlie", "C"),
|
||||
("Cynthia", "C"),
|
||||
("Chris", "C"),
|
||||
]
|
||||
|
||||
employees_table = cls.tables.employees
|
||||
connection.execute(
|
||||
employees_table.insert(),
|
||||
[
|
||||
{"employee_id": i, "name": n, "department": d}
|
||||
for i, (n, d) in enumerate(data)
|
||||
],
|
||||
)
|
||||
|
||||
def test_basic(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
s = select(
|
||||
employees_table.c.name, employees_table.c.department
|
||||
).order_by(employees_table.c.employee_id)
|
||||
rows = connection.execute(s).fetchall()
|
||||
|
||||
eq_(rows, self.data)
|
||||
|
||||
def test_update_rowcount1(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
|
||||
# WHERE matches 3, 3 rows changed
|
||||
department = employees_table.c.department
|
||||
r = connection.execute(
|
||||
employees_table.update().where(department == "C"),
|
||||
{"department": "Z"},
|
||||
)
|
||||
assert r.rowcount == 3
|
||||
|
||||
def test_update_rowcount2(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
|
||||
# WHERE matches 3, 0 rows changed
|
||||
department = employees_table.c.department
|
||||
|
||||
r = connection.execute(
|
||||
employees_table.update().where(department == "C"),
|
||||
{"department": "C"},
|
||||
)
|
||||
eq_(r.rowcount, 3)
|
||||
|
||||
@testing.requires.sane_rowcount_w_returning
|
||||
def test_update_rowcount_return_defaults(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
|
||||
department = employees_table.c.department
|
||||
stmt = (
|
||||
employees_table.update()
|
||||
.where(department == "C")
|
||||
.values(name=employees_table.c.department + "Z")
|
||||
.return_defaults()
|
||||
)
|
||||
|
||||
r = connection.execute(stmt)
|
||||
eq_(r.rowcount, 3)
|
||||
|
||||
def test_raw_sql_rowcount(self, connection):
|
||||
# test issue #3622, make sure eager rowcount is called for text
|
||||
result = connection.exec_driver_sql(
|
||||
"update employees set department='Z' where department='C'"
|
||||
)
|
||||
eq_(result.rowcount, 3)
|
||||
|
||||
def test_text_rowcount(self, connection):
|
||||
# test issue #3622, make sure eager rowcount is called for text
|
||||
result = connection.execute(
|
||||
text("update employees set department='Z' " "where department='C'")
|
||||
)
|
||||
eq_(result.rowcount, 3)
|
||||
|
||||
def test_delete_rowcount(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
|
||||
# WHERE matches 3, 3 rows deleted
|
||||
department = employees_table.c.department
|
||||
r = connection.execute(
|
||||
employees_table.delete().where(department == "C")
|
||||
)
|
||||
eq_(r.rowcount, 3)
|
||||
|
||||
@testing.requires.sane_multi_rowcount
|
||||
def test_multi_update_rowcount(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
stmt = (
|
||||
employees_table.update()
|
||||
.where(employees_table.c.name == bindparam("emp_name"))
|
||||
.values(department="C")
|
||||
)
|
||||
|
||||
r = connection.execute(
|
||||
stmt,
|
||||
[
|
||||
{"emp_name": "Bob"},
|
||||
{"emp_name": "Cynthia"},
|
||||
{"emp_name": "nonexistent"},
|
||||
],
|
||||
)
|
||||
|
||||
eq_(r.rowcount, 2)
|
||||
|
||||
@testing.requires.sane_multi_rowcount
|
||||
def test_multi_delete_rowcount(self, connection):
|
||||
employees_table = self.tables.employees
|
||||
|
||||
stmt = employees_table.delete().where(
|
||||
employees_table.c.name == bindparam("emp_name")
|
||||
)
|
||||
|
||||
r = connection.execute(
|
||||
stmt,
|
||||
[
|
||||
{"emp_name": "Bob"},
|
||||
{"emp_name": "Cynthia"},
|
||||
{"emp_name": "nonexistent"},
|
||||
],
|
||||
)
|
||||
|
||||
eq_(r.rowcount, 2)
|
||||
1783
lib/sqlalchemy/testing/suite/test_select.py
Normal file
1783
lib/sqlalchemy/testing/suite/test_select.py
Normal file
File diff suppressed because it is too large
Load Diff
282
lib/sqlalchemy/testing/suite/test_sequence.py
Normal file
282
lib/sqlalchemy/testing/suite/test_sequence.py
Normal file
@@ -0,0 +1,282 @@
|
||||
from .. import config
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..assertions import is_true
|
||||
from ..config import requirements
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import inspect
|
||||
from ... import Integer
|
||||
from ... import MetaData
|
||||
from ... import Sequence
|
||||
from ... import String
|
||||
from ... import testing
|
||||
|
||||
|
||||
class SequenceTest(fixtures.TablesTest):
|
||||
__requires__ = ("sequences",)
|
||||
__backend__ = True
|
||||
|
||||
run_create_tables = "each"
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"seq_pk",
|
||||
metadata,
|
||||
Column(
|
||||
"id",
|
||||
Integer,
|
||||
Sequence("tab_id_seq"),
|
||||
primary_key=True,
|
||||
),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
Table(
|
||||
"seq_opt_pk",
|
||||
metadata,
|
||||
Column(
|
||||
"id",
|
||||
Integer,
|
||||
Sequence("tab_id_seq", data_type=Integer, optional=True),
|
||||
primary_key=True,
|
||||
),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
Table(
|
||||
"seq_no_returning",
|
||||
metadata,
|
||||
Column(
|
||||
"id",
|
||||
Integer,
|
||||
Sequence("noret_id_seq"),
|
||||
primary_key=True,
|
||||
),
|
||||
Column("data", String(50)),
|
||||
implicit_returning=False,
|
||||
)
|
||||
|
||||
if testing.requires.schemas.enabled:
|
||||
Table(
|
||||
"seq_no_returning_sch",
|
||||
metadata,
|
||||
Column(
|
||||
"id",
|
||||
Integer,
|
||||
Sequence("noret_sch_id_seq", schema=config.test_schema),
|
||||
primary_key=True,
|
||||
),
|
||||
Column("data", String(50)),
|
||||
implicit_returning=False,
|
||||
schema=config.test_schema,
|
||||
)
|
||||
|
||||
def test_insert_roundtrip(self, connection):
|
||||
connection.execute(self.tables.seq_pk.insert(), dict(data="some data"))
|
||||
self._assert_round_trip(self.tables.seq_pk, connection)
|
||||
|
||||
def test_insert_lastrowid(self, connection):
|
||||
r = connection.execute(
|
||||
self.tables.seq_pk.insert(), dict(data="some data")
|
||||
)
|
||||
eq_(
|
||||
r.inserted_primary_key, (testing.db.dialect.default_sequence_base,)
|
||||
)
|
||||
|
||||
def test_nextval_direct(self, connection):
|
||||
r = connection.execute(self.tables.seq_pk.c.id.default)
|
||||
eq_(r, testing.db.dialect.default_sequence_base)
|
||||
|
||||
@requirements.sequences_optional
|
||||
def test_optional_seq(self, connection):
|
||||
r = connection.execute(
|
||||
self.tables.seq_opt_pk.insert(), dict(data="some data")
|
||||
)
|
||||
eq_(r.inserted_primary_key, (1,))
|
||||
|
||||
def _assert_round_trip(self, table, conn):
|
||||
row = conn.execute(table.select()).first()
|
||||
eq_(row, (testing.db.dialect.default_sequence_base, "some data"))
|
||||
|
||||
def test_insert_roundtrip_no_implicit_returning(self, connection):
|
||||
connection.execute(
|
||||
self.tables.seq_no_returning.insert(), dict(data="some data")
|
||||
)
|
||||
self._assert_round_trip(self.tables.seq_no_returning, connection)
|
||||
|
||||
@testing.combinations((True,), (False,), argnames="implicit_returning")
|
||||
@testing.requires.schemas
|
||||
def test_insert_roundtrip_translate(self, connection, implicit_returning):
|
||||
|
||||
seq_no_returning = Table(
|
||||
"seq_no_returning_sch",
|
||||
MetaData(),
|
||||
Column(
|
||||
"id",
|
||||
Integer,
|
||||
Sequence("noret_sch_id_seq", schema="alt_schema"),
|
||||
primary_key=True,
|
||||
),
|
||||
Column("data", String(50)),
|
||||
implicit_returning=implicit_returning,
|
||||
schema="alt_schema",
|
||||
)
|
||||
|
||||
connection = connection.execution_options(
|
||||
schema_translate_map={"alt_schema": config.test_schema}
|
||||
)
|
||||
connection.execute(seq_no_returning.insert(), dict(data="some data"))
|
||||
self._assert_round_trip(seq_no_returning, connection)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_nextval_direct_schema_translate(self, connection):
|
||||
seq = Sequence("noret_sch_id_seq", schema="alt_schema")
|
||||
connection = connection.execution_options(
|
||||
schema_translate_map={"alt_schema": config.test_schema}
|
||||
)
|
||||
|
||||
r = connection.execute(seq)
|
||||
eq_(r, testing.db.dialect.default_sequence_base)
|
||||
|
||||
|
||||
class SequenceCompilerTest(testing.AssertsCompiledSQL, fixtures.TestBase):
|
||||
__requires__ = ("sequences",)
|
||||
__backend__ = True
|
||||
|
||||
def test_literal_binds_inline_compile(self, connection):
|
||||
table = Table(
|
||||
"x",
|
||||
MetaData(),
|
||||
Column("y", Integer, Sequence("y_seq")),
|
||||
Column("q", Integer),
|
||||
)
|
||||
|
||||
stmt = table.insert().values(q=5)
|
||||
|
||||
seq_nextval = connection.dialect.statement_compiler(
|
||||
statement=None, dialect=connection.dialect
|
||||
).visit_sequence(Sequence("y_seq"))
|
||||
self.assert_compile(
|
||||
stmt,
|
||||
"INSERT INTO x (y, q) VALUES (%s, 5)" % (seq_nextval,),
|
||||
literal_binds=True,
|
||||
dialect=connection.dialect,
|
||||
)
|
||||
|
||||
|
||||
class HasSequenceTest(fixtures.TablesTest):
|
||||
run_deletes = None
|
||||
|
||||
__requires__ = ("sequences",)
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Sequence("user_id_seq", metadata=metadata)
|
||||
Sequence(
|
||||
"other_seq", metadata=metadata, nomaxvalue=True, nominvalue=True
|
||||
)
|
||||
if testing.requires.schemas.enabled:
|
||||
Sequence(
|
||||
"user_id_seq", schema=config.test_schema, metadata=metadata
|
||||
)
|
||||
Sequence(
|
||||
"schema_seq", schema=config.test_schema, metadata=metadata
|
||||
)
|
||||
Table(
|
||||
"user_id_table",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
)
|
||||
|
||||
def test_has_sequence(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence("user_id_seq"),
|
||||
True,
|
||||
)
|
||||
|
||||
def test_has_sequence_other_object(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence("user_id_table"),
|
||||
False,
|
||||
)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_has_sequence_schema(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence(
|
||||
"user_id_seq", schema=config.test_schema
|
||||
),
|
||||
True,
|
||||
)
|
||||
|
||||
def test_has_sequence_neg(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence("some_sequence"),
|
||||
False,
|
||||
)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_has_sequence_schemas_neg(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence(
|
||||
"some_sequence", schema=config.test_schema
|
||||
),
|
||||
False,
|
||||
)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_has_sequence_default_not_in_remote(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence(
|
||||
"other_sequence", schema=config.test_schema
|
||||
),
|
||||
False,
|
||||
)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_has_sequence_remote_not_in_default(self, connection):
|
||||
eq_(
|
||||
inspect(connection).has_sequence("schema_seq"),
|
||||
False,
|
||||
)
|
||||
|
||||
def test_get_sequence_names(self, connection):
|
||||
exp = {"other_seq", "user_id_seq"}
|
||||
|
||||
res = set(inspect(connection).get_sequence_names())
|
||||
is_true(res.intersection(exp) == exp)
|
||||
is_true("schema_seq" not in res)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_get_sequence_names_no_sequence_schema(self, connection):
|
||||
eq_(
|
||||
inspect(connection).get_sequence_names(
|
||||
schema=config.test_schema_2
|
||||
),
|
||||
[],
|
||||
)
|
||||
|
||||
@testing.requires.schemas
|
||||
def test_get_sequence_names_sequences_schema(self, connection):
|
||||
eq_(
|
||||
sorted(
|
||||
inspect(connection).get_sequence_names(
|
||||
schema=config.test_schema
|
||||
)
|
||||
),
|
||||
["schema_seq", "user_id_seq"],
|
||||
)
|
||||
|
||||
|
||||
class HasSequenceTestEmpty(fixtures.TestBase):
|
||||
__requires__ = ("sequences",)
|
||||
__backend__ = True
|
||||
|
||||
def test_get_sequence_names_no_sequence(self, connection):
|
||||
eq_(
|
||||
inspect(connection).get_sequence_names(),
|
||||
[],
|
||||
)
|
||||
1508
lib/sqlalchemy/testing/suite/test_types.py
Normal file
1508
lib/sqlalchemy/testing/suite/test_types.py
Normal file
File diff suppressed because it is too large
Load Diff
206
lib/sqlalchemy/testing/suite/test_unicode_ddl.py
Normal file
206
lib/sqlalchemy/testing/suite/test_unicode_ddl.py
Normal file
@@ -0,0 +1,206 @@
|
||||
# coding: utf-8
|
||||
"""verrrrry basic unicode column name testing"""
|
||||
|
||||
from sqlalchemy import desc
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy import testing
|
||||
from sqlalchemy import util
|
||||
from sqlalchemy.testing import eq_
|
||||
from sqlalchemy.testing import fixtures
|
||||
from sqlalchemy.testing.schema import Column
|
||||
from sqlalchemy.testing.schema import Table
|
||||
from sqlalchemy.util import u
|
||||
from sqlalchemy.util import ue
|
||||
|
||||
|
||||
class UnicodeSchemaTest(fixtures.TablesTest):
|
||||
__requires__ = ("unicode_ddl",)
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
global t1, t2, t3
|
||||
|
||||
t1 = Table(
|
||||
u("unitable1"),
|
||||
metadata,
|
||||
Column(u("méil"), Integer, primary_key=True),
|
||||
Column(ue("\u6e2c\u8a66"), Integer),
|
||||
test_needs_fk=True,
|
||||
)
|
||||
t2 = Table(
|
||||
u("Unitéble2"),
|
||||
metadata,
|
||||
Column(u("méil"), Integer, primary_key=True, key="a"),
|
||||
Column(
|
||||
ue("\u6e2c\u8a66"),
|
||||
Integer,
|
||||
ForeignKey(u("unitable1.méil")),
|
||||
key="b",
|
||||
),
|
||||
test_needs_fk=True,
|
||||
)
|
||||
|
||||
# Few DBs support Unicode foreign keys
|
||||
if testing.against("sqlite"):
|
||||
t3 = Table(
|
||||
ue("\u6e2c\u8a66"),
|
||||
metadata,
|
||||
Column(
|
||||
ue("\u6e2c\u8a66_id"),
|
||||
Integer,
|
||||
primary_key=True,
|
||||
autoincrement=False,
|
||||
),
|
||||
Column(
|
||||
ue("unitable1_\u6e2c\u8a66"),
|
||||
Integer,
|
||||
ForeignKey(ue("unitable1.\u6e2c\u8a66")),
|
||||
),
|
||||
Column(
|
||||
u("Unitéble2_b"), Integer, ForeignKey(u("Unitéble2.b"))
|
||||
),
|
||||
Column(
|
||||
ue("\u6e2c\u8a66_self"),
|
||||
Integer,
|
||||
ForeignKey(ue("\u6e2c\u8a66.\u6e2c\u8a66_id")),
|
||||
),
|
||||
test_needs_fk=True,
|
||||
)
|
||||
else:
|
||||
t3 = Table(
|
||||
ue("\u6e2c\u8a66"),
|
||||
metadata,
|
||||
Column(
|
||||
ue("\u6e2c\u8a66_id"),
|
||||
Integer,
|
||||
primary_key=True,
|
||||
autoincrement=False,
|
||||
),
|
||||
Column(ue("unitable1_\u6e2c\u8a66"), Integer),
|
||||
Column(u("Unitéble2_b"), Integer),
|
||||
Column(ue("\u6e2c\u8a66_self"), Integer),
|
||||
test_needs_fk=True,
|
||||
)
|
||||
|
||||
def test_insert(self, connection):
|
||||
connection.execute(t1.insert(), {u("méil"): 1, ue("\u6e2c\u8a66"): 5})
|
||||
connection.execute(t2.insert(), {u("a"): 1, u("b"): 1})
|
||||
connection.execute(
|
||||
t3.insert(),
|
||||
{
|
||||
ue("\u6e2c\u8a66_id"): 1,
|
||||
ue("unitable1_\u6e2c\u8a66"): 5,
|
||||
u("Unitéble2_b"): 1,
|
||||
ue("\u6e2c\u8a66_self"): 1,
|
||||
},
|
||||
)
|
||||
|
||||
eq_(connection.execute(t1.select()).fetchall(), [(1, 5)])
|
||||
eq_(connection.execute(t2.select()).fetchall(), [(1, 1)])
|
||||
eq_(connection.execute(t3.select()).fetchall(), [(1, 5, 1, 1)])
|
||||
|
||||
def test_col_targeting(self, connection):
|
||||
connection.execute(t1.insert(), {u("méil"): 1, ue("\u6e2c\u8a66"): 5})
|
||||
connection.execute(t2.insert(), {u("a"): 1, u("b"): 1})
|
||||
connection.execute(
|
||||
t3.insert(),
|
||||
{
|
||||
ue("\u6e2c\u8a66_id"): 1,
|
||||
ue("unitable1_\u6e2c\u8a66"): 5,
|
||||
u("Unitéble2_b"): 1,
|
||||
ue("\u6e2c\u8a66_self"): 1,
|
||||
},
|
||||
)
|
||||
|
||||
row = connection.execute(t1.select()).first()
|
||||
eq_(row._mapping[t1.c[u("méil")]], 1)
|
||||
eq_(row._mapping[t1.c[ue("\u6e2c\u8a66")]], 5)
|
||||
|
||||
row = connection.execute(t2.select()).first()
|
||||
eq_(row._mapping[t2.c[u("a")]], 1)
|
||||
eq_(row._mapping[t2.c[u("b")]], 1)
|
||||
|
||||
row = connection.execute(t3.select()).first()
|
||||
eq_(row._mapping[t3.c[ue("\u6e2c\u8a66_id")]], 1)
|
||||
eq_(row._mapping[t3.c[ue("unitable1_\u6e2c\u8a66")]], 5)
|
||||
eq_(row._mapping[t3.c[u("Unitéble2_b")]], 1)
|
||||
eq_(row._mapping[t3.c[ue("\u6e2c\u8a66_self")]], 1)
|
||||
|
||||
def test_reflect(self, connection):
|
||||
connection.execute(t1.insert(), {u("méil"): 2, ue("\u6e2c\u8a66"): 7})
|
||||
connection.execute(t2.insert(), {u("a"): 2, u("b"): 2})
|
||||
connection.execute(
|
||||
t3.insert(),
|
||||
{
|
||||
ue("\u6e2c\u8a66_id"): 2,
|
||||
ue("unitable1_\u6e2c\u8a66"): 7,
|
||||
u("Unitéble2_b"): 2,
|
||||
ue("\u6e2c\u8a66_self"): 2,
|
||||
},
|
||||
)
|
||||
|
||||
meta = MetaData()
|
||||
tt1 = Table(t1.name, meta, autoload_with=connection)
|
||||
tt2 = Table(t2.name, meta, autoload_with=connection)
|
||||
tt3 = Table(t3.name, meta, autoload_with=connection)
|
||||
|
||||
connection.execute(tt1.insert(), {u("méil"): 1, ue("\u6e2c\u8a66"): 5})
|
||||
connection.execute(tt2.insert(), {u("méil"): 1, ue("\u6e2c\u8a66"): 1})
|
||||
connection.execute(
|
||||
tt3.insert(),
|
||||
{
|
||||
ue("\u6e2c\u8a66_id"): 1,
|
||||
ue("unitable1_\u6e2c\u8a66"): 5,
|
||||
u("Unitéble2_b"): 1,
|
||||
ue("\u6e2c\u8a66_self"): 1,
|
||||
},
|
||||
)
|
||||
|
||||
eq_(
|
||||
connection.execute(
|
||||
tt1.select().order_by(desc(u("méil")))
|
||||
).fetchall(),
|
||||
[(2, 7), (1, 5)],
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
tt2.select().order_by(desc(u("méil")))
|
||||
).fetchall(),
|
||||
[(2, 2), (1, 1)],
|
||||
)
|
||||
eq_(
|
||||
connection.execute(
|
||||
tt3.select().order_by(desc(ue("\u6e2c\u8a66_id")))
|
||||
).fetchall(),
|
||||
[(2, 7, 2, 2), (1, 5, 1, 1)],
|
||||
)
|
||||
|
||||
def test_repr(self):
|
||||
meta = MetaData()
|
||||
t = Table(
|
||||
ue("\u6e2c\u8a66"), meta, Column(ue("\u6e2c\u8a66_id"), Integer)
|
||||
)
|
||||
|
||||
if util.py2k:
|
||||
eq_(
|
||||
repr(t),
|
||||
(
|
||||
"Table('\\u6e2c\\u8a66', MetaData(), "
|
||||
"Column('\\u6e2c\\u8a66_id', Integer(), "
|
||||
"table=<\u6e2c\u8a66>), "
|
||||
"schema=None)"
|
||||
),
|
||||
)
|
||||
else:
|
||||
eq_(
|
||||
repr(t),
|
||||
(
|
||||
"Table('測試', MetaData(), "
|
||||
"Column('測試_id', Integer(), "
|
||||
"table=<測試>), "
|
||||
"schema=None)"
|
||||
),
|
||||
)
|
||||
60
lib/sqlalchemy/testing/suite/test_update_delete.py
Normal file
60
lib/sqlalchemy/testing/suite/test_update_delete.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from .. import fixtures
|
||||
from ..assertions import eq_
|
||||
from ..schema import Column
|
||||
from ..schema import Table
|
||||
from ... import Integer
|
||||
from ... import String
|
||||
|
||||
|
||||
class SimpleUpdateDeleteTest(fixtures.TablesTest):
|
||||
run_deletes = "each"
|
||||
__requires__ = ("sane_rowcount",)
|
||||
__backend__ = True
|
||||
|
||||
@classmethod
|
||||
def define_tables(cls, metadata):
|
||||
Table(
|
||||
"plain_pk",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("data", String(50)),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, connection):
|
||||
connection.execute(
|
||||
cls.tables.plain_pk.insert(),
|
||||
[
|
||||
{"id": 1, "data": "d1"},
|
||||
{"id": 2, "data": "d2"},
|
||||
{"id": 3, "data": "d3"},
|
||||
],
|
||||
)
|
||||
|
||||
def test_update(self, connection):
|
||||
t = self.tables.plain_pk
|
||||
r = connection.execute(
|
||||
t.update().where(t.c.id == 2), dict(data="d2_new")
|
||||
)
|
||||
assert not r.is_insert
|
||||
assert not r.returns_rows
|
||||
assert r.rowcount == 1
|
||||
|
||||
eq_(
|
||||
connection.execute(t.select().order_by(t.c.id)).fetchall(),
|
||||
[(1, "d1"), (2, "d2_new"), (3, "d3")],
|
||||
)
|
||||
|
||||
def test_delete(self, connection):
|
||||
t = self.tables.plain_pk
|
||||
r = connection.execute(t.delete().where(t.c.id == 2))
|
||||
assert not r.is_insert
|
||||
assert not r.returns_rows
|
||||
assert r.rowcount == 1
|
||||
eq_(
|
||||
connection.execute(t.select().order_by(t.c.id)).fetchall(),
|
||||
[(1, "d1"), (3, "d3")],
|
||||
)
|
||||
|
||||
|
||||
__all__ = ("SimpleUpdateDeleteTest",)
|
||||
Reference in New Issue
Block a user