summaryrefslogtreecommitdiffstats
path: root/lib/pexpect/expect.py
diff options
context:
space:
mode:
authorxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
committerxiubuzhe <xiubuzhe@sina.com>2023-10-08 20:59:00 +0800
commit1dac2263372df2b85db5d029a45721fa158a5c9d (patch)
tree0365f9c57df04178a726d7584ca6a6b955a7ce6a /lib/pexpect/expect.py
parentb494be364bb39e1de128ada7dc576a729d99907e (diff)
downloadsunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.gz
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.tar.bz2
sunhpc-1dac2263372df2b85db5d029a45721fa158a5c9d.zip
first add files
Diffstat (limited to 'lib/pexpect/expect.py')
-rw-r--r--lib/pexpect/expect.py371
1 files changed, 371 insertions, 0 deletions
diff --git a/lib/pexpect/expect.py b/lib/pexpect/expect.py
new file mode 100644
index 0000000..d3409db
--- /dev/null
+++ b/lib/pexpect/expect.py
@@ -0,0 +1,371 @@
+import time
+
+from .exceptions import EOF, TIMEOUT
+
+class Expecter(object):
+ def __init__(self, spawn, searcher, searchwindowsize=-1):
+ self.spawn = spawn
+ self.searcher = searcher
+ # A value of -1 means to use the figure from spawn, which should
+ # be None or a positive number.
+ if searchwindowsize == -1:
+ searchwindowsize = spawn.searchwindowsize
+ self.searchwindowsize = searchwindowsize
+ self.lookback = None
+ if hasattr(searcher, 'longest_string'):
+ self.lookback = searcher.longest_string
+
+ def do_search(self, window, freshlen):
+ spawn = self.spawn
+ searcher = self.searcher
+ if freshlen > len(window):
+ freshlen = len(window)
+ index = searcher.search(window, freshlen, self.searchwindowsize)
+ if index >= 0:
+ spawn._buffer = spawn.buffer_type()
+ spawn._buffer.write(window[searcher.end:])
+ spawn.before = spawn._before.getvalue()[
+ 0:-(len(window) - searcher.start)]
+ spawn._before = spawn.buffer_type()
+ spawn._before.write(window[searcher.end:])
+ spawn.after = window[searcher.start:searcher.end]
+ spawn.match = searcher.match
+ spawn.match_index = index
+ # Found a match
+ return index
+ elif self.searchwindowsize or self.lookback:
+ maintain = self.searchwindowsize or self.lookback
+ if spawn._buffer.tell() > maintain:
+ spawn._buffer = spawn.buffer_type()
+ spawn._buffer.write(window[-maintain:])
+
+ def existing_data(self):
+ # First call from a new call to expect_loop or expect_async.
+ # self.searchwindowsize may have changed.
+ # Treat all data as fresh.
+ spawn = self.spawn
+ before_len = spawn._before.tell()
+ buf_len = spawn._buffer.tell()
+ freshlen = before_len
+ if before_len > buf_len:
+ if not self.searchwindowsize:
+ spawn._buffer = spawn.buffer_type()
+ window = spawn._before.getvalue()
+ spawn._buffer.write(window)
+ elif buf_len < self.searchwindowsize:
+ spawn._buffer = spawn.buffer_type()
+ spawn._before.seek(
+ max(0, before_len - self.searchwindowsize))
+ window = spawn._before.read()
+ spawn._buffer.write(window)
+ else:
+ spawn._buffer.seek(max(0, buf_len - self.searchwindowsize))
+ window = spawn._buffer.read()
+ else:
+ if self.searchwindowsize:
+ spawn._buffer.seek(max(0, buf_len - self.searchwindowsize))
+ window = spawn._buffer.read()
+ else:
+ window = spawn._buffer.getvalue()
+ return self.do_search(window, freshlen)
+
+ def new_data(self, data):
+ # A subsequent call, after a call to existing_data.
+ spawn = self.spawn
+ freshlen = len(data)
+ spawn._before.write(data)
+ if not self.searchwindowsize:
+ if self.lookback:
+ # search lookback + new data.
+ old_len = spawn._buffer.tell()
+ spawn._buffer.write(data)
+ spawn._buffer.seek(max(0, old_len - self.lookback))
+ window = spawn._buffer.read()
+ else:
+ # copy the whole buffer (really slow for large datasets).
+ spawn._buffer.write(data)
+ window = spawn.buffer
+ else:
+ if len(data) >= self.searchwindowsize or not spawn._buffer.tell():
+ window = data[-self.searchwindowsize:]
+ spawn._buffer = spawn.buffer_type()
+ spawn._buffer.write(window[-self.searchwindowsize:])
+ else:
+ spawn._buffer.write(data)
+ new_len = spawn._buffer.tell()
+ spawn._buffer.seek(max(0, new_len - self.searchwindowsize))
+ window = spawn._buffer.read()
+ return self.do_search(window, freshlen)
+
+ def eof(self, err=None):
+ spawn = self.spawn
+
+ spawn.before = spawn._before.getvalue()
+ spawn._buffer = spawn.buffer_type()
+ spawn._before = spawn.buffer_type()
+ spawn.after = EOF
+ index = self.searcher.eof_index
+ if index >= 0:
+ spawn.match = EOF
+ spawn.match_index = index
+ return index
+ else:
+ spawn.match = None
+ spawn.match_index = None
+ msg = str(spawn)
+ msg += '\nsearcher: %s' % self.searcher
+ if err is not None:
+ msg = str(err) + '\n' + msg
+
+ exc = EOF(msg)
+ exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
+ raise exc
+
+ def timeout(self, err=None):
+ spawn = self.spawn
+
+ spawn.before = spawn._before.getvalue()
+ spawn.after = TIMEOUT
+ index = self.searcher.timeout_index
+ if index >= 0:
+ spawn.match = TIMEOUT
+ spawn.match_index = index
+ return index
+ else:
+ spawn.match = None
+ spawn.match_index = None
+ msg = str(spawn)
+ msg += '\nsearcher: %s' % self.searcher
+ if err is not None:
+ msg = str(err) + '\n' + msg
+
+ exc = TIMEOUT(msg)
+ exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
+ raise exc
+
+ def errored(self):
+ spawn = self.spawn
+ spawn.before = spawn._before.getvalue()
+ spawn.after = None
+ spawn.match = None
+ spawn.match_index = None
+
+ def expect_loop(self, timeout=-1):
+ """Blocking expect"""
+ spawn = self.spawn
+
+ if timeout is not None:
+ end_time = time.time() + timeout
+
+ try:
+ idx = self.existing_data()
+ if idx is not None:
+ return idx
+ while True:
+ # No match at this point
+ if (timeout is not None) and (timeout < 0):
+ return self.timeout()
+ # Still have time left, so read more data
+ incoming = spawn.read_nonblocking(spawn.maxread, timeout)
+ if self.spawn.delayafterread is not None:
+ time.sleep(self.spawn.delayafterread)
+ idx = self.new_data(incoming)
+ # Keep reading until exception or return.
+ if idx is not None:
+ return idx
+ if timeout is not None:
+ timeout = end_time - time.time()
+ except EOF as e:
+ return self.eof(e)
+ except TIMEOUT as e:
+ return self.timeout(e)
+ except:
+ self.errored()
+ raise
+
+
+class searcher_string(object):
+ '''This is a plain string search helper for the spawn.expect_any() method.
+ This helper class is for speed. For more powerful regex patterns
+ see the helper class, searcher_re.
+
+ Attributes:
+
+ eof_index - index of EOF, or -1
+ timeout_index - index of TIMEOUT, or -1
+
+ After a successful match by the search() method the following attributes
+ are available:
+
+ start - index into the buffer, first byte of match
+ end - index into the buffer, first byte after match
+ match - the matching string itself
+
+ '''
+
+ def __init__(self, strings):
+ '''This creates an instance of searcher_string. This argument 'strings'
+ may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
+
+ self.eof_index = -1
+ self.timeout_index = -1
+ self._strings = []
+ self.longest_string = 0
+ for n, s in enumerate(strings):
+ if s is EOF:
+ self.eof_index = n
+ continue
+ if s is TIMEOUT:
+ self.timeout_index = n
+ continue
+ self._strings.append((n, s))
+ if len(s) > self.longest_string:
+ self.longest_string = len(s)
+
+ def __str__(self):
+ '''This returns a human-readable string that represents the state of
+ the object.'''
+
+ ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings]
+ ss.append((-1, 'searcher_string:'))
+ if self.eof_index >= 0:
+ ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
+ if self.timeout_index >= 0:
+ ss.append((self.timeout_index,
+ ' %d: TIMEOUT' % self.timeout_index))
+ ss.sort()
+ ss = list(zip(*ss))[1]
+ return '\n'.join(ss)
+
+ def search(self, buffer, freshlen, searchwindowsize=None):
+ '''This searches 'buffer' for the first occurrence of one of the search
+ strings. 'freshlen' must indicate the number of bytes at the end of
+ 'buffer' which have not been searched before. It helps to avoid
+ searching the same, possibly big, buffer over and over again.
+
+ See class spawn for the 'searchwindowsize' argument.
+
+ If there is a match this returns the index of that string, and sets
+ 'start', 'end' and 'match'. Otherwise, this returns -1. '''
+
+ first_match = None
+
+ # 'freshlen' helps a lot here. Further optimizations could
+ # possibly include:
+ #
+ # using something like the Boyer-Moore Fast String Searching
+ # Algorithm; pre-compiling the search through a list of
+ # strings into something that can scan the input once to
+ # search for all N strings; realize that if we search for
+ # ['bar', 'baz'] and the input is '...foo' we need not bother
+ # rescanning until we've read three more bytes.
+ #
+ # Sadly, I don't know enough about this interesting topic. /grahn
+
+ for index, s in self._strings:
+ if searchwindowsize is None:
+ # the match, if any, can only be in the fresh data,
+ # or at the very end of the old data
+ offset = -(freshlen + len(s))
+ else:
+ # better obey searchwindowsize
+ offset = -searchwindowsize
+ n = buffer.find(s, offset)
+ if n >= 0 and (first_match is None or n < first_match):
+ first_match = n
+ best_index, best_match = index, s
+ if first_match is None:
+ return -1
+ self.match = best_match
+ self.start = first_match
+ self.end = self.start + len(self.match)
+ return best_index
+
+
+class searcher_re(object):
+ '''This is regular expression string search helper for the
+ spawn.expect_any() method. This helper class is for powerful
+ pattern matching. For speed, see the helper class, searcher_string.
+
+ Attributes:
+
+ eof_index - index of EOF, or -1
+ timeout_index - index of TIMEOUT, or -1
+
+ After a successful match by the search() method the following attributes
+ are available:
+
+ start - index into the buffer, first byte of match
+ end - index into the buffer, first byte after match
+ match - the re.match object returned by a successful re.search
+
+ '''
+
+ def __init__(self, patterns):
+ '''This creates an instance that searches for 'patterns' Where
+ 'patterns' may be a list or other sequence of compiled regular
+ expressions, or the EOF or TIMEOUT types.'''
+
+ self.eof_index = -1
+ self.timeout_index = -1
+ self._searches = []
+ for n, s in enumerate(patterns):
+ if s is EOF:
+ self.eof_index = n
+ continue
+ if s is TIMEOUT:
+ self.timeout_index = n
+ continue
+ self._searches.append((n, s))
+
+ def __str__(self):
+ '''This returns a human-readable string that represents the state of
+ the object.'''
+
+ #ss = [(n, ' %d: re.compile("%s")' %
+ # (n, repr(s.pattern))) for n, s in self._searches]
+ ss = list()
+ for n, s in self._searches:
+ ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
+ ss.append((-1, 'searcher_re:'))
+ if self.eof_index >= 0:
+ ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
+ if self.timeout_index >= 0:
+ ss.append((self.timeout_index, ' %d: TIMEOUT' %
+ self.timeout_index))
+ ss.sort()
+ ss = list(zip(*ss))[1]
+ return '\n'.join(ss)
+
+ def search(self, buffer, freshlen, searchwindowsize=None):
+ '''This searches 'buffer' for the first occurrence of one of the regular
+ expressions. 'freshlen' must indicate the number of bytes at the end of
+ 'buffer' which have not been searched before.
+
+ See class spawn for the 'searchwindowsize' argument.
+
+ If there is a match this returns the index of that string, and sets
+ 'start', 'end' and 'match'. Otherwise, returns -1.'''
+
+ first_match = None
+ # 'freshlen' doesn't help here -- we cannot predict the
+ # length of a match, and the re module provides no help.
+ if searchwindowsize is None:
+ searchstart = 0
+ else:
+ searchstart = max(0, len(buffer) - searchwindowsize)
+ for index, s in self._searches:
+ match = s.search(buffer, searchstart)
+ if match is None:
+ continue
+ n = match.start()
+ if first_match is None or n < first_match:
+ first_match = n
+ the_match = match
+ best_index = index
+ if first_match is None:
+ return -1
+ self.start = first_match
+ self.match = the_match
+ self.end = self.match.end()
+ return best_index