1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
#!/opt/sunpy3/bin/python3
#coding:utf-8
import os, sys, time, queue
import pickle, base64, shutil, tempfile
import logging, random, argparse, configparser
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
s_queue = queue.Queue()
r_queue = queue.Queue()
class Application(object):
def __init__(self):
self.conf = '/etc/sunhpc-plugin.conf'
self.addr = None
self.port = None
self.keys = None
self.conn = None
self.send = None
self.recv = None
self.modules = {}
self.logfile = '/opt/sunhpc/logs/syncdaemon.log'
def config(self):
config = configparser.ConfigParser()
if os.path.exists(self.conf):
try:
config.read(self.conf)
self.addr = config['plugins']['address']
self.port = int(config['plugins']['port'])
self.keys = config['plugins']['keys'].encode('utf-8')
except KeyError as e:
self.exelog('Read "/etc/sunhpc-plugin.conf error - %s' % repr(e))
def parseArgs(self):
parser = argparse.ArgumentParser()
parser.add_argument('--addr', metavar='addr', nargs='?', help='IP address')
parser.add_argument('--port', metavar='port', nargs='?', help='IP port')
parser.add_argument('--keys', metavar='keys', nargs='?', help='Secret key')
parser.add_argument('--conf', metavar='conf', nargs='?', help='Config file')
args = parser.parse_args()
if args.addr:
self.addr = args.addr
if args.port:
self.port = args.port
if args.keys:
self.keys = args.keys.encode('utf-8')
if args.conf:
self.conf = args.conf
def register(self):
QueueManager.register('send_queue', callable=lambda: s_queue)
QueueManager.register('recv_queue', callable=lambda: r_queue)
manager = QueueManager(address=(self.addr, self.port), authkey=self.keys)
return manager
def connect(self):
self.conn = self.register()
self.conn.start()
self.send = self.conn.send_queue()
self.recv = self.conn.recv_queue()
def load_plugins(self):
plugin_path = '/opt/sunhpc/var/plugins/syncdata'
sys.path.append(plugin_path)
tmpdirs = tempfile.mkdtemp()
self.modules['temp'] = tmpdirs
self.modules['path'] = plugin_path
self.modules['plugins'] = []
for plugin_file in os.listdir(plugin_path):
plugin_dict = {}
if not plugin_file.endswith('.py'):
continue
if plugin_file in ['plugins.py']:
fn = os.path.join(plugin_path, plugin_file)
with open(fn, 'rb') as f:
self.modules['init'] = base64.b64encode(f.read())
continue
p = plugin_file.split('.py')[0]
plugin = __import__(p).plugin()
plugin_dict['file'] = plugin_file
plugin_dict['modname'] = p
# 获取src开头函数名称作为模块字典中 key.
# src函数在服务器端执行后放入字典中 value.
for fname in plugin.get_srcfuncname(plugin):
plugin_n = getattr(plugin, fname)
plugin_dict[fname] = plugin_n()
# 只获取set开头函数并且放入模块字典中.
# set函数在客户端中执行.
for setname in plugin.get_setfuncname(plugin):
plugin_dict[setname] = None
filename = os.path.join(plugin_path, plugin_file)
with open(filename, 'rb') as f:
content = base64.b64encode(f.read())
plugin_dict['source'] = content
self.modules['plugins'].append(plugin_dict)
shutil.rmtree(tmpdirs)
def running(self):
print ('addr -- %s' % self.addr)
print ('port -- %s' % self.port)
self.outputlog('start the syncdaemon...')
self.connect()
if not self.modules:
self.load_plugins()
data = pickle.dumps(self.modules)
running = 1
while running:
time.sleep(1)
try:
self.send.put(data)
result = self.recv.get(timeout=3)
if result == 'exit':
running = 0
self.outputlog(result)
except queue.Empty:
pass
self.conn.shutdown()
self.outputlog('end the syncdaemon...')
def outputlog(self, s):
log_format = "%(asctime)s %(name)s %(levelname)s %(message)s"
dateformat = '%Y-%m-%d %H:%M:%S %a'
logging.basicConfig(
level = logging.DEBUG,
format = log_format,
datefmt = dateformat,
filename = self.logfile
)
if s:
logging.info(s)
if __name__ == "__main__":
#
# 使用createKeys函数创建一个16位秘钥.
# 如果想创建一个新的秘钥,那么删除
# /opt/sunhpc/data/.daemon_keys 文件.
# 将会重新创建一个新的秘钥.
#
# syncDaemon --addr 127.0.0.1 --port 5000
#
app = Application()
app.config()
app.parseArgs()
app.running()
|