summaryrefslogtreecommitdiffstats
path: root/lib/Crypto/SelfTest/Cipher/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Crypto/SelfTest/Cipher/common.py')
-rw-r--r--lib/Crypto/SelfTest/Cipher/common.py510
1 files changed, 510 insertions, 0 deletions
diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py
new file mode 100644
index 0000000..c5bc755
--- /dev/null
+++ b/lib/Crypto/SelfTest/Cipher/common.py
@@ -0,0 +1,510 @@
+# -*- coding: utf-8 -*-
+#
+# SelfTest/Hash/common.py: Common code for Crypto.SelfTest.Hash
+#
+# Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net>
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain. To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+"""Self-testing for PyCrypto hash modules"""
+
+import unittest
+from binascii import a2b_hex, b2a_hex, hexlify
+
+from Crypto.Util.py3compat import b
+from Crypto.Util.strxor import strxor_c
+
+class _NoDefault: pass # sentinel object
+def _extract(d, k, default=_NoDefault):
+ """Get an item from a dictionary, and remove it from the dictionary."""
+ try:
+ retval = d[k]
+ except KeyError:
+ if default is _NoDefault:
+ raise
+ return default
+ del d[k]
+ return retval
+
+# Generic cipher test case
+class CipherSelfTest(unittest.TestCase):
+
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+
+ # Extract the parameters
+ params = params.copy()
+ self.description = _extract(params, 'description')
+ self.key = b(_extract(params, 'key'))
+ self.plaintext = b(_extract(params, 'plaintext'))
+ self.ciphertext = b(_extract(params, 'ciphertext'))
+ self.module_name = _extract(params, 'module_name', None)
+ self.assoc_data = _extract(params, 'assoc_data', None)
+ self.mac = _extract(params, 'mac', None)
+ if self.assoc_data:
+ self.mac = b(self.mac)
+
+ mode = _extract(params, 'mode', None)
+ self.mode_name = str(mode)
+
+ if mode is not None:
+ # Block cipher
+ self.mode = getattr(self.module, "MODE_" + mode)
+
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is None:
+ self.iv = _extract(params, 'nonce', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+
+ else:
+ # Stream cipher
+ self.mode = None
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+
+ self.extra_params = params
+
+ def shortDescription(self):
+ return self.description
+
+ def _new(self):
+ params = self.extra_params.copy()
+ key = a2b_hex(self.key)
+
+ old_style = []
+ if self.mode is not None:
+ old_style = [ self.mode ]
+ if self.iv is not None:
+ old_style += [ a2b_hex(self.iv) ]
+
+ return self.module.new(key, *old_style, **params)
+
+ def isMode(self, name):
+ if not hasattr(self.module, "MODE_"+name):
+ return False
+ return self.mode == getattr(self.module, "MODE_"+name)
+
+ def runTest(self):
+ plaintext = a2b_hex(self.plaintext)
+ ciphertext = a2b_hex(self.ciphertext)
+ assoc_data = []
+ if self.assoc_data:
+ assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data]
+
+ ct = None
+ pt = None
+
+ #
+ # Repeat the same encryption or decryption twice and verify
+ # that the result is always the same
+ #
+ for i in range(2):
+ cipher = self._new()
+ decipher = self._new()
+
+ # Only AEAD modes
+ for comp in assoc_data:
+ cipher.update(comp)
+ decipher.update(comp)
+
+ ctX = b2a_hex(cipher.encrypt(plaintext))
+ ptX = b2a_hex(decipher.decrypt(ciphertext))
+
+ if ct:
+ self.assertEqual(ct, ctX)
+ self.assertEqual(pt, ptX)
+ ct, pt = ctX, ptX
+
+ self.assertEqual(self.ciphertext, ct) # encrypt
+ self.assertEqual(self.plaintext, pt) # decrypt
+
+ if self.mac:
+ mac = b2a_hex(cipher.digest())
+ self.assertEqual(self.mac, mac)
+ decipher.verify(a2b_hex(self.mac))
+
+class CipherStreamingSelfTest(CipherSelfTest):
+
+ def shortDescription(self):
+ desc = self.module_name
+ if self.mode is not None:
+ desc += " in %s mode" % (self.mode_name,)
+ return "%s should behave like a stream cipher" % (desc,)
+
+ def runTest(self):
+ plaintext = a2b_hex(self.plaintext)
+ ciphertext = a2b_hex(self.ciphertext)
+
+ # The cipher should work like a stream cipher
+
+ # Test counter mode encryption, 3 bytes at a time
+ ct3 = []
+ cipher = self._new()
+ for i in range(0, len(plaintext), 3):
+ ct3.append(cipher.encrypt(plaintext[i:i+3]))
+ ct3 = b2a_hex(b("").join(ct3))
+ self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
+
+ # Test counter mode decryption, 3 bytes at a time
+ pt3 = []
+ cipher = self._new()
+ for i in range(0, len(ciphertext), 3):
+ pt3.append(cipher.encrypt(ciphertext[i:i+3]))
+ # PY3K: This is meant to be text, do not change to bytes (data)
+ pt3 = b2a_hex(b("").join(pt3))
+ self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
+
+
+class RoundtripTest(unittest.TestCase):
+ def __init__(self, module, params):
+ from Crypto import Random
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.iv = Random.get_random_bytes(module.block_size)
+ self.key = b(params['key'])
+ self.plaintext = 100 * b(params['plaintext'])
+ self.module_name = params.get('module_name', None)
+
+ def shortDescription(self):
+ return """%s .decrypt() output of .encrypt() should not be garbled""" % (self.module_name,)
+
+ def runTest(self):
+
+ ## ECB mode
+ mode = self.module.MODE_ECB
+ encryption_cipher = self.module.new(a2b_hex(self.key), mode)
+ ciphertext = encryption_cipher.encrypt(self.plaintext)
+ decryption_cipher = self.module.new(a2b_hex(self.key), mode)
+ decrypted_plaintext = decryption_cipher.decrypt(ciphertext)
+ self.assertEqual(self.plaintext, decrypted_plaintext)
+
+
+class IVLengthTest(unittest.TestCase):
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.key = b(params['key'])
+
+ def shortDescription(self):
+ return "Check that all modes except MODE_ECB and MODE_CTR require an IV of the proper length"
+
+ def runTest(self):
+ self.assertRaises(TypeError, self.module.new, a2b_hex(self.key),
+ self.module.MODE_ECB, b(""))
+
+ def _dummy_counter(self):
+ return "\0" * self.module.block_size
+
+
+class NoDefaultECBTest(unittest.TestCase):
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.key = b(params['key'])
+
+ def runTest(self):
+ self.assertRaises(TypeError, self.module.new, a2b_hex(self.key))
+
+
+class BlockSizeTest(unittest.TestCase):
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.key = a2b_hex(b(params['key']))
+
+ def runTest(self):
+ cipher = self.module.new(self.key, self.module.MODE_ECB)
+ self.assertEqual(cipher.block_size, self.module.block_size)
+
+
+class ByteArrayTest(unittest.TestCase):
+ """Verify we can use bytearray's for encrypting and decrypting"""
+
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+
+ # Extract the parameters
+ params = params.copy()
+ self.description = _extract(params, 'description')
+ self.key = b(_extract(params, 'key'))
+ self.plaintext = b(_extract(params, 'plaintext'))
+ self.ciphertext = b(_extract(params, 'ciphertext'))
+ self.module_name = _extract(params, 'module_name', None)
+ self.assoc_data = _extract(params, 'assoc_data', None)
+ self.mac = _extract(params, 'mac', None)
+ if self.assoc_data:
+ self.mac = b(self.mac)
+
+ mode = _extract(params, 'mode', None)
+ self.mode_name = str(mode)
+
+ if mode is not None:
+ # Block cipher
+ self.mode = getattr(self.module, "MODE_" + mode)
+
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is None:
+ self.iv = _extract(params, 'nonce', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+ else:
+ # Stream cipher
+ self.mode = None
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+
+ self.extra_params = params
+
+ def _new(self):
+ params = self.extra_params.copy()
+ key = a2b_hex(self.key)
+
+ old_style = []
+ if self.mode is not None:
+ old_style = [ self.mode ]
+ if self.iv is not None:
+ old_style += [ a2b_hex(self.iv) ]
+
+ return self.module.new(key, *old_style, **params)
+
+ def runTest(self):
+
+ plaintext = a2b_hex(self.plaintext)
+ ciphertext = a2b_hex(self.ciphertext)
+ assoc_data = []
+ if self.assoc_data:
+ assoc_data = [ bytearray(a2b_hex(b(x))) for x in self.assoc_data]
+
+ cipher = self._new()
+ decipher = self._new()
+
+ # Only AEAD modes
+ for comp in assoc_data:
+ cipher.update(comp)
+ decipher.update(comp)
+
+ ct = b2a_hex(cipher.encrypt(bytearray(plaintext)))
+ pt = b2a_hex(decipher.decrypt(bytearray(ciphertext)))
+
+ self.assertEqual(self.ciphertext, ct) # encrypt
+ self.assertEqual(self.plaintext, pt) # decrypt
+
+ if self.mac:
+ mac = b2a_hex(cipher.digest())
+ self.assertEqual(self.mac, mac)
+ decipher.verify(bytearray(a2b_hex(self.mac)))
+
+
+class MemoryviewTest(unittest.TestCase):
+ """Verify we can use memoryviews for encrypting and decrypting"""
+
+ def __init__(self, module, params):
+ unittest.TestCase.__init__(self)
+ self.module = module
+
+ # Extract the parameters
+ params = params.copy()
+ self.description = _extract(params, 'description')
+ self.key = b(_extract(params, 'key'))
+ self.plaintext = b(_extract(params, 'plaintext'))
+ self.ciphertext = b(_extract(params, 'ciphertext'))
+ self.module_name = _extract(params, 'module_name', None)
+ self.assoc_data = _extract(params, 'assoc_data', None)
+ self.mac = _extract(params, 'mac', None)
+ if self.assoc_data:
+ self.mac = b(self.mac)
+
+ mode = _extract(params, 'mode', None)
+ self.mode_name = str(mode)
+
+ if mode is not None:
+ # Block cipher
+ self.mode = getattr(self.module, "MODE_" + mode)
+
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is None:
+ self.iv = _extract(params, 'nonce', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+ else:
+ # Stream cipher
+ self.mode = None
+ self.iv = _extract(params, 'iv', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
+
+ self.extra_params = params
+
+ def _new(self):
+ params = self.extra_params.copy()
+ key = a2b_hex(self.key)
+
+ old_style = []
+ if self.mode is not None:
+ old_style = [ self.mode ]
+ if self.iv is not None:
+ old_style += [ a2b_hex(self.iv) ]
+
+ return self.module.new(key, *old_style, **params)
+
+ def runTest(self):
+
+ plaintext = a2b_hex(self.plaintext)
+ ciphertext = a2b_hex(self.ciphertext)
+ assoc_data = []
+ if self.assoc_data:
+ assoc_data = [ memoryview(a2b_hex(b(x))) for x in self.assoc_data]
+
+ cipher = self._new()
+ decipher = self._new()
+
+ # Only AEAD modes
+ for comp in assoc_data:
+ cipher.update(comp)
+ decipher.update(comp)
+
+ ct = b2a_hex(cipher.encrypt(memoryview(plaintext)))
+ pt = b2a_hex(decipher.decrypt(memoryview(ciphertext)))
+
+ self.assertEqual(self.ciphertext, ct) # encrypt
+ self.assertEqual(self.plaintext, pt) # decrypt
+
+ if self.mac:
+ mac = b2a_hex(cipher.digest())
+ self.assertEqual(self.mac, mac)
+ decipher.verify(memoryview(a2b_hex(self.mac)))
+
+
+def make_block_tests(module, module_name, test_data, additional_params=dict()):
+ tests = []
+ extra_tests_added = False
+ for i in range(len(test_data)):
+ row = test_data[i]
+
+ # Build the "params" dictionary with
+ # - plaintext
+ # - ciphertext
+ # - key
+ # - mode (default is ECB)
+ # - (optionally) description
+ # - (optionally) any other parameter that this cipher mode requires
+ params = {}
+ if len(row) == 3:
+ (params['plaintext'], params['ciphertext'], params['key']) = row
+ elif len(row) == 4:
+ (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row
+ elif len(row) == 5:
+ (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row
+ params.update(extra_params)
+ else:
+ raise AssertionError("Unsupported tuple size %d" % (len(row),))
+
+ if not "mode" in params:
+ params["mode"] = "ECB"
+
+ # Build the display-name for the test
+ p2 = params.copy()
+ p_key = _extract(p2, 'key')
+ p_plaintext = _extract(p2, 'plaintext')
+ p_ciphertext = _extract(p2, 'ciphertext')
+ p_mode = _extract(p2, 'mode')
+ p_description = _extract(p2, 'description', None)
+
+ if p_description is not None:
+ description = p_description
+ elif p_mode == 'ECB' and not p2:
+ description = "p=%s, k=%s" % (p_plaintext, p_key)
+ else:
+ description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2)
+ name = "%s #%d: %s" % (module_name, i+1, description)
+ params['description'] = name
+ params['module_name'] = module_name
+ params.update(additional_params)
+
+ # Add extra test(s) to the test suite before the current test
+ if not extra_tests_added:
+ tests += [
+ RoundtripTest(module, params),
+ IVLengthTest(module, params),
+ NoDefaultECBTest(module, params),
+ ByteArrayTest(module, params),
+ BlockSizeTest(module, params),
+ ]
+ extra_tests_added = True
+
+ # Add the current test to the test suite
+ tests.append(CipherSelfTest(module, params))
+
+ return tests
+
+def make_stream_tests(module, module_name, test_data):
+ tests = []
+ extra_tests_added = False
+ for i in range(len(test_data)):
+ row = test_data[i]
+
+ # Build the "params" dictionary
+ params = {}
+ if len(row) == 3:
+ (params['plaintext'], params['ciphertext'], params['key']) = row
+ elif len(row) == 4:
+ (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row
+ elif len(row) == 5:
+ (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row
+ params.update(extra_params)
+ else:
+ raise AssertionError("Unsupported tuple size %d" % (len(row),))
+
+ # Build the display-name for the test
+ p2 = params.copy()
+ p_key = _extract(p2, 'key')
+ p_plaintext = _extract(p2, 'plaintext')
+ p_ciphertext = _extract(p2, 'ciphertext')
+ p_description = _extract(p2, 'description', None)
+
+ if p_description is not None:
+ description = p_description
+ elif not p2:
+ description = "p=%s, k=%s" % (p_plaintext, p_key)
+ else:
+ description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2)
+ name = "%s #%d: %s" % (module_name, i+1, description)
+ params['description'] = name
+ params['module_name'] = module_name
+
+ # Add extra test(s) to the test suite before the current test
+ if not extra_tests_added:
+ tests += [
+ ByteArrayTest(module, params),
+ ]
+
+ tests.append(MemoryviewTest(module, params))
+ extra_tests_added = True
+
+ # Add the test to the test suite
+ tests.append(CipherSelfTest(module, params))
+ tests.append(CipherStreamingSelfTest(module, params))
+ return tests
+
+# vim:set ts=4 sw=4 sts=4 expandtab: