summaryrefslogtreecommitdiffstats
path: root/lib/sqlalchemy/connectors
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/connectors')
-rw-r--r--lib/sqlalchemy/connectors/__init__.py10
-rw-r--r--lib/sqlalchemy/connectors/mxodbc.py166
-rw-r--r--lib/sqlalchemy/connectors/pyodbc.py193
3 files changed, 369 insertions, 0 deletions
diff --git a/lib/sqlalchemy/connectors/__init__.py b/lib/sqlalchemy/connectors/__init__.py
new file mode 100644
index 0000000..e738086
--- /dev/null
+++ b/lib/sqlalchemy/connectors/__init__.py
@@ -0,0 +1,10 @@
+# connectors/__init__.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+
+class Connector(object):
+ pass
diff --git a/lib/sqlalchemy/connectors/mxodbc.py b/lib/sqlalchemy/connectors/mxodbc.py
new file mode 100644
index 0000000..89b3484
--- /dev/null
+++ b/lib/sqlalchemy/connectors/mxodbc.py
@@ -0,0 +1,166 @@
+# connectors/mxodbc.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+"""
+Provide a SQLALchemy connector for the eGenix mxODBC commercial
+Python adapter for ODBC. This is not a free product, but eGenix
+provides SQLAlchemy with a license for use in continuous integration
+testing.
+
+This has been tested for use with mxODBC 3.1.2 on SQL Server 2005
+and 2008, using the SQL Server Native driver. However, it is
+possible for this to be used on other database platforms.
+
+For more info on mxODBC, see https://www.egenix.com/
+
+.. deprecated:: 1.4 The mxODBC DBAPI is deprecated and will be removed
+ in a future version. Please use one of the supported DBAPIs to
+ connect to mssql.
+
+"""
+
+import re
+import sys
+import warnings
+
+from . import Connector
+from ..util import warn_deprecated
+
+
+class MxODBCConnector(Connector):
+ driver = "mxodbc"
+
+ supports_sane_multi_rowcount = False
+ supports_unicode_statements = True
+ supports_unicode_binds = True
+
+ supports_native_decimal = True
+
+ @classmethod
+ def dbapi(cls):
+ # this classmethod will normally be replaced by an instance
+ # attribute of the same name, so this is normally only called once.
+ cls._load_mx_exceptions()
+ platform = sys.platform
+ if platform == "win32":
+ from mx.ODBC import Windows as Module
+ # this can be the string "linux2", and possibly others
+ elif "linux" in platform:
+ from mx.ODBC import unixODBC as Module
+ elif platform == "darwin":
+ from mx.ODBC import iODBC as Module
+ else:
+ raise ImportError("Unrecognized platform for mxODBC import")
+
+ warn_deprecated(
+ "The mxODBC DBAPI is deprecated and will be removed"
+ "in a future version. Please use one of the supported DBAPIs to"
+ "connect to mssql.",
+ version="1.4",
+ )
+ return Module
+
+ @classmethod
+ def _load_mx_exceptions(cls):
+ """Import mxODBC exception classes into the module namespace,
+ as if they had been imported normally. This is done here
+ to avoid requiring all SQLAlchemy users to install mxODBC.
+ """
+ global InterfaceError, ProgrammingError
+ from mx.ODBC import InterfaceError
+ from mx.ODBC import ProgrammingError
+
+ def on_connect(self):
+ def connect(conn):
+ conn.stringformat = self.dbapi.MIXED_STRINGFORMAT
+ conn.datetimeformat = self.dbapi.PYDATETIME_DATETIMEFORMAT
+ conn.decimalformat = self.dbapi.DECIMAL_DECIMALFORMAT
+ conn.errorhandler = self._error_handler()
+
+ return connect
+
+ def _error_handler(self):
+ """Return a handler that adjusts mxODBC's raised Warnings to
+ emit Python standard warnings.
+ """
+ from mx.ODBC.Error import Warning as MxOdbcWarning
+
+ def error_handler(connection, cursor, errorclass, errorvalue):
+ if issubclass(errorclass, MxOdbcWarning):
+ errorclass.__bases__ = (Warning,)
+ warnings.warn(
+ message=str(errorvalue), category=errorclass, stacklevel=2
+ )
+ else:
+ raise errorclass(errorvalue)
+
+ return error_handler
+
+ def create_connect_args(self, url):
+ r"""Return a tuple of \*args, \**kwargs for creating a connection.
+
+ The mxODBC 3.x connection constructor looks like this:
+
+ connect(dsn, user='', password='',
+ clear_auto_commit=1, errorhandler=None)
+
+ This method translates the values in the provided URI
+ into args and kwargs needed to instantiate an mxODBC Connection.
+
+ The arg 'errorhandler' is not used by SQLAlchemy and will
+ not be populated.
+
+ """
+ opts = url.translate_connect_args(username="user")
+ opts.update(url.query)
+ args = opts.pop("host")
+ opts.pop("port", None)
+ opts.pop("database", None)
+ return (args,), opts
+
+ def is_disconnect(self, e, connection, cursor):
+ # TODO: eGenix recommends checking connection.closed here
+ # Does that detect dropped connections ?
+ if isinstance(e, self.dbapi.ProgrammingError):
+ return "connection already closed" in str(e)
+ elif isinstance(e, self.dbapi.Error):
+ return "[08S01]" in str(e)
+ else:
+ return False
+
+ def _get_server_version_info(self, connection):
+ # eGenix suggests using conn.dbms_version instead
+ # of what we're doing here
+ dbapi_con = connection.connection
+ version = []
+ r = re.compile(r"[.\-]")
+ # 18 == pyodbc.SQL_DBMS_VER
+ for n in r.split(dbapi_con.getinfo(18)[1]):
+ try:
+ version.append(int(n))
+ except ValueError:
+ version.append(n)
+ return tuple(version)
+
+ def _get_direct(self, context):
+ if context:
+ native_odbc_execute = context.execution_options.get(
+ "native_odbc_execute", "auto"
+ )
+ # default to direct=True in all cases, is more generally
+ # compatible especially with SQL Server
+ return False if native_odbc_execute is True else True
+ else:
+ return True
+
+ def do_executemany(self, cursor, statement, parameters, context=None):
+ cursor.executemany(
+ statement, parameters, direct=self._get_direct(context)
+ )
+
+ def do_execute(self, cursor, statement, parameters, context=None):
+ cursor.execute(statement, parameters, direct=self._get_direct(context))
diff --git a/lib/sqlalchemy/connectors/pyodbc.py b/lib/sqlalchemy/connectors/pyodbc.py
new file mode 100644
index 0000000..9bb67b5
--- /dev/null
+++ b/lib/sqlalchemy/connectors/pyodbc.py
@@ -0,0 +1,193 @@
+# connectors/pyodbc.py
+# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+import re
+
+from . import Connector
+from .. import util
+
+
+class PyODBCConnector(Connector):
+ driver = "pyodbc"
+
+ # this is no longer False for pyodbc in general
+ supports_sane_rowcount_returning = True
+ supports_sane_multi_rowcount = False
+
+ supports_unicode_statements = True
+ supports_unicode_binds = True
+
+ supports_native_decimal = True
+ default_paramstyle = "named"
+
+ use_setinputsizes = False
+
+ # for non-DSN connections, this *may* be used to
+ # hold the desired driver name
+ pyodbc_driver_name = None
+
+ def __init__(
+ self, supports_unicode_binds=None, use_setinputsizes=False, **kw
+ ):
+ super(PyODBCConnector, self).__init__(**kw)
+ if supports_unicode_binds is not None:
+ self.supports_unicode_binds = supports_unicode_binds
+ self.use_setinputsizes = use_setinputsizes
+
+ @classmethod
+ def dbapi(cls):
+ return __import__("pyodbc")
+
+ def create_connect_args(self, url):
+ opts = url.translate_connect_args(username="user")
+ opts.update(url.query)
+
+ keys = opts
+
+ query = url.query
+
+ connect_args = {}
+ for param in ("ansi", "unicode_results", "autocommit"):
+ if param in keys:
+ connect_args[param] = util.asbool(keys.pop(param))
+
+ if "odbc_connect" in keys:
+ connectors = [util.unquote_plus(keys.pop("odbc_connect"))]
+ else:
+
+ def check_quote(token):
+ if ";" in str(token) or str(token).startswith("{"):
+ token = "{%s}" % token.replace("}", "}}")
+ return token
+
+ keys = dict((k, check_quote(v)) for k, v in keys.items())
+
+ dsn_connection = "dsn" in keys or (
+ "host" in keys and "database" not in keys
+ )
+ if dsn_connection:
+ connectors = [
+ "dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", ""))
+ ]
+ else:
+ port = ""
+ if "port" in keys and "port" not in query:
+ port = ",%d" % int(keys.pop("port"))
+
+ connectors = []
+ driver = keys.pop("driver", self.pyodbc_driver_name)
+ if driver is None and keys:
+ # note if keys is empty, this is a totally blank URL
+ util.warn(
+ "No driver name specified; "
+ "this is expected by PyODBC when using "
+ "DSN-less connections"
+ )
+ else:
+ connectors.append("DRIVER={%s}" % driver)
+
+ connectors.extend(
+ [
+ "Server=%s%s" % (keys.pop("host", ""), port),
+ "Database=%s" % keys.pop("database", ""),
+ ]
+ )
+
+ user = keys.pop("user", None)
+ if user:
+ connectors.append("UID=%s" % user)
+ pwd = keys.pop("password", "")
+ if pwd:
+ connectors.append("PWD=%s" % pwd)
+ else:
+ authentication = keys.pop("authentication", None)
+ if authentication:
+ connectors.append("Authentication=%s" % authentication)
+ else:
+ connectors.append("Trusted_Connection=Yes")
+
+ # if set to 'Yes', the ODBC layer will try to automagically
+ # convert textual data from your database encoding to your
+ # client encoding. This should obviously be set to 'No' if
+ # you query a cp1253 encoded database from a latin1 client...
+ if "odbc_autotranslate" in keys:
+ connectors.append(
+ "AutoTranslate=%s" % keys.pop("odbc_autotranslate")
+ )
+
+ connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
+
+ return [[";".join(connectors)], connect_args]
+
+ def is_disconnect(self, e, connection, cursor):
+ if isinstance(e, self.dbapi.ProgrammingError):
+ return "The cursor's connection has been closed." in str(
+ e
+ ) or "Attempt to use a closed connection." in str(e)
+ else:
+ return False
+
+ def _dbapi_version(self):
+ if not self.dbapi:
+ return ()
+ return self._parse_dbapi_version(self.dbapi.version)
+
+ def _parse_dbapi_version(self, vers):
+ m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers)
+ if not m:
+ return ()
+ vers = tuple([int(x) for x in m.group(1).split(".")])
+ if m.group(2):
+ vers += (m.group(2),)
+ return vers
+
+ def _get_server_version_info(self, connection, allow_chars=True):
+ # NOTE: this function is not reliable, particularly when
+ # freetds is in use. Implement database-specific server version
+ # queries.
+ dbapi_con = connection.connection
+ version = []
+ r = re.compile(r"[.\-]")
+ for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)):
+ try:
+ version.append(int(n))
+ except ValueError:
+ if allow_chars:
+ version.append(n)
+ return tuple(version)
+
+ def do_set_input_sizes(self, cursor, list_of_tuples, context):
+ # the rules for these types seems a little strange, as you can pass
+ # non-tuples as well as tuples, however it seems to assume "0"
+ # for the subsequent values if you don't pass a tuple which fails
+ # for types such as pyodbc.SQL_WLONGVARCHAR, which is the datatype
+ # that ticket #5649 is targeting.
+
+ # NOTE: as of #6058, this won't be called if the use_setinputsizes flag
+ # is False, or if no types were specified in list_of_tuples
+
+ cursor.setinputsizes(
+ [
+ (dbtype, None, None)
+ if not isinstance(dbtype, tuple)
+ else dbtype
+ for key, dbtype, sqltype in list_of_tuples
+ ]
+ )
+
+ def set_isolation_level(self, connection, level):
+ # adjust for ConnectionFairy being present
+ # allows attribute set e.g. "connection.autocommit = True"
+ # to work properly
+ if hasattr(connection, "dbapi_connection"):
+ connection = connection.dbapi_connection
+
+ if level == "AUTOCOMMIT":
+ connection.autocommit = True
+ else:
+ connection.autocommit = False
+ super(PyODBCConnector, self).set_isolation_level(connection, level)