diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/sunhpc/commands/__init__.py | 12 | ||||
-rw-r--r-- | lib/sunhpc/commands/build/init/__init__.py | 84 | ||||
-rw-r--r-- | lib/sunhpc/core/interpreter.py | 371 | ||||
-rw-r--r-- | lib/sunhpc/core/utils.py | 8 |
4 files changed, 468 insertions, 7 deletions
diff --git a/lib/sunhpc/commands/__init__.py b/lib/sunhpc/commands/__init__.py index 75b7692..a8c6c5b 100644 --- a/lib/sunhpc/commands/__init__.py +++ b/lib/sunhpc/commands/__init__.py @@ -658,7 +658,7 @@ class Command(object): ender = '\033[95m%s\033[0m' % end msger = '\033[31m%s\033[0m' % msg result = '%s %s %s' % (header, msger, ender) - return result + print (result) def abort(self, msg): @@ -977,17 +977,17 @@ class Command(object): def getText(self): return self.text - def dictOutput(self, strDict, sep='\t'): + def dictOutput(self, strDict, sep=' '): maxlen = max(map(len, strDict.keys())) for k in strDict: strings = str(strDict[k]) if strings.strip().lower() in ['ok', 'on', 'yes']: - value = '|\033[1;32m %s \033[0m' % strings.upper() + value = '\033[1;32m %s \033[0m' % strings.upper() elif strings.strip().lower() in ['fail', 'off', 'no', 'error', 'failed']: - value = '|\033[1;31m %s \033[0m' % strings.upper() + value = '\033[1;31m %s \033[0m' % strings.upper() else: - value = '| %s' % strings - print ('\t\033[1;95m%s\033[0m %s \033[1;96m%s\033[0m' % (k.ljust(maxlen), sep, value)) + value = '%s' % strings + print ('\t\033[1;95m%s\033[0m %s = \033[1;96m%s\033[0m' % (k.ljust(maxlen), sep, value)) def beginFmtOutput(self, header=[]): self.fmtOutput = prettytable.PrettyTable(header) diff --git a/lib/sunhpc/commands/build/init/__init__.py b/lib/sunhpc/commands/build/init/__init__.py new file mode 100644 index 0000000..5a43ff2 --- /dev/null +++ b/lib/sunhpc/commands/build/init/__init__.py @@ -0,0 +1,84 @@ +# +#coding:utf-8 +# +#Author : QCSun +#Email : qcsun@sunhpc.com +#Times : 2023-04-14 05:21:02 +#WebSite : https://www.sunhpc.com + +import os +import re +import sys +import sunhpc +import readline +from sunhpc.core.interpreter import SunhpcInterpreter + +class Struct(object): + pass + +class command(sunhpc.commands.build.command): + pass + +class Command(command, SunhpcInterpreter): +#class Command(command): + """ + Initialzed the sunhpc database. + """ + def run(self, params, args): + SunhpcInterpreter.__init__(self) + + self.results = {} + self.results['country'] = 'CN' + self.results['state'] = 'LiaoNing' + self.results['city'] = 'Dalian' + self.results['url'] = 'https://www.sunhpc.com' + + self.current_keys = list(self.results.keys()) + self.current_values = self.results + + self.start() + + def preceding(self): + print(' 下列是数据库的初始化配置的基础选项.') + print(' 可以使用命令 set key=value 进行修改:\n') + self.dictOutput(self.results) + print(' ---------------------------------------------\n') + + def _SunhpcInterpreter__show_all(self, root=''): + + self.dictOutput(self.current_values) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/sunhpc/core/interpreter.py b/lib/sunhpc/core/interpreter.py new file mode 100644 index 0000000..2983096 --- /dev/null +++ b/lib/sunhpc/core/interpreter.py @@ -0,0 +1,371 @@ +# +#coding:utf-8 +# +#Author : QCSun +#Email : qcsun@sunhpc.com +#Times : 2023-04-14 05:21:02 +#WebSite : https://www.sunhpc.com + +import os +import re +import sys +import queue +import atexit +import textwrap +import readline +import itertools + +from sunhpc.core.utils import SunhpcCliException + +GLOBAL_OPTS = {} + +def is_libedit(): + return "libedit" in readline.__doc__ + +class BaseInterpreter(object): + history_file = os.path.expanduser("~/.history") + history_length = 100 + global_help = "" + + def __init__(self): + self.setup() + self.banner = "" + + def preceding(self): + """占位函数.在命令模块中覆盖此函数.可以预处理一些事情""" + pass + + def setup(self): + """ Initialization of third-party libraries + + Setting interpreter history. + Setting appropriate completer function. + + :return: + """ + if not os.path.exists(self.history_file): + with open(self.history_file, "a+") as history: + if is_libedit(): + history.write("_HiStOrY_V2_\n\n") + + readline.read_history_file(self.history_file) + readline.set_history_length(self.history_length) + atexit.register(readline.write_history_file, self.history_file) + + readline.parse_and_bind("set enable-keypad on") + + readline.set_completer(self.complete) + readline.set_completer_delims(" \t\n;") + if is_libedit(): + readline.parse_and_bind("bind ^I rl_complete") + else: + readline.parse_and_bind("tab: complete") + + def parse_line(self, line): + """ Split line into command and argument. + + :param line: line to parse + :return: (command, argument, named_arguments) + """ + kwargs = dict() + command, _, arg = line.strip().partition(" ") + args = arg.strip().split() + for word in args: + if '=' in word: + (key, value) = word.split('=', 1) + kwargs[key.lower()] = value + arg = arg.replace(word, '') + return command, ' '.join(arg.split()), kwargs + + @property + def prompt(self): + """ Returns prompt string """ + return ">>>" + + def get_command_handler(self, command): + """ Parsing command and returning appropriate handler. + + :param command: command + :return: command_handler + """ + try: + command_handler = getattr(self, "command_{}".format(command)) + except AttributeError: + raise SunhpcCliException("Unknown command: '{}'".format(command)) + + return command_handler + + def start(self): + """ Routersploit main entry point. Starting interpreter loop. """ + + print(self.banner) + self.preceding() + while True: + try: + command, args, kwargs = self.parse_line(input(self.prompt)) + if not command: + continue + command_handler = self.get_command_handler(command) + command_handler(args, **kwargs) + except (EOFError, KeyboardInterrupt, SystemExit): + print("Sunhpc cli stopped") + break + + def complete(self, text, state): + """Return the next possible completion for 'text'. + + If a command has not been entered, then complete against command list. + Otherwise try to call complete_<command> to get list of completions. + """ + if state == 0: + original_line = readline.get_line_buffer() + line = original_line.lstrip() + stripped = len(original_line) - len(line) + start_index = readline.get_begidx() - stripped + end_index = readline.get_endidx() - stripped + + if start_index > 0: + cmd, args, _ = self.parse_line(line) + if cmd == "": + complete_function = self.default_completer + else: + try: + complete_function = getattr(self, "complete_" + cmd) + except AttributeError: + complete_function = self.default_completer + else: + complete_function = self.raw_command_completer + + self.completion_matches = complete_function(text, line, start_index, end_index) + + try: + return self.completion_matches[state] + except IndexError: + return None + + def commands(self, *ignored): + """ Returns full list of interpreter commands. + + :param ignored: + :return: full list of interpreter commands + """ + return [command.rsplit("_").pop() for command in dir(self) if command.startswith("command_")] + + def raw_command_completer(self, text, line, start_index, end_index): + """ Complete command w/o any argument """ + return [command for command in self.suggested_commands() if command.startswith(text)] + + def default_completer(self, *ignored): + return [] + + def suggested_commands(self): + """ Entry point for intelligent tab completion. + + Overwrite this method to suggest suitable commands. + + :return: list of suitable commands + """ + return self.commands() + +class SunhpcInterpreter(BaseInterpreter): + history_file = os.path.expanduser("~/.rsf_history") + global_help = """Global commands: + help Print this help menu + use <module> Select a module for usage + exec <shell command> <args> Execute a command in a shell + search <search term> Search for appropriate module + exit Exit RouterSploit""" + + module_help = """Module commands: + run Run the selected module with the given options + back De-select the current module + set <option name> <option value> Set an option for the selected module + setg <option name> <option value> Set an option for all of the modules + unsetg <option name> Unset option that was set globally + show [info|options|devices] Print information, options, or target devices for a module + check Check if a given target is vulnerable to a selected module's exploit""" + + def __init__(self, global_commands=[], sub_commands=[]): + super(SunhpcInterpreter, self).__init__() + + self.current_keys = None + self.current_values = None + + + self.current_module = None + self.prompt_hostname = "sunhpc" + self.raw_prompt_template = None + self.module_prompt_template = None + + self.base_global_commands = ['help', 'exit'] + self.global_commands = ['show ', 'set ', 'check '] + if global_commands: + self.global_commands.extend(global_commands) + self.global_commands.extend(self.base_global_commands) + sorted(self.global_commands) + + + self.base_sub_commands = ['all', 'info', 'options'] + self.sub_commands = ['wordlist', 'advanced'] + if sub_commands: + self.sub_commands.extend(sub_commands) + self.sub_commands.extend(self.base_sub_commands) + sorted(self.sub_commands) + + + + self.__parse_prompt() + self.banner = r""" + _____ _ + / ____| | | + ======= | (___ _ _ _ __ | |__ _ __ ___ + \ // \___ \| | | | '_ \| '_ \| '_ \ / __| + \ // ___ ) | |_| | | | | | | | |_) | (__ + \// |_____/ \__,_|_| |_|_| |_| .__/ \___| + | | + Powered by HengPU Technology |_| By kylins + + PlatName : HengPu High Performance Computing Platform + WeChat : xiubuzhe + Version : 1.0.0 + Email : xiubuzhe@sina.com + GitPage : https://git.sunhpc.com - @kylins + BBSPage : https://bbs.sunhpc.com - @kylins + DocPage : https://doc.sunhpc.com - @kylins + Homepage : https://www.sunhpc.com - @kylins + + Kelvin SunHPC Beta Program - https://www.sunhpc.com + """ + + def __parse_prompt(self): + raw_prompt_default_template = "\001\033[4m\002{host}\001\033[0m\002 > " + raw_prompt_template = os.getenv("RSF_RAW_PROMPT", raw_prompt_default_template).replace('\\033', '\033') + self.raw_prompt_template = raw_prompt_template if '{host}' in raw_prompt_template else raw_prompt_default_template + + module_prompt_default_template = "\001\033[4m\002{host}\001\033[0m\002 (\001\033[91m\002{module}\001\033[0m\002) > " + module_prompt_template = os.getenv("RSF_MODULE_PROMPT", module_prompt_default_template).replace('\\033', '\033') + self.module_prompt_template = module_prompt_template if all(map(lambda x: x in module_prompt_template, \ + ['{host}', "{module}"])) else module_prompt_default_template + + @property + def prompt(self): + """ Returns prompt string based on current_module attribute. + Adding module prefix (module.name) if current_module attribute is set. + + :return: prompt string with appropriate module prefix. + """ + if self.current_module: + try: + return self.module_prompt_template.format(host=self.prompt_hostname, module=self.module_metadata['name']) + except (AttributeError, KeyError): + return self.module_prompt_template.format(host=self.prompt_hostname, module="UnnamedModule") + else: + return self.raw_prompt_template.format(host=self.prompt_hostname) + + def suggested_commands(self): + """ Entry point for intelligent tab completion. + + Based on state of interpreter this method will return intelligent suggestions. + + :return: list of most accurate command suggestions + """ + if self.current_keys and self.current_values: + # 将unsetg和current_keys列表组合后重新生成一个列表.(迭代器) + return sorted(itertools.chain(self.global_commands, ('unset ',))) + + return self.global_commands + + + # + # show command functions + # + def command_show(self, *args, **kwargs): + sub_command = args[0] + try: + getattr(self, "_show_{}".format(sub_command))(*args, **kwargs) + except AttributeError: + # sunhpc debug + #raise + #self.msg("Unknown 'show' sub-command choices are:{}".format(self.sub_commands), 'w') + self.message('[-]', "Unknown 'show' sub-command choices are:", self.sub_commands) + + def complete_show(self, text, *args, **kwargs): + if text: + return [command for command in self.sub_commands if command.startswith(text)] + else: + return self.sub_commands + + def _show_all(self, *args, **kwargs): + self.__show_all() + + + # + # set command functions + # + def command_set(self, *args, **kwargs): + key, _, value = args[0].partition(" ") + + if key in self.current_keys: + if kwargs.get('glob', False): + self.current_values[key] = value + self.dictOutput({key:value}) + + elif kwargs: + for k, v in kwargs.items(): + self.current_values[k] = v + self.dictOutput(kwargs) + + else: + print("You can't set option '{}'.Available options: {}".format(key, self.current_keys)) + + + def complete_set(self, text, *args, **kwargs): + if text: + return ["=".join((keys, "")) for keys in self.current_keys if keys.startswith(text)] + else: + return self.current_keys + + def command_unset(self, *args, **kwargs): + key, _, value = args[0].partition(' ') + try: + del self.current_values[key] + except KeyError: + self.msg("You can't unset option '{}'.Available options: {}".format(key, list(self.current_values.keys())), 'w') + else: + self.dictOutput({key: value}) + + def complete_unset(self, text, *args, **kwargs): + if text: + return [''.join((attr, "")) for attr in self.current_values.keys() if attr.startswith(text)] + else: + return list(self.current_values.keys()) + + + + + + def command_exit(self, *args, **kwargs): + raise EOFError + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/sunhpc/core/utils.py b/lib/sunhpc/core/utils.py index 97ec7c3..f8d544c 100644 --- a/lib/sunhpc/core/utils.py +++ b/lib/sunhpc/core/utils.py @@ -29,6 +29,12 @@ class CommandError(SunhpcException): something goes awry""" pass +class SunhpcCliException(Exception): + def __init__(self, msg: str = ""): + super(SunhpcCliException, self).__init__(msg) +class OptionValidationError(SunhpcCliException): + pass + class KickstartError(SunhpcException): pass @@ -191,7 +197,7 @@ def startSpinner(cmd): w, r ,e = (p.stdin, p.stdout, p.stderr) currLength = 0 prevLength = 0 - spinChars = '-\|/' + spinChars = r'-\|/' spinIndex = 0 while 1: line = e.readline() |