summaryrefslogtreecommitdiffstats
path: root/bin/syncDaemon
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /bin/syncDaemon
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'bin/syncDaemon')
-rwxr-xr-xbin/syncDaemon167
1 files changed, 167 insertions, 0 deletions
diff --git a/bin/syncDaemon b/bin/syncDaemon
new file mode 100755
index 0000000..30aa123
--- /dev/null
+++ b/bin/syncDaemon
@@ -0,0 +1,167 @@
+#!/opt/sunpy3/bin/python3
+#coding:utf-8
+import os, sys, time, queue
+import pickle, base64, shutil, tempfile
+import logging, random, argparse, configparser
+from multiprocessing.managers import BaseManager
+class QueueManager(BaseManager):
+ pass
+s_queue = queue.Queue()
+r_queue = queue.Queue()
+
+class Application(object):
+
+ def __init__(self):
+
+ self.conf = '/etc/sunhpc-plugin.conf'
+ self.addr = None
+ self.port = None
+ self.keys = None
+ self.conn = None
+ self.send = None
+ self.recv = None
+
+ self.modules = {}
+ self.logfile = '/opt/sunhpc/logs/syncdaemon.log'
+
+ def config(self):
+ config = configparser.ConfigParser()
+ if os.path.exists(self.conf):
+ try:
+ config.read(self.conf)
+ self.addr = config['plugins']['address']
+ self.port = int(config['plugins']['port'])
+ self.keys = config['plugins']['keys'].encode('utf-8')
+ except KeyError as e:
+ self.exelog('Read "/etc/sunhpc-plugin.conf error - %s' % repr(e))
+
+ def parseArgs(self):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--addr', metavar='addr', nargs='?', help='IP address')
+ parser.add_argument('--port', metavar='port', nargs='?', help='IP port')
+ parser.add_argument('--keys', metavar='keys', nargs='?', help='Secret key')
+ parser.add_argument('--conf', metavar='conf', nargs='?', help='Config file')
+
+ args = parser.parse_args()
+ if args.addr:
+ self.addr = args.addr
+ if args.port:
+ self.port = args.port
+ if args.keys:
+ self.keys = args.keys.encode('utf-8')
+ if args.conf:
+ self.conf = args.conf
+
+ def register(self):
+ QueueManager.register('send_queue', callable=lambda: s_queue)
+ QueueManager.register('recv_queue', callable=lambda: r_queue)
+ manager = QueueManager(address=(self.addr, self.port), authkey=self.keys)
+ return manager
+
+ def connect(self):
+ self.conn = self.register()
+ self.conn.start()
+ self.send = self.conn.send_queue()
+ self.recv = self.conn.recv_queue()
+
+ def load_plugins(self):
+ plugin_path = '/opt/sunhpc/var/plugins/syncdata'
+ sys.path.append(plugin_path)
+ tmpdirs = tempfile.mkdtemp()
+ self.modules['temp'] = tmpdirs
+ self.modules['path'] = plugin_path
+ self.modules['plugins'] = []
+
+ for plugin_file in os.listdir(plugin_path):
+ plugin_dict = {}
+ if not plugin_file.endswith('.py'):
+ continue
+
+ if plugin_file in ['plugins.py']:
+ fn = os.path.join(plugin_path, plugin_file)
+ with open(fn, 'rb') as f:
+ self.modules['init'] = base64.b64encode(f.read())
+ continue
+
+ p = plugin_file.split('.py')[0]
+ plugin = __import__(p).plugin()
+ plugin_dict['file'] = plugin_file
+ plugin_dict['modname'] = p
+ # 获取src开头函数名称作为模块字典中 key.
+ # src函数在服务器端执行后放入字典中 value.
+ for fname in plugin.get_srcfuncname(plugin):
+ plugin_n = getattr(plugin, fname)
+ plugin_dict[fname] = plugin_n()
+
+ # 只获取set开头函数并且放入模块字典中.
+ # set函数在客户端中执行.
+ for setname in plugin.get_setfuncname(plugin):
+ plugin_dict[setname] = None
+
+ filename = os.path.join(plugin_path, plugin_file)
+ with open(filename, 'rb') as f:
+ content = base64.b64encode(f.read())
+ plugin_dict['source'] = content
+
+ self.modules['plugins'].append(plugin_dict)
+
+ shutil.rmtree(tmpdirs)
+
+ def running(self):
+
+ print ('addr -- %s' % self.addr)
+ print ('port -- %s' % self.port)
+ self.outputlog('start the syncdaemon...')
+ self.connect()
+ if not self.modules:
+ self.load_plugins()
+
+ data = pickle.dumps(self.modules)
+ running = 1
+ while running:
+ time.sleep(1)
+ try:
+ self.send.put(data)
+ result = self.recv.get(timeout=3)
+ if result == 'exit':
+ running = 0
+
+ self.outputlog(result)
+ except queue.Empty:
+ pass
+
+ self.conn.shutdown()
+ self.outputlog('end the syncdaemon...')
+
+ def outputlog(self, s):
+ log_format = "%(asctime)s %(name)s %(levelname)s %(message)s"
+ dateformat = '%Y-%m-%d %H:%M:%S %a'
+ logging.basicConfig(
+ level = logging.DEBUG,
+ format = log_format,
+ datefmt = dateformat,
+ filename = self.logfile
+ )
+ if s:
+ logging.info(s)
+
+if __name__ == "__main__":
+
+ #
+ # 使用createKeys函数创建一个16位秘钥.
+ # 如果想创建一个新的秘钥,那么删除
+ # /opt/sunhpc/data/.daemon_keys 文件.
+ # 将会重新创建一个新的秘钥.
+ #
+ # syncDaemon --addr 127.0.0.1 --port 5000
+ #
+ app = Application()
+ app.config()
+ app.parseArgs()
+ app.running()
+
+
+
+
+
+