diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/Crypto/Util/_raw_api.py | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'lib/Crypto/Util/_raw_api.py')
-rw-r--r-- | lib/Crypto/Util/_raw_api.py | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/lib/Crypto/Util/_raw_api.py b/lib/Crypto/Util/_raw_api.py new file mode 100644 index 0000000..f026e8f --- /dev/null +++ b/lib/Crypto/Util/_raw_api.py @@ -0,0 +1,319 @@ +# =================================================================== +# +# Copyright (c) 2014, Legrandin <helderijs@gmail.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# =================================================================== + +import os +import abc +import sys +from Crypto.Util.py3compat import byte_string +from Crypto.Util._file_system import pycryptodome_filename + +# +# List of file suffixes for Python extensions +# +if sys.version_info[0] < 3: + + import imp + extension_suffixes = [] + for ext, mod, typ in imp.get_suffixes(): + if typ == imp.C_EXTENSION: + extension_suffixes.append(ext) + +else: + + from importlib import machinery + extension_suffixes = machinery.EXTENSION_SUFFIXES + +# Which types with buffer interface we support (apart from byte strings) +_buffer_type = (bytearray, memoryview) + + +class _VoidPointer(object): + @abc.abstractmethod + def get(self): + """Return the memory location we point to""" + return + + @abc.abstractmethod + def address_of(self): + """Return a raw pointer to this pointer""" + return + + +try: + # Starting from v2.18, pycparser (used by cffi for in-line ABI mode) + # stops working correctly when PYOPTIMIZE==2 or the parameter -OO is + # passed. In that case, we fall back to ctypes. + # Note that PyPy ships with an old version of pycparser so we can keep + # using cffi there. + # See https://github.com/Legrandin/pycryptodome/issues/228 + if '__pypy__' not in sys.builtin_module_names and sys.flags.optimize == 2: + raise ImportError("CFFI with optimize=2 fails due to pycparser bug.") + + from cffi import FFI + + ffi = FFI() + null_pointer = ffi.NULL + uint8_t_type = ffi.typeof(ffi.new("const uint8_t*")) + + _Array = ffi.new("uint8_t[1]").__class__.__bases__ + + def load_lib(name, cdecl): + """Load a shared library and return a handle to it. + + @name, either an absolute path or the name of a library + in the system search path. + + @cdecl, the C function declarations. + """ + + if hasattr(ffi, "RTLD_DEEPBIND") and not os.getenv('PYCRYPTODOME_DISABLE_DEEPBIND'): + lib = ffi.dlopen(name, ffi.RTLD_DEEPBIND) + else: + lib = ffi.dlopen(name) + ffi.cdef(cdecl) + return lib + + def c_ulong(x): + """Convert a Python integer to unsigned long""" + return x + + c_ulonglong = c_ulong + c_uint = c_ulong + c_ubyte = c_ulong + + def c_size_t(x): + """Convert a Python integer to size_t""" + return x + + def create_string_buffer(init_or_size, size=None): + """Allocate the given amount of bytes (initially set to 0)""" + + if isinstance(init_or_size, bytes): + size = max(len(init_or_size) + 1, size) + result = ffi.new("uint8_t[]", size) + result[:] = init_or_size + else: + if size: + raise ValueError("Size must be specified once only") + result = ffi.new("uint8_t[]", init_or_size) + return result + + def get_c_string(c_string): + """Convert a C string into a Python byte sequence""" + return ffi.string(c_string) + + def get_raw_buffer(buf): + """Convert a C buffer into a Python byte sequence""" + return ffi.buffer(buf)[:] + + def c_uint8_ptr(data): + if isinstance(data, _buffer_type): + # This only works for cffi >= 1.7 + return ffi.cast(uint8_t_type, ffi.from_buffer(data)) + elif byte_string(data) or isinstance(data, _Array): + return data + else: + raise TypeError("Object type %s cannot be passed to C code" % type(data)) + + class VoidPointer_cffi(_VoidPointer): + """Model a newly allocated pointer to void""" + + def __init__(self): + self._pp = ffi.new("void *[1]") + + def get(self): + return self._pp[0] + + def address_of(self): + return self._pp + + def VoidPointer(): + return VoidPointer_cffi() + + backend = "cffi" + +except ImportError: + + import ctypes + from ctypes import (CDLL, c_void_p, byref, c_ulong, c_ulonglong, c_size_t, + create_string_buffer, c_ubyte, c_uint) + from ctypes.util import find_library + from ctypes import Array as _Array + + null_pointer = None + cached_architecture = [] + + def c_ubyte(c): + if not (0 <= c < 256): + raise OverflowError() + return ctypes.c_ubyte(c) + + def load_lib(name, cdecl): + if not cached_architecture: + # platform.architecture() creates a subprocess, so caching the + # result makes successive imports faster. + import platform + cached_architecture[:] = platform.architecture() + bits, linkage = cached_architecture + if "." not in name and not linkage.startswith("Win"): + full_name = find_library(name) + if full_name is None: + raise OSError("Cannot load library '%s'" % name) + name = full_name + return CDLL(name) + + def get_c_string(c_string): + return c_string.value + + def get_raw_buffer(buf): + return buf.raw + + # ---- Get raw pointer --- + + _c_ssize_t = ctypes.c_ssize_t + + _PyBUF_SIMPLE = 0 + _PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer + _PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release + _py_object = ctypes.py_object + _c_ssize_p = ctypes.POINTER(_c_ssize_t) + + # See Include/object.h for CPython + # and https://github.com/pallets/click/blob/master/src/click/_winconsole.py + class _Py_buffer(ctypes.Structure): + _fields_ = [ + ('buf', c_void_p), + ('obj', ctypes.py_object), + ('len', _c_ssize_t), + ('itemsize', _c_ssize_t), + ('readonly', ctypes.c_int), + ('ndim', ctypes.c_int), + ('format', ctypes.c_char_p), + ('shape', _c_ssize_p), + ('strides', _c_ssize_p), + ('suboffsets', _c_ssize_p), + ('internal', c_void_p) + ] + + # Extra field for CPython 2.6/2.7 + if sys.version_info[0] == 2: + _fields_.insert(-1, ('smalltable', _c_ssize_t * 2)) + + def c_uint8_ptr(data): + if byte_string(data) or isinstance(data, _Array): + return data + elif isinstance(data, _buffer_type): + obj = _py_object(data) + buf = _Py_buffer() + _PyObject_GetBuffer(obj, byref(buf), _PyBUF_SIMPLE) + try: + buffer_type = ctypes.c_ubyte * buf.len + return buffer_type.from_address(buf.buf) + finally: + _PyBuffer_Release(byref(buf)) + else: + raise TypeError("Object type %s cannot be passed to C code" % type(data)) + + # --- + + class VoidPointer_ctypes(_VoidPointer): + """Model a newly allocated pointer to void""" + + def __init__(self): + self._p = c_void_p() + + def get(self): + return self._p + + def address_of(self): + return byref(self._p) + + def VoidPointer(): + return VoidPointer_ctypes() + + backend = "ctypes" + + +class SmartPointer(object): + """Class to hold a non-managed piece of memory""" + + def __init__(self, raw_pointer, destructor): + self._raw_pointer = raw_pointer + self._destructor = destructor + + def get(self): + return self._raw_pointer + + def release(self): + rp, self._raw_pointer = self._raw_pointer, None + return rp + + def __del__(self): + try: + if self._raw_pointer is not None: + self._destructor(self._raw_pointer) + self._raw_pointer = None + except AttributeError: + pass + + +def load_pycryptodome_raw_lib(name, cdecl): + """Load a shared library and return a handle to it. + + @name, the name of the library expressed as a PyCryptodome module, + for instance Crypto.Cipher._raw_cbc. + + @cdecl, the C function declarations. + """ + + split = name.split(".") + dir_comps, basename = split[:-1], split[-1] + attempts = [] + for ext in extension_suffixes: + try: + filename = basename + ext + full_name = pycryptodome_filename(dir_comps, filename) + if not os.path.isfile(full_name): + attempts.append("Not found '%s'" % filename) + continue + return load_lib(full_name, cdecl) + except OSError as exp: + attempts.append("Cannot load '%s': %s" % (filename, str(exp))) + raise OSError("Cannot load native module '%s': %s" % (name, ", ".join(attempts))) + + +def is_buffer(x): + """Return True if object x supports the buffer interface""" + return isinstance(x, (bytes, bytearray, memoryview)) + + +def is_writeable_buffer(x): + return (isinstance(x, bytearray) or + (isinstance(x, memoryview) and not x.readonly)) |