blob: f59b46f1859f528f0900cb9dae7fddae9ab9ba6f [file] [log] [blame]
awlane1cec2332025-04-24 17:24:47 -05001import fcntl, struct, termios, os
2import subprocess
3import select
4import pty
5from io import BufferedWriter
6from threading import Thread
7from typing import TYPE_CHECKING
8
9import msgpack
10
11from minindn.minindn_play.term.cbuf import CircularByteBuffer
12from minindn.minindn_play.consts import WSKeys, WSFunctions
13
14if TYPE_CHECKING:
15 from minindn.minindn_play.term.term import TermExecutor
16
17
18class Pty:
19 id: str
20 name: str
21 master: int
22 slave: int
23 stdin: BufferedWriter
24 buffer: CircularByteBuffer
25 executor: 'TermExecutor'
26 process: subprocess.Popen | None = None
27
28 def __init__(self, executor, pty_id: str, name: str):
29 """
30 Initialize a new pty
31 Creates a new Python PTY instance and stores the slave and master file descriptors
32 """
33 self.master, self.slave = pty.openpty()
34 self.buffer = CircularByteBuffer(16000)
35 self.executor = executor
36 self.id = pty_id
37 self.name = name
38 self.stdin = os.fdopen(self.master, 'wb')
39 executor.pty_list[self.id] = self
40
41 def cleanup(self):
42 """Cleanup the pty"""
43
44 self.executor.socket.send_all(msgpack.dumps({
45 WSKeys.MSG_KEY_FUN: WSFunctions.CLOSE_TERMINAL,
46 WSKeys.MSG_KEY_ID: self.id,
47 }))
48
49 try:
50 os.close(self.master)
51 os.close(self.slave)
52 except OSError:
53 pass
54
55 if self.id in self.executor.pty_list:
56 del self.executor.pty_list[self.id]
57
58class PtyManager:
59 ptys = {}
60 thread: Thread
61 executor: 'TermExecutor'
62 poller = select.poll()
63
64 def __init__(self, executor):
65 """Initialize the pty manager"""
66 self.executor = executor
67 self.thread = Thread(target=self._ui_thread, args=(), daemon=True)
68 self.thread.start()
69
70 def register(self, target_pty: Pty):
71 """Register a new pty with the manager"""
72 self.ptys[target_pty.master] = target_pty
73 self.poller.register(target_pty.master, select.POLLIN)
74
75 def unregister(self, target_pty: Pty):
76 """Unregister a pty from the manager"""
77 self.poller.unregister(target_pty.master)
78 target_pty.cleanup()
79 if target_pty.master in self.ptys:
80 del self.ptys[target_pty.master]
81
82 def _ui_thread(self):
83 """Thread that handles UI output"""
84 while True:
85 self._check_procs()
86 self._poll_fds()
87
88 def _check_procs(self):
89 """Check if any processes have exited and unregister them"""
90 for target_pty in list(self.ptys.values()):
91 if target_pty.process is not None and target_pty.process.poll() is not None:
92 self.unregister(target_pty)
93 continue
94
95 def _poll_fds(self):
96 """Poll all registered file descriptors"""
97 for (fd, status) in self.poller.poll(250):
98 if fd not in self.ptys:
99 self.poller.unregister(fd)
100 continue
101 target_pty = self.ptys[fd]
102
103 # Check if poller is closed
104 if status == select.POLLHUP:
105 self.unregister(target_pty)
106 continue
107
108 # Find the number of bytes available to read
109 bytes_available = fcntl.ioctl(target_pty.master, termios.FIONREAD, struct.pack('I', 0))
110 bytes_available = struct.unpack('I', bytes_available)[0]
111 bytes_to_read = min(bytes_available, 4096)
112
113 # This should never really happen
114 if bytes_to_read == 0:
115 continue
116
117 # Read everything available and send to UI
118 try:
119 out_bytes = os.read(target_pty.master, bytes_to_read)
120 self.executor.send_pty_out(out_bytes, target_pty.id)
121 target_pty.buffer.write(out_bytes)
122 except Exception as e:
123 print(e)
124 self.unregister(target_pty)
125 continue