diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /bin/syncDaemon | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'bin/syncDaemon')
-rwxr-xr-x | bin/syncDaemon | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/bin/syncDaemon b/bin/syncDaemon new file mode 100755 index 0000000..30aa123 --- /dev/null +++ b/bin/syncDaemon @@ -0,0 +1,167 @@ +#!/opt/sunpy3/bin/python3 +#coding:utf-8 +import os, sys, time, queue +import pickle, base64, shutil, tempfile +import logging, random, argparse, configparser +from multiprocessing.managers import BaseManager +class QueueManager(BaseManager): + pass +s_queue = queue.Queue() +r_queue = queue.Queue() + +class Application(object): + + def __init__(self): + + self.conf = '/etc/sunhpc-plugin.conf' + self.addr = None + self.port = None + self.keys = None + self.conn = None + self.send = None + self.recv = None + + self.modules = {} + self.logfile = '/opt/sunhpc/logs/syncdaemon.log' + + def config(self): + config = configparser.ConfigParser() + if os.path.exists(self.conf): + try: + config.read(self.conf) + self.addr = config['plugins']['address'] + self.port = int(config['plugins']['port']) + self.keys = config['plugins']['keys'].encode('utf-8') + except KeyError as e: + self.exelog('Read "/etc/sunhpc-plugin.conf error - %s' % repr(e)) + + def parseArgs(self): + parser = argparse.ArgumentParser() + parser.add_argument('--addr', metavar='addr', nargs='?', help='IP address') + parser.add_argument('--port', metavar='port', nargs='?', help='IP port') + parser.add_argument('--keys', metavar='keys', nargs='?', help='Secret key') + parser.add_argument('--conf', metavar='conf', nargs='?', help='Config file') + + args = parser.parse_args() + if args.addr: + self.addr = args.addr + if args.port: + self.port = args.port + if args.keys: + self.keys = args.keys.encode('utf-8') + if args.conf: + self.conf = args.conf + + def register(self): + QueueManager.register('send_queue', callable=lambda: s_queue) + QueueManager.register('recv_queue', callable=lambda: r_queue) + manager = QueueManager(address=(self.addr, self.port), authkey=self.keys) + return manager + + def connect(self): + self.conn = self.register() + self.conn.start() + self.send = self.conn.send_queue() + self.recv = self.conn.recv_queue() + + def load_plugins(self): + plugin_path = '/opt/sunhpc/var/plugins/syncdata' + sys.path.append(plugin_path) + tmpdirs = tempfile.mkdtemp() + self.modules['temp'] = tmpdirs + self.modules['path'] = plugin_path + self.modules['plugins'] = [] + + for plugin_file in os.listdir(plugin_path): + plugin_dict = {} + if not plugin_file.endswith('.py'): + continue + + if plugin_file in ['plugins.py']: + fn = os.path.join(plugin_path, plugin_file) + with open(fn, 'rb') as f: + self.modules['init'] = base64.b64encode(f.read()) + continue + + p = plugin_file.split('.py')[0] + plugin = __import__(p).plugin() + plugin_dict['file'] = plugin_file + plugin_dict['modname'] = p + # 获取src开头函数名称作为模块字典中 key. + # src函数在服务器端执行后放入字典中 value. + for fname in plugin.get_srcfuncname(plugin): + plugin_n = getattr(plugin, fname) + plugin_dict[fname] = plugin_n() + + # 只获取set开头函数并且放入模块字典中. + # set函数在客户端中执行. + for setname in plugin.get_setfuncname(plugin): + plugin_dict[setname] = None + + filename = os.path.join(plugin_path, plugin_file) + with open(filename, 'rb') as f: + content = base64.b64encode(f.read()) + plugin_dict['source'] = content + + self.modules['plugins'].append(plugin_dict) + + shutil.rmtree(tmpdirs) + + def running(self): + + print ('addr -- %s' % self.addr) + print ('port -- %s' % self.port) + self.outputlog('start the syncdaemon...') + self.connect() + if not self.modules: + self.load_plugins() + + data = pickle.dumps(self.modules) + running = 1 + while running: + time.sleep(1) + try: + self.send.put(data) + result = self.recv.get(timeout=3) + if result == 'exit': + running = 0 + + self.outputlog(result) + except queue.Empty: + pass + + self.conn.shutdown() + self.outputlog('end the syncdaemon...') + + def outputlog(self, s): + log_format = "%(asctime)s %(name)s %(levelname)s %(message)s" + dateformat = '%Y-%m-%d %H:%M:%S %a' + logging.basicConfig( + level = logging.DEBUG, + format = log_format, + datefmt = dateformat, + filename = self.logfile + ) + if s: + logging.info(s) + +if __name__ == "__main__": + + # + # 使用createKeys函数创建一个16位秘钥. + # 如果想创建一个新的秘钥,那么删除 + # /opt/sunhpc/data/.daemon_keys 文件. + # 将会重新创建一个新的秘钥. + # + # syncDaemon --addr 127.0.0.1 --port 5000 + # + app = Application() + app.config() + app.parseArgs() + app.running() + + + + + + |