Files
sunhpc/lib/sunhpc/core/security.py
2023-12-19 15:50:36 +08:00

201 lines
6.6 KiB
Python

#coding:utf-8
import os
import re
import stat
import sunhpc
import base64
import xml.dom.minidom
from Crypto import Random
from Crypto.Hash import SHA
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Signature import PKCS1_v1_5 as Sig_pk
from sunhpc.core.utils import SafeError
class Security(object):
def __init__(self, keybit=2048):
self.master_key = '/etc/safe-security/master.key'
self.master_pub = '/etc/safe-security/master.pub'
self.shared_key = '/etc/safe-security/shared.key'
self.shared_pub = '/etc/safe-security/shared.pub'
self.rsa_keybit = keybit
if keybit not in [ 1024, 2048, 4096 ]:
raise SafeError('Supply the rsa bit too big or non-standard.')
self.mkey = None
self.mpub = None
self.skey = None
self.spub = None
self.master = None
self.conn = None
self.masters = []
# A regex for our header search.
pattern = r"\n*(?P<comment>.*?)\$110id\$"
self.header_pattern = re.compile(pattern)
pattern = r"<a href=.+>(?P<filename>.+)</a> +(?P<date>\d+.*) +(?P<size>\d+.*)"
# Make the pattern matching engine case-insensitive.
self.dir_pattern = re.compile(pattern, re.I)
def addHeader(self, msg):
result = '-----BEGIN SECURITY MESSAGE-----\n'
result += msg
result += '-----END SECURITY MESSAGE-----\n'
return result
def makeRsaKeyPair(self):
"""生成新的Master 和 Shared RSA密钥对"""
dict_rsa = {}
Master_rsa = RSA.generate(self.rsa_keybit, Random.new().read)
dict_rsa['master.key'] = Master_rsa.exportKey()
dict_rsa['master.pub'] = Master_rsa.publickey().exportKey()
Shared_rsa = RSA.generate(self.rsa_keybit, Random.new().read)
dict_rsa['shared.key'] = Shared_rsa.exportKey()
dict_rsa['shared.pub'] = Shared_rsa.publickey().exportKey()
return dict_rsa
def readEncKeyPair(self):
"""使用Master私钥签名,Shared公钥加密,Frontend节点使用"""
if not os.path.exists(self.master_key):
raise SafeError("Master key is not exists.")
if not os.path.exists(self.shared_pub):
raise SafeError("Shared pub is not exists.")
# 使用master 私钥对RSA进行签名.
with open(self.master_key, 'r') as f: self.mkey = f.read()
# 使用shared 公钥对内容加密.
with open(self.shared_pub, 'r') as f: self.spub = f.read()
def readDecKeyPair(self):
"""使用Master私钥签名,Shared公钥加密,Frontend节点使用"""
if not os.path.exists(self.shared_key):
raise SafeError("Shared key is not exists.")
if not os.path.exists(self.master_pub):
raise SafeError("Master pub is not exists.")
# 使用Shared 私钥对内容解密.
with open(self.shared_key, 'r') as f: self.skey = f.read()
# 使用Master 公钥对RSA 验签.
with open(self.master_pub, 'r') as f: self.mpub = f.read()
def encrypt(self, msg, type110 = 1):
"""使用Master私钥签名,Shared公钥加密,Frontend节点使用"""
if not self.spub: self.readEncKeyPair()
# 转换成str格式.
if isinstance(msg, bytes):
msg = msg.decode()
# 使用Shared 公钥加密.
shared_pub = RSA.importKey(self.spub)
pk = PKCS1_v1_5.new(shared_pub)
encrypt_text = []
for i in range(0, len(msg), 100):
cont = msg[i:i+100]
encrypt_text.append(pk.encrypt(cont.encode()))
cipher_text = b''.join(encrypt_text)
basefmt = base64.b64encode(cipher_text).decode()
result = ''
if type110:
result += self.sign(basefmt)
# 友好方式显示密文.
for i in range(0, len(basefmt), 100):
result += basefmt[i:i+100] + '\n'
result = self.addHeader(result)
return result
def sign(self, msg):
"""使用master 私钥进行签名"""
if not self.mkey: self.readEncKeyPair()
master_key = RSA.importKey(self.mkey)
# 解码 base64格式
msg = base64.b64decode(msg)
# 将内容进行Hash
data = SHA.new(msg)
# 读取master 私钥
sig_pk = Sig_pk.new(master_key)
# 使用master 私钥进行签名
sign = sig_pk.sign(data)
# 转换成base64位格式
result = base64.b64encode(sign)
data = result.decode()
new_fmt = ''
# 友好格式输出.
for i in range(0, len(data), 100):
new_fmt += data[i:i+100] + '\n'
data = new_fmt + '\n'
return data
def verify(self, msg, key):
"""使用master 公钥进行验签"""
if not self.mpub: self.readDecKeyPair()
master_public = RSA.importKey(self.mpub)
# 将密文进行Hash读取.
sha_text = SHA.new(msg)
# 读取master 公钥进行验签.
signer = Sig_pk.new(master_public)
# 使用签名密文进行验签. 通过则返回真,否则假.
result = signer.verify(sha_text, key)
return result
def decrypt(self, msg, type110=1):
"""使用Master公钥验签,Shared私钥解密,Compute节点使用"""
if not self.skey: self.readDecKeyPair()
# 读取Shared 公钥进行解密.
shared_private = RSA.importKey(self.skey)
# 去头部信息.
msg_text = self.removeHeader(msg)
if type110:
# 将签名和内容密钥进行分割.
msg_text = msg_text.split('\n\n')
# 签名密文
sig_cip = base64.b64decode(''.join(msg_text[0].split('\n')))
# 内容密文
rsa_cip = base64.b64decode(''.join(msg_text[1].split('\n')))
if not self.verify(rsa_cip, sig_cip):
raise SafeError("Signature does not verify.")
else:
# 内容密文
rsa_cip = base64.b64decode(''.join(msg_text.split('\n')))
cipher = PKCS1_v1_5.new(shared_private)
decrypt_text = []
# RSA密码生成的位置不同,这里的256需要改变.
# RSA 1024Bit->128, 2048Bit->256, 4096->512
step = int(self.rsa_keybit / 8)
for i in range(0, len(rsa_cip), step):
cont = rsa_cip[i:i+step]
decrypt_text.append(cipher.decrypt(cont,1))
decrypt_text = b''.join(decrypt_text)
return decrypt_text.decode()
if __name__ == "__main__":
s = 'Aaab' * 20
a = Security()
# 加密 默认是进行签名的.
e = a.encrypt(s)
# 解密
d = a.decrypt(c)