summaryrefslogtreecommitdiffstats
path: root/lib/Crypto/PublicKey
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Crypto/PublicKey')
-rw-r--r--lib/Crypto/PublicKey/DSA.py682
-rw-r--r--lib/Crypto/PublicKey/DSA.pyi31
-rw-r--r--lib/Crypto/PublicKey/ECC.py1794
-rw-r--r--lib/Crypto/PublicKey/ECC.pyi66
-rw-r--r--lib/Crypto/PublicKey/ElGamal.py286
-rw-r--r--lib/Crypto/PublicKey/ElGamal.pyi18
-rw-r--r--lib/Crypto/PublicKey/RSA.py802
-rw-r--r--lib/Crypto/PublicKey/RSA.pyi51
-rw-r--r--lib/Crypto/PublicKey/__init__.py94
-rw-r--r--lib/Crypto/PublicKey/__init__.pyi0
-rwxr-xr-xlib/Crypto/PublicKey/_ec_ws.abi3.sobin0 -> 1068008 bytes
-rwxr-xr-xlib/Crypto/PublicKey/_ed25519.abi3.sobin0 -> 578280 bytes
-rwxr-xr-xlib/Crypto/PublicKey/_ed448.abi3.sobin0 -> 329424 bytes
-rw-r--r--lib/Crypto/PublicKey/_openssh.py135
-rw-r--r--lib/Crypto/PublicKey/_openssh.pyi7
-rwxr-xr-xlib/Crypto/PublicKey/_x25519.abi3.sobin0 -> 124632 bytes
16 files changed, 3966 insertions, 0 deletions
diff --git a/lib/Crypto/PublicKey/DSA.py b/lib/Crypto/PublicKey/DSA.py
new file mode 100644
index 0000000..4c7f47b
--- /dev/null
+++ b/lib/Crypto/PublicKey/DSA.py
@@ -0,0 +1,682 @@
+# -*- coding: utf-8 -*-
+#
+# PublicKey/DSA.py : DSA signature primitive
+#
+# Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net>
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain. To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+__all__ = ['generate', 'construct', 'DsaKey', 'import_key' ]
+
+import binascii
+import struct
+import itertools
+
+from Crypto.Util.py3compat import bchr, bord, tobytes, tostr, iter_range
+
+from Crypto import Random
+from Crypto.IO import PKCS8, PEM
+from Crypto.Hash import SHA256
+from Crypto.Util.asn1 import (
+ DerObject, DerSequence,
+ DerInteger, DerObjectId,
+ DerBitString,
+ )
+
+from Crypto.Math.Numbers import Integer
+from Crypto.Math.Primality import (test_probable_prime, COMPOSITE,
+ PROBABLY_PRIME)
+
+from Crypto.PublicKey import (_expand_subject_public_key_info,
+ _create_subject_public_key_info,
+ _extract_subject_public_key_info)
+
+# ; The following ASN.1 types are relevant for DSA
+#
+# SubjectPublicKeyInfo ::= SEQUENCE {
+# algorithm AlgorithmIdentifier,
+# subjectPublicKey BIT STRING
+# }
+#
+# id-dsa ID ::= { iso(1) member-body(2) us(840) x9-57(10040) x9cm(4) 1 }
+#
+# ; See RFC3279
+# Dss-Parms ::= SEQUENCE {
+# p INTEGER,
+# q INTEGER,
+# g INTEGER
+# }
+#
+# DSAPublicKey ::= INTEGER
+#
+# DSSPrivatKey_OpenSSL ::= SEQUENCE
+# version INTEGER,
+# p INTEGER,
+# q INTEGER,
+# g INTEGER,
+# y INTEGER,
+# x INTEGER
+# }
+#
+
+class DsaKey(object):
+ r"""Class defining an actual DSA key.
+ Do not instantiate directly.
+ Use :func:`generate`, :func:`construct` or :func:`import_key` instead.
+
+ :ivar p: DSA modulus
+ :vartype p: integer
+
+ :ivar q: Order of the subgroup
+ :vartype q: integer
+
+ :ivar g: Generator
+ :vartype g: integer
+
+ :ivar y: Public key
+ :vartype y: integer
+
+ :ivar x: Private key
+ :vartype x: integer
+
+ :undocumented: exportKey, publickey
+ """
+
+ _keydata = ['y', 'g', 'p', 'q', 'x']
+
+ def __init__(self, key_dict):
+ input_set = set(key_dict.keys())
+ public_set = set(('y' , 'g', 'p', 'q'))
+ if not public_set.issubset(input_set):
+ raise ValueError("Some DSA components are missing = %s" %
+ str(public_set - input_set))
+ extra_set = input_set - public_set
+ if extra_set and extra_set != set(('x',)):
+ raise ValueError("Unknown DSA components = %s" %
+ str(extra_set - set(('x',))))
+ self._key = dict(key_dict)
+
+ def _sign(self, m, k):
+ if not self.has_private():
+ raise TypeError("DSA public key cannot be used for signing")
+ if not (1 < k < self.q):
+ raise ValueError("k is not between 2 and q-1")
+
+ x, q, p, g = [self._key[comp] for comp in ['x', 'q', 'p', 'g']]
+
+ blind_factor = Integer.random_range(min_inclusive=1,
+ max_exclusive=q)
+ inv_blind_k = (blind_factor * k).inverse(q)
+ blind_x = x * blind_factor
+
+ r = pow(g, k, p) % q # r = (g**k mod p) mod q
+ s = (inv_blind_k * (blind_factor * m + blind_x * r)) % q
+ return map(int, (r, s))
+
+ def _verify(self, m, sig):
+ r, s = sig
+ y, q, p, g = [self._key[comp] for comp in ['y', 'q', 'p', 'g']]
+ if not (0 < r < q) or not (0 < s < q):
+ return False
+ w = Integer(s).inverse(q)
+ u1 = (w * m) % q
+ u2 = (w * r) % q
+ v = (pow(g, u1, p) * pow(y, u2, p) % p) % q
+ return v == r
+
+ def has_private(self):
+ """Whether this is a DSA private key"""
+
+ return 'x' in self._key
+
+ def can_encrypt(self): # legacy
+ return False
+
+ def can_sign(self): # legacy
+ return True
+
+ def public_key(self):
+ """A matching DSA public key.
+
+ Returns:
+ a new :class:`DsaKey` object
+ """
+
+ public_components = dict((k, self._key[k]) for k in ('y', 'g', 'p', 'q'))
+ return DsaKey(public_components)
+
+ def __eq__(self, other):
+ if bool(self.has_private()) != bool(other.has_private()):
+ return False
+
+ result = True
+ for comp in self._keydata:
+ result = result and (getattr(self._key, comp, None) ==
+ getattr(other._key, comp, None))
+ return result
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __getstate__(self):
+ # DSA key is not pickable
+ from pickle import PicklingError
+ raise PicklingError
+
+ def domain(self):
+ """The DSA domain parameters.
+
+ Returns
+ tuple : (p,q,g)
+ """
+
+ return [int(self._key[comp]) for comp in ('p', 'q', 'g')]
+
+ def __repr__(self):
+ attrs = []
+ for k in self._keydata:
+ if k == 'p':
+ bits = Integer(self.p).size_in_bits()
+ attrs.append("p(%d)" % (bits,))
+ elif hasattr(self, k):
+ attrs.append(k)
+ if self.has_private():
+ attrs.append("private")
+ # PY3K: This is meant to be text, do not change to bytes (data)
+ return "<%s @0x%x %s>" % (self.__class__.__name__, id(self), ",".join(attrs))
+
+ def __getattr__(self, item):
+ try:
+ return int(self._key[item])
+ except KeyError:
+ raise AttributeError(item)
+
+ def export_key(self, format='PEM', pkcs8=None, passphrase=None,
+ protection=None, randfunc=None):
+ """Export this DSA key.
+
+ Args:
+ format (string):
+ The encoding for the output:
+
+ - *'PEM'* (default). ASCII as per `RFC1421`_/ `RFC1423`_.
+ - *'DER'*. Binary ASN.1 encoding.
+ - *'OpenSSH'*. ASCII one-liner as per `RFC4253`_.
+ Only suitable for public keys, not for private keys.
+
+ passphrase (string):
+ *Private keys only*. The pass phrase to protect the output.
+
+ pkcs8 (boolean):
+ *Private keys only*. If ``True`` (default), the key is encoded
+ with `PKCS#8`_. If ``False``, it is encoded in the custom
+ OpenSSL/OpenSSH container.
+
+ protection (string):
+ *Only in combination with a pass phrase*.
+ The encryption scheme to use to protect the output.
+
+ If :data:`pkcs8` takes value ``True``, this is the PKCS#8
+ algorithm to use for deriving the secret and encrypting
+ the private DSA key.
+ For a complete list of algorithms, see :mod:`Crypto.IO.PKCS8`.
+ The default is *PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC*.
+
+ If :data:`pkcs8` is ``False``, the obsolete PEM encryption scheme is
+ used. It is based on MD5 for key derivation, and Triple DES for
+ encryption. Parameter :data:`protection` is then ignored.
+
+ The combination ``format='DER'`` and ``pkcs8=False`` is not allowed
+ if a passphrase is present.
+
+ randfunc (callable):
+ A function that returns random bytes.
+ By default it is :func:`Crypto.Random.get_random_bytes`.
+
+ Returns:
+ byte string : the encoded key
+
+ Raises:
+ ValueError : when the format is unknown or when you try to encrypt a private
+ key with *DER* format and OpenSSL/OpenSSH.
+
+ .. warning::
+ If you don't provide a pass phrase, the private key will be
+ exported in the clear!
+
+ .. _RFC1421: http://www.ietf.org/rfc/rfc1421.txt
+ .. _RFC1423: http://www.ietf.org/rfc/rfc1423.txt
+ .. _RFC4253: http://www.ietf.org/rfc/rfc4253.txt
+ .. _`PKCS#8`: http://www.ietf.org/rfc/rfc5208.txt
+ """
+
+ if passphrase is not None:
+ passphrase = tobytes(passphrase)
+
+ if randfunc is None:
+ randfunc = Random.get_random_bytes
+
+ if format == 'OpenSSH':
+ tup1 = [self._key[x].to_bytes() for x in ('p', 'q', 'g', 'y')]
+
+ def func(x):
+ if (bord(x[0]) & 0x80):
+ return bchr(0) + x
+ else:
+ return x
+
+ tup2 = [func(x) for x in tup1]
+ keyparts = [b'ssh-dss'] + tup2
+ keystring = b''.join(
+ [struct.pack(">I", len(kp)) + kp for kp in keyparts]
+ )
+ return b'ssh-dss ' + binascii.b2a_base64(keystring)[:-1]
+
+ # DER format is always used, even in case of PEM, which simply
+ # encodes it into BASE64.
+ params = DerSequence([self.p, self.q, self.g])
+ if self.has_private():
+ if pkcs8 is None:
+ pkcs8 = True
+ if pkcs8:
+ if not protection:
+ protection = 'PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC'
+ private_key = DerInteger(self.x).encode()
+ binary_key = PKCS8.wrap(
+ private_key, oid, passphrase,
+ protection, key_params=params,
+ randfunc=randfunc
+ )
+ if passphrase:
+ key_type = 'ENCRYPTED PRIVATE'
+ else:
+ key_type = 'PRIVATE'
+ passphrase = None
+ else:
+ if format != 'PEM' and passphrase:
+ raise ValueError("DSA private key cannot be encrypted")
+ ints = [0, self.p, self.q, self.g, self.y, self.x]
+ binary_key = DerSequence(ints).encode()
+ key_type = "DSA PRIVATE"
+ else:
+ if pkcs8:
+ raise ValueError("PKCS#8 is only meaningful for private keys")
+
+ binary_key = _create_subject_public_key_info(oid,
+ DerInteger(self.y), params)
+ key_type = "PUBLIC"
+
+ if format == 'DER':
+ return binary_key
+ if format == 'PEM':
+ pem_str = PEM.encode(
+ binary_key, key_type + " KEY",
+ passphrase, randfunc
+ )
+ return tobytes(pem_str)
+ raise ValueError("Unknown key format '%s'. Cannot export the DSA key." % format)
+
+ # Backward-compatibility
+ exportKey = export_key
+ publickey = public_key
+
+ # Methods defined in PyCrypto that we don't support anymore
+
+ def sign(self, M, K):
+ raise NotImplementedError("Use module Crypto.Signature.DSS instead")
+
+ def verify(self, M, signature):
+ raise NotImplementedError("Use module Crypto.Signature.DSS instead")
+
+ def encrypt(self, plaintext, K):
+ raise NotImplementedError
+
+ def decrypt(self, ciphertext):
+ raise NotImplementedError
+
+ def blind(self, M, B):
+ raise NotImplementedError
+
+ def unblind(self, M, B):
+ raise NotImplementedError
+
+ def size(self):
+ raise NotImplementedError
+
+
+def _generate_domain(L, randfunc):
+ """Generate a new set of DSA domain parameters"""
+
+ N = { 1024:160, 2048:224, 3072:256 }.get(L)
+ if N is None:
+ raise ValueError("Invalid modulus length (%d)" % L)
+
+ outlen = SHA256.digest_size * 8
+ n = (L + outlen - 1) // outlen - 1 # ceil(L/outlen) -1
+ b_ = L - 1 - (n * outlen)
+
+ # Generate q (A.1.1.2)
+ q = Integer(4)
+ upper_bit = 1 << (N - 1)
+ while test_probable_prime(q, randfunc) != PROBABLY_PRIME:
+ seed = randfunc(64)
+ U = Integer.from_bytes(SHA256.new(seed).digest()) & (upper_bit - 1)
+ q = U | upper_bit | 1
+
+ assert(q.size_in_bits() == N)
+
+ # Generate p (A.1.1.2)
+ offset = 1
+ upper_bit = 1 << (L - 1)
+ while True:
+ V = [ SHA256.new(seed + Integer(offset + j).to_bytes()).digest()
+ for j in iter_range(n + 1) ]
+ V = [ Integer.from_bytes(v) for v in V ]
+ W = sum([V[i] * (1 << (i * outlen)) for i in iter_range(n)],
+ (V[n] & ((1 << b_) - 1)) * (1 << (n * outlen)))
+
+ X = Integer(W + upper_bit) # 2^{L-1} < X < 2^{L}
+ assert(X.size_in_bits() == L)
+
+ c = X % (q * 2)
+ p = X - (c - 1) # 2q divides (p-1)
+ if p.size_in_bits() == L and \
+ test_probable_prime(p, randfunc) == PROBABLY_PRIME:
+ break
+ offset += n + 1
+
+ # Generate g (A.2.3, index=1)
+ e = (p - 1) // q
+ for count in itertools.count(1):
+ U = seed + b"ggen" + bchr(1) + Integer(count).to_bytes()
+ W = Integer.from_bytes(SHA256.new(U).digest())
+ g = pow(W, e, p)
+ if g != 1:
+ break
+
+ return (p, q, g, seed)
+
+
+def generate(bits, randfunc=None, domain=None):
+ """Generate a new DSA key pair.
+
+ The algorithm follows Appendix A.1/A.2 and B.1 of `FIPS 186-4`_,
+ respectively for domain generation and key pair generation.
+
+ Args:
+ bits (integer):
+ Key length, or size (in bits) of the DSA modulus *p*.
+ It must be 1024, 2048 or 3072.
+
+ randfunc (callable):
+ Random number generation function; it accepts a single integer N
+ and return a string of random data N bytes long.
+ If not specified, :func:`Crypto.Random.get_random_bytes` is used.
+
+ domain (tuple):
+ The DSA domain parameters *p*, *q* and *g* as a list of 3
+ integers. Size of *p* and *q* must comply to `FIPS 186-4`_.
+ If not specified, the parameters are created anew.
+
+ Returns:
+ :class:`DsaKey` : a new DSA key object
+
+ Raises:
+ ValueError : when **bits** is too little, too big, or not a multiple of 64.
+
+ .. _FIPS 186-4: http://nvlpubs.nist.gov/nistpubs/FIPS/NIST.FIPS.186-4.pdf
+ """
+
+ if randfunc is None:
+ randfunc = Random.get_random_bytes
+
+ if domain:
+ p, q, g = map(Integer, domain)
+
+ ## Perform consistency check on domain parameters
+ # P and Q must be prime
+ fmt_error = test_probable_prime(p) == COMPOSITE
+ fmt_error |= test_probable_prime(q) == COMPOSITE
+ # Verify Lagrange's theorem for sub-group
+ fmt_error |= ((p - 1) % q) != 0
+ fmt_error |= g <= 1 or g >= p
+ fmt_error |= pow(g, q, p) != 1
+ if fmt_error:
+ raise ValueError("Invalid DSA domain parameters")
+ else:
+ p, q, g, _ = _generate_domain(bits, randfunc)
+
+ L = p.size_in_bits()
+ N = q.size_in_bits()
+
+ if L != bits:
+ raise ValueError("Mismatch between size of modulus (%d)"
+ " and 'bits' parameter (%d)" % (L, bits))
+
+ if (L, N) not in [(1024, 160), (2048, 224),
+ (2048, 256), (3072, 256)]:
+ raise ValueError("Lengths of p and q (%d, %d) are not compatible"
+ "to FIPS 186-3" % (L, N))
+
+ if not 1 < g < p:
+ raise ValueError("Incorrent DSA generator")
+
+ # B.1.1
+ c = Integer.random(exact_bits=N + 64, randfunc=randfunc)
+ x = c % (q - 1) + 1 # 1 <= x <= q-1
+ y = pow(g, x, p)
+
+ key_dict = { 'y':y, 'g':g, 'p':p, 'q':q, 'x':x }
+ return DsaKey(key_dict)
+
+
+def construct(tup, consistency_check=True):
+ """Construct a DSA key from a tuple of valid DSA components.
+
+ Args:
+ tup (tuple):
+ A tuple of long integers, with 4 or 5 items
+ in the following order:
+
+ 1. Public key (*y*).
+ 2. Sub-group generator (*g*).
+ 3. Modulus, finite field order (*p*).
+ 4. Sub-group order (*q*).
+ 5. Private key (*x*). Optional.
+
+ consistency_check (boolean):
+ If ``True``, the library will verify that the provided components
+ fulfil the main DSA properties.
+
+ Raises:
+ ValueError: when the key being imported fails the most basic DSA validity checks.
+
+ Returns:
+ :class:`DsaKey` : a DSA key object
+ """
+
+ key_dict = dict(zip(('y', 'g', 'p', 'q', 'x'), map(Integer, tup)))
+ key = DsaKey(key_dict)
+
+ fmt_error = False
+ if consistency_check:
+ # P and Q must be prime
+ fmt_error = test_probable_prime(key.p) == COMPOSITE
+ fmt_error |= test_probable_prime(key.q) == COMPOSITE
+ # Verify Lagrange's theorem for sub-group
+ fmt_error |= ((key.p - 1) % key.q) != 0
+ fmt_error |= key.g <= 1 or key.g >= key.p
+ fmt_error |= pow(key.g, key.q, key.p) != 1
+ # Public key
+ fmt_error |= key.y <= 0 or key.y >= key.p
+ if hasattr(key, 'x'):
+ fmt_error |= key.x <= 0 or key.x >= key.q
+ fmt_error |= pow(key.g, key.x, key.p) != key.y
+
+ if fmt_error:
+ raise ValueError("Invalid DSA key components")
+
+ return key
+
+
+# Dss-Parms ::= SEQUENCE {
+# p OCTET STRING,
+# q OCTET STRING,
+# g OCTET STRING
+# }
+# DSAPublicKey ::= INTEGER -- public key, y
+
+def _import_openssl_private(encoded, passphrase, params):
+ if params:
+ raise ValueError("DSA private key already comes with parameters")
+ der = DerSequence().decode(encoded, nr_elements=6, only_ints_expected=True)
+ if der[0] != 0:
+ raise ValueError("No version found")
+ tup = [der[comp] for comp in (4, 3, 1, 2, 5)]
+ return construct(tup)
+
+
+def _import_subjectPublicKeyInfo(encoded, passphrase, params):
+
+ algoid, encoded_key, emb_params = _expand_subject_public_key_info(encoded)
+ if algoid != oid:
+ raise ValueError("No DSA subjectPublicKeyInfo")
+ if params and emb_params:
+ raise ValueError("Too many DSA parameters")
+
+ y = DerInteger().decode(encoded_key).value
+ p, q, g = list(DerSequence().decode(params or emb_params))
+ tup = (y, g, p, q)
+ return construct(tup)
+
+
+def _import_x509_cert(encoded, passphrase, params):
+
+ sp_info = _extract_subject_public_key_info(encoded)
+ return _import_subjectPublicKeyInfo(sp_info, None, params)
+
+
+def _import_pkcs8(encoded, passphrase, params):
+ if params:
+ raise ValueError("PKCS#8 already includes parameters")
+ k = PKCS8.unwrap(encoded, passphrase)
+ if k[0] != oid:
+ raise ValueError("No PKCS#8 encoded DSA key")
+ x = DerInteger().decode(k[1]).value
+ p, q, g = list(DerSequence().decode(k[2]))
+ tup = (pow(g, x, p), g, p, q, x)
+ return construct(tup)
+
+
+def _import_key_der(key_data, passphrase, params):
+ """Import a DSA key (public or private half), encoded in DER form."""
+
+ decodings = (_import_openssl_private,
+ _import_subjectPublicKeyInfo,
+ _import_x509_cert,
+ _import_pkcs8)
+
+ for decoding in decodings:
+ try:
+ return decoding(key_data, passphrase, params)
+ except ValueError:
+ pass
+
+ raise ValueError("DSA key format is not supported")
+
+
+def import_key(extern_key, passphrase=None):
+ """Import a DSA key.
+
+ Args:
+ extern_key (string or byte string):
+ The DSA key to import.
+
+ The following formats are supported for a DSA **public** key:
+
+ - X.509 certificate (binary DER or PEM)
+ - X.509 ``subjectPublicKeyInfo`` (binary DER or PEM)
+ - OpenSSH (ASCII one-liner, see `RFC4253`_)
+
+ The following formats are supported for a DSA **private** key:
+
+ - `PKCS#8`_ ``PrivateKeyInfo`` or ``EncryptedPrivateKeyInfo``
+ DER SEQUENCE (binary or PEM)
+ - OpenSSL/OpenSSH custom format (binary or PEM)
+
+ For details about the PEM encoding, see `RFC1421`_/`RFC1423`_.
+
+ passphrase (string):
+ In case of an encrypted private key, this is the pass phrase
+ from which the decryption key is derived.
+
+ Encryption may be applied either at the `PKCS#8`_ or at the PEM level.
+
+ Returns:
+ :class:`DsaKey` : a DSA key object
+
+ Raises:
+ ValueError : when the given key cannot be parsed (possibly because
+ the pass phrase is wrong).
+
+ .. _RFC1421: http://www.ietf.org/rfc/rfc1421.txt
+ .. _RFC1423: http://www.ietf.org/rfc/rfc1423.txt
+ .. _RFC4253: http://www.ietf.org/rfc/rfc4253.txt
+ .. _PKCS#8: http://www.ietf.org/rfc/rfc5208.txt
+ """
+
+ extern_key = tobytes(extern_key)
+ if passphrase is not None:
+ passphrase = tobytes(passphrase)
+
+ if extern_key.startswith(b'-----'):
+ # This is probably a PEM encoded key
+ (der, marker, enc_flag) = PEM.decode(tostr(extern_key), passphrase)
+ if enc_flag:
+ passphrase = None
+ return _import_key_der(der, passphrase, None)
+
+ if extern_key.startswith(b'ssh-dss '):
+ # This is probably a public OpenSSH key
+ keystring = binascii.a2b_base64(extern_key.split(b' ')[1])
+ keyparts = []
+ while len(keystring) > 4:
+ length = struct.unpack(">I", keystring[:4])[0]
+ keyparts.append(keystring[4:4 + length])
+ keystring = keystring[4 + length:]
+ if keyparts[0] == b"ssh-dss":
+ tup = [Integer.from_bytes(keyparts[x]) for x in (4, 3, 1, 2)]
+ return construct(tup)
+
+ if len(extern_key) > 0 and bord(extern_key[0]) == 0x30:
+ # This is probably a DER encoded key
+ return _import_key_der(extern_key, passphrase, None)
+
+ raise ValueError("DSA key format is not supported")
+
+
+# Backward compatibility
+importKey = import_key
+
+#: `Object ID`_ for a DSA key.
+#:
+#: id-dsa ID ::= { iso(1) member-body(2) us(840) x9-57(10040) x9cm(4) 1 }
+#:
+#: .. _`Object ID`: http://www.alvestrand.no/objectid/1.2.840.10040.4.1.html
+oid = "1.2.840.10040.4.1"
diff --git a/lib/Crypto/PublicKey/DSA.pyi b/lib/Crypto/PublicKey/DSA.pyi
new file mode 100644
index 0000000..354ac1f
--- /dev/null
+++ b/lib/Crypto/PublicKey/DSA.pyi
@@ -0,0 +1,31 @@
+from typing import Dict, Tuple, Callable, Union, Optional
+
+__all__ = ['generate', 'construct', 'DsaKey', 'import_key' ]
+
+RNG = Callable[[int], bytes]
+
+class DsaKey(object):
+ def __init__(self, key_dict: Dict[str, int]) -> None: ...
+ def has_private(self) -> bool: ...
+ def can_encrypt(self) -> bool: ... # legacy
+ def can_sign(self) -> bool: ... # legacy
+ def public_key(self) -> DsaKey: ...
+ def __eq__(self, other: object) -> bool: ...
+ def __ne__(self, other: object) -> bool: ...
+ def __getstate__(self) -> None: ...
+ def domain(self) -> Tuple[int, int, int]: ...
+ def __repr__(self) -> str: ...
+ def __getattr__(self, item: str) -> int: ...
+ def export_key(self, format: Optional[str]="PEM", pkcs8: Optional[bool]=None, passphrase: Optional[str]=None,
+ protection: Optional[str]=None, randfunc: Optional[RNG]=None) -> bytes: ...
+ # Backward-compatibility
+ exportKey = export_key
+ publickey = public_key
+
+def generate(bits: int, randfunc: Optional[RNG]=None, domain: Optional[Tuple[int, int, int]]=None) -> DsaKey: ...
+def construct(tup: Union[Tuple[int, int, int, int], Tuple[int, int, int, int, int]], consistency_check: Optional[bool]=True) -> DsaKey: ...
+def import_key(extern_key: Union[str, bytes], passphrase: Optional[str]=None) -> DsaKey: ...
+# Backward compatibility
+importKey = import_key
+
+oid: str
diff --git a/lib/Crypto/PublicKey/ECC.py b/lib/Crypto/PublicKey/ECC.py
new file mode 100644
index 0000000..0b605c4
--- /dev/null
+++ b/lib/Crypto/PublicKey/ECC.py
@@ -0,0 +1,1794 @@
+# ===================================================================
+#
+# Copyright (c) 2015, Legrandin <helderijs@gmail.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# ===================================================================
+
+from __future__ import print_function
+
+import re
+import struct
+import binascii
+from collections import namedtuple
+
+from Crypto.Util.py3compat import bord, tobytes, tostr, bchr, is_string
+from Crypto.Util.number import bytes_to_long, long_to_bytes
+
+from Crypto.Math.Numbers import Integer
+from Crypto.Util.asn1 import (DerObjectId, DerOctetString, DerSequence,
+ DerBitString)
+
+from Crypto.Util._raw_api import (load_pycryptodome_raw_lib, VoidPointer,
+ SmartPointer, c_size_t, c_uint8_ptr,
+ c_ulonglong, null_pointer)
+
+from Crypto.PublicKey import (_expand_subject_public_key_info,
+ _create_subject_public_key_info,
+ _extract_subject_public_key_info)
+
+from Crypto.Hash import SHA512, SHAKE256
+
+from Crypto.Random import get_random_bytes
+from Crypto.Random.random import getrandbits
+
+
+_ec_lib = load_pycryptodome_raw_lib("Crypto.PublicKey._ec_ws", """
+typedef void EcContext;
+typedef void EcPoint;
+int ec_ws_new_context(EcContext **pec_ctx,
+ const uint8_t *modulus,
+ const uint8_t *b,
+ const uint8_t *order,
+ size_t len,
+ uint64_t seed);
+void ec_free_context(EcContext *ec_ctx);
+int ec_ws_new_point(EcPoint **pecp,
+ const uint8_t *x,
+ const uint8_t *y,
+ size_t len,
+ const EcContext *ec_ctx);
+void ec_ws_free_point(EcPoint *ecp);
+int ec_ws_get_xy(uint8_t *x,
+ uint8_t *y,
+ size_t len,
+ const EcPoint *ecp);
+int ec_ws_double(EcPoint *p);
+int ec_ws_add(EcPoint *ecpa, EcPoint *ecpb);
+int ec_ws_scalar(EcPoint *ecp,
+ const uint8_t *k,
+ size_t len,
+ uint64_t seed);
+int ec_ws_clone(EcPoint **pecp2, const EcPoint *ecp);
+int ec_ws_cmp(const EcPoint *ecp1, const EcPoint *ecp2);
+int ec_ws_neg(EcPoint *p);
+""")
+
+_ed25519_lib = load_pycryptodome_raw_lib("Crypto.PublicKey._ed25519", """
+typedef void Point;
+int ed25519_new_point(Point **out,
+ const uint8_t x[32],
+ const uint8_t y[32],
+ size_t modsize,
+ const void *context);
+int ed25519_clone(Point **P, const Point *Q);
+void ed25519_free_point(Point *p);
+int ed25519_cmp(const Point *p1, const Point *p2);
+int ed25519_neg(Point *p);
+int ed25519_get_xy(uint8_t *xb, uint8_t *yb, size_t modsize, Point *p);
+int ed25519_double(Point *p);
+int ed25519_add(Point *P1, const Point *P2);
+int ed25519_scalar(Point *P, uint8_t *scalar, size_t scalar_len, uint64_t seed);
+""")
+
+_ed448_lib = load_pycryptodome_raw_lib("Crypto.PublicKey._ed448", """
+typedef void EcContext;
+typedef void PointEd448;
+int ed448_new_context(EcContext **pec_ctx);
+void ed448_context(EcContext *ec_ctx);
+void ed448_free_context(EcContext *ec_ctx);
+int ed448_new_point(PointEd448 **out,
+ const uint8_t x[56],
+ const uint8_t y[56],
+ size_t len,
+ const EcContext *context);
+int ed448_clone(PointEd448 **P, const PointEd448 *Q);
+void ed448_free_point(PointEd448 *p);
+int ed448_cmp(const PointEd448 *p1, const PointEd448 *p2);
+int ed448_neg(PointEd448 *p);
+int ed448_get_xy(uint8_t *xb, uint8_t *yb, size_t len, const PointEd448 *p);
+int ed448_double(PointEd448 *p);
+int ed448_add(PointEd448 *P1, const PointEd448 *P2);
+int ed448_scalar(PointEd448 *P, const uint8_t *scalar, size_t scalar_len, uint64_t seed);
+""")
+
+
+def lib_func(ecc_obj, func_name):
+ if ecc_obj._curve.desc == "Ed25519":
+ result = getattr(_ed25519_lib, "ed25519_" + func_name)
+ elif ecc_obj._curve.desc == "Ed448":
+ result = getattr(_ed448_lib, "ed448_" + func_name)
+ else:
+ result = getattr(_ec_lib, "ec_ws_" + func_name)
+ return result
+
+#
+# _curves is a database of curve parameters. Items are indexed by their
+# human-friendly name, suchas "P-256". Each item has the following fields:
+# - p: the prime number that defines the finite field for all modulo operations
+# - b: the constant in the Short Weierstrass curve equation
+# - order: the number of elements in the group with the generator below
+# - Gx the affine coordinate X of the generator point
+# - Gy the affine coordinate Y of the generator point
+# - G the generator, as an EccPoint object
+# - modulus_bits the minimum number of bits for encoding the modulus p
+# - oid an ASCII string with the registered ASN.1 Object ID
+# - context a raw pointer to memory holding a context for all curve operations (can be NULL)
+# - desc an ASCII string describing the curve
+# - openssh the ASCII string used in OpenSSH id files for public keys on this curve
+# - name the ASCII string which is also a valid key in _curves
+
+
+_Curve = namedtuple("_Curve", "p b order Gx Gy G modulus_bits oid context desc openssh name")
+_curves = {}
+
+
+p192_names = ["p192", "NIST P-192", "P-192", "prime192v1", "secp192r1",
+ "nistp192"]
+
+
+def init_p192():
+ p = 0xfffffffffffffffffffffffffffffffeffffffffffffffff
+ b = 0x64210519e59c80e70fa7e9ab72243049feb8deecc146b9b1
+ order = 0xffffffffffffffffffffffff99def836146bc9b1b4d22831
+ Gx = 0x188da80eb03090f67cbf20eb43a18800f4ff0afd82ff1012
+ Gy = 0x07192b95ffc8da78631011ed6b24cdd573f977a11e794811
+
+ p192_modulus = long_to_bytes(p, 24)
+ p192_b = long_to_bytes(b, 24)
+ p192_order = long_to_bytes(order, 24)
+
+ ec_p192_context = VoidPointer()
+ result = _ec_lib.ec_ws_new_context(ec_p192_context.address_of(),
+ c_uint8_ptr(p192_modulus),
+ c_uint8_ptr(p192_b),
+ c_uint8_ptr(p192_order),
+ c_size_t(len(p192_modulus)),
+ c_ulonglong(getrandbits(64))
+ )
+ if result:
+ raise ImportError("Error %d initializing P-192 context" % result)
+
+ context = SmartPointer(ec_p192_context.get(), _ec_lib.ec_free_context)
+ p192 = _Curve(Integer(p),
+ Integer(b),
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 192,
+ "1.2.840.10045.3.1.1", # ANSI X9.62 / SEC2
+ context,
+ "NIST P-192",
+ "ecdsa-sha2-nistp192",
+ "p192")
+ global p192_names
+ _curves.update(dict.fromkeys(p192_names, p192))
+
+
+init_p192()
+del init_p192
+
+
+p224_names = ["p224", "NIST P-224", "P-224", "prime224v1", "secp224r1",
+ "nistp224"]
+
+
+def init_p224():
+ p = 0xffffffffffffffffffffffffffffffff000000000000000000000001
+ b = 0xb4050a850c04b3abf54132565044b0b7d7bfd8ba270b39432355ffb4
+ order = 0xffffffffffffffffffffffffffff16a2e0b8f03e13dd29455c5c2a3d
+ Gx = 0xb70e0cbd6bb4bf7f321390b94a03c1d356c21122343280d6115c1d21
+ Gy = 0xbd376388b5f723fb4c22dfe6cd4375a05a07476444d5819985007e34
+
+ p224_modulus = long_to_bytes(p, 28)
+ p224_b = long_to_bytes(b, 28)
+ p224_order = long_to_bytes(order, 28)
+
+ ec_p224_context = VoidPointer()
+ result = _ec_lib.ec_ws_new_context(ec_p224_context.address_of(),
+ c_uint8_ptr(p224_modulus),
+ c_uint8_ptr(p224_b),
+ c_uint8_ptr(p224_order),
+ c_size_t(len(p224_modulus)),
+ c_ulonglong(getrandbits(64))
+ )
+ if result:
+ raise ImportError("Error %d initializing P-224 context" % result)
+
+ context = SmartPointer(ec_p224_context.get(), _ec_lib.ec_free_context)
+ p224 = _Curve(Integer(p),
+ Integer(b),
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 224,
+ "1.3.132.0.33", # SEC 2
+ context,
+ "NIST P-224",
+ "ecdsa-sha2-nistp224",
+ "p224")
+ global p224_names
+ _curves.update(dict.fromkeys(p224_names, p224))
+
+
+init_p224()
+del init_p224
+
+
+p256_names = ["p256", "NIST P-256", "P-256", "prime256v1", "secp256r1",
+ "nistp256"]
+
+
+def init_p256():
+ p = 0xffffffff00000001000000000000000000000000ffffffffffffffffffffffff
+ b = 0x5ac635d8aa3a93e7b3ebbd55769886bc651d06b0cc53b0f63bce3c3e27d2604b
+ order = 0xffffffff00000000ffffffffffffffffbce6faada7179e84f3b9cac2fc632551
+ Gx = 0x6b17d1f2e12c4247f8bce6e563a440f277037d812deb33a0f4a13945d898c296
+ Gy = 0x4fe342e2fe1a7f9b8ee7eb4a7c0f9e162bce33576b315ececbb6406837bf51f5
+
+ p256_modulus = long_to_bytes(p, 32)
+ p256_b = long_to_bytes(b, 32)
+ p256_order = long_to_bytes(order, 32)
+
+ ec_p256_context = VoidPointer()
+ result = _ec_lib.ec_ws_new_context(ec_p256_context.address_of(),
+ c_uint8_ptr(p256_modulus),
+ c_uint8_ptr(p256_b),
+ c_uint8_ptr(p256_order),
+ c_size_t(len(p256_modulus)),
+ c_ulonglong(getrandbits(64))
+ )
+ if result:
+ raise ImportError("Error %d initializing P-256 context" % result)
+
+ context = SmartPointer(ec_p256_context.get(), _ec_lib.ec_free_context)
+ p256 = _Curve(Integer(p),
+ Integer(b),
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 256,
+ "1.2.840.10045.3.1.7", # ANSI X9.62 / SEC2
+ context,
+ "NIST P-256",
+ "ecdsa-sha2-nistp256",
+ "p256")
+ global p256_names
+ _curves.update(dict.fromkeys(p256_names, p256))
+
+
+init_p256()
+del init_p256
+
+
+p384_names = ["p384", "NIST P-384", "P-384", "prime384v1", "secp384r1",
+ "nistp384"]
+
+
+def init_p384():
+ p = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffeffffffff0000000000000000ffffffff
+ b = 0xb3312fa7e23ee7e4988e056be3f82d19181d9c6efe8141120314088f5013875ac656398d8a2ed19d2a85c8edd3ec2aef
+ order = 0xffffffffffffffffffffffffffffffffffffffffffffffffc7634d81f4372ddf581a0db248b0a77aecec196accc52973
+ Gx = 0xaa87ca22be8b05378eb1c71ef320ad746e1d3b628ba79b9859f741e082542a385502f25dbf55296c3a545e3872760aB7
+ Gy = 0x3617de4a96262c6f5d9e98bf9292dc29f8f41dbd289a147ce9da3113b5f0b8c00a60b1ce1d7e819d7a431d7c90ea0e5F
+
+ p384_modulus = long_to_bytes(p, 48)
+ p384_b = long_to_bytes(b, 48)
+ p384_order = long_to_bytes(order, 48)
+
+ ec_p384_context = VoidPointer()
+ result = _ec_lib.ec_ws_new_context(ec_p384_context.address_of(),
+ c_uint8_ptr(p384_modulus),
+ c_uint8_ptr(p384_b),
+ c_uint8_ptr(p384_order),
+ c_size_t(len(p384_modulus)),
+ c_ulonglong(getrandbits(64))
+ )
+ if result:
+ raise ImportError("Error %d initializing P-384 context" % result)
+
+ context = SmartPointer(ec_p384_context.get(), _ec_lib.ec_free_context)
+ p384 = _Curve(Integer(p),
+ Integer(b),
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 384,
+ "1.3.132.0.34", # SEC 2
+ context,
+ "NIST P-384",
+ "ecdsa-sha2-nistp384",
+ "p384")
+ global p384_names
+ _curves.update(dict.fromkeys(p384_names, p384))
+
+
+init_p384()
+del init_p384
+
+
+p521_names = ["p521", "NIST P-521", "P-521", "prime521v1", "secp521r1",
+ "nistp521"]
+
+
+def init_p521():
+ p = 0x000001ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
+ b = 0x00000051953eb9618e1c9a1f929a21a0b68540eea2da725b99b315f3b8b489918ef109e156193951ec7e937b1652c0bd3bb1bf073573df883d2c34f1ef451fd46b503f00
+ order = 0x000001fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffa51868783bf2f966b7fcc0148f709a5d03bb5c9b8899c47aebb6fb71e91386409
+ Gx = 0x000000c6858e06b70404e9cd9e3ecb662395b4429c648139053fb521f828af606b4d3dbaa14b5e77efe75928fe1dc127a2ffa8de3348b3c1856a429bf97e7e31c2e5bd66
+ Gy = 0x0000011839296a789a3bc0045c8a5fb42c7d1bd998f54449579b446817afbd17273e662c97ee72995ef42640c550b9013fad0761353c7086a272c24088be94769fd16650
+
+ p521_modulus = long_to_bytes(p, 66)
+ p521_b = long_to_bytes(b, 66)
+ p521_order = long_to_bytes(order, 66)
+
+ ec_p521_context = VoidPointer()
+ result = _ec_lib.ec_ws_new_context(ec_p521_context.address_of(),
+ c_uint8_ptr(p521_modulus),
+ c_uint8_ptr(p521_b),
+ c_uint8_ptr(p521_order),
+ c_size_t(len(p521_modulus)),
+ c_ulonglong(getrandbits(64))
+ )
+ if result:
+ raise ImportError("Error %d initializing P-521 context" % result)
+
+ context = SmartPointer(ec_p521_context.get(), _ec_lib.ec_free_context)
+ p521 = _Curve(Integer(p),
+ Integer(b),
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 521,
+ "1.3.132.0.35", # SEC 2
+ context,
+ "NIST P-521",
+ "ecdsa-sha2-nistp521",
+ "p521")
+ global p521_names
+ _curves.update(dict.fromkeys(p521_names, p521))
+
+
+init_p521()
+del init_p521
+
+
+ed25519_names = ["ed25519", "Ed25519"]
+
+
+def init_ed25519():
+ p = 0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffed # 2**255 - 19
+ order = 0x1000000000000000000000000000000014def9dea2f79cd65812631a5cf5d3ed
+ Gx = 0x216936d3cd6e53fec0a4e231fdd6dc5c692cc7609525a7b2c9562d608f25d51a
+ Gy = 0x6666666666666666666666666666666666666666666666666666666666666658
+
+ ed25519 = _Curve(Integer(p),
+ None,
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 255,
+ "1.3.101.112", # RFC8410
+ None,
+ "Ed25519", # Used throughout; do not change
+ "ssh-ed25519",
+ "ed25519")
+ global ed25519_names
+ _curves.update(dict.fromkeys(ed25519_names, ed25519))
+
+
+init_ed25519()
+del init_ed25519
+
+
+ed448_names = ["ed448", "Ed448"]
+
+
+def init_ed448():
+ p = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffeffffffffffffffffffffffffffffffffffffffffffffffffffffffff # 2**448 - 2**224 - 1
+ order = 0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffff7cca23e9c44edb49aed63690216cc2728dc58f552378c292ab5844f3
+ Gx = 0x4f1970c66bed0ded221d15a622bf36da9e146570470f1767ea6de324a3d3a46412ae1af72ab66511433b80e18b00938e2626a82bc70cc05e
+ Gy = 0x693f46716eb6bc248876203756c9c7624bea73736ca3984087789c1e05a0c2d73ad3ff1ce67c39c4fdbd132c4ed7c8ad9808795bf230fa14
+
+ ed448_context = VoidPointer()
+ result = _ed448_lib.ed448_new_context(ed448_context.address_of())
+ if result:
+ raise ImportError("Error %d initializing Ed448 context" % result)
+
+ context = SmartPointer(ed448_context.get(), _ed448_lib.ed448_free_context)
+
+ ed448 = _Curve(Integer(p),
+ None,
+ Integer(order),
+ Integer(Gx),
+ Integer(Gy),
+ None,
+ 448,
+ "1.3.101.113", # RFC8410
+ context,
+ "Ed448", # Used throughout; do not change
+ None,
+ "ed448")
+ global ed448_names
+ _curves.update(dict.fromkeys(ed448_names, ed448))
+
+
+init_ed448()
+del init_ed448
+
+
+class UnsupportedEccFeature(ValueError):
+ pass
+
+
+class EccPoint(object):
+ """A class to model a point on an Elliptic Curve.
+
+ The class supports operators for:
+
+ * Adding two points: ``R = S + T``
+ * In-place addition: ``S += T``
+ * Negating a point: ``R = -T``
+ * Comparing two points: ``if S == T: ...`` or ``if S != T: ...``
+ * Multiplying a point by a scalar: ``R = S*k``
+ * In-place multiplication by a scalar: ``T *= k``
+
+ :ivar x: The affine X-coordinate of the ECC point
+ :vartype x: integer
+
+ :ivar y: The affine Y-coordinate of the ECC point
+ :vartype y: integer
+
+ :ivar xy: The tuple with affine X- and Y- coordinates
+ """
+
+ def __init__(self, x, y, curve="p256"):
+
+ try:
+ self._curve = _curves[curve]
+ except KeyError:
+ raise ValueError("Unknown curve name %s" % str(curve))
+ self._curve_name = curve
+
+ modulus_bytes = self.size_in_bytes()
+
+ xb = long_to_bytes(x, modulus_bytes)
+ yb = long_to_bytes(y, modulus_bytes)
+ if len(xb) != modulus_bytes or len(yb) != modulus_bytes:
+ raise ValueError("Incorrect coordinate length")
+
+ new_point = lib_func(self, "new_point")
+ free_func = lib_func(self, "free_point")
+
+ self._point = VoidPointer()
+ try:
+ context = self._curve.context.get()
+ except AttributeError:
+ context = null_pointer
+ result = new_point(self._point.address_of(),
+ c_uint8_ptr(xb),
+ c_uint8_ptr(yb),
+ c_size_t(modulus_bytes),
+ context)
+
+ if result:
+ if result == 15:
+ raise ValueError("The EC point does not belong to the curve")
+ raise ValueError("Error %d while instantiating an EC point" % result)
+
+ # Ensure that object disposal of this Python object will (eventually)
+ # free the memory allocated by the raw library for the EC point
+ self._point = SmartPointer(self._point.get(), free_func)
+
+ def set(self, point):
+ clone = lib_func(self, "clone")
+ free_func = lib_func(self, "free_point")
+
+ self._point = VoidPointer()
+ result = clone(self._point.address_of(),
+ point._point.get())
+
+ if result:
+ raise ValueError("Error %d while cloning an EC point" % result)
+
+ self._point = SmartPointer(self._point.get(), free_func)
+ return self
+
+ def __eq__(self, point):
+ cmp_func = lib_func(self, "cmp")
+ return 0 == cmp_func(self._point.get(), point._point.get())
+
+ # Only needed for Python 2
+ def __ne__(self, point):
+ return not self == point
+
+ def __neg__(self):
+ neg_func = lib_func(self, "neg")
+ np = self.copy()
+ result = neg_func(np._point.get())
+ if result:
+ raise ValueError("Error %d while inverting an EC point" % result)
+ return np
+
+ def copy(self):
+ """Return a copy of this point."""
+ x, y = self.xy
+ np = EccPoint(x, y, self._curve_name)
+ return np
+
+ def _is_eddsa(self):
+ return self._curve.name in ("ed25519", "ed448")
+
+ def is_point_at_infinity(self):
+ """``True`` if this is the *point-at-infinity*."""
+
+ if self._is_eddsa():
+ return self.x == 0
+ else:
+ return self.xy == (0, 0)
+
+ def point_at_infinity(self):
+ """Return the *point-at-infinity* for the curve."""
+
+ if self._is_eddsa():
+ return EccPoint(0, 1, self._curve_name)
+ else:
+ return EccPoint(0, 0, self._curve_name)
+
+ @property
+ def x(self):
+ return self.xy[0]
+
+ @property
+ def y(self):
+ return self.xy[1]
+
+ @property
+ def xy(self):
+ modulus_bytes = self.size_in_bytes()
+ xb = bytearray(modulus_bytes)
+ yb = bytearray(modulus_bytes)
+ get_xy = lib_func(self, "get_xy")
+ result = get_xy(c_uint8_ptr(xb),
+ c_uint8_ptr(yb),
+ c_size_t(modulus_bytes),
+ self._point.get())
+ if result:
+ raise ValueError("Error %d while encoding an EC point" % result)
+
+ return (Integer(bytes_to_long(xb)), Integer(bytes_to_long(yb)))
+
+ def size_in_bytes(self):
+ """Size of each coordinate, in bytes."""
+ return (self.size_in_bits() + 7) // 8
+
+ def size_in_bits(self):
+ """Size of each coordinate, in bits."""
+ return self._curve.modulus_bits
+
+ def double(self):
+ """Double this point (in-place operation).
+
+ Returns:
+ This same object (to enable chaining).
+ """
+
+ double_func = lib_func(self, "double")
+ result = double_func(self._point.get())
+ if result:
+ raise ValueError("Error %d while doubling an EC point" % result)
+ return self
+
+ def __iadd__(self, point):
+ """Add a second point to this one"""
+
+ add_func = lib_func(self, "add")
+ result = add_func(self._point.get(), point._point.get())
+ if result:
+ if result == 16:
+ raise ValueError("EC points are not on the same curve")
+ raise ValueError("Error %d while adding two EC points" % result)
+ return self
+
+ def __add__(self, point):
+ """Return a new point, the addition of this one and another"""
+
+ np = self.copy()
+ np += point
+ return np
+
+ def __imul__(self, scalar):
+ """Multiply this point by a scalar"""
+
+ scalar_func = lib_func(self, "scalar")
+ if scalar < 0:
+ raise ValueError("Scalar multiplication is only defined for non-negative integers")
+ sb = long_to_bytes(scalar)
+ result = scalar_func(self._point.get(),
+ c_uint8_ptr(sb),
+ c_size_t(len(sb)),
+ c_ulonglong(getrandbits(64)))
+ if result:
+ raise ValueError("Error %d during scalar multiplication" % result)
+ return self
+
+ def __mul__(self, scalar):
+ """Return a new point, the scalar product of this one"""
+
+ np = self.copy()
+ np *= scalar
+ return np
+
+ def __rmul__(self, left_hand):
+ return self.__mul__(left_hand)
+
+
+# Last piece of initialization
+p192_G = EccPoint(_curves['p192'].Gx, _curves['p192'].Gy, "p192")
+p192 = _curves['p192']._replace(G=p192_G)
+_curves.update(dict.fromkeys(p192_names, p192))
+del p192_G, p192, p192_names
+
+p224_G = EccPoint(_curves['p224'].Gx, _curves['p224'].Gy, "p224")
+p224 = _curves['p224']._replace(G=p224_G)
+_curves.update(dict.fromkeys(p224_names, p224))
+del p224_G, p224, p224_names
+
+p256_G = EccPoint(_curves['p256'].Gx, _curves['p256'].Gy, "p256")
+p256 = _curves['p256']._replace(G=p256_G)
+_curves.update(dict.fromkeys(p256_names, p256))
+del p256_G, p256, p256_names
+
+p384_G = EccPoint(_curves['p384'].Gx, _curves['p384'].Gy, "p384")
+p384 = _curves['p384']._replace(G=p384_G)
+_curves.update(dict.fromkeys(p384_names, p384))
+del p384_G, p384, p384_names
+
+p521_G = EccPoint(_curves['p521'].Gx, _curves['p521'].Gy, "p521")
+p521 = _curves['p521']._replace(G=p521_G)
+_curves.update(dict.fromkeys(p521_names, p521))
+del p521_G, p521, p521_names
+
+ed25519_G = EccPoint(_curves['Ed25519'].Gx, _curves['Ed25519'].Gy, "Ed25519")
+ed25519 = _curves['Ed25519']._replace(G=ed25519_G)
+_curves.update(dict.fromkeys(ed25519_names, ed25519))
+del ed25519_G, ed25519, ed25519_names
+
+ed448_G = EccPoint(_curves['Ed448'].Gx, _curves['Ed448'].Gy, "Ed448")
+ed448 = _curves['Ed448']._replace(G=ed448_G)
+_curves.update(dict.fromkeys(ed448_names, ed448))
+del ed448_G, ed448, ed448_names
+
+
+class EccKey(object):
+ r"""Class defining an ECC key.
+ Do not instantiate directly.
+ Use :func:`generate`, :func:`construct` or :func:`import_key` instead.
+
+ :ivar curve: The name of the curve as defined in the `ECC table`_.
+ :vartype curve: string
+
+ :ivar pointQ: an ECC point representating the public component.
+ :vartype pointQ: :class:`EccPoint`
+
+ :ivar d: A scalar that represents the private component
+ in NIST P curves. It is smaller than the
+ order of the generator point.
+ :vartype d: integer
+
+ :ivar seed: A seed that representats the private component
+ in EdDSA curves
+ (Ed25519, 32 bytes; Ed448, 57 bytes).
+ :vartype seed: bytes
+ """
+
+ def __init__(self, **kwargs):
+ """Create a new ECC key
+
+ Keywords:
+ curve : string
+ The name of the curve.
+ d : integer
+ Mandatory for a private key one NIST P curves.
+ It must be in the range ``[1..order-1]``.
+ seed : bytes
+ Mandatory for a private key on the Ed25519 (32 bytes)
+ or Ed448 (57 bytes) curve.
+ point : EccPoint
+ Mandatory for a public key. If provided for a private key,
+ the implementation will NOT check whether it matches ``d``.
+
+ Only one parameter among ``d``, ``seed`` or ``point`` may be used.
+ """
+
+ kwargs_ = dict(kwargs)
+ curve_name = kwargs_.pop("curve", None)
+ self._d = kwargs_.pop("d", None)
+ self._seed = kwargs_.pop("seed", None)
+ self._point = kwargs_.pop("point", None)
+ if curve_name is None and self._point:
+ curve_name = self._point._curve_name
+ if kwargs_:
+ raise TypeError("Unknown parameters: " + str(kwargs_))
+
+ if curve_name not in _curves:
+ raise ValueError("Unsupported curve (%s)" % curve_name)
+ self._curve = _curves[curve_name]
+ self.curve = curve_name
+
+ count = int(self._d is not None) + int(self._seed is not None)
+
+ if count == 0:
+ if self._point is None:
+ raise ValueError("At lest one between parameters 'point', 'd' or 'seed' must be specified")
+ return
+
+ if count == 2:
+ raise ValueError("Parameters d and seed are mutually exclusive")
+
+ # NIST P curves work with d, EdDSA works with seed
+
+ if not self._is_eddsa():
+ if self._seed is not None:
+ raise ValueError("Parameter 'seed' can only be used with Ed25519 or Ed448")
+ self._d = Integer(self._d)
+ if not 1 <= self._d < self._curve.order:
+ raise ValueError("Parameter d must be an integer smaller than the curve order")
+ else:
+ if self._d is not None:
+ raise ValueError("Parameter d can only be used with NIST P curves")
+ # RFC 8032, 5.1.5
+ if self._curve.name == "ed25519":
+ if len(self._seed) != 32:
+ raise ValueError("Parameter seed must be 32 bytes long for Ed25519")
+ seed_hash = SHA512.new(self._seed).digest() # h
+ self._prefix = seed_hash[32:]
+ tmp = bytearray(seed_hash[:32])
+ tmp[0] &= 0xF8
+ tmp[31] = (tmp[31] & 0x7F) | 0x40
+ # RFC 8032, 5.2.5
+ elif self._curve.name == "ed448":
+ if len(self._seed) != 57:
+ raise ValueError("Parameter seed must be 57 bytes long for Ed448")
+ seed_hash = SHAKE256.new(self._seed).read(114) # h
+ self._prefix = seed_hash[57:]
+ tmp = bytearray(seed_hash[:57])
+ tmp[0] &= 0xFC
+ tmp[55] |= 0x80
+ tmp[56] = 0
+ self._d = Integer.from_bytes(tmp, byteorder='little')
+
+ def _is_eddsa(self):
+ return self._curve.desc in ("Ed25519", "Ed448")
+
+ def __eq__(self, other):
+ if other.has_private() != self.has_private():
+ return False
+
+ return other.pointQ == self.pointQ
+
+ def __repr__(self):
+ if self.has_private():
+ if self._is_eddsa():
+ extra = ", seed=%s" % self._seed.hex()
+ else:
+ extra = ", d=%d" % int(self._d)
+ else:
+ extra = ""
+ x, y = self.pointQ.xy
+ return "EccKey(curve='%s', point_x=%d, point_y=%d%s)" % (self._curve.desc, x, y, extra)
+
+ def has_private(self):
+ """``True`` if this key can be used for making signatures or decrypting data."""
+
+ return self._d is not None
+
+ # ECDSA
+ def _sign(self, z, k):
+ assert 0 < k < self._curve.order
+
+ order = self._curve.order
+ blind = Integer.random_range(min_inclusive=1,
+ max_exclusive=order)
+
+ blind_d = self._d * blind
+ inv_blind_k = (blind * k).inverse(order)
+
+ r = (self._curve.G * k).x % order
+ s = inv_blind_k * (blind * z + blind_d * r) % order
+ return (r, s)
+
+ # ECDSA
+ def _verify(self, z, rs):
+ order = self._curve.order
+ sinv = rs[1].inverse(order)
+ point1 = self._curve.G * ((sinv * z) % order)
+ point2 = self.pointQ * ((sinv * rs[0]) % order)
+ return (point1 + point2).x == rs[0]
+
+ @property
+ def d(self):
+ if not self.has_private():
+ raise ValueError("This is not a private ECC key")
+ return self._d
+
+ @property
+ def seed(self):
+ if not self.has_private():
+ raise ValueError("This is not a private ECC key")
+ return self._seed
+
+ @property
+ def pointQ(self):
+ if self._point is None:
+ self._point = self._curve.G * self._d
+ return self._point
+
+ def public_key(self):
+ """A matching ECC public key.
+
+ Returns:
+ a new :class:`EccKey` object
+ """
+
+ return EccKey(curve=self._curve.desc, point=self.pointQ)
+
+ def _export_SEC1(self, compress):
+ if self._is_eddsa():
+ raise ValueError("SEC1 format is unsupported for EdDSA curves")
+
+ # See 2.2 in RFC5480 and 2.3.3 in SEC1
+ #
+ # The first byte is:
+ # - 0x02: compressed, only X-coordinate, Y-coordinate is even
+ # - 0x03: compressed, only X-coordinate, Y-coordinate is odd
+ # - 0x04: uncompressed, X-coordinate is followed by Y-coordinate
+ #
+ # PAI is in theory encoded as 0x00.
+
+ modulus_bytes = self.pointQ.size_in_bytes()
+
+ if compress:
+ if self.pointQ.y.is_odd():
+ first_byte = b'\x03'
+ else:
+ first_byte = b'\x02'
+ public_key = (first_byte +
+ self.pointQ.x.to_bytes(modulus_bytes))
+ else:
+ public_key = (b'\x04' +
+ self.pointQ.x.to_bytes(modulus_bytes) +
+ self.pointQ.y.to_bytes(modulus_bytes))
+ return public_key
+
+ def _export_eddsa(self):
+ x, y = self.pointQ.xy
+ if self._curve.name == "ed25519":
+ result = bytearray(y.to_bytes(32, byteorder='little'))
+ result[31] = ((x & 1) << 7) | result[31]
+ elif self._curve.name == "ed448":
+ result = bytearray(y.to_bytes(57, byteorder='little'))
+ result[56] = (x & 1) << 7
+ else:
+ raise ValueError("Not an EdDSA key to export")
+ return bytes(result)
+
+ def _export_subjectPublicKeyInfo(self, compress):
+ if self._is_eddsa():
+ oid = self._curve.oid
+ public_key = self._export_eddsa()
+ params = None
+ else:
+ oid = "1.2.840.10045.2.1" # unrestricted
+ public_key = self._export_SEC1(compress)
+ params = DerObjectId(self._curve.oid)
+
+ return _create_subject_public_key_info(oid,
+ public_key,
+ params)
+
+ def _export_rfc5915_private_der(self, include_ec_params=True):
+
+ assert self.has_private()
+
+ # ECPrivateKey ::= SEQUENCE {
+ # version INTEGER { ecPrivkeyVer1(1) } (ecPrivkeyVer1),
+ # privateKey OCTET STRING,
+ # parameters [0] ECParameters {{ NamedCurve }} OPTIONAL,
+ # publicKey [1] BIT STRING OPTIONAL
+ # }
+
+ # Public key - uncompressed form
+ modulus_bytes = self.pointQ.size_in_bytes()
+ public_key = (b'\x04' +
+ self.pointQ.x.to_bytes(modulus_bytes) +
+ self.pointQ.y.to_bytes(modulus_bytes))
+
+ seq = [1,
+ DerOctetString(self.d.to_bytes(modulus_bytes)),
+ DerObjectId(self._curve.oid, explicit=0),
+ DerBitString(public_key, explicit=1)]
+
+ if not include_ec_params:
+ del seq[2]
+
+ return DerSequence(seq).encode()
+
+ def _export_pkcs8(self, **kwargs):
+ from Crypto.IO import PKCS8
+
+ if kwargs.get('passphrase', None) is not None and 'protection' not in kwargs:
+ raise ValueError("At least the 'protection' parameter should be present")
+
+ if self._is_eddsa():
+ oid = self._curve.oid
+ private_key = DerOctetString(self._seed).encode()
+ params = None
+ else:
+ oid = "1.2.840.10045.2.1" # unrestricted
+ private_key = self._export_rfc5915_private_der(include_ec_params=False)
+ params = DerObjectId(self._curve.oid)
+
+ result = PKCS8.wrap(private_key,
+ oid,
+ key_params=params,
+ **kwargs)
+ return result
+
+ def _export_public_pem(self, compress):
+ from Crypto.IO import PEM
+
+ encoded_der = self._export_subjectPublicKeyInfo(compress)
+ return PEM.encode(encoded_der, "PUBLIC KEY")
+
+ def _export_private_pem(self, passphrase, **kwargs):
+ from Crypto.IO import PEM
+
+ encoded_der = self._export_rfc5915_private_der()
+ return PEM.encode(encoded_der, "EC PRIVATE KEY", passphrase, **kwargs)
+
+ def _export_private_clear_pkcs8_in_clear_pem(self):
+ from Crypto.IO import PEM
+
+ encoded_der = self._export_pkcs8()
+ return PEM.encode(encoded_der, "PRIVATE KEY")
+
+ def _export_private_encrypted_pkcs8_in_clear_pem(self, passphrase, **kwargs):
+ from Crypto.IO import PEM
+
+ assert passphrase
+ if 'protection' not in kwargs:
+ raise ValueError("At least the 'protection' parameter should be present")
+ encoded_der = self._export_pkcs8(passphrase=passphrase, **kwargs)
+ return PEM.encode(encoded_der, "ENCRYPTED PRIVATE KEY")
+
+ def _export_openssh(self, compress):
+ if self.has_private():
+ raise ValueError("Cannot export OpenSSH private keys")
+
+ desc = self._curve.openssh
+
+ if desc is None:
+ raise ValueError("Cannot export %s keys as OpenSSH" % self._curve.name)
+ elif desc == "ssh-ed25519":
+ public_key = self._export_eddsa()
+ comps = (tobytes(desc), tobytes(public_key))
+ else:
+ modulus_bytes = self.pointQ.size_in_bytes()
+
+ if compress:
+ first_byte = 2 + self.pointQ.y.is_odd()
+ public_key = (bchr(first_byte) +
+ self.pointQ.x.to_bytes(modulus_bytes))
+ else:
+ public_key = (b'\x04' +
+ self.pointQ.x.to_bytes(modulus_bytes) +
+ self.pointQ.y.to_bytes(modulus_bytes))
+
+ middle = desc.split("-")[2]
+ comps = (tobytes(desc), tobytes(middle), public_key)
+
+ blob = b"".join([struct.pack(">I", len(x)) + x for x in comps])
+ return desc + " " + tostr(binascii.b2a_base64(blob))
+
+ def export_key(self, **kwargs):
+ """Export this ECC key.
+
+ Args:
+ format (string):
+ The format to use for encoding the key:
+
+ - ``'DER'``. The key will be encoded in ASN.1 DER format (binary).
+ For a public key, the ASN.1 ``subjectPublicKeyInfo`` structure
+ defined in `RFC5480`_ will be used.
+ For a private key, the ASN.1 ``ECPrivateKey`` structure defined
+ in `RFC5915`_ is used instead (possibly within a PKCS#8 envelope,
+ see the ``use_pkcs8`` flag below).
+ - ``'PEM'``. The key will be encoded in a PEM_ envelope (ASCII).
+ - ``'OpenSSH'``. The key will be encoded in the OpenSSH_ format
+ (ASCII, public keys only).
+ - ``'SEC1'``. The public key (i.e., the EC point) will be encoded
+ into ``bytes`` according to Section 2.3.3 of `SEC1`_
+ (which is a subset of the older X9.62 ITU standard).
+ Only for NIST P-curves.
+ - ``'raw'``. The public key will be encoded as ``bytes``,
+ without any metadata.
+
+ * For NIST P-curves: equivalent to ``'SEC1'``.
+ * For EdDSA curves: ``bytes`` in the format defined in `RFC8032`_.
+
+ passphrase (byte string or string):
+ The passphrase to use for protecting the private key.
+
+ use_pkcs8 (boolean):
+ Only relevant for private keys.
+
+ If ``True`` (default and recommended), the `PKCS#8`_ representation
+ will be used. It must be ``True`` for EdDSA curves.
+
+ protection (string):
+ When a private key is exported with password-protection
+ and PKCS#8 (both ``DER`` and ``PEM`` formats), this parameter MUST be
+ present and be a valid algorithm supported by :mod:`Crypto.IO.PKCS8`.
+ It is recommended to use ``PBKDF2WithHMAC-SHA1AndAES128-CBC``.
+
+ compress (boolean):
+ If ``True``, the method returns a more compact representation
+ of the public key, with the X-coordinate only.
+
+ If ``False`` (default), the method returns the full public key.
+
+ This parameter is ignored for EdDSA curves, as compression is
+ mandatory.
+
+ .. warning::
+ If you don't provide a passphrase, the private key will be
+ exported in the clear!
+
+ .. note::
+ When exporting a private key with password-protection and `PKCS#8`_
+ (both ``DER`` and ``PEM`` formats), any extra parameters
+ to ``export_key()`` will be passed to :mod:`Crypto.IO.PKCS8`.
+
+ .. _PEM: http://www.ietf.org/rfc/rfc1421.txt
+ .. _`PEM encryption`: http://www.ietf.org/rfc/rfc1423.txt
+ .. _OpenSSH: http://www.openssh.com/txt/rfc5656.txt
+ .. _RFC5480: https://tools.ietf.org/html/rfc5480
+ .. _SEC1: https://www.secg.org/sec1-v2.pdf
+
+ Returns:
+ A multi-line string (for ``'PEM'`` and ``'OpenSSH'``) or
+ ``bytes`` (for ``'DER'``, ``'SEC1'``, and ``'raw'``) with the encoded key.
+ """
+
+ args = kwargs.copy()
+ ext_format = args.pop("format")
+ if ext_format not in ("PEM", "DER", "OpenSSH", "SEC1", "raw"):
+ raise ValueError("Unknown format '%s'" % ext_format)
+
+ compress = args.pop("compress", False)
+
+ if self.has_private():
+ passphrase = args.pop("passphrase", None)
+ if is_string(passphrase):
+ passphrase = tobytes(passphrase)
+ if not passphrase:
+ raise ValueError("Empty passphrase")
+ use_pkcs8 = args.pop("use_pkcs8", True)
+
+ if not use_pkcs8 and self._is_eddsa():
+ raise ValueError("'pkcs8' must be True for EdDSA curves")
+
+ if ext_format == "PEM":
+ if use_pkcs8:
+ if passphrase:
+ return self._export_private_encrypted_pkcs8_in_clear_pem(passphrase, **args)
+ else:
+ return self._export_private_clear_pkcs8_in_clear_pem()
+ else:
+ return self._export_private_pem(passphrase, **args)
+ elif ext_format == "DER":
+ # DER
+ if passphrase and not use_pkcs8:
+ raise ValueError("Private keys can only be encrpyted with DER using PKCS#8")
+ if use_pkcs8:
+ return self._export_pkcs8(passphrase=passphrase, **args)
+ else:
+ return self._export_rfc5915_private_der()
+ else:
+ raise ValueError("Private keys cannot be exported "
+ "in the '%s' format" % ext_format)
+ else: # Public key
+ if args:
+ raise ValueError("Unexpected parameters: '%s'" % args)
+ if ext_format == "PEM":
+ return self._export_public_pem(compress)
+ elif ext_format == "DER":
+ return self._export_subjectPublicKeyInfo(compress)
+ elif ext_format == "SEC1":
+ return self._export_SEC1(compress)
+ elif ext_format == "raw":
+ if self._curve.name in ('ed25519', 'ed448'):
+ return self._export_eddsa()
+ else:
+ return self._export_SEC1(compress)
+ else:
+ return self._export_openssh(compress)
+
+
+def generate(**kwargs):
+ """Generate a new private key on the given curve.
+
+ Args:
+
+ curve (string):
+ Mandatory. It must be a curve name defined in the `ECC table`_.
+
+ randfunc (callable):
+ Optional. The RNG to read randomness from.
+ If ``None``, :func:`Crypto.Random.get_random_bytes` is used.
+ """
+
+ curve_name = kwargs.pop("curve")
+ curve = _curves[curve_name]
+ randfunc = kwargs.pop("randfunc", get_random_bytes)
+ if kwargs:
+ raise TypeError("Unknown parameters: " + str(kwargs))
+
+ if _curves[curve_name].name == "ed25519":
+ seed = randfunc(32)
+ new_key = EccKey(curve=curve_name, seed=seed)
+ elif _curves[curve_name].name == "ed448":
+ seed = randfunc(57)
+ new_key = EccKey(curve=curve_name, seed=seed)
+ else:
+ d = Integer.random_range(min_inclusive=1,
+ max_exclusive=curve.order,
+ randfunc=randfunc)
+ new_key = EccKey(curve=curve_name, d=d)
+
+ return new_key
+
+
+def construct(**kwargs):
+ """Build a new ECC key (private or public) starting
+ from some base components.
+
+ In most cases, you will already have an existing key
+ which you can read in with :func:`import_key` instead
+ of this function.
+
+ Args:
+ curve (string):
+ Mandatory. The name of the elliptic curve, as defined in the `ECC table`_.
+
+ d (integer):
+ Mandatory for a private key and a NIST P-curve (e.g., P-256):
+ the integer in the range ``[1..order-1]`` that represents the key.
+
+ seed (bytes):
+ Mandatory for a private key and an EdDSA curve.
+ It must be 32 bytes for Ed25519, and 57 bytes for Ed448.
+
+ point_x (integer):
+ Mandatory for a public key: the X coordinate (affine) of the ECC point.
+
+ point_y (integer):
+ Mandatory for a public key: the Y coordinate (affine) of the ECC point.
+
+ Returns:
+ :class:`EccKey` : a new ECC key object
+ """
+
+ curve_name = kwargs["curve"]
+ curve = _curves[curve_name]
+ point_x = kwargs.pop("point_x", None)
+ point_y = kwargs.pop("point_y", None)
+
+ if "point" in kwargs:
+ raise TypeError("Unknown keyword: point")
+
+ if None not in (point_x, point_y):
+ # ValueError is raised if the point is not on the curve
+ kwargs["point"] = EccPoint(point_x, point_y, curve_name)
+
+ new_key = EccKey(**kwargs)
+
+ # Validate that the private key matches the public one
+ # because EccKey will not do that automatically
+ if new_key.has_private() and 'point' in kwargs:
+ pub_key = curve.G * new_key.d
+ if pub_key.xy != (point_x, point_y):
+ raise ValueError("Private and public ECC keys do not match")
+
+ return new_key
+
+
+def _import_public_der(ec_point, curve_oid=None, curve_name=None):
+ """Convert an encoded EC point into an EccKey object
+
+ ec_point: byte string with the EC point (SEC1-encoded)
+ curve_oid: string with the name the curve
+ curve_name: string with the OID of the curve
+
+ Either curve_id or curve_name must be specified
+
+ """
+
+ for _curve_name, curve in _curves.items():
+ if curve_oid and curve.oid == curve_oid:
+ break
+ if curve_name == _curve_name:
+ break
+ else:
+ if curve_oid:
+ raise UnsupportedEccFeature("Unsupported ECC curve (OID: %s)" % curve_oid)
+ else:
+ raise UnsupportedEccFeature("Unsupported ECC curve (%s)" % curve_name)
+
+ # See 2.2 in RFC5480 and 2.3.3 in SEC1
+ # The first byte is:
+ # - 0x02: compressed, only X-coordinate, Y-coordinate is even
+ # - 0x03: compressed, only X-coordinate, Y-coordinate is odd
+ # - 0x04: uncompressed, X-coordinate is followed by Y-coordinate
+ #
+ # PAI is in theory encoded as 0x00.
+
+ modulus_bytes = curve.p.size_in_bytes()
+ point_type = bord(ec_point[0])
+
+ # Uncompressed point
+ if point_type == 0x04:
+ if len(ec_point) != (1 + 2 * modulus_bytes):
+ raise ValueError("Incorrect EC point length")
+ x = Integer.from_bytes(ec_point[1:modulus_bytes+1])
+ y = Integer.from_bytes(ec_point[modulus_bytes+1:])
+ # Compressed point
+ elif point_type in (0x02, 0x03):
+ if len(ec_point) != (1 + modulus_bytes):
+ raise ValueError("Incorrect EC point length")
+ x = Integer.from_bytes(ec_point[1:])
+ # Right now, we only support Short Weierstrass curves
+ y = (x**3 - x*3 + curve.b).sqrt(curve.p)
+ if point_type == 0x02 and y.is_odd():
+ y = curve.p - y
+ if point_type == 0x03 and y.is_even():
+ y = curve.p - y
+ else:
+ raise ValueError("Incorrect EC point encoding")
+
+ return construct(curve=_curve_name, point_x=x, point_y=y)
+
+
+def _import_subjectPublicKeyInfo(encoded, *kwargs):
+ """Convert a subjectPublicKeyInfo into an EccKey object"""
+
+ # See RFC5480
+
+ # Parse the generic subjectPublicKeyInfo structure
+ oid, ec_point, params = _expand_subject_public_key_info(encoded)
+
+ nist_p_oids = (
+ "1.2.840.10045.2.1", # id-ecPublicKey (unrestricted)
+ "1.3.132.1.12", # id-ecDH
+ "1.3.132.1.13" # id-ecMQV
+ )
+ eddsa_oids = {
+ "1.3.101.112": ("Ed25519", _import_ed25519_public_key), # id-Ed25519
+ "1.3.101.113": ("Ed448", _import_ed448_public_key) # id-Ed448
+ }
+
+ if oid in nist_p_oids:
+ # See RFC5480
+
+ # Parameters are mandatory and encoded as ECParameters
+ # ECParameters ::= CHOICE {
+ # namedCurve OBJECT IDENTIFIER
+ # -- implicitCurve NULL
+ # -- specifiedCurve SpecifiedECDomain
+ # }
+ # implicitCurve and specifiedCurve are not supported (as per RFC)
+ if not params:
+ raise ValueError("Missing ECC parameters for ECC OID %s" % oid)
+ try:
+ curve_oid = DerObjectId().decode(params).value
+ except ValueError:
+ raise ValueError("Error decoding namedCurve")
+
+ # ECPoint ::= OCTET STRING
+ return _import_public_der(ec_point, curve_oid=curve_oid)
+
+ elif oid in eddsa_oids:
+ # See RFC8410
+ curve_name, import_eddsa_public_key = eddsa_oids[oid]
+
+ # Parameters must be absent
+ if params:
+ raise ValueError("Unexpected ECC parameters for ECC OID %s" % oid)
+
+ x, y = import_eddsa_public_key(ec_point)
+ return construct(point_x=x, point_y=y, curve=curve_name)
+ else:
+ raise UnsupportedEccFeature("Unsupported ECC OID: %s" % oid)
+
+
+def _import_rfc5915_der(encoded, passphrase, curve_oid=None):
+
+ # See RFC5915 https://tools.ietf.org/html/rfc5915
+ #
+ # ECPrivateKey ::= SEQUENCE {
+ # version INTEGER { ecPrivkeyVer1(1) } (ecPrivkeyVer1),
+ # privateKey OCTET STRING,
+ # parameters [0] ECParameters {{ NamedCurve }} OPTIONAL,
+ # publicKey [1] BIT STRING OPTIONAL
+ # }
+
+ private_key = DerSequence().decode(encoded, nr_elements=(3, 4))
+ if private_key[0] != 1:
+ raise ValueError("Incorrect ECC private key version")
+
+ try:
+ parameters = DerObjectId(explicit=0).decode(private_key[2]).value
+ if curve_oid is not None and parameters != curve_oid:
+ raise ValueError("Curve mismatch")
+ curve_oid = parameters
+ except ValueError:
+ pass
+
+ if curve_oid is None:
+ raise ValueError("No curve found")
+
+ for curve_name, curve in _curves.items():
+ if curve.oid == curve_oid:
+ break
+ else:
+ raise UnsupportedEccFeature("Unsupported ECC curve (OID: %s)" % curve_oid)
+
+ scalar_bytes = DerOctetString().decode(private_key[1]).payload
+ modulus_bytes = curve.p.size_in_bytes()
+ if len(scalar_bytes) != modulus_bytes:
+ raise ValueError("Private key is too small")
+ d = Integer.from_bytes(scalar_bytes)
+
+ # Decode public key (if any)
+ if len(private_key) == 4:
+ public_key_enc = DerBitString(explicit=1).decode(private_key[3]).value
+ public_key = _import_public_der(public_key_enc, curve_oid=curve_oid)
+ point_x = public_key.pointQ.x
+ point_y = public_key.pointQ.y
+ else:
+ point_x = point_y = None
+
+ return construct(curve=curve_name, d=d, point_x=point_x, point_y=point_y)
+
+
+def _import_pkcs8(encoded, passphrase):
+ from Crypto.IO import PKCS8
+
+ algo_oid, private_key, params = PKCS8.unwrap(encoded, passphrase)
+
+ nist_p_oids = (
+ "1.2.840.10045.2.1", # id-ecPublicKey (unrestricted)
+ "1.3.132.1.12", # id-ecDH
+ "1.3.132.1.13" # id-ecMQV
+ )
+ eddsa_oids = {
+ "1.3.101.112": "Ed25519", # id-Ed25519
+ "1.3.101.113": "Ed448", # id-Ed448
+ }
+
+ if algo_oid in nist_p_oids:
+ curve_oid = DerObjectId().decode(params).value
+ return _import_rfc5915_der(private_key, passphrase, curve_oid)
+ elif algo_oid in eddsa_oids:
+ if params is not None:
+ raise ValueError("EdDSA ECC private key must not have parameters")
+ curve_oid = None
+ seed = DerOctetString().decode(private_key).payload
+ return construct(curve=eddsa_oids[algo_oid], seed=seed)
+ else:
+ raise UnsupportedEccFeature("Unsupported ECC purpose (OID: %s)" % algo_oid)
+
+
+def _import_x509_cert(encoded, *kwargs):
+
+ sp_info = _extract_subject_public_key_info(encoded)
+ return _import_subjectPublicKeyInfo(sp_info)
+
+
+def _import_der(encoded, passphrase):
+
+ try:
+ return _import_subjectPublicKeyInfo(encoded, passphrase)
+ except UnsupportedEccFeature as err:
+ raise err
+ except (ValueError, TypeError, IndexError):
+ pass
+
+ try:
+ return _import_x509_cert(encoded, passphrase)
+ except UnsupportedEccFeature as err:
+ raise err
+ except (ValueError, TypeError, IndexError):
+ pass
+
+ try:
+ return _import_rfc5915_der(encoded, passphrase)
+ except UnsupportedEccFeature as err:
+ raise err
+ except (ValueError, TypeError, IndexError):
+ pass
+
+ try:
+ return _import_pkcs8(encoded, passphrase)
+ except UnsupportedEccFeature as err:
+ raise err
+ except (ValueError, TypeError, IndexError):
+ pass
+
+ raise ValueError("Not an ECC DER key")
+
+
+def _import_openssh_public(encoded):
+ parts = encoded.split(b' ')
+ if len(parts) not in (2, 3):
+ raise ValueError("Not an openssh public key")
+
+ try:
+ keystring = binascii.a2b_base64(parts[1])
+
+ keyparts = []
+ while len(keystring) > 4:
+ lk = struct.unpack(">I", keystring[:4])[0]
+ keyparts.append(keystring[4:4 + lk])
+ keystring = keystring[4 + lk:]
+
+ if parts[0] != keyparts[0]:
+ raise ValueError("Mismatch in openssh public key")
+
+ # NIST P curves
+ if parts[0].startswith(b"ecdsa-sha2-"):
+
+ for curve_name, curve in _curves.items():
+ if curve.openssh is None:
+ continue
+ if not curve.openssh.startswith("ecdsa-sha2"):
+ continue
+ middle = tobytes(curve.openssh.split("-")[2])
+ if keyparts[1] == middle:
+ break
+ else:
+ raise ValueError("Unsupported ECC curve: " + middle)
+
+ ecc_key = _import_public_der(keyparts[2], curve_oid=curve.oid)
+
+ # EdDSA
+ elif parts[0] == b"ssh-ed25519":
+ x, y = _import_ed25519_public_key(keyparts[1])
+ ecc_key = construct(curve="Ed25519", point_x=x, point_y=y)
+ else:
+ raise ValueError("Unsupported SSH key type: " + parts[0])
+
+ except (IndexError, TypeError, binascii.Error):
+ raise ValueError("Error parsing SSH key type: " + parts[0])
+
+ return ecc_key
+
+
+def _import_openssh_private_ecc(data, password):
+
+ from ._openssh import (import_openssh_private_generic,
+ read_bytes, read_string, check_padding)
+
+ key_type, decrypted = import_openssh_private_generic(data, password)
+
+ eddsa_keys = {
+ "ssh-ed25519": ("Ed25519", _import_ed25519_public_key, 32),
+ }
+
+ # https://datatracker.ietf.org/doc/html/draft-miller-ssh-agent-04
+ if key_type.startswith("ecdsa-sha2"):
+
+ ecdsa_curve_name, decrypted = read_string(decrypted)
+ if ecdsa_curve_name not in _curves:
+ raise UnsupportedEccFeature("Unsupported ECC curve %s" % ecdsa_curve_name)
+ curve = _curves[ecdsa_curve_name]
+ modulus_bytes = (curve.modulus_bits + 7) // 8
+
+ public_key, decrypted = read_bytes(decrypted)
+
+ if bord(public_key[0]) != 4:
+ raise ValueError("Only uncompressed OpenSSH EC keys are supported")
+ if len(public_key) != 2 * modulus_bytes + 1:
+ raise ValueError("Incorrect public key length")
+
+ point_x = Integer.from_bytes(public_key[1:1+modulus_bytes])
+ point_y = Integer.from_bytes(public_key[1+modulus_bytes:])
+
+ private_key, decrypted = read_bytes(decrypted)
+ d = Integer.from_bytes(private_key)
+
+ params = {'d': d, 'curve': ecdsa_curve_name}
+
+ elif key_type in eddsa_keys:
+
+ curve_name, import_eddsa_public_key, seed_len = eddsa_keys[key_type]
+
+ public_key, decrypted = read_bytes(decrypted)
+ point_x, point_y = import_eddsa_public_key(public_key)
+
+ private_public_key, decrypted = read_bytes(decrypted)
+ seed = private_public_key[:seed_len]
+
+ params = {'seed': seed, 'curve': curve_name}
+ else:
+ raise ValueError("Unsupport SSH agent key type:" + key_type)
+
+ _, padded = read_string(decrypted) # Comment
+ check_padding(padded)
+
+ return construct(point_x=point_x, point_y=point_y, **params)
+
+
+def _import_ed25519_public_key(encoded):
+ """Import an Ed25519 ECC public key, encoded as raw bytes as described
+ in RFC8032_.
+
+ Args:
+ encoded (bytes):
+ The Ed25519 public key to import. It must be 32 bytes long.
+
+ Returns:
+ :class:`EccKey` : a new ECC key object
+
+ Raises:
+ ValueError: when the given key cannot be parsed.
+
+ .. _RFC8032: https://datatracker.ietf.org/doc/html/rfc8032
+ """
+
+ if len(encoded) != 32:
+ raise ValueError("Incorrect length. Only Ed25519 public keys are supported.")
+
+ p = Integer(0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffed) # 2**255 - 19
+ d = 37095705934669439343138083508754565189542113879843219016388785533085940283555
+
+ y = bytearray(encoded)
+ x_lsb = y[31] >> 7
+ y[31] &= 0x7F
+ point_y = Integer.from_bytes(y, byteorder='little')
+ if point_y >= p:
+ raise ValueError("Invalid Ed25519 key (y)")
+ if point_y == 1:
+ return 0, 1
+
+ u = (point_y**2 - 1) % p
+ v = ((point_y**2 % p) * d + 1) % p
+ try:
+ v_inv = v.inverse(p)
+ x2 = (u * v_inv) % p
+ point_x = Integer._tonelli_shanks(x2, p)
+ if (point_x & 1) != x_lsb:
+ point_x = p - point_x
+ except ValueError:
+ raise ValueError("Invalid Ed25519 public key")
+ return point_x, point_y
+
+
+def _import_ed448_public_key(encoded):
+ """Import an Ed448 ECC public key, encoded as raw bytes as described
+ in RFC8032_.
+
+ Args:
+ encoded (bytes):
+ The Ed448 public key to import. It must be 57 bytes long.
+
+ Returns:
+ :class:`EccKey` : a new ECC key object
+
+ Raises:
+ ValueError: when the given key cannot be parsed.
+
+ .. _RFC8032: https://datatracker.ietf.org/doc/html/rfc8032
+ """
+
+ if len(encoded) != 57:
+ raise ValueError("Incorrect length. Only Ed448 public keys are supported.")
+
+ p = Integer(0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffeffffffffffffffffffffffffffffffffffffffffffffffffffffffff) # 2**448 - 2**224 - 1
+ d = 0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffeffffffffffffffffffffffffffffffffffffffffffffffffffff6756
+
+ y = encoded[:56]
+ x_lsb = bord(encoded[56]) >> 7
+ point_y = Integer.from_bytes(y, byteorder='little')
+ if point_y >= p:
+ raise ValueError("Invalid Ed448 key (y)")
+ if point_y == 1:
+ return 0, 1
+
+ u = (point_y**2 - 1) % p
+ v = ((point_y**2 % p) * d - 1) % p
+ try:
+ v_inv = v.inverse(p)
+ x2 = (u * v_inv) % p
+ point_x = Integer._tonelli_shanks(x2, p)
+ if (point_x & 1) != x_lsb:
+ point_x = p - point_x
+ except ValueError:
+ raise ValueError("Invalid Ed448 public key")
+ return point_x, point_y
+
+
+def import_key(encoded, passphrase=None, curve_name=None):
+ """Import an ECC key (public or private).
+
+ Args:
+ encoded (bytes or multi-line string):
+ The ECC key to import.
+ The function will try to automatically detect the right format.
+
+ Supported formats for an ECC **public** key:
+
+ * X.509 certificate: binary (DER) or ASCII (PEM).
+ * X.509 ``subjectPublicKeyInfo``: binary (DER) or ASCII (PEM).
+ * SEC1_ (or X9.62), as ``bytes``. NIST P curves only.
+ You must also provide the ``curve_name`` (with a value from the `ECC table`_)
+ * OpenSSH line, defined in RFC5656_ and RFC8709_ (ASCII).
+ This is normally the content of files like ``~/.ssh/id_ecdsa.pub``.
+
+ Supported formats for an ECC **private** key:
+
+ * A binary ``ECPrivateKey`` structure, as defined in `RFC5915`_ (DER).
+ NIST P curves only.
+ * A `PKCS#8`_ structure (or the more recent Asymmetric Key Package, RFC5958_): binary (DER) or ASCII (PEM).
+ * `OpenSSH 6.5`_ and newer versions (ASCII).
+
+ Private keys can be in the clear or password-protected.
+
+ For details about the PEM encoding, see `RFC1421`_/`RFC1423`_.
+
+ passphrase (byte string):
+ The passphrase to use for decrypting a private key.
+ Encryption may be applied protected at the PEM level (not recommended)
+ or at the PKCS#8 level (recommended).
+ This parameter is ignored if the key in input is not encrypted.
+
+ curve_name (string):
+ For a SEC1 encoding only. This is the name of the curve,
+ as defined in the `ECC table`_.
+
+ .. note::
+
+ To import EdDSA private and public keys, when encoded as raw ``bytes``, use:
+
+ * :func:`Crypto.Signature.eddsa.import_public_key`, or
+ * :func:`Crypto.Signature.eddsa.import_private_key`.
+
+ Returns:
+ :class:`EccKey` : a new ECC key object
+
+ Raises:
+ ValueError: when the given key cannot be parsed (possibly because
+ the pass phrase is wrong).
+
+ .. _RFC1421: https://datatracker.ietf.org/doc/html/rfc1421
+ .. _RFC1423: https://datatracker.ietf.org/doc/html/rfc1423
+ .. _RFC5915: https://datatracker.ietf.org/doc/html/rfc5915
+ .. _RFC5656: https://datatracker.ietf.org/doc/html/rfc5656
+ .. _RFC8709: https://datatracker.ietf.org/doc/html/rfc8709
+ .. _RFC5958: https://datatracker.ietf.org/doc/html/rfc5958
+ .. _`PKCS#8`: https://datatracker.ietf.org/doc/html/rfc5208
+ .. _`OpenSSH 6.5`: https://flak.tedunangst.com/post/new-openssh-key-format-and-bcrypt-pbkdf
+ .. _SEC1: https://www.secg.org/sec1-v2.pdf
+ """
+
+ from Crypto.IO import PEM
+
+ encoded = tobytes(encoded)
+ if passphrase is not None:
+ passphrase = tobytes(passphrase)
+
+ # PEM
+ if encoded.startswith(b'-----BEGIN OPENSSH PRIVATE KEY'):
+ text_encoded = tostr(encoded)
+ openssh_encoded, marker, enc_flag = PEM.decode(text_encoded, passphrase)
+ result = _import_openssh_private_ecc(openssh_encoded, passphrase)
+ return result
+
+ elif encoded.startswith(b'-----'):
+
+ text_encoded = tostr(encoded)
+
+ # Remove any EC PARAMETERS section
+ # Ignore its content because the curve type must be already given in the key
+ ecparams_start = "-----BEGIN EC PARAMETERS-----"
+ ecparams_end = "-----END EC PARAMETERS-----"
+ text_encoded = re.sub(ecparams_start + ".*?" + ecparams_end, "",
+ text_encoded,
+ flags=re.DOTALL)
+
+ der_encoded, marker, enc_flag = PEM.decode(text_encoded, passphrase)
+ if enc_flag:
+ passphrase = None
+ try:
+ result = _import_der(der_encoded, passphrase)
+ except UnsupportedEccFeature as uef:
+ raise uef
+ except ValueError:
+ raise ValueError("Invalid DER encoding inside the PEM file")
+ return result
+
+ # OpenSSH
+ if encoded.startswith((b'ecdsa-sha2-', b'ssh-ed25519')):
+ return _import_openssh_public(encoded)
+
+ # DER
+ if len(encoded) > 0 and bord(encoded[0]) == 0x30:
+ return _import_der(encoded, passphrase)
+
+ # SEC1
+ if len(encoded) > 0 and bord(encoded[0]) in b'\x02\x03\x04':
+ if curve_name is None:
+ raise ValueError("No curve name was provided")
+ return _import_public_der(encoded, curve_name=curve_name)
+
+ raise ValueError("ECC key format is not supported")
+
+
+if __name__ == "__main__":
+
+ import time
+
+ d = 0xc51e4753afdec1e6b6c6a5b992f43f8dd0c7a8933072708b6522468b2ffb06fd
+
+ point = _curves['p256'].G.copy()
+ count = 3000
+
+ start = time.time()
+ for x in range(count):
+ pointX = point * d
+ print("(P-256 G)", (time.time() - start) / count * 1000, "ms")
+
+ start = time.time()
+ for x in range(count):
+ pointX = pointX * d
+ print("(P-256 arbitrary point)", (time.time() - start) / count * 1000, "ms")
diff --git a/lib/Crypto/PublicKey/ECC.pyi b/lib/Crypto/PublicKey/ECC.pyi
new file mode 100644
index 0000000..89f5a13
--- /dev/null
+++ b/lib/Crypto/PublicKey/ECC.pyi
@@ -0,0 +1,66 @@
+from typing import Union, Callable, Optional, NamedTuple, List, Tuple, Dict, NamedTuple, Any
+
+from Crypto.Math.Numbers import Integer
+
+RNG = Callable[[int], bytes]
+
+class UnsupportedEccFeature(ValueError): ...
+class EccPoint(object):
+ def __init__(self, x: Union[int, Integer], y: Union[int, Integer], curve: Optional[str] = ...) -> None: ...
+ def set(self, point: EccPoint) -> EccPoint: ...
+ def __eq__(self, point: object) -> bool: ...
+ def __neg__(self) -> EccPoint: ...
+ def copy(self) -> EccPoint: ...
+ def is_point_at_infinity(self) -> bool: ...
+ def point_at_infinity(self) -> EccPoint: ...
+ @property
+ def x(self) -> int: ...
+ @property
+ def y(self) -> int: ...
+ @property
+ def xy(self) -> Tuple[int, int]: ...
+ def size_in_bytes(self) -> int: ...
+ def size_in_bits(self) -> int: ...
+ def double(self) -> EccPoint: ...
+ def __iadd__(self, point: EccPoint) -> EccPoint: ...
+ def __add__(self, point: EccPoint) -> EccPoint: ...
+ def __imul__(self, scalar: int) -> EccPoint: ...
+ def __mul__(self, scalar: int) -> EccPoint: ...
+
+class EccKey(object):
+ curve: str
+ def __init__(self, *, curve: str = ..., d: int = ..., point: EccPoint = ...) -> None: ...
+ def __eq__(self, other: object) -> bool: ...
+ def __repr__(self) -> str: ...
+ def has_private(self) -> bool: ...
+ @property
+ def d(self) -> int: ...
+ @property
+ def pointQ(self) -> EccPoint: ...
+ def public_key(self) -> EccKey: ...
+ def export_key(self, **kwargs: Union[str, bytes, bool]) -> Union[str,bytes]: ...
+
+
+_Curve = NamedTuple("_Curve", [('p', Integer),
+ ('order', Integer),
+ ('b', Integer),
+ ('Gx', Integer),
+ ('Gy', Integer),
+ ('G', EccPoint),
+ ('modulus_bits', int),
+ ('oid', str),
+ ('context', Any),
+ ('desc', str),
+ ('openssh', Union[str, None]),
+ ])
+
+_curves : Dict[str, _Curve]
+
+
+def generate(**kwargs: Union[str, RNG]) -> EccKey: ...
+def construct(**kwargs: Union[str, int]) -> EccKey: ...
+def import_key(encoded: Union[bytes, str],
+ passphrase: Optional[str]=None,
+ curve_name:Optional[str]=None) -> EccKey: ...
+def _import_ed25519_public_key(encoded: bytes) -> EccKey: ...
+def _import_ed448_public_key(encoded: bytes) -> EccKey: ...
diff --git a/lib/Crypto/PublicKey/ElGamal.py b/lib/Crypto/PublicKey/ElGamal.py
new file mode 100644
index 0000000..3b10840
--- /dev/null
+++ b/lib/Crypto/PublicKey/ElGamal.py
@@ -0,0 +1,286 @@
+#
+# ElGamal.py : ElGamal encryption/decryption and signatures
+#
+# Part of the Python Cryptography Toolkit
+#
+# Originally written by: A.M. Kuchling
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain. To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+__all__ = ['generate', 'construct', 'ElGamalKey']
+
+from Crypto import Random
+from Crypto.Math.Primality import ( generate_probable_safe_prime,
+ test_probable_prime, COMPOSITE )
+from Crypto.Math.Numbers import Integer
+
+# Generate an ElGamal key with N bits
+def generate(bits, randfunc):
+ """Randomly generate a fresh, new ElGamal key.
+
+ The key will be safe for use for both encryption and signature
+ (although it should be used for **only one** purpose).
+
+ Args:
+ bits (int):
+ Key length, or size (in bits) of the modulus *p*.
+ The recommended value is 2048.
+ randfunc (callable):
+ Random number generation function; it should accept
+ a single integer *N* and return a string of random
+ *N* random bytes.
+
+ Return:
+ an :class:`ElGamalKey` object
+ """
+
+ obj=ElGamalKey()
+
+ # Generate a safe prime p
+ # See Algorithm 4.86 in Handbook of Applied Cryptography
+ obj.p = generate_probable_safe_prime(exact_bits=bits, randfunc=randfunc)
+ q = (obj.p - 1) >> 1
+
+ # Generate generator g
+ while 1:
+ # Choose a square residue; it will generate a cyclic group of order q.
+ obj.g = pow(Integer.random_range(min_inclusive=2,
+ max_exclusive=obj.p,
+ randfunc=randfunc), 2, obj.p)
+
+ # We must avoid g=2 because of Bleichenbacher's attack described
+ # in "Generating ElGamal signatures without knowning the secret key",
+ # 1996
+ if obj.g in (1, 2):
+ continue
+
+ # Discard g if it divides p-1 because of the attack described
+ # in Note 11.67 (iii) in HAC
+ if (obj.p - 1) % obj.g == 0:
+ continue
+
+ # g^{-1} must not divide p-1 because of Khadir's attack
+ # described in "Conditions of the generator for forging ElGamal
+ # signature", 2011
+ ginv = obj.g.inverse(obj.p)
+ if (obj.p - 1) % ginv == 0:
+ continue
+
+ # Found
+ break
+
+ # Generate private key x
+ obj.x = Integer.random_range(min_inclusive=2,
+ max_exclusive=obj.p-1,
+ randfunc=randfunc)
+ # Generate public key y
+ obj.y = pow(obj.g, obj.x, obj.p)
+ return obj
+
+def construct(tup):
+ r"""Construct an ElGamal key from a tuple of valid ElGamal components.
+
+ The modulus *p* must be a prime.
+ The following conditions must apply:
+
+ .. math::
+
+ \begin{align}
+ &1 < g < p-1 \\
+ &g^{p-1} = 1 \text{ mod } 1 \\
+ &1 < x < p-1 \\
+ &g^x = y \text{ mod } p
+ \end{align}
+
+ Args:
+ tup (tuple):
+ A tuple with either 3 or 4 integers,
+ in the following order:
+
+ 1. Modulus (*p*).
+ 2. Generator (*g*).
+ 3. Public key (*y*).
+ 4. Private key (*x*). Optional.
+
+ Raises:
+ ValueError: when the key being imported fails the most basic ElGamal validity checks.
+
+ Returns:
+ an :class:`ElGamalKey` object
+ """
+
+ obj=ElGamalKey()
+ if len(tup) not in [3,4]:
+ raise ValueError('argument for construct() wrong length')
+ for i in range(len(tup)):
+ field = obj._keydata[i]
+ setattr(obj, field, Integer(tup[i]))
+
+ fmt_error = test_probable_prime(obj.p) == COMPOSITE
+ fmt_error |= obj.g<=1 or obj.g>=obj.p
+ fmt_error |= pow(obj.g, obj.p-1, obj.p)!=1
+ fmt_error |= obj.y<1 or obj.y>=obj.p
+ if len(tup)==4:
+ fmt_error |= obj.x<=1 or obj.x>=obj.p
+ fmt_error |= pow(obj.g, obj.x, obj.p)!=obj.y
+
+ if fmt_error:
+ raise ValueError("Invalid ElGamal key components")
+
+ return obj
+
+class ElGamalKey(object):
+ r"""Class defining an ElGamal key.
+ Do not instantiate directly.
+ Use :func:`generate` or :func:`construct` instead.
+
+ :ivar p: Modulus
+ :vartype d: integer
+
+ :ivar g: Generator
+ :vartype e: integer
+
+ :ivar y: Public key component
+ :vartype y: integer
+
+ :ivar x: Private key component
+ :vartype x: integer
+ """
+
+ #: Dictionary of ElGamal parameters.
+ #:
+ #: A public key will only have the following entries:
+ #:
+ #: - **y**, the public key.
+ #: - **g**, the generator.
+ #: - **p**, the modulus.
+ #:
+ #: A private key will also have:
+ #:
+ #: - **x**, the private key.
+ _keydata=['p', 'g', 'y', 'x']
+
+ def __init__(self, randfunc=None):
+ if randfunc is None:
+ randfunc = Random.new().read
+ self._randfunc = randfunc
+
+ def _encrypt(self, M, K):
+ a=pow(self.g, K, self.p)
+ b=( pow(self.y, K, self.p)*M ) % self.p
+ return [int(a), int(b)]
+
+ def _decrypt(self, M):
+ if (not hasattr(self, 'x')):
+ raise TypeError('Private key not available in this object')
+ r = Integer.random_range(min_inclusive=2,
+ max_exclusive=self.p-1,
+ randfunc=self._randfunc)
+ a_blind = (pow(self.g, r, self.p) * M[0]) % self.p
+ ax=pow(a_blind, self.x, self.p)
+ plaintext_blind = (ax.inverse(self.p) * M[1] ) % self.p
+ plaintext = (plaintext_blind * pow(self.y, r, self.p)) % self.p
+ return int(plaintext)
+
+ def _sign(self, M, K):
+ if (not hasattr(self, 'x')):
+ raise TypeError('Private key not available in this object')
+ p1=self.p-1
+ K = Integer(K)
+ if (K.gcd(p1)!=1):
+ raise ValueError('Bad K value: GCD(K,p-1)!=1')
+ a=pow(self.g, K, self.p)
+ t=(Integer(M)-self.x*a) % p1
+ while t<0: t=t+p1
+ b=(t*K.inverse(p1)) % p1
+ return [int(a), int(b)]
+
+ def _verify(self, M, sig):
+ sig = [Integer(x) for x in sig]
+ if sig[0]<1 or sig[0]>self.p-1:
+ return 0
+ v1=pow(self.y, sig[0], self.p)
+ v1=(v1*pow(sig[0], sig[1], self.p)) % self.p
+ v2=pow(self.g, M, self.p)
+ if v1==v2:
+ return 1
+ return 0
+
+ def has_private(self):
+ """Whether this is an ElGamal private key"""
+
+ if hasattr(self, 'x'):
+ return 1
+ else:
+ return 0
+
+ def can_encrypt(self):
+ return True
+
+ def can_sign(self):
+ return True
+
+ def publickey(self):
+ """A matching ElGamal public key.
+
+ Returns:
+ a new :class:`ElGamalKey` object
+ """
+ return construct((self.p, self.g, self.y))
+
+ def __eq__(self, other):
+ if bool(self.has_private()) != bool(other.has_private()):
+ return False
+
+ result = True
+ for comp in self._keydata:
+ result = result and (getattr(self.key, comp, None) ==
+ getattr(other.key, comp, None))
+ return result
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __getstate__(self):
+ # ElGamal key is not pickable
+ from pickle import PicklingError
+ raise PicklingError
+
+ # Methods defined in PyCrypto that we don't support anymore
+
+ def sign(self, M, K):
+ raise NotImplementedError
+
+ def verify(self, M, signature):
+ raise NotImplementedError
+
+ def encrypt(self, plaintext, K):
+ raise NotImplementedError
+
+ def decrypt(self, ciphertext):
+ raise NotImplementedError
+
+ def blind(self, M, B):
+ raise NotImplementedError
+
+ def unblind(self, M, B):
+ raise NotImplementedError
+
+ def size(self):
+ raise NotImplementedError
diff --git a/lib/Crypto/PublicKey/ElGamal.pyi b/lib/Crypto/PublicKey/ElGamal.pyi
new file mode 100644
index 0000000..9048531
--- /dev/null
+++ b/lib/Crypto/PublicKey/ElGamal.pyi
@@ -0,0 +1,18 @@
+from typing import Callable, Union, Tuple, Optional
+
+__all__ = ['generate', 'construct', 'ElGamalKey']
+
+RNG = Callable[[int], bytes]
+
+def generate(bits: int, randfunc: RNG) -> ElGamalKey: ...
+def construct(tup: Union[Tuple[int, int, int], Tuple[int, int, int, int]]) -> ElGamalKey: ...
+
+class ElGamalKey(object):
+ def __init__(self, randfunc: Optional[RNG]=None) -> None: ...
+ def has_private(self) -> bool: ...
+ def can_encrypt(self) -> bool: ...
+ def can_sign(self) -> bool: ...
+ def publickey(self) -> ElGamalKey: ...
+ def __eq__(self, other: object) -> bool: ...
+ def __ne__(self, other: object) -> bool: ...
+ def __getstate__(self) -> None: ...
diff --git a/lib/Crypto/PublicKey/RSA.py b/lib/Crypto/PublicKey/RSA.py
new file mode 100644
index 0000000..0f5e589
--- /dev/null
+++ b/lib/Crypto/PublicKey/RSA.py
@@ -0,0 +1,802 @@
+# -*- coding: utf-8 -*-
+# ===================================================================
+#
+# Copyright (c) 2016, Legrandin <helderijs@gmail.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# ===================================================================
+
+__all__ = ['generate', 'construct', 'import_key',
+ 'RsaKey', 'oid']
+
+import binascii
+import struct
+
+from Crypto import Random
+from Crypto.Util.py3compat import tobytes, bord, tostr
+from Crypto.Util.asn1 import DerSequence, DerNull
+
+from Crypto.Math.Numbers import Integer
+from Crypto.Math.Primality import (test_probable_prime,
+ generate_probable_prime, COMPOSITE)
+
+from Crypto.PublicKey import (_expand_subject_public_key_info,
+ _create_subject_public_key_info,
+ _extract_subject_public_key_info)
+
+
+class RsaKey(object):
+ r"""Class defining an actual RSA key.
+ Do not instantiate directly.
+ Use :func:`generate`, :func:`construct` or :func:`import_key` instead.
+
+ :ivar n: RSA modulus
+ :vartype n: integer
+
+ :ivar e: RSA public exponent
+ :vartype e: integer
+
+ :ivar d: RSA private exponent
+ :vartype d: integer
+
+ :ivar p: First factor of the RSA modulus
+ :vartype p: integer
+
+ :ivar q: Second factor of the RSA modulus
+ :vartype q: integer
+
+ :ivar u: Chinese remainder component (:math:`p^{-1} \text{mod } q`)
+ :vartype u: integer
+
+ :undocumented: exportKey, publickey
+ """
+
+ def __init__(self, **kwargs):
+ """Build an RSA key.
+
+ :Keywords:
+ n : integer
+ The modulus.
+ e : integer
+ The public exponent.
+ d : integer
+ The private exponent. Only required for private keys.
+ p : integer
+ The first factor of the modulus. Only required for private keys.
+ q : integer
+ The second factor of the modulus. Only required for private keys.
+ u : integer
+ The CRT coefficient (inverse of p modulo q). Only required for
+ private keys.
+ """
+
+ input_set = set(kwargs.keys())
+ public_set = set(('n', 'e'))
+ private_set = public_set | set(('p', 'q', 'd', 'u'))
+ if input_set not in (private_set, public_set):
+ raise ValueError("Some RSA components are missing")
+ for component, value in kwargs.items():
+ setattr(self, "_" + component, value)
+ if input_set == private_set:
+ self._dp = self._d % (self._p - 1) # = (e⁻¹) mod (p-1)
+ self._dq = self._d % (self._q - 1) # = (e⁻¹) mod (q-1)
+
+ @property
+ def n(self):
+ return int(self._n)
+
+ @property
+ def e(self):
+ return int(self._e)
+
+ @property
+ def d(self):
+ if not self.has_private():
+ raise AttributeError("No private exponent available for public keys")
+ return int(self._d)
+
+ @property
+ def p(self):
+ if not self.has_private():
+ raise AttributeError("No CRT component 'p' available for public keys")
+ return int(self._p)
+
+ @property
+ def q(self):
+ if not self.has_private():
+ raise AttributeError("No CRT component 'q' available for public keys")
+ return int(self._q)
+
+ @property
+ def u(self):
+ if not self.has_private():
+ raise AttributeError("No CRT component 'u' available for public keys")
+ return int(self._u)
+
+ def size_in_bits(self):
+ """Size of the RSA modulus in bits"""
+ return self._n.size_in_bits()
+
+ def size_in_bytes(self):
+ """The minimal amount of bytes that can hold the RSA modulus"""
+ return (self._n.size_in_bits() - 1) // 8 + 1
+
+ def _encrypt(self, plaintext):
+ if not 0 <= plaintext < self._n:
+ raise ValueError("Plaintext too large")
+ return int(pow(Integer(plaintext), self._e, self._n))
+
+ def _decrypt(self, ciphertext):
+ if not 0 <= ciphertext < self._n:
+ raise ValueError("Ciphertext too large")
+ if not self.has_private():
+ raise TypeError("This is not a private key")
+
+ # Blinded RSA decryption (to prevent timing attacks):
+ # Step 1: Generate random secret blinding factor r,
+ # such that 0 < r < n-1
+ r = Integer.random_range(min_inclusive=1, max_exclusive=self._n)
+ # Step 2: Compute c' = c * r**e mod n
+ cp = Integer(ciphertext) * pow(r, self._e, self._n) % self._n
+ # Step 3: Compute m' = c'**d mod n (normal RSA decryption)
+ m1 = pow(cp, self._dp, self._p)
+ m2 = pow(cp, self._dq, self._q)
+ h = ((m2 - m1) * self._u) % self._q
+ mp = h * self._p + m1
+ # Step 4: Compute m = m' * (r**(-1)) mod n
+ result = (r.inverse(self._n) * mp) % self._n
+ # Verify no faults occurred
+ if ciphertext != pow(result, self._e, self._n):
+ raise ValueError("Fault detected in RSA decryption")
+ return result
+
+ def has_private(self):
+ """Whether this is an RSA private key"""
+
+ return hasattr(self, "_d")
+
+ def can_encrypt(self): # legacy
+ return True
+
+ def can_sign(self): # legacy
+ return True
+
+ def public_key(self):
+ """A matching RSA public key.
+
+ Returns:
+ a new :class:`RsaKey` object
+ """
+ return RsaKey(n=self._n, e=self._e)
+
+ def __eq__(self, other):
+ if self.has_private() != other.has_private():
+ return False
+ if self.n != other.n or self.e != other.e:
+ return False
+ if not self.has_private():
+ return True
+ return (self.d == other.d)
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __getstate__(self):
+ # RSA key is not pickable
+ from pickle import PicklingError
+ raise PicklingError
+
+ def __repr__(self):
+ if self.has_private():
+ extra = ", d=%d, p=%d, q=%d, u=%d" % (int(self._d), int(self._p),
+ int(self._q), int(self._u))
+ else:
+ extra = ""
+ return "RsaKey(n=%d, e=%d%s)" % (int(self._n), int(self._e), extra)
+
+ def __str__(self):
+ if self.has_private():
+ key_type = "Private"
+ else:
+ key_type = "Public"
+ return "%s RSA key at 0x%X" % (key_type, id(self))
+
+ def export_key(self, format='PEM', passphrase=None, pkcs=1,
+ protection=None, randfunc=None):
+ """Export this RSA key.
+
+ Args:
+ format (string):
+ The format to use for wrapping the key:
+
+ - *'PEM'*. (*Default*) Text encoding, done according to `RFC1421`_/`RFC1423`_.
+ - *'DER'*. Binary encoding.
+ - *'OpenSSH'*. Textual encoding, done according to OpenSSH specification.
+ Only suitable for public keys (not private keys).
+
+ passphrase (string):
+ (*For private keys only*) The pass phrase used for protecting the output.
+
+ pkcs (integer):
+ (*For private keys only*) The ASN.1 structure to use for
+ serializing the key. Note that even in case of PEM
+ encoding, there is an inner ASN.1 DER structure.
+
+ With ``pkcs=1`` (*default*), the private key is encoded in a
+ simple `PKCS#1`_ structure (``RSAPrivateKey``).
+
+ With ``pkcs=8``, the private key is encoded in a `PKCS#8`_ structure
+ (``PrivateKeyInfo``).
+
+ .. note::
+ This parameter is ignored for a public key.
+ For DER and PEM, an ASN.1 DER ``SubjectPublicKeyInfo``
+ structure is always used.
+
+ protection (string):
+ (*For private keys only*)
+ The encryption scheme to use for protecting the private key.
+
+ If ``None`` (default), the behavior depends on :attr:`format`:
+
+ - For *'DER'*, the *PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC*
+ scheme is used. The following operations are performed:
+
+ 1. A 16 byte Triple DES key is derived from the passphrase
+ using :func:`Crypto.Protocol.KDF.PBKDF2` with 8 bytes salt,
+ and 1 000 iterations of :mod:`Crypto.Hash.HMAC`.
+ 2. The private key is encrypted using CBC.
+ 3. The encrypted key is encoded according to PKCS#8.
+
+ - For *'PEM'*, the obsolete PEM encryption scheme is used.
+ It is based on MD5 for key derivation, and Triple DES for encryption.
+
+ Specifying a value for :attr:`protection` is only meaningful for PKCS#8
+ (that is, ``pkcs=8``) and only if a pass phrase is present too.
+
+ The supported schemes for PKCS#8 are listed in the
+ :mod:`Crypto.IO.PKCS8` module (see :attr:`wrap_algo` parameter).
+
+ randfunc (callable):
+ A function that provides random bytes. Only used for PEM encoding.
+ The default is :func:`Crypto.Random.get_random_bytes`.
+
+ Returns:
+ byte string: the encoded key
+
+ Raises:
+ ValueError:when the format is unknown or when you try to encrypt a private
+ key with *DER* format and PKCS#1.
+
+ .. warning::
+ If you don't provide a pass phrase, the private key will be
+ exported in the clear!
+
+ .. _RFC1421: http://www.ietf.org/rfc/rfc1421.txt
+ .. _RFC1423: http://www.ietf.org/rfc/rfc1423.txt
+ .. _`PKCS#1`: http://www.ietf.org/rfc/rfc3447.txt
+ .. _`PKCS#8`: http://www.ietf.org/rfc/rfc5208.txt
+ """
+
+ if passphrase is not None:
+ passphrase = tobytes(passphrase)
+
+ if randfunc is None:
+ randfunc = Random.get_random_bytes
+
+ if format == 'OpenSSH':
+ e_bytes, n_bytes = [x.to_bytes() for x in (self._e, self._n)]
+ if bord(e_bytes[0]) & 0x80:
+ e_bytes = b'\x00' + e_bytes
+ if bord(n_bytes[0]) & 0x80:
+ n_bytes = b'\x00' + n_bytes
+ keyparts = [b'ssh-rsa', e_bytes, n_bytes]
+ keystring = b''.join([struct.pack(">I", len(kp)) + kp for kp in keyparts])
+ return b'ssh-rsa ' + binascii.b2a_base64(keystring)[:-1]
+
+ # DER format is always used, even in case of PEM, which simply
+ # encodes it into BASE64.
+ if self.has_private():
+ binary_key = DerSequence([0,
+ self.n,
+ self.e,
+ self.d,
+ self.p,
+ self.q,
+ self.d % (self.p-1),
+ self.d % (self.q-1),
+ Integer(self.q).inverse(self.p)
+ ]).encode()
+ if pkcs == 1:
+ key_type = 'RSA PRIVATE KEY'
+ if format == 'DER' and passphrase:
+ raise ValueError("PKCS#1 private key cannot be encrypted")
+ else: # PKCS#8
+ from Crypto.IO import PKCS8
+
+ if format == 'PEM' and protection is None:
+ key_type = 'PRIVATE KEY'
+ binary_key = PKCS8.wrap(binary_key, oid, None,
+ key_params=DerNull())
+ else:
+ key_type = 'ENCRYPTED PRIVATE KEY'
+ if not protection:
+ protection = 'PBKDF2WithHMAC-SHA1AndDES-EDE3-CBC'
+ binary_key = PKCS8.wrap(binary_key, oid,
+ passphrase, protection,
+ key_params=DerNull())
+ passphrase = None
+ else:
+ key_type = "PUBLIC KEY"
+ binary_key = _create_subject_public_key_info(oid,
+ DerSequence([self.n,
+ self.e]),
+ DerNull()
+ )
+
+ if format == 'DER':
+ return binary_key
+ if format == 'PEM':
+ from Crypto.IO import PEM
+
+ pem_str = PEM.encode(binary_key, key_type, passphrase, randfunc)
+ return tobytes(pem_str)
+
+ raise ValueError("Unknown key format '%s'. Cannot export the RSA key." % format)
+
+ # Backward compatibility
+ exportKey = export_key
+ publickey = public_key
+
+ # Methods defined in PyCrypto that we don't support anymore
+ def sign(self, M, K):
+ raise NotImplementedError("Use module Crypto.Signature.pkcs1_15 instead")
+
+ def verify(self, M, signature):
+ raise NotImplementedError("Use module Crypto.Signature.pkcs1_15 instead")
+
+ def encrypt(self, plaintext, K):
+ raise NotImplementedError("Use module Crypto.Cipher.PKCS1_OAEP instead")
+
+ def decrypt(self, ciphertext):
+ raise NotImplementedError("Use module Crypto.Cipher.PKCS1_OAEP instead")
+
+ def blind(self, M, B):
+ raise NotImplementedError
+
+ def unblind(self, M, B):
+ raise NotImplementedError
+
+ def size(self):
+ raise NotImplementedError
+
+
+def generate(bits, randfunc=None, e=65537):
+ """Create a new RSA key pair.
+
+ The algorithm closely follows NIST `FIPS 186-4`_ in its
+ sections B.3.1 and B.3.3. The modulus is the product of
+ two non-strong probable primes.
+ Each prime passes a suitable number of Miller-Rabin tests
+ with random bases and a single Lucas test.
+
+ Args:
+ bits (integer):
+ Key length, or size (in bits) of the RSA modulus.
+ It must be at least 1024, but **2048 is recommended.**
+ The FIPS standard only defines 1024, 2048 and 3072.
+ randfunc (callable):
+ Function that returns random bytes.
+ The default is :func:`Crypto.Random.get_random_bytes`.
+ e (integer):
+ Public RSA exponent. It must be an odd positive integer.
+ It is typically a small number with very few ones in its
+ binary representation.
+ The FIPS standard requires the public exponent to be
+ at least 65537 (the default).
+
+ Returns: an RSA key object (:class:`RsaKey`, with private key).
+
+ .. _FIPS 186-4: http://nvlpubs.nist.gov/nistpubs/FIPS/NIST.FIPS.186-4.pdf
+ """
+
+ if bits < 1024:
+ raise ValueError("RSA modulus length must be >= 1024")
+ if e % 2 == 0 or e < 3:
+ raise ValueError("RSA public exponent must be a positive, odd integer larger than 2.")
+
+ if randfunc is None:
+ randfunc = Random.get_random_bytes
+
+ d = n = Integer(1)
+ e = Integer(e)
+
+ while n.size_in_bits() != bits and d < (1 << (bits // 2)):
+ # Generate the prime factors of n: p and q.
+ # By construciton, their product is always
+ # 2^{bits-1} < p*q < 2^bits.
+ size_q = bits // 2
+ size_p = bits - size_q
+
+ min_p = min_q = (Integer(1) << (2 * size_q - 1)).sqrt()
+ if size_q != size_p:
+ min_p = (Integer(1) << (2 * size_p - 1)).sqrt()
+
+ def filter_p(candidate):
+ return candidate > min_p and (candidate - 1).gcd(e) == 1
+
+ p = generate_probable_prime(exact_bits=size_p,
+ randfunc=randfunc,
+ prime_filter=filter_p)
+
+ min_distance = Integer(1) << (bits // 2 - 100)
+
+ def filter_q(candidate):
+ return (candidate > min_q and
+ (candidate - 1).gcd(e) == 1 and
+ abs(candidate - p) > min_distance)
+
+ q = generate_probable_prime(exact_bits=size_q,
+ randfunc=randfunc,
+ prime_filter=filter_q)
+
+ n = p * q
+ lcm = (p - 1).lcm(q - 1)
+ d = e.inverse(lcm)
+
+ if p > q:
+ p, q = q, p
+
+ u = p.inverse(q)
+
+ return RsaKey(n=n, e=e, d=d, p=p, q=q, u=u)
+
+
+def construct(rsa_components, consistency_check=True):
+ r"""Construct an RSA key from a tuple of valid RSA components.
+
+ The modulus **n** must be the product of two primes.
+ The public exponent **e** must be odd and larger than 1.
+
+ In case of a private key, the following equations must apply:
+
+ .. math::
+
+ \begin{align}
+ p*q &= n \\
+ e*d &\equiv 1 ( \text{mod lcm} [(p-1)(q-1)]) \\
+ p*u &\equiv 1 ( \text{mod } q)
+ \end{align}
+
+ Args:
+ rsa_components (tuple):
+ A tuple of integers, with at least 2 and no
+ more than 6 items. The items come in the following order:
+
+ 1. RSA modulus *n*.
+ 2. Public exponent *e*.
+ 3. Private exponent *d*.
+ Only required if the key is private.
+ 4. First factor of *n* (*p*).
+ Optional, but the other factor *q* must also be present.
+ 5. Second factor of *n* (*q*). Optional.
+ 6. CRT coefficient *q*, that is :math:`p^{-1} \text{mod }q`. Optional.
+
+ consistency_check (boolean):
+ If ``True``, the library will verify that the provided components
+ fulfil the main RSA properties.
+
+ Raises:
+ ValueError: when the key being imported fails the most basic RSA validity checks.
+
+ Returns: An RSA key object (:class:`RsaKey`).
+ """
+
+ class InputComps(object):
+ pass
+
+ input_comps = InputComps()
+ for (comp, value) in zip(('n', 'e', 'd', 'p', 'q', 'u'), rsa_components):
+ setattr(input_comps, comp, Integer(value))
+
+ n = input_comps.n
+ e = input_comps.e
+ if not hasattr(input_comps, 'd'):
+ key = RsaKey(n=n, e=e)
+ else:
+ d = input_comps.d
+ if hasattr(input_comps, 'q'):
+ p = input_comps.p
+ q = input_comps.q
+ else:
+ # Compute factors p and q from the private exponent d.
+ # We assume that n has no more than two factors.
+ # See 8.2.2(i) in Handbook of Applied Cryptography.
+ ktot = d * e - 1
+ # The quantity d*e-1 is a multiple of phi(n), even,
+ # and can be represented as t*2^s.
+ t = ktot
+ while t % 2 == 0:
+ t //= 2
+ # Cycle through all multiplicative inverses in Zn.
+ # The algorithm is non-deterministic, but there is a 50% chance
+ # any candidate a leads to successful factoring.
+ # See "Digitalized Signatures and Public Key Functions as Intractable
+ # as Factorization", M. Rabin, 1979
+ spotted = False
+ a = Integer(2)
+ while not spotted and a < 100:
+ k = Integer(t)
+ # Cycle through all values a^{t*2^i}=a^k
+ while k < ktot:
+ cand = pow(a, k, n)
+ # Check if a^k is a non-trivial root of unity (mod n)
+ if cand != 1 and cand != (n - 1) and pow(cand, 2, n) == 1:
+ # We have found a number such that (cand-1)(cand+1)=0 (mod n).
+ # Either of the terms divides n.
+ p = Integer(n).gcd(cand + 1)
+ spotted = True
+ break
+ k *= 2
+ # This value was not any good... let's try another!
+ a += 2
+ if not spotted:
+ raise ValueError("Unable to compute factors p and q from exponent d.")
+ # Found !
+ assert ((n % p) == 0)
+ q = n // p
+
+ if hasattr(input_comps, 'u'):
+ u = input_comps.u
+ else:
+ u = p.inverse(q)
+
+ # Build key object
+ key = RsaKey(n=n, e=e, d=d, p=p, q=q, u=u)
+
+ # Verify consistency of the key
+ if consistency_check:
+
+ # Modulus and public exponent must be coprime
+ if e <= 1 or e >= n:
+ raise ValueError("Invalid RSA public exponent")
+ if Integer(n).gcd(e) != 1:
+ raise ValueError("RSA public exponent is not coprime to modulus")
+
+ # For RSA, modulus must be odd
+ if not n & 1:
+ raise ValueError("RSA modulus is not odd")
+
+ if key.has_private():
+ # Modulus and private exponent must be coprime
+ if d <= 1 or d >= n:
+ raise ValueError("Invalid RSA private exponent")
+ if Integer(n).gcd(d) != 1:
+ raise ValueError("RSA private exponent is not coprime to modulus")
+ # Modulus must be product of 2 primes
+ if p * q != n:
+ raise ValueError("RSA factors do not match modulus")
+ if test_probable_prime(p) == COMPOSITE:
+ raise ValueError("RSA factor p is composite")
+ if test_probable_prime(q) == COMPOSITE:
+ raise ValueError("RSA factor q is composite")
+ # See Carmichael theorem
+ phi = (p - 1) * (q - 1)
+ lcm = phi // (p - 1).gcd(q - 1)
+ if (e * d % int(lcm)) != 1:
+ raise ValueError("Invalid RSA condition")
+ if hasattr(key, 'u'):
+ # CRT coefficient
+ if u <= 1 or u >= q:
+ raise ValueError("Invalid RSA component u")
+ if (p * u % q) != 1:
+ raise ValueError("Invalid RSA component u with p")
+
+ return key
+
+
+def _import_pkcs1_private(encoded, *kwargs):
+ # RSAPrivateKey ::= SEQUENCE {
+ # version Version,
+ # modulus INTEGER, -- n
+ # publicExponent INTEGER, -- e
+ # privateExponent INTEGER, -- d
+ # prime1 INTEGER, -- p
+ # prime2 INTEGER, -- q
+ # exponent1 INTEGER, -- d mod (p-1)
+ # exponent2 INTEGER, -- d mod (q-1)
+ # coefficient INTEGER -- (inverse of q) mod p
+ # }
+ #
+ # Version ::= INTEGER
+ der = DerSequence().decode(encoded, nr_elements=9, only_ints_expected=True)
+ if der[0] != 0:
+ raise ValueError("No PKCS#1 encoding of an RSA private key")
+ return construct(der[1:6] + [Integer(der[4]).inverse(der[5])])
+
+
+def _import_pkcs1_public(encoded, *kwargs):
+ # RSAPublicKey ::= SEQUENCE {
+ # modulus INTEGER, -- n
+ # publicExponent INTEGER -- e
+ # }
+ der = DerSequence().decode(encoded, nr_elements=2, only_ints_expected=True)
+ return construct(der)
+
+
+def _import_subjectPublicKeyInfo(encoded, *kwargs):
+
+ algoid, encoded_key, params = _expand_subject_public_key_info(encoded)
+ if algoid != oid or params is not None:
+ raise ValueError("No RSA subjectPublicKeyInfo")
+ return _import_pkcs1_public(encoded_key)
+
+
+def _import_x509_cert(encoded, *kwargs):
+
+ sp_info = _extract_subject_public_key_info(encoded)
+ return _import_subjectPublicKeyInfo(sp_info)
+
+
+def _import_pkcs8(encoded, passphrase):
+ from Crypto.IO import PKCS8
+
+ k = PKCS8.unwrap(encoded, passphrase)
+ if k[0] != oid:
+ raise ValueError("No PKCS#8 encoded RSA key")
+ return _import_keyDER(k[1], passphrase)
+
+
+def _import_keyDER(extern_key, passphrase):
+ """Import an RSA key (public or private half), encoded in DER form."""
+
+ decodings = (_import_pkcs1_private,
+ _import_pkcs1_public,
+ _import_subjectPublicKeyInfo,
+ _import_x509_cert,
+ _import_pkcs8)
+
+ for decoding in decodings:
+ try:
+ return decoding(extern_key, passphrase)
+ except ValueError:
+ pass
+
+ raise ValueError("RSA key format is not supported")
+
+
+def _import_openssh_private_rsa(data, password):
+
+ from ._openssh import (import_openssh_private_generic,
+ read_bytes, read_string, check_padding)
+
+ ssh_name, decrypted = import_openssh_private_generic(data, password)
+
+ if ssh_name != "ssh-rsa":
+ raise ValueError("This SSH key is not RSA")
+
+ n, decrypted = read_bytes(decrypted)
+ e, decrypted = read_bytes(decrypted)
+ d, decrypted = read_bytes(decrypted)
+ iqmp, decrypted = read_bytes(decrypted)
+ p, decrypted = read_bytes(decrypted)
+ q, decrypted = read_bytes(decrypted)
+
+ _, padded = read_string(decrypted) # Comment
+ check_padding(padded)
+
+ build = [Integer.from_bytes(x) for x in (n, e, d, q, p, iqmp)]
+ return construct(build)
+
+
+def import_key(extern_key, passphrase=None):
+ """Import an RSA key (public or private).
+
+ Args:
+ extern_key (string or byte string):
+ The RSA key to import.
+
+ The following formats are supported for an RSA **public key**:
+
+ - X.509 certificate (binary or PEM format)
+ - X.509 ``subjectPublicKeyInfo`` DER SEQUENCE (binary or PEM
+ encoding)
+ - `PKCS#1`_ ``RSAPublicKey`` DER SEQUENCE (binary or PEM encoding)
+ - An OpenSSH line (e.g. the content of ``~/.ssh/id_ecdsa``, ASCII)
+
+ The following formats are supported for an RSA **private key**:
+
+ - PKCS#1 ``RSAPrivateKey`` DER SEQUENCE (binary or PEM encoding)
+ - `PKCS#8`_ ``PrivateKeyInfo`` or ``EncryptedPrivateKeyInfo``
+ DER SEQUENCE (binary or PEM encoding)
+ - OpenSSH (text format, introduced in `OpenSSH 6.5`_)
+
+ For details about the PEM encoding, see `RFC1421`_/`RFC1423`_.
+
+ passphrase (string or byte string):
+ For private keys only, the pass phrase that encrypts the key.
+
+ Returns: An RSA key object (:class:`RsaKey`).
+
+ Raises:
+ ValueError/IndexError/TypeError:
+ When the given key cannot be parsed (possibly because the pass
+ phrase is wrong).
+
+ .. _RFC1421: http://www.ietf.org/rfc/rfc1421.txt
+ .. _RFC1423: http://www.ietf.org/rfc/rfc1423.txt
+ .. _`PKCS#1`: http://www.ietf.org/rfc/rfc3447.txt
+ .. _`PKCS#8`: http://www.ietf.org/rfc/rfc5208.txt
+ .. _`OpenSSH 6.5`: https://flak.tedunangst.com/post/new-openssh-key-format-and-bcrypt-pbkdf
+ """
+
+ from Crypto.IO import PEM
+
+ extern_key = tobytes(extern_key)
+ if passphrase is not None:
+ passphrase = tobytes(passphrase)
+
+ if extern_key.startswith(b'-----BEGIN OPENSSH PRIVATE KEY'):
+ text_encoded = tostr(extern_key)
+ openssh_encoded, marker, enc_flag = PEM.decode(text_encoded, passphrase)
+ result = _import_openssh_private_rsa(openssh_encoded, passphrase)
+ return result
+
+ if extern_key.startswith(b'-----'):
+ # This is probably a PEM encoded key.
+ (der, marker, enc_flag) = PEM.decode(tostr(extern_key), passphrase)
+ if enc_flag:
+ passphrase = None
+ return _import_keyDER(der, passphrase)
+
+ if extern_key.startswith(b'ssh-rsa '):
+ # This is probably an OpenSSH key
+ keystring = binascii.a2b_base64(extern_key.split(b' ')[1])
+ keyparts = []
+ while len(keystring) > 4:
+ length = struct.unpack(">I", keystring[:4])[0]
+ keyparts.append(keystring[4:4 + length])
+ keystring = keystring[4 + length:]
+ e = Integer.from_bytes(keyparts[1])
+ n = Integer.from_bytes(keyparts[2])
+ return construct([n, e])
+
+ if len(extern_key) > 0 and bord(extern_key[0]) == 0x30:
+ # This is probably a DER encoded key
+ return _import_keyDER(extern_key, passphrase)
+
+ raise ValueError("RSA key format is not supported")
+
+
+# Backward compatibility
+importKey = import_key
+
+#: `Object ID`_ for the RSA encryption algorithm. This OID often indicates
+#: a generic RSA key, even when such key will be actually used for digital
+#: signatures.
+#:
+#: .. _`Object ID`: http://www.alvestrand.no/objectid/1.2.840.113549.1.1.1.html
+oid = "1.2.840.113549.1.1.1"
diff --git a/lib/Crypto/PublicKey/RSA.pyi b/lib/Crypto/PublicKey/RSA.pyi
new file mode 100644
index 0000000..d436acf
--- /dev/null
+++ b/lib/Crypto/PublicKey/RSA.pyi
@@ -0,0 +1,51 @@
+from typing import Callable, Union, Tuple, Optional
+
+__all__ = ['generate', 'construct', 'import_key',
+ 'RsaKey', 'oid']
+
+RNG = Callable[[int], bytes]
+
+class RsaKey(object):
+ def __init__(self, **kwargs: int) -> None: ...
+ @property
+ def n(self) -> int: ...
+ @property
+ def e(self) -> int: ...
+ @property
+ def d(self) -> int: ...
+ @property
+ def p(self) -> int: ...
+ @property
+ def q(self) -> int: ...
+ @property
+ def u(self) -> int: ...
+ def size_in_bits(self) -> int: ...
+ def size_in_bytes(self) -> int: ...
+ def has_private(self) -> bool: ...
+ def can_encrypt(self) -> bool: ... # legacy
+ def can_sign(self) -> bool:... # legacy
+ def public_key(self) -> RsaKey: ...
+ def __eq__(self, other: object) -> bool: ...
+ def __ne__(self, other: object) -> bool: ...
+ def __getstate__(self) -> None: ...
+ def __repr__(self) -> str: ...
+ def __str__(self) -> str: ...
+ def export_key(self, format: Optional[str]="PEM", passphrase: Optional[str]=None, pkcs: Optional[int]=1,
+ protection: Optional[str]=None, randfunc: Optional[RNG]=None) -> bytes: ...
+
+ # Backward compatibility
+ exportKey = export_key
+ publickey = public_key
+
+def generate(bits: int, randfunc: Optional[RNG]=None, e: Optional[int]=65537) -> RsaKey: ...
+def construct(rsa_components: Union[Tuple[int, int], # n, e
+ Tuple[int, int, int], # n, e, d
+ Tuple[int, int, int, int, int], # n, e, d, p, q
+ Tuple[int, int, int, int, int, int]], # n, e, d, p, q, crt_q
+ consistency_check: Optional[bool]=True) -> RsaKey: ...
+def import_key(extern_key: Union[str, bytes], passphrase: Optional[str]=None) -> RsaKey: ...
+
+# Backward compatibility
+importKey = import_key
+
+oid: str
diff --git a/lib/Crypto/PublicKey/__init__.py b/lib/Crypto/PublicKey/__init__.py
new file mode 100644
index 0000000..cf3a238
--- /dev/null
+++ b/lib/Crypto/PublicKey/__init__.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain. To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+from Crypto.Util.asn1 import (DerSequence, DerInteger, DerBitString,
+ DerObjectId, DerNull)
+
+
+def _expand_subject_public_key_info(encoded):
+ """Parse a SubjectPublicKeyInfo structure.
+
+ It returns a triple with:
+ * OID (string)
+ * encoded public key (bytes)
+ * Algorithm parameters (bytes or None)
+ """
+
+ #
+ # SubjectPublicKeyInfo ::= SEQUENCE {
+ # algorithm AlgorithmIdentifier,
+ # subjectPublicKey BIT STRING
+ # }
+ #
+ # AlgorithmIdentifier ::= SEQUENCE {
+ # algorithm OBJECT IDENTIFIER,
+ # parameters ANY DEFINED BY algorithm OPTIONAL
+ # }
+ #
+
+ spki = DerSequence().decode(encoded, nr_elements=2)
+ algo = DerSequence().decode(spki[0], nr_elements=(1,2))
+ algo_oid = DerObjectId().decode(algo[0])
+ spk = DerBitString().decode(spki[1]).value
+
+ if len(algo) == 1:
+ algo_params = None
+ else:
+ try:
+ DerNull().decode(algo[1])
+ algo_params = None
+ except:
+ algo_params = algo[1]
+
+ return algo_oid.value, spk, algo_params
+
+
+def _create_subject_public_key_info(algo_oid, public_key, params):
+
+ if params is None:
+ algorithm = DerSequence([DerObjectId(algo_oid)])
+ else:
+ algorithm = DerSequence([DerObjectId(algo_oid), params])
+
+ spki = DerSequence([algorithm,
+ DerBitString(public_key)
+ ])
+ return spki.encode()
+
+
+def _extract_subject_public_key_info(x509_certificate):
+ """Extract subjectPublicKeyInfo from a DER X.509 certificate."""
+
+ certificate = DerSequence().decode(x509_certificate, nr_elements=3)
+ tbs_certificate = DerSequence().decode(certificate[0],
+ nr_elements=range(6, 11))
+
+ index = 5
+ try:
+ tbs_certificate[0] + 1
+ # Version not present
+ version = 1
+ except TypeError:
+ version = DerInteger(explicit=0).decode(tbs_certificate[0]).value
+ if version not in (2, 3):
+ raise ValueError("Incorrect X.509 certificate version")
+ index = 6
+
+ return tbs_certificate[index]
diff --git a/lib/Crypto/PublicKey/__init__.pyi b/lib/Crypto/PublicKey/__init__.pyi
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/Crypto/PublicKey/__init__.pyi
diff --git a/lib/Crypto/PublicKey/_ec_ws.abi3.so b/lib/Crypto/PublicKey/_ec_ws.abi3.so
new file mode 100755
index 0000000..46ec795
--- /dev/null
+++ b/lib/Crypto/PublicKey/_ec_ws.abi3.so
Binary files differ
diff --git a/lib/Crypto/PublicKey/_ed25519.abi3.so b/lib/Crypto/PublicKey/_ed25519.abi3.so
new file mode 100755
index 0000000..891ec44
--- /dev/null
+++ b/lib/Crypto/PublicKey/_ed25519.abi3.so
Binary files differ
diff --git a/lib/Crypto/PublicKey/_ed448.abi3.so b/lib/Crypto/PublicKey/_ed448.abi3.so
new file mode 100755
index 0000000..a3ddd4b
--- /dev/null
+++ b/lib/Crypto/PublicKey/_ed448.abi3.so
Binary files differ
diff --git a/lib/Crypto/PublicKey/_openssh.py b/lib/Crypto/PublicKey/_openssh.py
new file mode 100644
index 0000000..88dacfc
--- /dev/null
+++ b/lib/Crypto/PublicKey/_openssh.py
@@ -0,0 +1,135 @@
+# ===================================================================
+#
+# Copyright (c) 2019, Helder Eijs <helderijs@gmail.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# ===================================================================
+
+import struct
+
+from Crypto.Cipher import AES
+from Crypto.Hash import SHA512
+from Crypto.Protocol.KDF import _bcrypt_hash
+from Crypto.Util.strxor import strxor
+from Crypto.Util.py3compat import tostr, bchr, bord
+
+
+def read_int4(data):
+ if len(data) < 4:
+ raise ValueError("Insufficient data")
+ value = struct.unpack(">I", data[:4])[0]
+ return value, data[4:]
+
+
+def read_bytes(data):
+ size, data = read_int4(data)
+ if len(data) < size:
+ raise ValueError("Insufficient data (V)")
+ return data[:size], data[size:]
+
+
+def read_string(data):
+ s, d = read_bytes(data)
+ return tostr(s), d
+
+
+def check_padding(pad):
+ for v, x in enumerate(pad):
+ if bord(x) != ((v + 1) & 0xFF):
+ raise ValueError("Incorrect padding")
+
+
+def import_openssh_private_generic(data, password):
+ # https://cvsweb.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/PROTOCOL.key?annotate=HEAD
+ # https://github.com/openssh/openssh-portable/blob/master/sshkey.c
+ # https://coolaj86.com/articles/the-openssh-private-key-format/
+ # https://coolaj86.com/articles/the-ssh-public-key-format/
+
+ if not data.startswith(b'openssh-key-v1\x00'):
+ raise ValueError("Incorrect magic value")
+ data = data[15:]
+
+ ciphername, data = read_string(data)
+ kdfname, data = read_string(data)
+ kdfoptions, data = read_bytes(data)
+ number_of_keys, data = read_int4(data)
+
+ if number_of_keys != 1:
+ raise ValueError("We only handle 1 key at a time")
+
+ _, data = read_string(data) # Public key
+ encrypted, data = read_bytes(data)
+ if data:
+ raise ValueError("Too much data")
+
+ if len(encrypted) % 8 != 0:
+ raise ValueError("Incorrect payload length")
+
+ # Decrypt if necessary
+ if ciphername == 'none':
+ decrypted = encrypted
+ else:
+ if (ciphername, kdfname) != ('aes256-ctr', 'bcrypt'):
+ raise ValueError("Unsupported encryption scheme %s/%s" % (ciphername, kdfname))
+
+ salt, kdfoptions = read_bytes(kdfoptions)
+ iterations, kdfoptions = read_int4(kdfoptions)
+
+ if len(salt) != 16:
+ raise ValueError("Incorrect salt length")
+ if kdfoptions:
+ raise ValueError("Too much data in kdfoptions")
+
+ pwd_sha512 = SHA512.new(password).digest()
+ # We need 32+16 = 48 bytes, therefore 2 bcrypt outputs are sufficient
+ stripes = []
+ constant = b"OxychromaticBlowfishSwatDynamite"
+ for count in range(1, 3):
+ salt_sha512 = SHA512.new(salt + struct.pack(">I", count)).digest()
+ out_le = _bcrypt_hash(pwd_sha512, 6, salt_sha512, constant, False)
+ out = struct.pack("<IIIIIIII", *struct.unpack(">IIIIIIII", out_le))
+ acc = bytearray(out)
+ for _ in range(1, iterations):
+ out_le = _bcrypt_hash(pwd_sha512, 6, SHA512.new(out).digest(), constant, False)
+ out = struct.pack("<IIIIIIII", *struct.unpack(">IIIIIIII", out_le))
+ strxor(acc, out, output=acc)
+ stripes.append(acc[:24])
+
+ result = b"".join([bchr(a)+bchr(b) for (a, b) in zip(*stripes)])
+
+ cipher = AES.new(result[:32],
+ AES.MODE_CTR,
+ nonce=b"",
+ initial_value=result[32:32+16])
+ decrypted = cipher.decrypt(encrypted)
+
+ checkint1, decrypted = read_int4(decrypted)
+ checkint2, decrypted = read_int4(decrypted)
+ if checkint1 != checkint2:
+ raise ValueError("Incorrect checksum")
+ ssh_name, decrypted = read_string(decrypted)
+
+ return ssh_name, decrypted
diff --git a/lib/Crypto/PublicKey/_openssh.pyi b/lib/Crypto/PublicKey/_openssh.pyi
new file mode 100644
index 0000000..15f3677
--- /dev/null
+++ b/lib/Crypto/PublicKey/_openssh.pyi
@@ -0,0 +1,7 @@
+from typing import Tuple
+
+def read_int4(data: bytes) -> Tuple[int, bytes]: ...
+def read_bytes(data: bytes) -> Tuple[bytes, bytes]: ...
+def read_string(data: bytes) -> Tuple[str, bytes]: ...
+def check_padding(pad: bytes) -> None: ...
+def import_openssh_private_generic(data: bytes, password: bytes) -> Tuple[str, bytes]: ...
diff --git a/lib/Crypto/PublicKey/_x25519.abi3.so b/lib/Crypto/PublicKey/_x25519.abi3.so
new file mode 100755
index 0000000..afd3ee4
--- /dev/null
+++ b/lib/Crypto/PublicKey/_x25519.abi3.so
Binary files differ