#!/opt/sunpy3/bin/python3 #coding:utf-8 import os, sys, time, queue import pickle, base64, shutil, tempfile import logging, random, argparse, configparser from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass s_queue = queue.Queue() r_queue = queue.Queue() class Application(object): def __init__(self): self.conf = '/etc/sunhpc-plugin.conf' self.addr = None self.port = None self.keys = None self.conn = None self.send = None self.recv = None self.modules = {} self.logfile = '/opt/sunhpc/logs/syncdaemon.log' def config(self): config = configparser.ConfigParser() if os.path.exists(self.conf): try: config.read(self.conf) self.addr = config['plugins']['address'] self.port = int(config['plugins']['port']) self.keys = config['plugins']['keys'].encode('utf-8') except KeyError as e: self.exelog('Read "/etc/sunhpc-plugin.conf error - %s' % repr(e)) def parseArgs(self): parser = argparse.ArgumentParser() parser.add_argument('--addr', metavar='addr', nargs='?', help='IP address') parser.add_argument('--port', metavar='port', nargs='?', help='IP port') parser.add_argument('--keys', metavar='keys', nargs='?', help='Secret key') parser.add_argument('--conf', metavar='conf', nargs='?', help='Config file') args = parser.parse_args() if args.addr: self.addr = args.addr if args.port: self.port = args.port if args.keys: self.keys = args.keys.encode('utf-8') if args.conf: self.conf = args.conf def register(self): QueueManager.register('send_queue', callable=lambda: s_queue) QueueManager.register('recv_queue', callable=lambda: r_queue) manager = QueueManager(address=(self.addr, self.port), authkey=self.keys) return manager def connect(self): self.conn = self.register() self.conn.start() self.send = self.conn.send_queue() self.recv = self.conn.recv_queue() def load_plugins(self): plugin_path = '/opt/sunhpc/var/plugins/syncdata' sys.path.append(plugin_path) tmpdirs = tempfile.mkdtemp() self.modules['temp'] = tmpdirs self.modules['path'] = plugin_path self.modules['plugins'] = [] for plugin_file in os.listdir(plugin_path): plugin_dict = {} if not plugin_file.endswith('.py'): continue if plugin_file in ['plugins.py']: fn = os.path.join(plugin_path, plugin_file) with open(fn, 'rb') as f: self.modules['init'] = base64.b64encode(f.read()) continue p = plugin_file.split('.py')[0] plugin = __import__(p).plugin() plugin_dict['file'] = plugin_file plugin_dict['modname'] = p # 获取src开头函数名称作为模块字典中 key. # src函数在服务器端执行后放入字典中 value. for fname in plugin.get_srcfuncname(plugin): plugin_n = getattr(plugin, fname) plugin_dict[fname] = plugin_n() # 只获取set开头函数并且放入模块字典中. # set函数在客户端中执行. for setname in plugin.get_setfuncname(plugin): plugin_dict[setname] = None filename = os.path.join(plugin_path, plugin_file) with open(filename, 'rb') as f: content = base64.b64encode(f.read()) plugin_dict['source'] = content self.modules['plugins'].append(plugin_dict) shutil.rmtree(tmpdirs) def running(self): print ('addr -- %s' % self.addr) print ('port -- %s' % self.port) self.outputlog('start the syncdaemon...') self.connect() if not self.modules: self.load_plugins() data = pickle.dumps(self.modules) running = 1 while running: time.sleep(1) try: self.send.put(data) result = self.recv.get(timeout=3) if result == 'exit': running = 0 self.outputlog(result) except queue.Empty: pass self.conn.shutdown() self.outputlog('end the syncdaemon...') def outputlog(self, s): log_format = "%(asctime)s %(name)s %(levelname)s %(message)s" dateformat = '%Y-%m-%d %H:%M:%S %a' logging.basicConfig( level = logging.DEBUG, format = log_format, datefmt = dateformat, filename = self.logfile ) if s: logging.info(s) if __name__ == "__main__": # # 使用createKeys函数创建一个16位秘钥. # 如果想创建一个新的秘钥,那么删除 # /opt/sunhpc/data/.daemon_keys 文件. # 将会重新创建一个新的秘钥. # # syncDaemon --addr 127.0.0.1 --port 5000 # app = Application() app.config() app.parseArgs() app.running()