From 1dac2263372df2b85db5d029a45721fa158a5c9d Mon Sep 17 00:00:00 2001 From: xiubuzhe Date: Sun, 8 Oct 2023 20:59:00 +0800 Subject: first add files --- lib/sqlalchemy/connectors/__init__.py | 10 ++ lib/sqlalchemy/connectors/mxodbc.py | 166 +++++++++++++++++++++++++++++ lib/sqlalchemy/connectors/pyodbc.py | 193 ++++++++++++++++++++++++++++++++++ 3 files changed, 369 insertions(+) create mode 100644 lib/sqlalchemy/connectors/__init__.py create mode 100644 lib/sqlalchemy/connectors/mxodbc.py create mode 100644 lib/sqlalchemy/connectors/pyodbc.py (limited to 'lib/sqlalchemy/connectors') diff --git a/lib/sqlalchemy/connectors/__init__.py b/lib/sqlalchemy/connectors/__init__.py new file mode 100644 index 0000000..e738086 --- /dev/null +++ b/lib/sqlalchemy/connectors/__init__.py @@ -0,0 +1,10 @@ +# connectors/__init__.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + + +class Connector(object): + pass diff --git a/lib/sqlalchemy/connectors/mxodbc.py b/lib/sqlalchemy/connectors/mxodbc.py new file mode 100644 index 0000000..89b3484 --- /dev/null +++ b/lib/sqlalchemy/connectors/mxodbc.py @@ -0,0 +1,166 @@ +# connectors/mxodbc.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +""" +Provide a SQLALchemy connector for the eGenix mxODBC commercial +Python adapter for ODBC. This is not a free product, but eGenix +provides SQLAlchemy with a license for use in continuous integration +testing. + +This has been tested for use with mxODBC 3.1.2 on SQL Server 2005 +and 2008, using the SQL Server Native driver. However, it is +possible for this to be used on other database platforms. + +For more info on mxODBC, see https://www.egenix.com/ + +.. deprecated:: 1.4 The mxODBC DBAPI is deprecated and will be removed + in a future version. Please use one of the supported DBAPIs to + connect to mssql. + +""" + +import re +import sys +import warnings + +from . import Connector +from ..util import warn_deprecated + + +class MxODBCConnector(Connector): + driver = "mxodbc" + + supports_sane_multi_rowcount = False + supports_unicode_statements = True + supports_unicode_binds = True + + supports_native_decimal = True + + @classmethod + def dbapi(cls): + # this classmethod will normally be replaced by an instance + # attribute of the same name, so this is normally only called once. + cls._load_mx_exceptions() + platform = sys.platform + if platform == "win32": + from mx.ODBC import Windows as Module + # this can be the string "linux2", and possibly others + elif "linux" in platform: + from mx.ODBC import unixODBC as Module + elif platform == "darwin": + from mx.ODBC import iODBC as Module + else: + raise ImportError("Unrecognized platform for mxODBC import") + + warn_deprecated( + "The mxODBC DBAPI is deprecated and will be removed" + "in a future version. Please use one of the supported DBAPIs to" + "connect to mssql.", + version="1.4", + ) + return Module + + @classmethod + def _load_mx_exceptions(cls): + """Import mxODBC exception classes into the module namespace, + as if they had been imported normally. This is done here + to avoid requiring all SQLAlchemy users to install mxODBC. + """ + global InterfaceError, ProgrammingError + from mx.ODBC import InterfaceError + from mx.ODBC import ProgrammingError + + def on_connect(self): + def connect(conn): + conn.stringformat = self.dbapi.MIXED_STRINGFORMAT + conn.datetimeformat = self.dbapi.PYDATETIME_DATETIMEFORMAT + conn.decimalformat = self.dbapi.DECIMAL_DECIMALFORMAT + conn.errorhandler = self._error_handler() + + return connect + + def _error_handler(self): + """Return a handler that adjusts mxODBC's raised Warnings to + emit Python standard warnings. + """ + from mx.ODBC.Error import Warning as MxOdbcWarning + + def error_handler(connection, cursor, errorclass, errorvalue): + if issubclass(errorclass, MxOdbcWarning): + errorclass.__bases__ = (Warning,) + warnings.warn( + message=str(errorvalue), category=errorclass, stacklevel=2 + ) + else: + raise errorclass(errorvalue) + + return error_handler + + def create_connect_args(self, url): + r"""Return a tuple of \*args, \**kwargs for creating a connection. + + The mxODBC 3.x connection constructor looks like this: + + connect(dsn, user='', password='', + clear_auto_commit=1, errorhandler=None) + + This method translates the values in the provided URI + into args and kwargs needed to instantiate an mxODBC Connection. + + The arg 'errorhandler' is not used by SQLAlchemy and will + not be populated. + + """ + opts = url.translate_connect_args(username="user") + opts.update(url.query) + args = opts.pop("host") + opts.pop("port", None) + opts.pop("database", None) + return (args,), opts + + def is_disconnect(self, e, connection, cursor): + # TODO: eGenix recommends checking connection.closed here + # Does that detect dropped connections ? + if isinstance(e, self.dbapi.ProgrammingError): + return "connection already closed" in str(e) + elif isinstance(e, self.dbapi.Error): + return "[08S01]" in str(e) + else: + return False + + def _get_server_version_info(self, connection): + # eGenix suggests using conn.dbms_version instead + # of what we're doing here + dbapi_con = connection.connection + version = [] + r = re.compile(r"[.\-]") + # 18 == pyodbc.SQL_DBMS_VER + for n in r.split(dbapi_con.getinfo(18)[1]): + try: + version.append(int(n)) + except ValueError: + version.append(n) + return tuple(version) + + def _get_direct(self, context): + if context: + native_odbc_execute = context.execution_options.get( + "native_odbc_execute", "auto" + ) + # default to direct=True in all cases, is more generally + # compatible especially with SQL Server + return False if native_odbc_execute is True else True + else: + return True + + def do_executemany(self, cursor, statement, parameters, context=None): + cursor.executemany( + statement, parameters, direct=self._get_direct(context) + ) + + def do_execute(self, cursor, statement, parameters, context=None): + cursor.execute(statement, parameters, direct=self._get_direct(context)) diff --git a/lib/sqlalchemy/connectors/pyodbc.py b/lib/sqlalchemy/connectors/pyodbc.py new file mode 100644 index 0000000..9bb67b5 --- /dev/null +++ b/lib/sqlalchemy/connectors/pyodbc.py @@ -0,0 +1,193 @@ +# connectors/pyodbc.py +# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +import re + +from . import Connector +from .. import util + + +class PyODBCConnector(Connector): + driver = "pyodbc" + + # this is no longer False for pyodbc in general + supports_sane_rowcount_returning = True + supports_sane_multi_rowcount = False + + supports_unicode_statements = True + supports_unicode_binds = True + + supports_native_decimal = True + default_paramstyle = "named" + + use_setinputsizes = False + + # for non-DSN connections, this *may* be used to + # hold the desired driver name + pyodbc_driver_name = None + + def __init__( + self, supports_unicode_binds=None, use_setinputsizes=False, **kw + ): + super(PyODBCConnector, self).__init__(**kw) + if supports_unicode_binds is not None: + self.supports_unicode_binds = supports_unicode_binds + self.use_setinputsizes = use_setinputsizes + + @classmethod + def dbapi(cls): + return __import__("pyodbc") + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + opts.update(url.query) + + keys = opts + + query = url.query + + connect_args = {} + for param in ("ansi", "unicode_results", "autocommit"): + if param in keys: + connect_args[param] = util.asbool(keys.pop(param)) + + if "odbc_connect" in keys: + connectors = [util.unquote_plus(keys.pop("odbc_connect"))] + else: + + def check_quote(token): + if ";" in str(token) or str(token).startswith("{"): + token = "{%s}" % token.replace("}", "}}") + return token + + keys = dict((k, check_quote(v)) for k, v in keys.items()) + + dsn_connection = "dsn" in keys or ( + "host" in keys and "database" not in keys + ) + if dsn_connection: + connectors = [ + "dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", "")) + ] + else: + port = "" + if "port" in keys and "port" not in query: + port = ",%d" % int(keys.pop("port")) + + connectors = [] + driver = keys.pop("driver", self.pyodbc_driver_name) + if driver is None and keys: + # note if keys is empty, this is a totally blank URL + util.warn( + "No driver name specified; " + "this is expected by PyODBC when using " + "DSN-less connections" + ) + else: + connectors.append("DRIVER={%s}" % driver) + + connectors.extend( + [ + "Server=%s%s" % (keys.pop("host", ""), port), + "Database=%s" % keys.pop("database", ""), + ] + ) + + user = keys.pop("user", None) + if user: + connectors.append("UID=%s" % user) + pwd = keys.pop("password", "") + if pwd: + connectors.append("PWD=%s" % pwd) + else: + authentication = keys.pop("authentication", None) + if authentication: + connectors.append("Authentication=%s" % authentication) + else: + connectors.append("Trusted_Connection=Yes") + + # if set to 'Yes', the ODBC layer will try to automagically + # convert textual data from your database encoding to your + # client encoding. This should obviously be set to 'No' if + # you query a cp1253 encoded database from a latin1 client... + if "odbc_autotranslate" in keys: + connectors.append( + "AutoTranslate=%s" % keys.pop("odbc_autotranslate") + ) + + connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()]) + + return [[";".join(connectors)], connect_args] + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.ProgrammingError): + return "The cursor's connection has been closed." in str( + e + ) or "Attempt to use a closed connection." in str(e) + else: + return False + + def _dbapi_version(self): + if not self.dbapi: + return () + return self._parse_dbapi_version(self.dbapi.version) + + def _parse_dbapi_version(self, vers): + m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers) + if not m: + return () + vers = tuple([int(x) for x in m.group(1).split(".")]) + if m.group(2): + vers += (m.group(2),) + return vers + + def _get_server_version_info(self, connection, allow_chars=True): + # NOTE: this function is not reliable, particularly when + # freetds is in use. Implement database-specific server version + # queries. + dbapi_con = connection.connection + version = [] + r = re.compile(r"[.\-]") + for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)): + try: + version.append(int(n)) + except ValueError: + if allow_chars: + version.append(n) + return tuple(version) + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + # the rules for these types seems a little strange, as you can pass + # non-tuples as well as tuples, however it seems to assume "0" + # for the subsequent values if you don't pass a tuple which fails + # for types such as pyodbc.SQL_WLONGVARCHAR, which is the datatype + # that ticket #5649 is targeting. + + # NOTE: as of #6058, this won't be called if the use_setinputsizes flag + # is False, or if no types were specified in list_of_tuples + + cursor.setinputsizes( + [ + (dbtype, None, None) + if not isinstance(dbtype, tuple) + else dbtype + for key, dbtype, sqltype in list_of_tuples + ] + ) + + def set_isolation_level(self, connection, level): + # adjust for ConnectionFairy being present + # allows attribute set e.g. "connection.autocommit = True" + # to work properly + if hasattr(connection, "dbapi_connection"): + connection = connection.dbapi_connection + + if level == "AUTOCOMMIT": + connection.autocommit = True + else: + connection.autocommit = False + super(PyODBCConnector, self).set_isolation_level(connection, level) -- cgit v1.2.3