0
0
mirror of https://github.com/python/cpython.git synced 2024-11-30 18:51:15 +01:00
cpython/Lib/_pyrepl/unix_console.py

787 lines
24 KiB
Python
Raw Normal View History

# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
# Antonio Cuni
# Armin Rigo
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose is hereby granted without fee,
# provided that the above copyright notice appear in all copies and
# that both that copyright notice and this permission notice appear in
# supporting documentation.
#
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import annotations
import errno
import os
import re
import select
import signal
import struct
import sys
import termios
import time
from fcntl import ioctl
from . import curses
from .console import Console, Event
from .fancy_termios import tcgetattr, tcsetattr
from .trace import trace
from .unix_eventqueue import EventQueue
from .utils import wlen
TYPE_CHECKING = False
# types
if TYPE_CHECKING:
from typing import IO, Literal, overload
else:
overload = lambda func: None
class InvalidTerminal(RuntimeError):
pass
_error = (termios.error, curses.error, InvalidTerminal)
SIGWINCH_EVENT = "repaint"
FIONREAD = getattr(termios, "FIONREAD", None)
TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None)
# ------------ start of baudrate definitions ------------
# Add (possibly) missing baudrates (check termios man page) to termios
def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None:
baudrate_name = "B%d" % rate
if hasattr(termios, baudrate_name):
dictionary[getattr(termios, baudrate_name)] = rate
# Check the termios man page (Line speed) to know where these
# values come from.
potential_baudrates = [
0,
110,
115200,
1200,
134,
150,
1800,
19200,
200,
230400,
2400,
300,
38400,
460800,
4800,
50,
57600,
600,
75,
9600,
]
ratedict: dict[int, int] = {}
for rate in potential_baudrates:
add_baudrate_if_supported(ratedict, rate)
# Clean up variables to avoid unintended usage
del rate, add_baudrate_if_supported
# ------------ end of baudrate definitions ------------
delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>")
try:
poll: type[select.poll] = select.poll
except AttributeError:
# this is exactly the minumum necessary to support what we
# do with poll objects
class MinimalPoll:
def __init__(self):
pass
def register(self, fd, flag):
self.fd = fd
# note: The 'timeout' argument is received as *milliseconds*
def poll(self, timeout: float | None = None) -> list[int]:
if timeout is None:
r, w, e = select.select([self.fd], [], [])
else:
r, w, e = select.select([self.fd], [], [], timeout/1000)
return r
poll = MinimalPoll # type: ignore[assignment]
class UnixConsole(Console):
def __init__(
self,
f_in: IO[bytes] | int = 0,
f_out: IO[bytes] | int = 1,
term: str = "",
encoding: str = "",
):
"""
Initialize the UnixConsole.
Parameters:
- f_in (int or file-like object): Input file descriptor or object.
- f_out (int or file-like object): Output file descriptor or object.
- term (str): Terminal name.
- encoding (str): Encoding to use for I/O operations.
"""
super().__init__(f_in, f_out, term, encoding)
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
gh-119517: Fixes for pasting in pyrepl (#120253) * Remove pyrepl's optimization for self-insert This will be replaced by a less specialized optimization. * Use line-buffering when pyrepl echoes pastes Previously echoing was totally suppressed until the entire command had been pasted and the terminal ended paste mode, but this gives the user no feedback to indicate that an operation is in progress. Drawing something to the screen once per line strikes a balance between perceived responsiveness and performance. * Remove dead code from pyrepl `msg_at_bottom` is always true. * Speed up pyrepl's screen rendering computation The Reader in pyrepl doesn't hold a complete representation of the screen area being drawn as persistent state. Instead, it recomputes it, on each keypress. This is fast enough for a few hundred bytes, but incredibly slow as the input buffer grows into the kilobytes (likely because of pasting). Rather than making some expensive and expansive changes to the repl's internal representation of the screen, add some caching: remember some data from one refresh to the next about what was drawn to the screen and, if we don't find anything that has invalidated the results that were computed last time around, reuse them. To keep this caching as simple as possible, all we'll do is look for lines in the buffer that were above the cursor the last time we were asked to update the screen, and that are still above the cursor now. We assume that nothing can affect a line that comes before both the old and new cursor location without us being informed. Based on this assumption, we can reuse old lines, which drastically speeds up the overwhelmingly common case where the user is typing near the end of the buffer. * Speed up pyrepl prompt drawing Cache the `can_colorize()` call rather than repeatedly recomputing it. This call looks up an environment variable, and is called once per character typed at the REPL. The environment variable lookup shows up as a hot spot when profiling, and we don't expect this to change while the REPL is running. * Speed up pasting multiple lines into the REPL Previously, we were checking whether the command should be accepted each time a line break was encountered, but that's not the expected behavior. In bracketed paste mode, we expect everything pasted to be part of a single block of code, and encountering a newline shouldn't behave like a user pressing <Enter> to execute a command. The user should always have a chance to review the pasted command before running it. * Use a read buffer for input in pyrepl Previously we were reading one byte at a time, which causes much slower IO than necessary. Instead, read in chunks, processing previously read data before asking for more. * Optimize finding width of a single character `wlen` finds the width of a multi-character string by adding up the width of each character, and then subtracting the width of any escape sequences. It's often called for single character strings, however, which can't possibly contain escape sequences. Optimize for that case. * Optimize disp_str for ASCII characters Since every ASCII character is known to display as single width, we can avoid not only the Unicode data lookup in `disp_str` but also the one hidden in `str_width` for them. * Speed up cursor movements in long pyrepl commands When the current pyrepl command buffer contains many lines, scrolling up becomes slow. We have optimizations in place to reuse lines above the cursor position from one refresh to the next, but don't currently try to reuse lines below the cursor position in the same way, so we wind up with quadratic behavior where all lines of the buffer below the cursor are recomputed each time the cursor moves up another line. Optimize this by only computing one screen's worth of lines beyond the cursor position. Any lines beyond that can't possibly be shown by the console, and bounding this makes scrolling up have linear time complexity instead. --------- Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net> Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 18:42:10 +02:00
self.input_buffer = b""
self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term
@overload
def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ...
@overload
def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
r = curses.tigetstr(cap)
if not optional and r is None:
raise InvalidTerminal(
f"terminal doesn't have the required {cap} capability"
)
return r
self._bel = _my_getstr("bel")
self._civis = _my_getstr("civis", optional=True)
self._clear = _my_getstr("clear")
self._cnorm = _my_getstr("cnorm", optional=True)
self._cub = _my_getstr("cub", optional=True)
self._cub1 = _my_getstr("cub1", optional=True)
self._cud = _my_getstr("cud", optional=True)
self._cud1 = _my_getstr("cud1", optional=True)
self._cuf = _my_getstr("cuf", optional=True)
self._cuf1 = _my_getstr("cuf1", optional=True)
self._cup = _my_getstr("cup")
self._cuu = _my_getstr("cuu", optional=True)
self._cuu1 = _my_getstr("cuu1", optional=True)
self._dch1 = _my_getstr("dch1", optional=True)
self._dch = _my_getstr("dch", optional=True)
self._el = _my_getstr("el")
self._hpa = _my_getstr("hpa", optional=True)
self._ich = _my_getstr("ich", optional=True)
self._ich1 = _my_getstr("ich1", optional=True)
self._ind = _my_getstr("ind", optional=True)
self._pad = _my_getstr("pad", optional=True)
self._ri = _my_getstr("ri", optional=True)
self._rmkx = _my_getstr("rmkx", optional=True)
self._smkx = _my_getstr("smkx", optional=True)
self.__setup_movement()
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1
gh-119517: Fixes for pasting in pyrepl (#120253) * Remove pyrepl's optimization for self-insert This will be replaced by a less specialized optimization. * Use line-buffering when pyrepl echoes pastes Previously echoing was totally suppressed until the entire command had been pasted and the terminal ended paste mode, but this gives the user no feedback to indicate that an operation is in progress. Drawing something to the screen once per line strikes a balance between perceived responsiveness and performance. * Remove dead code from pyrepl `msg_at_bottom` is always true. * Speed up pyrepl's screen rendering computation The Reader in pyrepl doesn't hold a complete representation of the screen area being drawn as persistent state. Instead, it recomputes it, on each keypress. This is fast enough for a few hundred bytes, but incredibly slow as the input buffer grows into the kilobytes (likely because of pasting). Rather than making some expensive and expansive changes to the repl's internal representation of the screen, add some caching: remember some data from one refresh to the next about what was drawn to the screen and, if we don't find anything that has invalidated the results that were computed last time around, reuse them. To keep this caching as simple as possible, all we'll do is look for lines in the buffer that were above the cursor the last time we were asked to update the screen, and that are still above the cursor now. We assume that nothing can affect a line that comes before both the old and new cursor location without us being informed. Based on this assumption, we can reuse old lines, which drastically speeds up the overwhelmingly common case where the user is typing near the end of the buffer. * Speed up pyrepl prompt drawing Cache the `can_colorize()` call rather than repeatedly recomputing it. This call looks up an environment variable, and is called once per character typed at the REPL. The environment variable lookup shows up as a hot spot when profiling, and we don't expect this to change while the REPL is running. * Speed up pasting multiple lines into the REPL Previously, we were checking whether the command should be accepted each time a line break was encountered, but that's not the expected behavior. In bracketed paste mode, we expect everything pasted to be part of a single block of code, and encountering a newline shouldn't behave like a user pressing <Enter> to execute a command. The user should always have a chance to review the pasted command before running it. * Use a read buffer for input in pyrepl Previously we were reading one byte at a time, which causes much slower IO than necessary. Instead, read in chunks, processing previously read data before asking for more. * Optimize finding width of a single character `wlen` finds the width of a multi-character string by adding up the width of each character, and then subtracting the width of any escape sequences. It's often called for single character strings, however, which can't possibly contain escape sequences. Optimize for that case. * Optimize disp_str for ASCII characters Since every ASCII character is known to display as single width, we can avoid not only the Unicode data lookup in `disp_str` but also the one hidden in `str_width` for them. * Speed up cursor movements in long pyrepl commands When the current pyrepl command buffer contains many lines, scrolling up becomes slow. We have optimizations in place to reuse lines above the cursor position from one refresh to the next, but don't currently try to reuse lines below the cursor position in the same way, so we wind up with quadratic behavior where all lines of the buffer below the cursor are recomputed each time the cursor moves up another line. Optimize this by only computing one screen's worth of lines beyond the cursor position. Any lines beyond that can't possibly be shown by the console, and bounding this makes scrolling up have linear time complexity instead. --------- Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net> Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 18:42:10 +02:00
def __read(self, n: int) -> bytes:
if not self.input_buffer or self.input_buffer_pos >= len(self.input_buffer):
self.input_buffer = os.read(self.input_fd, 10000)
ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
self.input_buffer_pos += len(ret)
if self.input_buffer_pos >= len(self.input_buffer):
self.input_buffer = b""
self.input_buffer_pos = 0
return ret
def change_encoding(self, encoding: str) -> None:
"""
Change the encoding used for I/O operations.
Parameters:
- encoding (str): New encoding to use.
"""
self.encoding = encoding
def refresh(self, screen, c_xy):
"""
Refresh the console screen.
Parameters:
- screen (list): List of strings representing the screen contents.
- c_xy (tuple): Cursor position (x, y) on the screen.
"""
cx, cy = c_xy
if not self.__gone_tall:
while len(self.screen) < min(len(screen), self.height):
self.__hide_cursor()
self.__move(0, len(self.screen) - 1)
self.__write("\n")
self.__posxy = 0, len(self.screen)
self.screen.append("")
else:
while len(self.screen) < len(screen):
self.screen.append("")
if len(screen) > self.height:
self.__gone_tall = 1
self.__move = self.__move_tall
px, py = self.__posxy
old_offset = offset = self.__offset
height = self.height
# we make sure the cursor is on the screen, and that we're
# using all of the screen if we can
if cy < offset:
offset = cy
elif cy >= offset + height:
offset = cy - height + 1
elif offset > 0 and len(screen) < offset + height:
offset = max(len(screen) - height, 0)
screen.append("")
oldscr = self.screen[old_offset : old_offset + height]
newscr = screen[offset : offset + height]
# use hardware scrolling if we have it.
if old_offset > offset and self._ri:
self.__hide_cursor()
self.__write_code(self._cup, 0, 0)
self.__posxy = 0, old_offset
for i in range(old_offset - offset):
self.__write_code(self._ri)
oldscr.pop(-1)
oldscr.insert(0, "")
elif old_offset < offset and self._ind:
self.__hide_cursor()
self.__write_code(self._cup, self.height - 1, 0)
self.__posxy = 0, old_offset + self.height - 1
for i in range(offset - old_offset):
self.__write_code(self._ind)
oldscr.pop(0)
oldscr.append("")
self.__offset = offset
for (
y,
oldline,
newline,
) in zip(range(offset, offset + height), oldscr, newscr):
if oldline != newline:
self.__write_changed_line(y, oldline, newline, px)
y = len(newscr)
while y < len(oldscr):
self.__hide_cursor()
self.__move(0, y)
self.__posxy = 0, y
self.__write_code(self._el)
y += 1
self.__show_cursor()
self.screen = screen.copy()
self.move_cursor(cx, cy)
self.flushoutput()
def move_cursor(self, x, y):
"""
Move the cursor to the specified position on the screen.
Parameters:
- x (int): X coordinate.
- y (int): Y coordinate.
"""
if y < self.__offset or y >= self.__offset + self.height:
self.event_queue.insert(Event("scroll", None))
else:
self.__move(x, y)
self.__posxy = x, y
self.flushoutput()
def prepare(self):
"""
Prepare the console for input/output operations.
"""
self.__svtermstate = tcgetattr(self.input_fd)
raw = self.__svtermstate.copy()
raw.iflag &= ~(termios.BRKINT | termios.INPCK | termios.ISTRIP | termios.IXON)
raw.oflag &= ~(termios.OPOST)
raw.cflag &= ~(termios.CSIZE | termios.PARENB)
raw.cflag |= termios.CS8
raw.lflag &= ~(
termios.ICANON | termios.ECHO | termios.IEXTEN | (termios.ISIG * 1)
)
raw.cc[termios.VMIN] = 1
raw.cc[termios.VTIME] = 0
tcsetattr(self.input_fd, termios.TCSADRAIN, raw)
self.screen = []
self.height, self.width = self.getheightwidth()
self.__buffer = []
self.__posxy = 0, 0
self.__gone_tall = 0
self.__move = self.__move_short
self.__offset = 0
self.__maybe_write_code(self._smkx)
try:
self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch)
except ValueError:
pass
self.__enable_bracketed_paste()
def restore(self):
"""
Restore the console to the default state
"""
self.__disable_bracketed_paste()
self.__maybe_write_code(self._rmkx)
self.flushoutput()
tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate)
if hasattr(self, "old_sigwinch"):
signal.signal(signal.SIGWINCH, self.old_sigwinch)
del self.old_sigwinch
def push_char(self, char: int | bytes) -> None:
"""
Push a character to the console event queue.
"""
trace("push char {char!r}", char=char)
self.event_queue.push(char)
def get_event(self, block: bool = True) -> Event | None:
"""
Get an event from the console event queue.
Parameters:
- block (bool): Whether to block until an event is available.
Returns:
- Event: Event object from the event queue.
"""
while self.event_queue.empty():
while True:
try:
gh-119517: Fixes for pasting in pyrepl (#120253) * Remove pyrepl's optimization for self-insert This will be replaced by a less specialized optimization. * Use line-buffering when pyrepl echoes pastes Previously echoing was totally suppressed until the entire command had been pasted and the terminal ended paste mode, but this gives the user no feedback to indicate that an operation is in progress. Drawing something to the screen once per line strikes a balance between perceived responsiveness and performance. * Remove dead code from pyrepl `msg_at_bottom` is always true. * Speed up pyrepl's screen rendering computation The Reader in pyrepl doesn't hold a complete representation of the screen area being drawn as persistent state. Instead, it recomputes it, on each keypress. This is fast enough for a few hundred bytes, but incredibly slow as the input buffer grows into the kilobytes (likely because of pasting). Rather than making some expensive and expansive changes to the repl's internal representation of the screen, add some caching: remember some data from one refresh to the next about what was drawn to the screen and, if we don't find anything that has invalidated the results that were computed last time around, reuse them. To keep this caching as simple as possible, all we'll do is look for lines in the buffer that were above the cursor the last time we were asked to update the screen, and that are still above the cursor now. We assume that nothing can affect a line that comes before both the old and new cursor location without us being informed. Based on this assumption, we can reuse old lines, which drastically speeds up the overwhelmingly common case where the user is typing near the end of the buffer. * Speed up pyrepl prompt drawing Cache the `can_colorize()` call rather than repeatedly recomputing it. This call looks up an environment variable, and is called once per character typed at the REPL. The environment variable lookup shows up as a hot spot when profiling, and we don't expect this to change while the REPL is running. * Speed up pasting multiple lines into the REPL Previously, we were checking whether the command should be accepted each time a line break was encountered, but that's not the expected behavior. In bracketed paste mode, we expect everything pasted to be part of a single block of code, and encountering a newline shouldn't behave like a user pressing <Enter> to execute a command. The user should always have a chance to review the pasted command before running it. * Use a read buffer for input in pyrepl Previously we were reading one byte at a time, which causes much slower IO than necessary. Instead, read in chunks, processing previously read data before asking for more. * Optimize finding width of a single character `wlen` finds the width of a multi-character string by adding up the width of each character, and then subtracting the width of any escape sequences. It's often called for single character strings, however, which can't possibly contain escape sequences. Optimize for that case. * Optimize disp_str for ASCII characters Since every ASCII character is known to display as single width, we can avoid not only the Unicode data lookup in `disp_str` but also the one hidden in `str_width` for them. * Speed up cursor movements in long pyrepl commands When the current pyrepl command buffer contains many lines, scrolling up becomes slow. We have optimizations in place to reuse lines above the cursor position from one refresh to the next, but don't currently try to reuse lines below the cursor position in the same way, so we wind up with quadratic behavior where all lines of the buffer below the cursor are recomputed each time the cursor moves up another line. Optimize this by only computing one screen's worth of lines beyond the cursor position. Any lines beyond that can't possibly be shown by the console, and bounding this makes scrolling up have linear time complexity instead. --------- Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net> Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 18:42:10 +02:00
self.push_char(self.__read(1))
except OSError as err:
if err.errno == errno.EINTR:
if not self.event_queue.empty():
return self.event_queue.get()
else:
continue
else:
raise
else:
break
if not block:
break
return self.event_queue.get()
def wait(self, timeout: float | None = None) -> bool:
"""
Wait for events on the console.
"""
return bool(self.pollob.poll(timeout))
def set_cursor_vis(self, visible):
"""
Set the visibility of the cursor.
Parameters:
- visible (bool): Visibility flag.
"""
if visible:
self.__show_cursor()
else:
self.__hide_cursor()
if TIOCGWINSZ:
def getheightwidth(self):
"""
Get the height and width of the console.
Returns:
- tuple: Height and width of the console.
"""
try:
return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
except KeyError:
height, width = struct.unpack(
"hhhh", ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8)
)[0:2]
if not height:
return 25, 80
return height, width
else:
def getheightwidth(self):
"""
Get the height and width of the console.
Returns:
- tuple: Height and width of the console.
"""
try:
return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
except KeyError:
return 25, 80
def forgetinput(self):
"""
Discard any pending input on the console.
"""
termios.tcflush(self.input_fd, termios.TCIFLUSH)
def flushoutput(self):
"""
Flush the output buffer.
"""
for text, iscode in self.__buffer:
if iscode:
self.__tputs(text)
else:
os.write(self.output_fd, text.encode(self.encoding, "replace"))
del self.__buffer[:]
def finish(self):
"""
Finish console operations and flush the output buffer.
"""
y = len(self.screen) - 1
while y >= 0 and not self.screen[y]:
y -= 1
self.__move(0, min(y, self.height + self.__offset - 1))
self.__write("\n\r")
self.flushoutput()
def beep(self):
"""
Emit a beep sound.
"""
self.__maybe_write_code(self._bel)
self.flushoutput()
if FIONREAD:
def getpending(self):
"""
Get pending events from the console event queue.
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
gh-119517: Fixes for pasting in pyrepl (#120253) * Remove pyrepl's optimization for self-insert This will be replaced by a less specialized optimization. * Use line-buffering when pyrepl echoes pastes Previously echoing was totally suppressed until the entire command had been pasted and the terminal ended paste mode, but this gives the user no feedback to indicate that an operation is in progress. Drawing something to the screen once per line strikes a balance between perceived responsiveness and performance. * Remove dead code from pyrepl `msg_at_bottom` is always true. * Speed up pyrepl's screen rendering computation The Reader in pyrepl doesn't hold a complete representation of the screen area being drawn as persistent state. Instead, it recomputes it, on each keypress. This is fast enough for a few hundred bytes, but incredibly slow as the input buffer grows into the kilobytes (likely because of pasting). Rather than making some expensive and expansive changes to the repl's internal representation of the screen, add some caching: remember some data from one refresh to the next about what was drawn to the screen and, if we don't find anything that has invalidated the results that were computed last time around, reuse them. To keep this caching as simple as possible, all we'll do is look for lines in the buffer that were above the cursor the last time we were asked to update the screen, and that are still above the cursor now. We assume that nothing can affect a line that comes before both the old and new cursor location without us being informed. Based on this assumption, we can reuse old lines, which drastically speeds up the overwhelmingly common case where the user is typing near the end of the buffer. * Speed up pyrepl prompt drawing Cache the `can_colorize()` call rather than repeatedly recomputing it. This call looks up an environment variable, and is called once per character typed at the REPL. The environment variable lookup shows up as a hot spot when profiling, and we don't expect this to change while the REPL is running. * Speed up pasting multiple lines into the REPL Previously, we were checking whether the command should be accepted each time a line break was encountered, but that's not the expected behavior. In bracketed paste mode, we expect everything pasted to be part of a single block of code, and encountering a newline shouldn't behave like a user pressing <Enter> to execute a command. The user should always have a chance to review the pasted command before running it. * Use a read buffer for input in pyrepl Previously we were reading one byte at a time, which causes much slower IO than necessary. Instead, read in chunks, processing previously read data before asking for more. * Optimize finding width of a single character `wlen` finds the width of a multi-character string by adding up the width of each character, and then subtracting the width of any escape sequences. It's often called for single character strings, however, which can't possibly contain escape sequences. Optimize for that case. * Optimize disp_str for ASCII characters Since every ASCII character is known to display as single width, we can avoid not only the Unicode data lookup in `disp_str` but also the one hidden in `str_width` for them. * Speed up cursor movements in long pyrepl commands When the current pyrepl command buffer contains many lines, scrolling up becomes slow. We have optimizations in place to reuse lines above the cursor position from one refresh to the next, but don't currently try to reuse lines below the cursor position in the same way, so we wind up with quadratic behavior where all lines of the buffer below the cursor are recomputed each time the cursor moves up another line. Optimize this by only computing one screen's worth of lines beyond the cursor position. Any lines beyond that can't possibly be shown by the console, and bounding this makes scrolling up have linear time complexity instead. --------- Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net> Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 18:42:10 +02:00
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
return e
else:
def getpending(self):
"""
Get pending events from the console event queue.
Returns:
- Event: Pending event from the event queue.
"""
e = Event("key", "", b"")
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e.raw
amount = 10000
gh-119517: Fixes for pasting in pyrepl (#120253) * Remove pyrepl's optimization for self-insert This will be replaced by a less specialized optimization. * Use line-buffering when pyrepl echoes pastes Previously echoing was totally suppressed until the entire command had been pasted and the terminal ended paste mode, but this gives the user no feedback to indicate that an operation is in progress. Drawing something to the screen once per line strikes a balance between perceived responsiveness and performance. * Remove dead code from pyrepl `msg_at_bottom` is always true. * Speed up pyrepl's screen rendering computation The Reader in pyrepl doesn't hold a complete representation of the screen area being drawn as persistent state. Instead, it recomputes it, on each keypress. This is fast enough for a few hundred bytes, but incredibly slow as the input buffer grows into the kilobytes (likely because of pasting). Rather than making some expensive and expansive changes to the repl's internal representation of the screen, add some caching: remember some data from one refresh to the next about what was drawn to the screen and, if we don't find anything that has invalidated the results that were computed last time around, reuse them. To keep this caching as simple as possible, all we'll do is look for lines in the buffer that were above the cursor the last time we were asked to update the screen, and that are still above the cursor now. We assume that nothing can affect a line that comes before both the old and new cursor location without us being informed. Based on this assumption, we can reuse old lines, which drastically speeds up the overwhelmingly common case where the user is typing near the end of the buffer. * Speed up pyrepl prompt drawing Cache the `can_colorize()` call rather than repeatedly recomputing it. This call looks up an environment variable, and is called once per character typed at the REPL. The environment variable lookup shows up as a hot spot when profiling, and we don't expect this to change while the REPL is running. * Speed up pasting multiple lines into the REPL Previously, we were checking whether the command should be accepted each time a line break was encountered, but that's not the expected behavior. In bracketed paste mode, we expect everything pasted to be part of a single block of code, and encountering a newline shouldn't behave like a user pressing <Enter> to execute a command. The user should always have a chance to review the pasted command before running it. * Use a read buffer for input in pyrepl Previously we were reading one byte at a time, which causes much slower IO than necessary. Instead, read in chunks, processing previously read data before asking for more. * Optimize finding width of a single character `wlen` finds the width of a multi-character string by adding up the width of each character, and then subtracting the width of any escape sequences. It's often called for single character strings, however, which can't possibly contain escape sequences. Optimize for that case. * Optimize disp_str for ASCII characters Since every ASCII character is known to display as single width, we can avoid not only the Unicode data lookup in `disp_str` but also the one hidden in `str_width` for them. * Speed up cursor movements in long pyrepl commands When the current pyrepl command buffer contains many lines, scrolling up becomes slow. We have optimizations in place to reuse lines above the cursor position from one refresh to the next, but don't currently try to reuse lines below the cursor position in the same way, so we wind up with quadratic behavior where all lines of the buffer below the cursor are recomputed each time the cursor moves up another line. Optimize this by only computing one screen's worth of lines beyond the cursor position. Any lines beyond that can't possibly be shown by the console, and bounding this makes scrolling up have linear time complexity instead. --------- Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net> Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
2024-06-11 18:42:10 +02:00
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data
e.raw += raw
return e
def clear(self):
"""
Clear the console screen.
"""
self.__write_code(self._clear)
self.__gone_tall = 1
self.__move = self.__move_tall
self.__posxy = 0, 0
self.screen = []
@property
def input_hook(self):
try:
import posix
except ImportError:
return None
if posix._is_inputhook_installed():
return posix._inputhook
def __enable_bracketed_paste(self) -> None:
os.write(self.output_fd, b"\x1b[?2004h")
def __disable_bracketed_paste(self) -> None:
os.write(self.output_fd, b"\x1b[?2004l")
def __setup_movement(self):
"""
Set up the movement functions based on the terminal capabilities.
"""
if 0 and self._hpa: # hpa don't work in windows telnet :-(
self.__move_x = self.__move_x_hpa
elif self._cub and self._cuf:
self.__move_x = self.__move_x_cub_cuf
elif self._cub1 and self._cuf1:
self.__move_x = self.__move_x_cub1_cuf1
else:
raise RuntimeError("insufficient terminal (horizontal)")
if self._cuu and self._cud:
self.__move_y = self.__move_y_cuu_cud
elif self._cuu1 and self._cud1:
self.__move_y = self.__move_y_cuu1_cud1
else:
raise RuntimeError("insufficient terminal (vertical)")
if self._dch1:
self.dch1 = self._dch1
elif self._dch:
self.dch1 = curses.tparm(self._dch, 1)
else:
self.dch1 = None
if self._ich1:
self.ich1 = self._ich1
elif self._ich:
self.ich1 = curses.tparm(self._ich, 1)
else:
self.ich1 = None
self.__move = self.__move_short
def __write_changed_line(self, y, oldline, newline, px_coord):
# this is frustrating; there's no reason to test (say)
# self.dch1 inside the loop -- but alternative ways of
# structuring this function are equally painful (I'm trying to
# avoid writing code generators these days...)
minlen = min(wlen(oldline), wlen(newline))
x_pos = 0
x_coord = 0
px_pos = 0
j = 0
for c in oldline:
if j >= px_coord:
break
j += wlen(c)
px_pos += 1
# reuse the oldline as much as possible, but stop as soon as we
# encounter an ESCAPE, because it might be the start of an escape
# sequene
while (
x_coord < minlen
and oldline[x_pos] == newline[x_pos]
and newline[x_pos] != "\x1b"
):
x_coord += wlen(newline[x_pos])
x_pos += 1
# if we need to insert a single character right after the first detected change
if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1:
if (
y == self.__posxy[1]
and x_coord > self.__posxy[0]
and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1]
):
x_pos = px_pos
x_coord = px_coord
character_width = wlen(newline[x_pos])
self.__move(x_coord, y)
self.__write_code(self.ich1)
self.__write(newline[x_pos])
self.__posxy = x_coord + character_width, y
# if it's a single character change in the middle of the line
elif (
x_coord < minlen
and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
and wlen(oldline[x_pos]) == wlen(newline[x_pos])
):
character_width = wlen(newline[x_pos])
self.__move(x_coord, y)
self.__write(newline[x_pos])
self.__posxy = x_coord + character_width, y
# if this is the last character to fit in the line and we edit in the middle of the line
elif (
self.dch1
and self.ich1
and wlen(newline) == self.width
and x_coord < wlen(newline) - 2
and newline[x_pos + 1 : -1] == oldline[x_pos:-2]
):
self.__hide_cursor()
self.__move(self.width - 2, y)
self.__posxy = self.width - 2, y
self.__write_code(self.dch1)
character_width = wlen(newline[x_pos])
self.__move(x_coord, y)
self.__write_code(self.ich1)
self.__write(newline[x_pos])
self.__posxy = character_width + 1, y
else:
self.__hide_cursor()
self.__move(x_coord, y)
if wlen(oldline) > wlen(newline):
self.__write_code(self._el)
self.__write(newline[x_pos:])
self.__posxy = wlen(newline), y
if "\x1b" in newline:
# ANSI escape characters are present, so we can't assume
# anything about the position of the cursor. Moving the cursor
# to the left margin should work to get to a known position.
self.move_cursor(0, y)
def __write(self, text):
self.__buffer.append((text, 0))
def __write_code(self, fmt, *args):
self.__buffer.append((curses.tparm(fmt, *args), 1))
def __maybe_write_code(self, fmt, *args):
if fmt:
self.__write_code(fmt, *args)
def __move_y_cuu1_cud1(self, y):
dy = y - self.__posxy[1]
if dy > 0:
self.__write_code(dy * self._cud1)
elif dy < 0:
self.__write_code((-dy) * self._cuu1)
def __move_y_cuu_cud(self, y):
dy = y - self.__posxy[1]
if dy > 0:
self.__write_code(self._cud, dy)
elif dy < 0:
self.__write_code(self._cuu, -dy)
def __move_x_hpa(self, x: int) -> None:
if x != self.__posxy[0]:
self.__write_code(self._hpa, x)
def __move_x_cub1_cuf1(self, x: int) -> None:
dx = x - self.__posxy[0]
if dx > 0:
self.__write_code(self._cuf1 * dx)
elif dx < 0:
self.__write_code(self._cub1 * (-dx))
def __move_x_cub_cuf(self, x: int) -> None:
dx = x - self.__posxy[0]
if dx > 0:
self.__write_code(self._cuf, dx)
elif dx < 0:
self.__write_code(self._cub, -dx)
def __move_short(self, x, y):
self.__move_x(x)
self.__move_y(y)
def __move_tall(self, x, y):
assert 0 <= y - self.__offset < self.height, y - self.__offset
self.__write_code(self._cup, y - self.__offset, x)
def __sigwinch(self, signum, frame):
self.height, self.width = self.getheightwidth()
self.event_queue.insert(Event("resize", None))
def __hide_cursor(self):
if self.cursor_visible:
self.__maybe_write_code(self._civis)
self.cursor_visible = 0
def __show_cursor(self):
if not self.cursor_visible:
self.__maybe_write_code(self._cnorm)
self.cursor_visible = 1
def repaint(self):
if not self.__gone_tall:
self.__posxy = 0, self.__posxy[1]
self.__write("\r")
ns = len(self.screen) * ["\000" * self.width]
self.screen = ns
else:
self.__posxy = 0, self.__offset
self.__move(0, self.__offset)
ns = self.height * ["\000" * self.width]
self.screen = ns
def __tputs(self, fmt, prog=delayprog):
"""A Python implementation of the curses tputs function; the
curses one can't really be wrapped in a sane manner.
I have the strong suspicion that this is complexity that
will never do anyone any good."""
# using .get() means that things will blow up
# only if the bps is actually needed (which I'm
# betting is pretty unlkely)
bps = ratedict.get(self.__svtermstate.ospeed)
while 1:
m = prog.search(fmt)
if not m:
os.write(self.output_fd, fmt)
break
x, y = m.span()
os.write(self.output_fd, fmt[:x])
fmt = fmt[y:]
delay = int(m.group(1))
if b"*" in m.group(2):
delay *= self.height
if self._pad and bps is not None:
nchars = (bps * delay) / 1000
os.write(self.output_fd, self._pad * nchars)
else:
time.sleep(float(delay) / 1000.0)