diff options
Diffstat (limited to 'lib/sqlalchemy/testing/plugin')
-rw-r--r-- | lib/sqlalchemy/testing/plugin/__init__.py | 0 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/plugin/bootstrap.py | 54 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/plugin/plugin_base.py | 789 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/plugin/pytestplugin.py | 820 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/plugin/reinvent_fixtures_py2k.py | 112 |
5 files changed, 1775 insertions, 0 deletions
diff --git a/lib/sqlalchemy/testing/plugin/__init__.py b/lib/sqlalchemy/testing/plugin/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/sqlalchemy/testing/plugin/__init__.py diff --git a/lib/sqlalchemy/testing/plugin/bootstrap.py b/lib/sqlalchemy/testing/plugin/bootstrap.py new file mode 100644 index 0000000..6721f48 --- /dev/null +++ b/lib/sqlalchemy/testing/plugin/bootstrap.py @@ -0,0 +1,54 @@ +""" +Bootstrapper for test framework plugins. + +The entire rationale for this system is to get the modules in plugin/ +imported without importing all of the supporting library, so that we can +set up things for testing before coverage starts. + +The rationale for all of plugin/ being *in* the supporting library in the +first place is so that the testing and plugin suite is available to other +libraries, mainly external SQLAlchemy and Alembic dialects, to make use +of the same test environment and standard suites available to +SQLAlchemy/Alembic themselves without the need to ship/install a separate +package outside of SQLAlchemy. + + +""" + +import os +import sys + + +bootstrap_file = locals()["bootstrap_file"] +to_bootstrap = locals()["to_bootstrap"] + + +def load_file_as_module(name): + path = os.path.join(os.path.dirname(bootstrap_file), "%s.py" % name) + + if sys.version_info >= (3, 5): + import importlib.util + + spec = importlib.util.spec_from_file_location(name, path) + assert spec is not None + assert spec.loader is not None + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + else: + import imp + + mod = imp.load_source(name, path) + + return mod + + +if to_bootstrap == "pytest": + sys.modules["sqla_plugin_base"] = load_file_as_module("plugin_base") + sys.modules["sqla_plugin_base"].bootstrapped_as_sqlalchemy = True + if sys.version_info < (3, 0): + sys.modules["sqla_reinvent_fixtures"] = load_file_as_module( + "reinvent_fixtures_py2k" + ) + sys.modules["sqla_pytestplugin"] = load_file_as_module("pytestplugin") +else: + raise Exception("unknown bootstrap: %s" % to_bootstrap) # noqa diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py new file mode 100644 index 0000000..d59564e --- /dev/null +++ b/lib/sqlalchemy/testing/plugin/plugin_base.py @@ -0,0 +1,789 @@ +# plugin/plugin_base.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +"""Testing extensions. + +this module is designed to work as a testing-framework-agnostic library, +created so that multiple test frameworks can be supported at once +(mostly so that we can migrate to new ones). The current target +is pytest. + +""" + +from __future__ import absolute_import + +import abc +import logging +import re +import sys + +# flag which indicates we are in the SQLAlchemy testing suite, +# and not that of Alembic or a third party dialect. +bootstrapped_as_sqlalchemy = False + +log = logging.getLogger("sqlalchemy.testing.plugin_base") + + +py3k = sys.version_info >= (3, 0) + +if py3k: + import configparser + + ABC = abc.ABC +else: + import ConfigParser as configparser + import collections as collections_abc # noqa + + class ABC(object): + __metaclass__ = abc.ABCMeta + + +# late imports +fixtures = None +engines = None +exclusions = None +warnings = None +profiling = None +provision = None +assertions = None +requirements = None +config = None +testing = None +util = None +file_config = None + +logging = None +include_tags = set() +exclude_tags = set() +options = None + + +def setup_options(make_option): + make_option( + "--log-info", + action="callback", + type=str, + callback=_log, + help="turn on info logging for <LOG> (multiple OK)", + ) + make_option( + "--log-debug", + action="callback", + type=str, + callback=_log, + help="turn on debug logging for <LOG> (multiple OK)", + ) + make_option( + "--db", + action="append", + type=str, + dest="db", + help="Use prefab database uri. Multiple OK, " + "first one is run by default.", + ) + make_option( + "--dbs", + action="callback", + zeroarg_callback=_list_dbs, + help="List available prefab dbs", + ) + make_option( + "--dburi", + action="append", + type=str, + dest="dburi", + help="Database uri. Multiple OK, " "first one is run by default.", + ) + make_option( + "--dbdriver", + action="append", + type=str, + dest="dbdriver", + help="Additional database drivers to include in tests. " + "These are linked to the existing database URLs by the " + "provisioning system.", + ) + make_option( + "--dropfirst", + action="store_true", + dest="dropfirst", + help="Drop all tables in the target database first", + ) + make_option( + "--disable-asyncio", + action="store_true", + help="disable test / fixtures / provisoning running in asyncio", + ) + make_option( + "--backend-only", + action="store_true", + dest="backend_only", + help="Run only tests marked with __backend__ or __sparse_backend__", + ) + make_option( + "--nomemory", + action="store_true", + dest="nomemory", + help="Don't run memory profiling tests", + ) + make_option( + "--notimingintensive", + action="store_true", + dest="notimingintensive", + help="Don't run timing intensive tests", + ) + make_option( + "--profile-sort", + type=str, + default="cumulative", + dest="profilesort", + help="Type of sort for profiling standard output", + ) + make_option( + "--profile-dump", + type=str, + dest="profiledump", + help="Filename where a single profile run will be dumped", + ) + make_option( + "--postgresql-templatedb", + type=str, + help="name of template database to use for PostgreSQL " + "CREATE DATABASE (defaults to current database)", + ) + make_option( + "--low-connections", + action="store_true", + dest="low_connections", + help="Use a low number of distinct connections - " + "i.e. for Oracle TNS", + ) + make_option( + "--write-idents", + type=str, + dest="write_idents", + help="write out generated follower idents to <file>, " + "when -n<num> is used", + ) + make_option( + "--reversetop", + action="store_true", + dest="reversetop", + default=False, + help="Use a random-ordering set implementation in the ORM " + "(helps reveal dependency issues)", + ) + make_option( + "--requirements", + action="callback", + type=str, + callback=_requirements_opt, + help="requirements class for testing, overrides setup.cfg", + ) + make_option( + "--with-cdecimal", + action="store_true", + dest="cdecimal", + default=False, + help="Monkeypatch the cdecimal library into Python 'decimal' " + "for all tests", + ) + make_option( + "--include-tag", + action="callback", + callback=_include_tag, + type=str, + help="Include tests with tag <tag>", + ) + make_option( + "--exclude-tag", + action="callback", + callback=_exclude_tag, + type=str, + help="Exclude tests with tag <tag>", + ) + make_option( + "--write-profiles", + action="store_true", + dest="write_profiles", + default=False, + help="Write/update failing profiling data.", + ) + make_option( + "--force-write-profiles", + action="store_true", + dest="force_write_profiles", + default=False, + help="Unconditionally write/update profiling data.", + ) + make_option( + "--dump-pyannotate", + type=str, + dest="dump_pyannotate", + help="Run pyannotate and dump json info to given file", + ) + make_option( + "--mypy-extra-test-path", + type=str, + action="append", + default=[], + dest="mypy_extra_test_paths", + help="Additional test directories to add to the mypy tests. " + "This is used only when running mypy tests. Multiple OK", + ) + + +def configure_follower(follower_ident): + """Configure required state for a follower. + + This invokes in the parent process and typically includes + database creation. + + """ + from sqlalchemy.testing import provision + + provision.FOLLOWER_IDENT = follower_ident + + +def memoize_important_follower_config(dict_): + """Store important configuration we will need to send to a follower. + + This invokes in the parent process after normal config is set up. + + This is necessary as pytest seems to not be using forking, so we + start with nothing in memory, *but* it isn't running our argparse + callables, so we have to just copy all of that over. + + """ + dict_["memoized_config"] = { + "include_tags": include_tags, + "exclude_tags": exclude_tags, + } + + +def restore_important_follower_config(dict_): + """Restore important configuration needed by a follower. + + This invokes in the follower process. + + """ + global include_tags, exclude_tags + include_tags.update(dict_["memoized_config"]["include_tags"]) + exclude_tags.update(dict_["memoized_config"]["exclude_tags"]) + + +def read_config(): + global file_config + file_config = configparser.ConfigParser() + file_config.read(["setup.cfg", "test.cfg"]) + + +def pre_begin(opt): + """things to set up early, before coverage might be setup.""" + global options + options = opt + for fn in pre_configure: + fn(options, file_config) + + +def set_coverage_flag(value): + options.has_coverage = value + + +def post_begin(): + """things to set up later, once we know coverage is running.""" + # Lazy setup of other options (post coverage) + for fn in post_configure: + fn(options, file_config) + + # late imports, has to happen after config. + global util, fixtures, engines, exclusions, assertions, provision + global warnings, profiling, config, testing + from sqlalchemy import testing # noqa + from sqlalchemy.testing import fixtures, engines, exclusions # noqa + from sqlalchemy.testing import assertions, warnings, profiling # noqa + from sqlalchemy.testing import config, provision # noqa + from sqlalchemy import util # noqa + + warnings.setup_filters() + + +def _log(opt_str, value, parser): + global logging + if not logging: + import logging + + logging.basicConfig() + + if opt_str.endswith("-info"): + logging.getLogger(value).setLevel(logging.INFO) + elif opt_str.endswith("-debug"): + logging.getLogger(value).setLevel(logging.DEBUG) + + +def _list_dbs(*args): + print("Available --db options (use --dburi to override)") + for macro in sorted(file_config.options("db")): + print("%20s\t%s" % (macro, file_config.get("db", macro))) + sys.exit(0) + + +def _requirements_opt(opt_str, value, parser): + _setup_requirements(value) + + +def _exclude_tag(opt_str, value, parser): + exclude_tags.add(value.replace("-", "_")) + + +def _include_tag(opt_str, value, parser): + include_tags.add(value.replace("-", "_")) + + +pre_configure = [] +post_configure = [] + + +def pre(fn): + pre_configure.append(fn) + return fn + + +def post(fn): + post_configure.append(fn) + return fn + + +@pre +def _setup_options(opt, file_config): + global options + options = opt + + +@pre +def _set_nomemory(opt, file_config): + if opt.nomemory: + exclude_tags.add("memory_intensive") + + +@pre +def _set_notimingintensive(opt, file_config): + if opt.notimingintensive: + exclude_tags.add("timing_intensive") + + +@pre +def _monkeypatch_cdecimal(options, file_config): + if options.cdecimal: + import cdecimal + + sys.modules["decimal"] = cdecimal + + +@post +def _init_symbols(options, file_config): + from sqlalchemy.testing import config + + config._fixture_functions = _fixture_fn_class() + + +@post +def _set_disable_asyncio(opt, file_config): + if opt.disable_asyncio or not py3k: + from sqlalchemy.testing import asyncio + + asyncio.ENABLE_ASYNCIO = False + + +@post +def _engine_uri(options, file_config): + + from sqlalchemy import testing + from sqlalchemy.testing import config + from sqlalchemy.testing import provision + + if options.dburi: + db_urls = list(options.dburi) + else: + db_urls = [] + + extra_drivers = options.dbdriver or [] + + if options.db: + for db_token in options.db: + for db in re.split(r"[,\s]+", db_token): + if db not in file_config.options("db"): + raise RuntimeError( + "Unknown URI specifier '%s'. " + "Specify --dbs for known uris." % db + ) + else: + db_urls.append(file_config.get("db", db)) + + if not db_urls: + db_urls.append(file_config.get("db", "default")) + + config._current = None + + expanded_urls = list(provision.generate_db_urls(db_urls, extra_drivers)) + + for db_url in expanded_urls: + log.info("Adding database URL: %s", db_url) + + if options.write_idents and provision.FOLLOWER_IDENT: + with open(options.write_idents, "a") as file_: + file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n") + + cfg = provision.setup_config( + db_url, options, file_config, provision.FOLLOWER_IDENT + ) + if not config._current: + cfg.set_as_current(cfg, testing) + + +@post +def _requirements(options, file_config): + + requirement_cls = file_config.get("sqla_testing", "requirement_cls") + _setup_requirements(requirement_cls) + + +def _setup_requirements(argument): + from sqlalchemy.testing import config + from sqlalchemy import testing + + if config.requirements is not None: + return + + modname, clsname = argument.split(":") + + # importlib.import_module() only introduced in 2.7, a little + # late + mod = __import__(modname) + for component in modname.split(".")[1:]: + mod = getattr(mod, component) + req_cls = getattr(mod, clsname) + + config.requirements = testing.requires = req_cls() + + config.bootstrapped_as_sqlalchemy = bootstrapped_as_sqlalchemy + + +@post +def _prep_testing_database(options, file_config): + from sqlalchemy.testing import config + + if options.dropfirst: + from sqlalchemy.testing import provision + + for cfg in config.Config.all_configs(): + provision.drop_all_schema_objects(cfg, cfg.db) + + +@post +def _reverse_topological(options, file_config): + if options.reversetop: + from sqlalchemy.orm.util import randomize_unitofwork + + randomize_unitofwork() + + +@post +def _post_setup_options(opt, file_config): + from sqlalchemy.testing import config + + config.options = options + config.file_config = file_config + + +@post +def _setup_profiling(options, file_config): + from sqlalchemy.testing import profiling + + profiling._profile_stats = profiling.ProfileStatsFile( + file_config.get("sqla_testing", "profile_file"), + sort=options.profilesort, + dump=options.profiledump, + ) + + +def want_class(name, cls): + if not issubclass(cls, fixtures.TestBase): + return False + elif name.startswith("_"): + return False + elif ( + config.options.backend_only + and not getattr(cls, "__backend__", False) + and not getattr(cls, "__sparse_backend__", False) + and not getattr(cls, "__only_on__", False) + ): + return False + else: + return True + + +def want_method(cls, fn): + if not fn.__name__.startswith("test_"): + return False + elif fn.__module__ is None: + return False + elif include_tags: + return ( + hasattr(cls, "__tags__") + and exclusions.tags(cls.__tags__).include_test( + include_tags, exclude_tags + ) + ) or ( + hasattr(fn, "_sa_exclusion_extend") + and fn._sa_exclusion_extend.include_test( + include_tags, exclude_tags + ) + ) + elif exclude_tags and hasattr(cls, "__tags__"): + return exclusions.tags(cls.__tags__).include_test( + include_tags, exclude_tags + ) + elif exclude_tags and hasattr(fn, "_sa_exclusion_extend"): + return fn._sa_exclusion_extend.include_test(include_tags, exclude_tags) + else: + return True + + +def generate_sub_tests(cls, module): + if getattr(cls, "__backend__", False) or getattr( + cls, "__sparse_backend__", False + ): + sparse = getattr(cls, "__sparse_backend__", False) + for cfg in _possible_configs_for_cls(cls, sparse=sparse): + orig_name = cls.__name__ + + # we can have special chars in these names except for the + # pytest junit plugin, which is tripped up by the brackets + # and periods, so sanitize + + alpha_name = re.sub(r"[_\[\]\.]+", "_", cfg.name) + alpha_name = re.sub(r"_+$", "", alpha_name) + name = "%s_%s" % (cls.__name__, alpha_name) + subcls = type( + name, + (cls,), + {"_sa_orig_cls_name": orig_name, "__only_on_config__": cfg}, + ) + setattr(module, name, subcls) + yield subcls + else: + yield cls + + +def start_test_class_outside_fixtures(cls): + _do_skips(cls) + _setup_engine(cls) + + +def stop_test_class(cls): + # close sessions, immediate connections, etc. + fixtures.stop_test_class_inside_fixtures(cls) + + # close outstanding connection pool connections, dispose of + # additional engines + engines.testing_reaper.stop_test_class_inside_fixtures() + + +def stop_test_class_outside_fixtures(cls): + engines.testing_reaper.stop_test_class_outside_fixtures() + provision.stop_test_class_outside_fixtures(config, config.db, cls) + try: + if not options.low_connections: + assertions.global_cleanup_assertions() + finally: + _restore_engine() + + +def _restore_engine(): + if config._current: + config._current.reset(testing) + + +def final_process_cleanup(): + engines.testing_reaper.final_cleanup() + assertions.global_cleanup_assertions() + _restore_engine() + + +def _setup_engine(cls): + if getattr(cls, "__engine_options__", None): + opts = dict(cls.__engine_options__) + opts["scope"] = "class" + eng = engines.testing_engine(options=opts) + config._current.push_engine(eng, testing) + + +def before_test(test, test_module_name, test_class, test_name): + + # format looks like: + # "test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause" + + name = getattr(test_class, "_sa_orig_cls_name", test_class.__name__) + + id_ = "%s.%s.%s" % (test_module_name, name, test_name) + + profiling._start_current_test(id_) + + +def after_test(test): + fixtures.after_test() + engines.testing_reaper.after_test() + + +def after_test_fixtures(test): + engines.testing_reaper.after_test_outside_fixtures(test) + + +def _possible_configs_for_cls(cls, reasons=None, sparse=False): + all_configs = set(config.Config.all_configs()) + + if cls.__unsupported_on__: + spec = exclusions.db_spec(*cls.__unsupported_on__) + for config_obj in list(all_configs): + if spec(config_obj): + all_configs.remove(config_obj) + + if getattr(cls, "__only_on__", None): + spec = exclusions.db_spec(*util.to_list(cls.__only_on__)) + for config_obj in list(all_configs): + if not spec(config_obj): + all_configs.remove(config_obj) + + if getattr(cls, "__only_on_config__", None): + all_configs.intersection_update([cls.__only_on_config__]) + + if hasattr(cls, "__requires__"): + requirements = config.requirements + for config_obj in list(all_configs): + for requirement in cls.__requires__: + check = getattr(requirements, requirement) + + skip_reasons = check.matching_config_reasons(config_obj) + if skip_reasons: + all_configs.remove(config_obj) + if reasons is not None: + reasons.extend(skip_reasons) + break + + if hasattr(cls, "__prefer_requires__"): + non_preferred = set() + requirements = config.requirements + for config_obj in list(all_configs): + for requirement in cls.__prefer_requires__: + check = getattr(requirements, requirement) + + if not check.enabled_for_config(config_obj): + non_preferred.add(config_obj) + if all_configs.difference(non_preferred): + all_configs.difference_update(non_preferred) + + if sparse: + # pick only one config from each base dialect + # sorted so we get the same backend each time selecting the highest + # server version info. + per_dialect = {} + for cfg in reversed( + sorted( + all_configs, + key=lambda cfg: ( + cfg.db.name, + cfg.db.driver, + cfg.db.dialect.server_version_info, + ), + ) + ): + db = cfg.db.name + if db not in per_dialect: + per_dialect[db] = cfg + return per_dialect.values() + + return all_configs + + +def _do_skips(cls): + reasons = [] + all_configs = _possible_configs_for_cls(cls, reasons) + + if getattr(cls, "__skip_if__", False): + for c in getattr(cls, "__skip_if__"): + if c(): + config.skip_test( + "'%s' skipped by %s" % (cls.__name__, c.__name__) + ) + + if not all_configs: + msg = "'%s' unsupported on any DB implementation %s%s" % ( + cls.__name__, + ", ".join( + "'%s(%s)+%s'" + % ( + config_obj.db.name, + ".".join( + str(dig) + for dig in exclusions._server_version(config_obj.db) + ), + config_obj.db.driver, + ) + for config_obj in config.Config.all_configs() + ), + ", ".join(reasons), + ) + config.skip_test(msg) + elif hasattr(cls, "__prefer_backends__"): + non_preferred = set() + spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__)) + for config_obj in all_configs: + if not spec(config_obj): + non_preferred.add(config_obj) + if all_configs.difference(non_preferred): + all_configs.difference_update(non_preferred) + + if config._current not in all_configs: + _setup_config(all_configs.pop(), cls) + + +def _setup_config(config_obj, ctx): + config._current.push(config_obj, testing) + + +class FixtureFunctions(ABC): + @abc.abstractmethod + def skip_test_exception(self, *arg, **kw): + raise NotImplementedError() + + @abc.abstractmethod + def combinations(self, *args, **kw): + raise NotImplementedError() + + @abc.abstractmethod + def param_ident(self, *args, **kw): + raise NotImplementedError() + + @abc.abstractmethod + def fixture(self, *arg, **kw): + raise NotImplementedError() + + def get_current_test_name(self): + raise NotImplementedError() + + @abc.abstractmethod + def mark_base_test_class(self): + raise NotImplementedError() + + +_fixture_fn_class = None + + +def set_fixture_functions(fixture_fn_class): + global _fixture_fn_class + _fixture_fn_class = fixture_fn_class diff --git a/lib/sqlalchemy/testing/plugin/pytestplugin.py b/lib/sqlalchemy/testing/plugin/pytestplugin.py new file mode 100644 index 0000000..5a51582 --- /dev/null +++ b/lib/sqlalchemy/testing/plugin/pytestplugin.py @@ -0,0 +1,820 @@ +try: + # installed by bootstrap.py + import sqla_plugin_base as plugin_base +except ImportError: + # assume we're a package, use traditional import + from . import plugin_base + +import argparse +import collections +from functools import update_wrapper +import inspect +import itertools +import operator +import os +import re +import sys +import uuid + +import pytest + + +py2k = sys.version_info < (3, 0) +if py2k: + try: + import sqla_reinvent_fixtures as reinvent_fixtures_py2k + except ImportError: + from . import reinvent_fixtures_py2k + + +def pytest_addoption(parser): + group = parser.getgroup("sqlalchemy") + + def make_option(name, **kw): + callback_ = kw.pop("callback", None) + if callback_: + + class CallableAction(argparse.Action): + def __call__( + self, parser, namespace, values, option_string=None + ): + callback_(option_string, values, parser) + + kw["action"] = CallableAction + + zeroarg_callback = kw.pop("zeroarg_callback", None) + if zeroarg_callback: + + class CallableAction(argparse.Action): + def __init__( + self, + option_strings, + dest, + default=False, + required=False, + help=None, # noqa + ): + super(CallableAction, self).__init__( + option_strings=option_strings, + dest=dest, + nargs=0, + const=True, + default=default, + required=required, + help=help, + ) + + def __call__( + self, parser, namespace, values, option_string=None + ): + zeroarg_callback(option_string, values, parser) + + kw["action"] = CallableAction + + group.addoption(name, **kw) + + plugin_base.setup_options(make_option) + plugin_base.read_config() + + +def pytest_configure(config): + if config.pluginmanager.hasplugin("xdist"): + config.pluginmanager.register(XDistHooks()) + + if hasattr(config, "workerinput"): + plugin_base.restore_important_follower_config(config.workerinput) + plugin_base.configure_follower(config.workerinput["follower_ident"]) + else: + if config.option.write_idents and os.path.exists( + config.option.write_idents + ): + os.remove(config.option.write_idents) + + plugin_base.pre_begin(config.option) + + plugin_base.set_coverage_flag( + bool(getattr(config.option, "cov_source", False)) + ) + + plugin_base.set_fixture_functions(PytestFixtureFunctions) + + if config.option.dump_pyannotate: + global DUMP_PYANNOTATE + DUMP_PYANNOTATE = True + + +DUMP_PYANNOTATE = False + + +@pytest.fixture(autouse=True) +def collect_types_fixture(): + if DUMP_PYANNOTATE: + from pyannotate_runtime import collect_types + + collect_types.start() + yield + if DUMP_PYANNOTATE: + collect_types.stop() + + +def pytest_sessionstart(session): + from sqlalchemy.testing import asyncio + + asyncio._assume_async(plugin_base.post_begin) + + +def pytest_sessionfinish(session): + from sqlalchemy.testing import asyncio + + asyncio._maybe_async_provisioning(plugin_base.final_process_cleanup) + + if session.config.option.dump_pyannotate: + from pyannotate_runtime import collect_types + + collect_types.dump_stats(session.config.option.dump_pyannotate) + + +def pytest_collection_finish(session): + if session.config.option.dump_pyannotate: + from pyannotate_runtime import collect_types + + lib_sqlalchemy = os.path.abspath("lib/sqlalchemy") + + def _filter(filename): + filename = os.path.normpath(os.path.abspath(filename)) + if "lib/sqlalchemy" not in os.path.commonpath( + [filename, lib_sqlalchemy] + ): + return None + if "testing" in filename: + return None + + return filename + + collect_types.init_types_collection(filter_filename=_filter) + + +class XDistHooks(object): + def pytest_configure_node(self, node): + from sqlalchemy.testing import provision + from sqlalchemy.testing import asyncio + + # the master for each node fills workerinput dictionary + # which pytest-xdist will transfer to the subprocess + + plugin_base.memoize_important_follower_config(node.workerinput) + + node.workerinput["follower_ident"] = "test_%s" % uuid.uuid4().hex[0:12] + + asyncio._maybe_async_provisioning( + provision.create_follower_db, node.workerinput["follower_ident"] + ) + + def pytest_testnodedown(self, node, error): + from sqlalchemy.testing import provision + from sqlalchemy.testing import asyncio + + asyncio._maybe_async_provisioning( + provision.drop_follower_db, node.workerinput["follower_ident"] + ) + + +def pytest_collection_modifyitems(session, config, items): + + # look for all those classes that specify __backend__ and + # expand them out into per-database test cases. + + # this is much easier to do within pytest_pycollect_makeitem, however + # pytest is iterating through cls.__dict__ as makeitem is + # called which causes a "dictionary changed size" error on py3k. + # I'd submit a pullreq for them to turn it into a list first, but + # it's to suit the rather odd use case here which is that we are adding + # new classes to a module on the fly. + + from sqlalchemy.testing import asyncio + + rebuilt_items = collections.defaultdict( + lambda: collections.defaultdict(list) + ) + + items[:] = [ + item + for item in items + if item.getparent(pytest.Class) is not None + and not item.getparent(pytest.Class).name.startswith("_") + ] + + test_classes = set(item.getparent(pytest.Class) for item in items) + + def collect(element): + for inst_or_fn in element.collect(): + if isinstance(inst_or_fn, pytest.Collector): + # no yield from in 2.7 + for el in collect(inst_or_fn): + yield el + else: + yield inst_or_fn + + def setup_test_classes(): + for test_class in test_classes: + for sub_cls in plugin_base.generate_sub_tests( + test_class.cls, test_class.module + ): + if sub_cls is not test_class.cls: + per_cls_dict = rebuilt_items[test_class.cls] + + # support pytest 5.4.0 and above pytest.Class.from_parent + ctor = getattr(pytest.Class, "from_parent", pytest.Class) + module = test_class.getparent(pytest.Module) + for fn in collect( + ctor(name=sub_cls.__name__, parent=module) + ): + per_cls_dict[fn.name].append(fn) + + # class requirements will sometimes need to access the DB to check + # capabilities, so need to do this for async + asyncio._maybe_async_provisioning(setup_test_classes) + + newitems = [] + for item in items: + cls_ = item.cls + if cls_ in rebuilt_items: + newitems.extend(rebuilt_items[cls_][item.name]) + else: + newitems.append(item) + + if py2k: + for item in newitems: + reinvent_fixtures_py2k.scan_for_fixtures_to_use_for_class(item) + + # seems like the functions attached to a test class aren't sorted already? + # is that true and why's that? (when using unittest, they're sorted) + items[:] = sorted( + newitems, + key=lambda item: ( + item.getparent(pytest.Module).name, + item.getparent(pytest.Class).name, + item.name, + ), + ) + + +def pytest_pycollect_makeitem(collector, name, obj): + if inspect.isclass(obj) and plugin_base.want_class(name, obj): + from sqlalchemy.testing import config + + if config.any_async: + obj = _apply_maybe_async(obj) + + ctor = getattr(pytest.Class, "from_parent", pytest.Class) + return [ + ctor(name=parametrize_cls.__name__, parent=collector) + for parametrize_cls in _parametrize_cls(collector.module, obj) + ] + elif ( + inspect.isfunction(obj) + and collector.cls is not None + and plugin_base.want_method(collector.cls, obj) + ): + # None means, fall back to default logic, which includes + # method-level parametrize + return None + else: + # empty list means skip this item + return [] + + +def _is_wrapped_coroutine_function(fn): + while hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + + return inspect.iscoroutinefunction(fn) + + +def _apply_maybe_async(obj, recurse=True): + from sqlalchemy.testing import asyncio + + for name, value in vars(obj).items(): + if ( + (callable(value) or isinstance(value, classmethod)) + and not getattr(value, "_maybe_async_applied", False) + and (name.startswith("test_")) + and not _is_wrapped_coroutine_function(value) + ): + is_classmethod = False + if isinstance(value, classmethod): + value = value.__func__ + is_classmethod = True + + @_pytest_fn_decorator + def make_async(fn, *args, **kwargs): + return asyncio._maybe_async(fn, *args, **kwargs) + + do_async = make_async(value) + if is_classmethod: + do_async = classmethod(do_async) + do_async._maybe_async_applied = True + + setattr(obj, name, do_async) + if recurse: + for cls in obj.mro()[1:]: + if cls != object: + _apply_maybe_async(cls, False) + return obj + + +def _parametrize_cls(module, cls): + """implement a class-based version of pytest parametrize.""" + + if "_sa_parametrize" not in cls.__dict__: + return [cls] + + _sa_parametrize = cls._sa_parametrize + classes = [] + for full_param_set in itertools.product( + *[params for argname, params in _sa_parametrize] + ): + cls_variables = {} + + for argname, param in zip( + [_sa_param[0] for _sa_param in _sa_parametrize], full_param_set + ): + if not argname: + raise TypeError("need argnames for class-based combinations") + argname_split = re.split(r",\s*", argname) + for arg, val in zip(argname_split, param.values): + cls_variables[arg] = val + parametrized_name = "_".join( + # token is a string, but in py2k pytest is giving us a unicode, + # so call str() on it. + str(re.sub(r"\W", "", token)) + for param in full_param_set + for token in param.id.split("-") + ) + name = "%s_%s" % (cls.__name__, parametrized_name) + newcls = type.__new__(type, name, (cls,), cls_variables) + setattr(module, name, newcls) + classes.append(newcls) + return classes + + +_current_class = None + + +def pytest_runtest_setup(item): + from sqlalchemy.testing import asyncio + + # pytest_runtest_setup runs *before* pytest fixtures with scope="class". + # plugin_base.start_test_class_outside_fixtures may opt to raise SkipTest + # for the whole class and has to run things that are across all current + # databases, so we run this outside of the pytest fixture system altogether + # and ensure asyncio greenlet if any engines are async + + global _current_class + + if isinstance(item, pytest.Function) and _current_class is None: + asyncio._maybe_async_provisioning( + plugin_base.start_test_class_outside_fixtures, + item.cls, + ) + _current_class = item.getparent(pytest.Class) + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_teardown(item, nextitem): + # runs inside of pytest function fixture scope + # after test function runs + from sqlalchemy.testing import asyncio + from sqlalchemy.util import string_types + + asyncio._maybe_async(plugin_base.after_test, item) + + yield + # this is now after all the fixture teardown have run, the class can be + # finalized. Since pytest v7 this finalizer can no longer be added in + # pytest_runtest_setup since the class has not yet been setup at that + # time. + # See https://github.com/pytest-dev/pytest/issues/9343 + global _current_class, _current_report + + if _current_class is not None and ( + # last test or a new class + nextitem is None + or nextitem.getparent(pytest.Class) is not _current_class + ): + _current_class = None + + try: + asyncio._maybe_async_provisioning( + plugin_base.stop_test_class_outside_fixtures, item.cls + ) + except Exception as e: + # in case of an exception during teardown attach the original + # error to the exception message, otherwise it will get lost + if _current_report.failed: + if not e.args: + e.args = ( + "__Original test failure__:\n" + + _current_report.longreprtext, + ) + elif e.args[-1] and isinstance(e.args[-1], string_types): + args = list(e.args) + args[-1] += ( + "\n__Original test failure__:\n" + + _current_report.longreprtext + ) + e.args = tuple(args) + else: + e.args += ( + "__Original test failure__", + _current_report.longreprtext, + ) + raise + finally: + _current_report = None + + +def pytest_runtest_call(item): + # runs inside of pytest function fixture scope + # before test function runs + + from sqlalchemy.testing import asyncio + + asyncio._maybe_async( + plugin_base.before_test, + item, + item.module.__name__, + item.cls, + item.name, + ) + + +_current_report = None + + +def pytest_runtest_logreport(report): + global _current_report + if report.when == "call": + _current_report = report + + +@pytest.fixture(scope="class") +def setup_class_methods(request): + from sqlalchemy.testing import asyncio + + cls = request.cls + + if hasattr(cls, "setup_test_class"): + asyncio._maybe_async(cls.setup_test_class) + + if py2k: + reinvent_fixtures_py2k.run_class_fixture_setup(request) + + yield + + if py2k: + reinvent_fixtures_py2k.run_class_fixture_teardown(request) + + if hasattr(cls, "teardown_test_class"): + asyncio._maybe_async(cls.teardown_test_class) + + asyncio._maybe_async(plugin_base.stop_test_class, cls) + + +@pytest.fixture(scope="function") +def setup_test_methods(request): + from sqlalchemy.testing import asyncio + + # called for each test + + self = request.instance + + # before this fixture runs: + + # 1. function level "autouse" fixtures under py3k (examples: TablesTest + # define tables / data, MappedTest define tables / mappers / data) + + # 2. run homegrown function level "autouse" fixtures under py2k + if py2k: + reinvent_fixtures_py2k.run_fn_fixture_setup(request) + + # 3. run outer xdist-style setup + if hasattr(self, "setup_test"): + asyncio._maybe_async(self.setup_test) + + # alembic test suite is using setUp and tearDown + # xdist methods; support these in the test suite + # for the near term + if hasattr(self, "setUp"): + asyncio._maybe_async(self.setUp) + + # inside the yield: + # 4. function level fixtures defined on test functions themselves, + # e.g. "connection", "metadata" run next + + # 5. pytest hook pytest_runtest_call then runs + + # 6. test itself runs + + yield + + # yield finishes: + + # 7. function level fixtures defined on test functions + # themselves, e.g. "connection" rolls back the transaction, "metadata" + # emits drop all + + # 8. pytest hook pytest_runtest_teardown hook runs, this is associated + # with fixtures close all sessions, provisioning.stop_test_class(), + # engines.testing_reaper -> ensure all connection pool connections + # are returned, engines created by testing_engine that aren't the + # config engine are disposed + + asyncio._maybe_async(plugin_base.after_test_fixtures, self) + + # 10. run xdist-style teardown + if hasattr(self, "tearDown"): + asyncio._maybe_async(self.tearDown) + + if hasattr(self, "teardown_test"): + asyncio._maybe_async(self.teardown_test) + + # 11. run homegrown function-level "autouse" fixtures under py2k + if py2k: + reinvent_fixtures_py2k.run_fn_fixture_teardown(request) + + # 12. function level "autouse" fixtures under py3k (examples: TablesTest / + # MappedTest delete table data, possibly drop tables and clear mappers + # depending on the flags defined by the test class) + + +def getargspec(fn): + if sys.version_info.major == 3: + return inspect.getfullargspec(fn) + else: + return inspect.getargspec(fn) + + +def _pytest_fn_decorator(target): + """Port of langhelpers.decorator with pytest-specific tricks.""" + + from sqlalchemy.util.langhelpers import format_argspec_plus + from sqlalchemy.util.compat import inspect_getfullargspec + + def _exec_code_in_env(code, env, fn_name): + exec(code, env) + return env[fn_name] + + def decorate(fn, add_positional_parameters=()): + + spec = inspect_getfullargspec(fn) + if add_positional_parameters: + spec.args.extend(add_positional_parameters) + + metadata = dict( + __target_fn="__target_fn", __orig_fn="__orig_fn", name=fn.__name__ + ) + metadata.update(format_argspec_plus(spec, grouped=False)) + code = ( + """\ +def %(name)s(%(args)s): + return %(__target_fn)s(%(__orig_fn)s, %(apply_kw)s) +""" + % metadata + ) + decorated = _exec_code_in_env( + code, {"__target_fn": target, "__orig_fn": fn}, fn.__name__ + ) + if not add_positional_parameters: + decorated.__defaults__ = getattr(fn, "__func__", fn).__defaults__ + decorated.__wrapped__ = fn + return update_wrapper(decorated, fn) + else: + # this is the pytest hacky part. don't do a full update wrapper + # because pytest is really being sneaky about finding the args + # for the wrapped function + decorated.__module__ = fn.__module__ + decorated.__name__ = fn.__name__ + if hasattr(fn, "pytestmark"): + decorated.pytestmark = fn.pytestmark + return decorated + + return decorate + + +class PytestFixtureFunctions(plugin_base.FixtureFunctions): + def skip_test_exception(self, *arg, **kw): + return pytest.skip.Exception(*arg, **kw) + + def mark_base_test_class(self): + return pytest.mark.usefixtures( + "setup_class_methods", "setup_test_methods" + ) + + _combination_id_fns = { + "i": lambda obj: obj, + "r": repr, + "s": str, + "n": lambda obj: obj.__name__ + if hasattr(obj, "__name__") + else type(obj).__name__, + } + + def combinations(self, *arg_sets, **kw): + """Facade for pytest.mark.parametrize. + + Automatically derives argument names from the callable which in our + case is always a method on a class with positional arguments. + + ids for parameter sets are derived using an optional template. + + """ + from sqlalchemy.testing import exclusions + + if sys.version_info.major == 3: + if len(arg_sets) == 1 and hasattr(arg_sets[0], "__next__"): + arg_sets = list(arg_sets[0]) + else: + if len(arg_sets) == 1 and hasattr(arg_sets[0], "next"): + arg_sets = list(arg_sets[0]) + + argnames = kw.pop("argnames", None) + + def _filter_exclusions(args): + result = [] + gathered_exclusions = [] + for a in args: + if isinstance(a, exclusions.compound): + gathered_exclusions.append(a) + else: + result.append(a) + + return result, gathered_exclusions + + id_ = kw.pop("id_", None) + + tobuild_pytest_params = [] + has_exclusions = False + if id_: + _combination_id_fns = self._combination_id_fns + + # because itemgetter is not consistent for one argument vs. + # multiple, make it multiple in all cases and use a slice + # to omit the first argument + _arg_getter = operator.itemgetter( + 0, + *[ + idx + for idx, char in enumerate(id_) + if char in ("n", "r", "s", "a") + ] + ) + fns = [ + (operator.itemgetter(idx), _combination_id_fns[char]) + for idx, char in enumerate(id_) + if char in _combination_id_fns + ] + + for arg in arg_sets: + if not isinstance(arg, tuple): + arg = (arg,) + + fn_params, param_exclusions = _filter_exclusions(arg) + + parameters = _arg_getter(fn_params)[1:] + + if param_exclusions: + has_exclusions = True + + tobuild_pytest_params.append( + ( + parameters, + param_exclusions, + "-".join( + comb_fn(getter(arg)) for getter, comb_fn in fns + ), + ) + ) + + else: + + for arg in arg_sets: + if not isinstance(arg, tuple): + arg = (arg,) + + fn_params, param_exclusions = _filter_exclusions(arg) + + if param_exclusions: + has_exclusions = True + + tobuild_pytest_params.append( + (fn_params, param_exclusions, None) + ) + + pytest_params = [] + for parameters, param_exclusions, id_ in tobuild_pytest_params: + if has_exclusions: + parameters += (param_exclusions,) + + param = pytest.param(*parameters, id=id_) + pytest_params.append(param) + + def decorate(fn): + if inspect.isclass(fn): + if has_exclusions: + raise NotImplementedError( + "exclusions not supported for class level combinations" + ) + if "_sa_parametrize" not in fn.__dict__: + fn._sa_parametrize = [] + fn._sa_parametrize.append((argnames, pytest_params)) + return fn + else: + if argnames is None: + _argnames = getargspec(fn).args[1:] + else: + _argnames = re.split(r", *", argnames) + + if has_exclusions: + _argnames += ["_exclusions"] + + @_pytest_fn_decorator + def check_exclusions(fn, *args, **kw): + _exclusions = args[-1] + if _exclusions: + exlu = exclusions.compound().add(*_exclusions) + fn = exlu(fn) + return fn(*args[0:-1], **kw) + + def process_metadata(spec): + spec.args.append("_exclusions") + + fn = check_exclusions( + fn, add_positional_parameters=("_exclusions",) + ) + + return pytest.mark.parametrize(_argnames, pytest_params)(fn) + + return decorate + + def param_ident(self, *parameters): + ident = parameters[0] + return pytest.param(*parameters[1:], id=ident) + + def fixture(self, *arg, **kw): + from sqlalchemy.testing import config + from sqlalchemy.testing import asyncio + + # wrapping pytest.fixture function. determine if + # decorator was called as @fixture or @fixture(). + if len(arg) > 0 and callable(arg[0]): + # was called as @fixture(), we have the function to wrap. + fn = arg[0] + arg = arg[1:] + else: + # was called as @fixture, don't have the function yet. + fn = None + + # create a pytest.fixture marker. because the fn is not being + # passed, this is always a pytest.FixtureFunctionMarker() + # object (or whatever pytest is calling it when you read this) + # that is waiting for a function. + fixture = pytest.fixture(*arg, **kw) + + # now apply wrappers to the function, including fixture itself + + def wrap(fn): + if config.any_async: + fn = asyncio._maybe_async_wrapper(fn) + # other wrappers may be added here + + if py2k and "autouse" in kw: + # py2k workaround for too-slow collection of autouse fixtures + # in pytest 4.6.11. See notes in reinvent_fixtures_py2k for + # rationale. + + # comment this condition out in order to disable the + # py2k workaround entirely. + reinvent_fixtures_py2k.add_fixture(fn, fixture) + else: + # now apply FixtureFunctionMarker + fn = fixture(fn) + + return fn + + if fn: + return wrap(fn) + else: + return wrap + + def get_current_test_name(self): + return os.environ.get("PYTEST_CURRENT_TEST") + + def async_test(self, fn): + from sqlalchemy.testing import asyncio + + @_pytest_fn_decorator + def decorate(fn, *args, **kwargs): + asyncio._run_coroutine_function(fn, *args, **kwargs) + + return decorate(fn) diff --git a/lib/sqlalchemy/testing/plugin/reinvent_fixtures_py2k.py b/lib/sqlalchemy/testing/plugin/reinvent_fixtures_py2k.py new file mode 100644 index 0000000..36b6841 --- /dev/null +++ b/lib/sqlalchemy/testing/plugin/reinvent_fixtures_py2k.py @@ -0,0 +1,112 @@ +""" +invent a quick version of pytest autouse fixtures as pytest's unacceptably slow +collection/high memory use in pytest 4.6.11, which is the highest version that +works in py2k. + +by "too-slow" we mean the test suite can't even manage to be collected for a +single process in less than 70 seconds or so and memory use seems to be very +high as well. for two or four workers the job just times out after ten +minutes. + +so instead we have invented a very limited form of these fixtures, as our +current use of "autouse" fixtures are limited to those in fixtures.py. + +assumptions for these fixtures: + +1. we are only using "function" or "class" scope + +2. the functions must be associated with a test class + +3. the fixture functions cannot themselves use pytest fixtures + +4. the fixture functions must use yield, not return + +When py2k support is removed and we can stay on a modern pytest version, this +can all be removed. + + +""" +import collections + + +_py2k_fixture_fn_names = collections.defaultdict(set) +_py2k_class_fixtures = collections.defaultdict( + lambda: collections.defaultdict(set) +) +_py2k_function_fixtures = collections.defaultdict( + lambda: collections.defaultdict(set) +) + +_py2k_cls_fixture_stack = [] +_py2k_fn_fixture_stack = [] + + +def add_fixture(fn, fixture): + assert fixture.scope in ("class", "function") + _py2k_fixture_fn_names[fn.__name__].add((fn, fixture.scope)) + + +def scan_for_fixtures_to_use_for_class(item): + test_class = item.parent.parent.obj + + for name in _py2k_fixture_fn_names: + for fixture_fn, scope in _py2k_fixture_fn_names[name]: + meth = getattr(test_class, name, None) + if meth and meth.im_func is fixture_fn: + for sup in test_class.__mro__: + if name in sup.__dict__: + if scope == "class": + _py2k_class_fixtures[test_class][sup].add(meth) + elif scope == "function": + _py2k_function_fixtures[test_class][sup].add(meth) + break + break + + +def run_class_fixture_setup(request): + + cls = request.cls + self = cls.__new__(cls) + + fixtures_for_this_class = _py2k_class_fixtures.get(cls) + + if fixtures_for_this_class: + for sup_ in cls.__mro__: + for fn in fixtures_for_this_class.get(sup_, ()): + iter_ = fn(self) + next(iter_) + + _py2k_cls_fixture_stack.append(iter_) + + +def run_class_fixture_teardown(request): + while _py2k_cls_fixture_stack: + iter_ = _py2k_cls_fixture_stack.pop(-1) + try: + next(iter_) + except StopIteration: + pass + + +def run_fn_fixture_setup(request): + cls = request.cls + self = request.instance + + fixtures_for_this_class = _py2k_function_fixtures.get(cls) + + if fixtures_for_this_class: + for sup_ in reversed(cls.__mro__): + for fn in fixtures_for_this_class.get(sup_, ()): + iter_ = fn(self) + next(iter_) + + _py2k_fn_fixture_stack.append(iter_) + + +def run_fn_fixture_teardown(request): + while _py2k_fn_fixture_stack: + iter_ = _py2k_fn_fixture_stack.pop(-1) + try: + next(iter_) + except StopIteration: + pass |