diff options
author | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
---|---|---|
committer | xiubuzhe <xiubuzhe@sina.com> | 2023-10-08 20:59:00 +0800 |
commit | 1dac2263372df2b85db5d029a45721fa158a5c9d (patch) | |
tree | 0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/pexpect/expect.py | |
parent | b494be364bb39e1de128ada7dc576a729d99907e (diff) | |
download | sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2 sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip |
first add files
Diffstat (limited to 'lib/pexpect/expect.py')
-rw-r--r-- | lib/pexpect/expect.py | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/lib/pexpect/expect.py b/lib/pexpect/expect.py new file mode 100644 index 0000000..d3409db --- /dev/null +++ b/lib/pexpect/expect.py @@ -0,0 +1,371 @@ +import time + +from .exceptions import EOF, TIMEOUT + +class Expecter(object): + def __init__(self, spawn, searcher, searchwindowsize=-1): + self.spawn = spawn + self.searcher = searcher + # A value of -1 means to use the figure from spawn, which should + # be None or a positive number. + if searchwindowsize == -1: + searchwindowsize = spawn.searchwindowsize + self.searchwindowsize = searchwindowsize + self.lookback = None + if hasattr(searcher, 'longest_string'): + self.lookback = searcher.longest_string + + def do_search(self, window, freshlen): + spawn = self.spawn + searcher = self.searcher + if freshlen > len(window): + freshlen = len(window) + index = searcher.search(window, freshlen, self.searchwindowsize) + if index >= 0: + spawn._buffer = spawn.buffer_type() + spawn._buffer.write(window[searcher.end:]) + spawn.before = spawn._before.getvalue()[ + 0:-(len(window) - searcher.start)] + spawn._before = spawn.buffer_type() + spawn._before.write(window[searcher.end:]) + spawn.after = window[searcher.start:searcher.end] + spawn.match = searcher.match + spawn.match_index = index + # Found a match + return index + elif self.searchwindowsize or self.lookback: + maintain = self.searchwindowsize or self.lookback + if spawn._buffer.tell() > maintain: + spawn._buffer = spawn.buffer_type() + spawn._buffer.write(window[-maintain:]) + + def existing_data(self): + # First call from a new call to expect_loop or expect_async. + # self.searchwindowsize may have changed. + # Treat all data as fresh. + spawn = self.spawn + before_len = spawn._before.tell() + buf_len = spawn._buffer.tell() + freshlen = before_len + if before_len > buf_len: + if not self.searchwindowsize: + spawn._buffer = spawn.buffer_type() + window = spawn._before.getvalue() + spawn._buffer.write(window) + elif buf_len < self.searchwindowsize: + spawn._buffer = spawn.buffer_type() + spawn._before.seek( + max(0, before_len - self.searchwindowsize)) + window = spawn._before.read() + spawn._buffer.write(window) + else: + spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) + window = spawn._buffer.read() + else: + if self.searchwindowsize: + spawn._buffer.seek(max(0, buf_len - self.searchwindowsize)) + window = spawn._buffer.read() + else: + window = spawn._buffer.getvalue() + return self.do_search(window, freshlen) + + def new_data(self, data): + # A subsequent call, after a call to existing_data. + spawn = self.spawn + freshlen = len(data) + spawn._before.write(data) + if not self.searchwindowsize: + if self.lookback: + # search lookback + new data. + old_len = spawn._buffer.tell() + spawn._buffer.write(data) + spawn._buffer.seek(max(0, old_len - self.lookback)) + window = spawn._buffer.read() + else: + # copy the whole buffer (really slow for large datasets). + spawn._buffer.write(data) + window = spawn.buffer + else: + if len(data) >= self.searchwindowsize or not spawn._buffer.tell(): + window = data[-self.searchwindowsize:] + spawn._buffer = spawn.buffer_type() + spawn._buffer.write(window[-self.searchwindowsize:]) + else: + spawn._buffer.write(data) + new_len = spawn._buffer.tell() + spawn._buffer.seek(max(0, new_len - self.searchwindowsize)) + window = spawn._buffer.read() + return self.do_search(window, freshlen) + + def eof(self, err=None): + spawn = self.spawn + + spawn.before = spawn._before.getvalue() + spawn._buffer = spawn.buffer_type() + spawn._before = spawn.buffer_type() + spawn.after = EOF + index = self.searcher.eof_index + if index >= 0: + spawn.match = EOF + spawn.match_index = index + return index + else: + spawn.match = None + spawn.match_index = None + msg = str(spawn) + msg += '\nsearcher: %s' % self.searcher + if err is not None: + msg = str(err) + '\n' + msg + + exc = EOF(msg) + exc.__cause__ = None # in Python 3.x we can use "raise exc from None" + raise exc + + def timeout(self, err=None): + spawn = self.spawn + + spawn.before = spawn._before.getvalue() + spawn.after = TIMEOUT + index = self.searcher.timeout_index + if index >= 0: + spawn.match = TIMEOUT + spawn.match_index = index + return index + else: + spawn.match = None + spawn.match_index = None + msg = str(spawn) + msg += '\nsearcher: %s' % self.searcher + if err is not None: + msg = str(err) + '\n' + msg + + exc = TIMEOUT(msg) + exc.__cause__ = None # in Python 3.x we can use "raise exc from None" + raise exc + + def errored(self): + spawn = self.spawn + spawn.before = spawn._before.getvalue() + spawn.after = None + spawn.match = None + spawn.match_index = None + + def expect_loop(self, timeout=-1): + """Blocking expect""" + spawn = self.spawn + + if timeout is not None: + end_time = time.time() + timeout + + try: + idx = self.existing_data() + if idx is not None: + return idx + while True: + # No match at this point + if (timeout is not None) and (timeout < 0): + return self.timeout() + # Still have time left, so read more data + incoming = spawn.read_nonblocking(spawn.maxread, timeout) + if self.spawn.delayafterread is not None: + time.sleep(self.spawn.delayafterread) + idx = self.new_data(incoming) + # Keep reading until exception or return. + if idx is not None: + return idx + if timeout is not None: + timeout = end_time - time.time() + except EOF as e: + return self.eof(e) + except TIMEOUT as e: + return self.timeout(e) + except: + self.errored() + raise + + +class searcher_string(object): + '''This is a plain string search helper for the spawn.expect_any() method. + This helper class is for speed. For more powerful regex patterns + see the helper class, searcher_re. + + Attributes: + + eof_index - index of EOF, or -1 + timeout_index - index of TIMEOUT, or -1 + + After a successful match by the search() method the following attributes + are available: + + start - index into the buffer, first byte of match + end - index into the buffer, first byte after match + match - the matching string itself + + ''' + + def __init__(self, strings): + '''This creates an instance of searcher_string. This argument 'strings' + may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' + + self.eof_index = -1 + self.timeout_index = -1 + self._strings = [] + self.longest_string = 0 + for n, s in enumerate(strings): + if s is EOF: + self.eof_index = n + continue + if s is TIMEOUT: + self.timeout_index = n + continue + self._strings.append((n, s)) + if len(s) > self.longest_string: + self.longest_string = len(s) + + def __str__(self): + '''This returns a human-readable string that represents the state of + the object.''' + + ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings] + ss.append((-1, 'searcher_string:')) + if self.eof_index >= 0: + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) + if self.timeout_index >= 0: + ss.append((self.timeout_index, + ' %d: TIMEOUT' % self.timeout_index)) + ss.sort() + ss = list(zip(*ss))[1] + return '\n'.join(ss) + + def search(self, buffer, freshlen, searchwindowsize=None): + '''This searches 'buffer' for the first occurrence of one of the search + strings. 'freshlen' must indicate the number of bytes at the end of + 'buffer' which have not been searched before. It helps to avoid + searching the same, possibly big, buffer over and over again. + + See class spawn for the 'searchwindowsize' argument. + + If there is a match this returns the index of that string, and sets + 'start', 'end' and 'match'. Otherwise, this returns -1. ''' + + first_match = None + + # 'freshlen' helps a lot here. Further optimizations could + # possibly include: + # + # using something like the Boyer-Moore Fast String Searching + # Algorithm; pre-compiling the search through a list of + # strings into something that can scan the input once to + # search for all N strings; realize that if we search for + # ['bar', 'baz'] and the input is '...foo' we need not bother + # rescanning until we've read three more bytes. + # + # Sadly, I don't know enough about this interesting topic. /grahn + + for index, s in self._strings: + if searchwindowsize is None: + # the match, if any, can only be in the fresh data, + # or at the very end of the old data + offset = -(freshlen + len(s)) + else: + # better obey searchwindowsize + offset = -searchwindowsize + n = buffer.find(s, offset) + if n >= 0 and (first_match is None or n < first_match): + first_match = n + best_index, best_match = index, s + if first_match is None: + return -1 + self.match = best_match + self.start = first_match + self.end = self.start + len(self.match) + return best_index + + +class searcher_re(object): + '''This is regular expression string search helper for the + spawn.expect_any() method. This helper class is for powerful + pattern matching. For speed, see the helper class, searcher_string. + + Attributes: + + eof_index - index of EOF, or -1 + timeout_index - index of TIMEOUT, or -1 + + After a successful match by the search() method the following attributes + are available: + + start - index into the buffer, first byte of match + end - index into the buffer, first byte after match + match - the re.match object returned by a successful re.search + + ''' + + def __init__(self, patterns): + '''This creates an instance that searches for 'patterns' Where + 'patterns' may be a list or other sequence of compiled regular + expressions, or the EOF or TIMEOUT types.''' + + self.eof_index = -1 + self.timeout_index = -1 + self._searches = [] + for n, s in enumerate(patterns): + if s is EOF: + self.eof_index = n + continue + if s is TIMEOUT: + self.timeout_index = n + continue + self._searches.append((n, s)) + + def __str__(self): + '''This returns a human-readable string that represents the state of + the object.''' + + #ss = [(n, ' %d: re.compile("%s")' % + # (n, repr(s.pattern))) for n, s in self._searches] + ss = list() + for n, s in self._searches: + ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern))) + ss.append((-1, 'searcher_re:')) + if self.eof_index >= 0: + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) + if self.timeout_index >= 0: + ss.append((self.timeout_index, ' %d: TIMEOUT' % + self.timeout_index)) + ss.sort() + ss = list(zip(*ss))[1] + return '\n'.join(ss) + + def search(self, buffer, freshlen, searchwindowsize=None): + '''This searches 'buffer' for the first occurrence of one of the regular + expressions. 'freshlen' must indicate the number of bytes at the end of + 'buffer' which have not been searched before. + + See class spawn for the 'searchwindowsize' argument. + + If there is a match this returns the index of that string, and sets + 'start', 'end' and 'match'. Otherwise, returns -1.''' + + first_match = None + # 'freshlen' doesn't help here -- we cannot predict the + # length of a match, and the re module provides no help. + if searchwindowsize is None: + searchstart = 0 + else: + searchstart = max(0, len(buffer) - searchwindowsize) + for index, s in self._searches: + match = s.search(buffer, searchstart) + if match is None: + continue + n = match.start() + if first_match is None or n < first_match: + first_match = n + the_match = match + best_index = index + if first_match is None: + return -1 + self.start = first_match + self.match = the_match + self.end = self.match.end() + return best_index |