From 1dac2263372df2b85db5d029a45721fa158a5c9d Mon Sep 17 00:00:00 2001 From: xiubuzhe Date: Sun, 8 Oct 2023 20:59:00 +0800 Subject: first add files --- lib/sqlalchemy/testing/suite/test_ddl.py | 381 +++++++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 lib/sqlalchemy/testing/suite/test_ddl.py (limited to 'lib/sqlalchemy/testing/suite/test_ddl.py') diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py new file mode 100644 index 0000000..b3fee55 --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -0,0 +1,381 @@ +import random + +from . import testing +from .. import config +from .. import fixtures +from .. import util +from ..assertions import eq_ +from ..assertions import is_false +from ..assertions import is_true +from ..config import requirements +from ..schema import Table +from ... import CheckConstraint +from ... import Column +from ... import ForeignKeyConstraint +from ... import Index +from ... import inspect +from ... import Integer +from ... import schema +from ... import String +from ... import UniqueConstraint + + +class TableDDLTest(fixtures.TestBase): + __backend__ = True + + def _simple_fixture(self, schema=None): + return Table( + "test_table", + self.metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("data", String(50)), + schema=schema, + ) + + def _underscore_fixture(self): + return Table( + "_test_table", + self.metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("_data", String(50)), + ) + + def _table_index_fixture(self, schema=None): + table = self._simple_fixture(schema=schema) + idx = Index("test_index", table.c.data) + return table, idx + + def _simple_roundtrip(self, table): + with config.db.begin() as conn: + conn.execute(table.insert().values((1, "some data"))) + result = conn.execute(table.select()) + eq_(result.first(), (1, "some data")) + + @requirements.create_table + @util.provide_metadata + def test_create_table(self): + table = self._simple_fixture() + table.create(config.db, checkfirst=False) + self._simple_roundtrip(table) + + @requirements.create_table + @requirements.schemas + @util.provide_metadata + def test_create_table_schema(self): + table = self._simple_fixture(schema=config.test_schema) + table.create(config.db, checkfirst=False) + self._simple_roundtrip(table) + + @requirements.drop_table + @util.provide_metadata + def test_drop_table(self): + table = self._simple_fixture() + table.create(config.db, checkfirst=False) + table.drop(config.db, checkfirst=False) + + @requirements.create_table + @util.provide_metadata + def test_underscore_names(self): + table = self._underscore_fixture() + table.create(config.db, checkfirst=False) + self._simple_roundtrip(table) + + @requirements.comment_reflection + @util.provide_metadata + def test_add_table_comment(self, connection): + table = self._simple_fixture() + table.create(connection, checkfirst=False) + table.comment = "a comment" + connection.execute(schema.SetTableComment(table)) + eq_( + inspect(connection).get_table_comment("test_table"), + {"text": "a comment"}, + ) + + @requirements.comment_reflection + @util.provide_metadata + def test_drop_table_comment(self, connection): + table = self._simple_fixture() + table.create(connection, checkfirst=False) + table.comment = "a comment" + connection.execute(schema.SetTableComment(table)) + connection.execute(schema.DropTableComment(table)) + eq_( + inspect(connection).get_table_comment("test_table"), {"text": None} + ) + + @requirements.table_ddl_if_exists + @util.provide_metadata + def test_create_table_if_not_exists(self, connection): + table = self._simple_fixture() + + connection.execute(schema.CreateTable(table, if_not_exists=True)) + + is_true(inspect(connection).has_table("test_table")) + connection.execute(schema.CreateTable(table, if_not_exists=True)) + + @requirements.index_ddl_if_exists + @util.provide_metadata + def test_create_index_if_not_exists(self, connection): + table, idx = self._table_index_fixture() + + connection.execute(schema.CreateTable(table, if_not_exists=True)) + is_true(inspect(connection).has_table("test_table")) + is_false( + "test_index" + in [ + ix["name"] + for ix in inspect(connection).get_indexes("test_table") + ] + ) + + connection.execute(schema.CreateIndex(idx, if_not_exists=True)) + + is_true( + "test_index" + in [ + ix["name"] + for ix in inspect(connection).get_indexes("test_table") + ] + ) + + connection.execute(schema.CreateIndex(idx, if_not_exists=True)) + + @requirements.table_ddl_if_exists + @util.provide_metadata + def test_drop_table_if_exists(self, connection): + table = self._simple_fixture() + + table.create(connection) + + is_true(inspect(connection).has_table("test_table")) + + connection.execute(schema.DropTable(table, if_exists=True)) + + is_false(inspect(connection).has_table("test_table")) + + connection.execute(schema.DropTable(table, if_exists=True)) + + @requirements.index_ddl_if_exists + @util.provide_metadata + def test_drop_index_if_exists(self, connection): + table, idx = self._table_index_fixture() + + table.create(connection) + + is_true( + "test_index" + in [ + ix["name"] + for ix in inspect(connection).get_indexes("test_table") + ] + ) + + connection.execute(schema.DropIndex(idx, if_exists=True)) + + is_false( + "test_index" + in [ + ix["name"] + for ix in inspect(connection).get_indexes("test_table") + ] + ) + + connection.execute(schema.DropIndex(idx, if_exists=True)) + + +class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest): + pass + + +class LongNameBlowoutTest(fixtures.TestBase): + """test the creation of a variety of DDL structures and ensure + label length limits pass on backends + + """ + + __backend__ = True + + def fk(self, metadata, connection): + convention = { + "fk": "foreign_key_%(table_name)s_" + "%(column_0_N_name)s_" + "%(referred_table_name)s_" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(20)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + test_needs_fk=True, + ) + + cons = ForeignKeyConstraint( + ["aid"], ["a_things_with_stuff.id_long_column_name"] + ) + Table( + "b_related_things_of_value", + metadata, + Column( + "aid", + ), + cons, + test_needs_fk=True, + ) + actual_name = cons.name + + metadata.create_all(connection) + + if testing.requires.foreign_key_constraint_name_reflection.enabled: + insp = inspect(connection) + fks = insp.get_foreign_keys("b_related_things_of_value") + reflected_name = fks[0]["name"] + + return actual_name, reflected_name + else: + return actual_name, None + + def pk(self, metadata, connection): + convention = { + "pk": "primary_key_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + a = Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer, primary_key=True), + ) + cons = a.primary_key + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + pk = insp.get_pk_constraint("a_things_with_stuff") + reflected_name = pk["name"] + return actual_name, reflected_name + + def ix(self, metadata, connection): + convention = { + "ix": "index_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + a = Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer), + ) + cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + ix = insp.get_indexes("a_things_with_stuff") + reflected_name = ix[0]["name"] + return actual_name, reflected_name + + def uq(self, metadata, connection): + convention = { + "uq": "unique_constraint_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + cons = UniqueConstraint("id_long_column_name", "id_another_long_name") + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer), + cons, + ) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + uq = insp.get_unique_constraints("a_things_with_stuff") + reflected_name = uq[0]["name"] + return actual_name, reflected_name + + def ck(self, metadata, connection): + convention = { + "ck": "check_constraint_%(table_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + cons = CheckConstraint("some_long_column_name > 5") + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("some_long_column_name", Integer), + cons, + ) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + ck = insp.get_check_constraints("a_things_with_stuff") + reflected_name = ck[0]["name"] + return actual_name, reflected_name + + @testing.combinations( + ("fk",), + ("pk",), + ("ix",), + ("ck", testing.requires.check_constraint_reflection.as_skips()), + ("uq", testing.requires.unique_constraint_reflection.as_skips()), + argnames="type_", + ) + def test_long_convention_name(self, type_, metadata, connection): + actual_name, reflected_name = getattr(self, type_)( + metadata, connection + ) + + assert len(actual_name) > 255 + + if reflected_name is not None: + overlap = actual_name[0 : len(reflected_name)] + if len(overlap) < len(actual_name): + eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5]) + else: + eq_(overlap, reflected_name) + + +__all__ = ("TableDDLTest", "FutureTableDDLTest", "LongNameBlowoutTest") -- cgit v1.2.3