Add NDN Play integration to Mini-NDN code base
Integrate the minindn_play project by Varun Patil into
the base Mini-NDN codebase; this will allow for the
use of the NDN-Play browser UI with minimal additional
dependencies or setup.
Refs #5359
Change-Id: I4fedfa885b07d7fe946a18c6d9b5016d291b3582
diff --git a/minindn/minindn_play/__init__.py b/minindn/minindn_play/__init__.py
new file mode 100644
index 0000000..52f7088
--- /dev/null
+++ b/minindn/minindn_play/__init__.py
@@ -0,0 +1 @@
+'''Helper for NDN-Play GUI (based on Minindn_Play by Varun Patil)'''
diff --git a/minindn/minindn_play/consts.py b/minindn/minindn_play/consts.py
new file mode 100644
index 0000000..60e80e7
--- /dev/null
+++ b/minindn/minindn_play/consts.py
@@ -0,0 +1,34 @@
+from enum import Enum
+
+class Config:
+ AUTH_FILE = "/tmp/ndnplay-auth"
+ PLAY_URL = "https://play.ndn.today"
+ SERVER_HOST = "0.0.0.0"
+ SERVER_HOST_URL = "127.0.0.1"
+ SERVER_PORT = 8765
+ PCAP_CHUNK_SIZE = 512
+
+# MessagePack Keys
+class WSKeys(str, Enum):
+ MSG_KEY_FUN = 'F'
+ MSG_KEY_ID = 'I'
+ MSG_KEY_RESULT = 'R'
+ MSG_KEY_ARGS = 'A'
+
+class WSFunctions(str, Enum):
+ GET_TOPO = 'get_topo'
+ OPEN_ALL_PTYS = 'open_all_ptys'
+ DEL_LINK = 'del_link'
+ ADD_LINK = 'add_link'
+ UPD_LINK = 'upd_link'
+ DEL_NODE = 'del_node'
+ ADD_NODE = 'add_node'
+ GET_FIB = 'get_fib'
+ GET_PCAP = 'get_pcap'
+ GET_PCAP_WIRE = 'get_pcap_wire'
+ PTY_IN = 'pty_in'
+ PTY_OUT = 'pty_out'
+ PTY_RESIZE = 'pty_resize'
+ OPEN_TERMINAL = 'open_term'
+ CLOSE_TERMINAL = 'close_term'
+ MONITOR_COUNTS = 'monitor_counts'
\ No newline at end of file
diff --git a/minindn/minindn_play/monitor.py b/minindn/minindn_play/monitor.py
new file mode 100644
index 0000000..e8fed92
--- /dev/null
+++ b/minindn/minindn_play/monitor.py
@@ -0,0 +1,62 @@
+import re
+from time import sleep
+from io import TextIOWrapper
+from threading import Thread
+
+import msgpack
+
+from mininet.node import Node
+from minindn.util import host_home
+from minindn.minindn_play.socket import PlaySocket
+from minindn.minindn_play.consts import WSKeys, WSFunctions
+
+
+class LogMonitor:
+ nodes: list[Node]
+ log_file: str
+ interval: float
+ socket: PlaySocket
+ filter: re.Pattern
+ quit: bool = False
+
+ def __init__(self, nodes: list, log_file: str, interval: float = 0.5, regex_filter: str = ''):
+ self.nodes = nodes
+ self.log_file = log_file
+ self.interval = interval
+ self.regex_filter = re.compile(regex_filter)
+
+ def start(self, socket: PlaySocket):
+ self.socket = socket
+ Thread(target=self._start).start()
+
+ def stop(self):
+ self.quit = True
+
+ def _start(self):
+ files: list[TextIOWrapper] = []
+ counts: dict[str, int] = {}
+
+ for node in self.nodes:
+ path = f"{host_home(node)}/{self.log_file}"
+ files.append(open(path, 'r'))
+ counts[node.name] = 0
+
+ while not self.quit:
+ for i, file in enumerate(files):
+ node = self.nodes[i]
+ counts[node.name] = 0
+ while line := file.readline():
+ if self.regex_filter.match(line):
+ counts[node.name] += 1
+
+ self._send(counts)
+ sleep(self.interval)
+
+ for file in files:
+ file.close()
+
+ def _send(self, counts: dict[str, int]):
+ self.socket.send_all(msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: WSFunctions.MONITOR_COUNTS,
+ WSKeys.MSG_KEY_RESULT: counts,
+ }))
diff --git a/minindn/minindn_play/net/__init__.py b/minindn/minindn_play/net/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/minindn/minindn_play/net/__init__.py
diff --git a/minindn/minindn_play/net/state.py b/minindn/minindn_play/net/state.py
new file mode 100644
index 0000000..e325edb
--- /dev/null
+++ b/minindn/minindn_play/net/state.py
@@ -0,0 +1,23 @@
+from mininet.net import Mininet
+from minindn.util import is_valid_hostid, run_popen
+
+class StateExecutor:
+ def __init__(self, net: Mininet):
+ self.net = net
+
+ async def get_fib(self, node_id):
+ """UI Function: Get the NFDC status report and ifconfig as the fib"""
+ if not is_valid_hostid(self.net, node_id):
+ if node_id in self.net:
+ node = self.net[node_id]
+ return { "id": node_id, "fib": f"Node is not a host ({node.__class__.__name__})" }
+ return
+
+ node = self.net[node_id]
+ nfd_status = run_popen(node, "nfdc status report".split()).decode("utf-8")
+ ifconfig = run_popen(node, "ifconfig".split()).decode("utf-8")
+ output = nfd_status + "\n" + ifconfig
+ return {
+ "id": node_id,
+ "fib": output,
+ }
diff --git a/minindn/minindn_play/net/topo.py b/minindn/minindn_play/net/topo.py
new file mode 100644
index 0000000..fe039e6
--- /dev/null
+++ b/minindn/minindn_play/net/topo.py
@@ -0,0 +1,156 @@
+from mininet.net import Mininet
+from mininet.log import info, error
+from mininet.link import Link
+
+class TopoExecutor:
+ def __init__(self, net: Mininet):
+ self.net = net
+
+ async def get_topo(self):
+ """UI Function: Get topology"""
+ nodes = []
+ node_ids = set()
+ links = []
+
+ for host in self.net.hosts:
+ nodes.append(self._node_dict(host))
+ node_ids.add(host.name)
+
+ for switch in self.net.switches:
+ nodes.append(self._node_dict(switch, switch=True))
+
+ for station in getattr(self.net, "stations", []):
+ if station.name not in node_ids:
+ nodes.append(self._node_dict(station))
+
+ for link in self.net.links:
+ if obj := self._link_dict(link):
+ links.append(obj)
+
+ return {
+ "nodes": nodes,
+ "links": links,
+ }
+
+ async def add_link(self, a, b, link_id, opts):
+ """UI Function: Add link"""
+ link = self.net.addLink(self.net[a], self.net[b], **self._conv_link_opts(opts))
+ info(f"Added link {link}\n")
+ return {
+ "id": link_id,
+ "mnId": str(link),
+ **opts,
+ }
+
+ async def del_link(self, a, b, mn_id):
+ """UI Function: Delete link"""
+ link = self._get_link(a, b, mn_id)
+ if link:
+ self.net.delLink(link)
+ return True
+
+ error(f"No link found to remove for {mn_id}\n")
+ return False
+
+ async def upd_link(self, a, b, mn_id, opts):
+ """UI Function: Update link"""
+ link = self._get_link(a, b, mn_id)
+ if link:
+ params = self._conv_link_opts(opts)
+ link.intf1.config(**params)
+ link.intf2.config(**params)
+ for p in params:
+ link.intf1.params[p] = params[p]
+ link.intf2.params[p] = params[p]
+ return True
+
+ info(f"No link to configure for {mn_id}\n")
+ return False
+
+ async def add_node(self, node_id, label):
+ """UI Function: Add node (host is added)"""
+ self.net.addHost(label)
+ return {
+ "id": node_id,
+ "label": label,
+ }
+
+ async def del_node(self, node_id):
+ """UI Function: Delete node"""
+ self.net.delNode(self.net[node_id])
+ info(f"Removed node {node_id}\n")
+ return True
+
+ async def set_node_pos(self, node_id, x, y):
+ """UI Function: Set node position"""
+ node = self.net[node_id]
+ if not node or not hasattr(node, "position"):
+ return False
+ x, y = x / 10, y / 10
+ z = node.position[2] if len(node.position) > 2 else 0
+ node.setPosition(f"{int(x)},{int(y)},{int(z)}")
+ info(f"Set position of {node_id} to {x},{y}\n")
+ return True
+
+ def _node_dict(self, node, switch=False):
+ val = {
+ "id": node.name,
+ "label": node.name,
+ }
+
+ if switch:
+ val["isSwitch"] = True
+
+ if hasattr(node, "position"):
+ # Mininet positions are in m, NDN-Play are much smaller
+ val["x"] = node.position[0] * 10
+ val["y"] = node.position[1] * 10
+
+ if hasattr(node, "params") and "params" in node.params:
+ p = node.params["params"]
+ if "color" in p:
+ val["color"] = p["color"]
+
+ return val
+
+ def _link_dict(self, link):
+ if isinstance(link.intf2, str):
+ if link.intf2 == "wifiAdhoc":
+ # TODO: visualize adhoc links
+ pass
+ return None
+
+ obj = {
+ "mnId": str(link),
+ "from": link.intf1.node.name,
+ "to": link.intf2.node.name,
+ }
+
+ if "delay" in link.intf1.params:
+ d1 = int(link.intf1.params["delay"][:-len("ms")])
+ d2 = int(link.intf2.params["delay"][:-len("ms")])
+ obj["latency"] = (d1 + d2) / 2
+
+ if "loss" in link.intf1.params:
+ l1 = link.intf1.params["loss"]
+ l2 = link.intf2.params["loss"]
+ obj["loss"] = (l1 + l2) / 2
+
+ return obj
+
+ def _get_link(self, a, b, mn_id) -> Link | None:
+ """Helper: get link between two nodes by name"""
+ for link in self.net.linksBetween(self.net[a], self.net[b]):
+ if str(link) == mn_id:
+ return link
+
+ return None
+
+ def _conv_link_opts(self, opts: dict):
+ """Helper: convert link options"""
+ params = {}
+ if "latency" in opts and opts["latency"] is not None and int(opts["latency"]) >= 0:
+ params["delay"] = str(int(opts["latency"])) + "ms"
+ if "loss" in opts and opts["loss"] is not None and float(opts["loss"]) >= 0:
+ params["loss"] = float(opts["loss"])
+ return params
diff --git a/minindn/minindn_play/server.py b/minindn/minindn_play/server.py
new file mode 100644
index 0000000..8a4bb85
--- /dev/null
+++ b/minindn/minindn_play/server.py
@@ -0,0 +1,63 @@
+import signal
+from threading import Thread
+
+from mininet.net import Mininet
+from minindn.minindn_play.monitor import LogMonitor
+from minindn.minindn_play.socket import PlaySocket
+from minindn.minindn_play.net.topo import TopoExecutor
+from minindn.minindn_play.net.state import StateExecutor
+from minindn.minindn_play.term.term import TermExecutor
+from minindn.minindn_play.shark.shark import SharkExecutor
+
+class PlayServer:
+ net: Mininet
+ repl: bool
+ cli: bool
+ monitors: list[LogMonitor] = []
+
+ def __init__(self, net: Mininet, **kwargs) -> None:
+ """
+ Start NDN Play GUI server.
+ If cli=True is specified (default), will block for the Mininet CLI.
+ """
+
+ self.net = net
+ self.repl = kwargs.get('repl', False)
+ self.cli = kwargs.get('cli', True)
+
+ self.socket = PlaySocket()
+ self.socket.add_executor(TopoExecutor(net))
+ self.socket.add_executor(StateExecutor(net))
+
+ self.shark_executor = SharkExecutor(net, self.socket)
+ self.socket.add_executor(self.shark_executor)
+
+ self.pty_executor = TermExecutor(net, self.socket)
+ self.socket.add_executor(self.pty_executor)
+
+ def start(self):
+ if self.repl:
+ Thread(target=self.pty_executor.start_repl).start()
+
+ # Start all monitors
+ for monitor in self.monitors:
+ monitor.start(self.socket)
+
+ # Blocks until Mininet CLI is closed
+ if self.cli:
+ self.hook_sigint()
+ self.pty_executor.start_cli()
+
+ # Stop all monitors
+ for monitor in self.monitors:
+ monitor.stop()
+
+ def hook_sigint(self):
+ def signal_handler(sig, frame):
+ print('SIGINT received, stopping Mininet...')
+ self.net.stop()
+ exit(127)
+ signal.signal(signal.SIGINT, signal_handler)
+
+ def add_monitor(self, monitor: LogMonitor):
+ self.monitors.append(monitor)
diff --git a/minindn/minindn_play/shark/__init__.py b/minindn/minindn_play/shark/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/minindn/minindn_play/shark/__init__.py
diff --git a/minindn/minindn_play/shark/shark.py b/minindn/minindn_play/shark/shark.py
new file mode 100644
index 0000000..3db0170
--- /dev/null
+++ b/minindn/minindn_play/shark/shark.py
@@ -0,0 +1,199 @@
+import ipaddress
+from pathlib import Path
+from threading import Thread
+
+import msgpack
+
+from mininet.net import Mininet
+from mininet.log import error, info
+from minindn.minindn_play.socket import PlaySocket
+from minindn.minindn_play.consts import Config, WSFunctions, WSKeys
+from minindn.util import is_valid_hostid, run_popen, run_popen_readline
+
+# TShark fields
+SHARK_FIELDS = [
+ "frame.number",
+ "frame.time_epoch",
+ "ndn.len",
+ "ndn.type",
+ "ndn.name",
+ "ip.src",
+ "ip.dst",
+ "ipv6.src",
+ "ipv6.dst",
+ # "ndn.bin", # binary data
+]
+SHARK_FIELDS_STR = " -Tfields -e " + " -e ".join(SHARK_FIELDS) + " -Y ndn.len"
+
+class SharkExecutor:
+ _ip_map: dict = None
+ _lua_script: str = None
+
+ def __init__(self, net: Mininet, socket: PlaySocket):
+ self.net = net
+ self.socket = socket
+
+ def _get_pcap_file(self, name):
+ return f"./{name}-interfaces.pcap"
+
+ def _get_lua(self):
+ if self._lua_script is not None:
+ return self._lua_script
+
+ lua_path = Path(__file__).parent.parent.absolute() / "ndn.lua"
+ if lua_path.exists():
+ luafile = str(lua_path)
+ self._lua_script = 'lua_script:' + luafile
+ return self._lua_script
+
+ luafile = '/usr/local/share/ndn-dissect-wireshark/ndn.lua'
+ if Path(luafile).exists():
+ self._lua_script = 'lua_script:' + luafile
+ return self._lua_script
+
+ raise RuntimeError('NDN Wireshark dissector not found (ndn-tools/ndn.lua)')
+
+ def _convert_to_full_ip_address(self, ip_address: str):
+ try:
+ ip_obj = ipaddress.ip_address(ip_address)
+ except ValueError:
+ return ip_address
+
+ if isinstance(ip_obj, ipaddress.IPv6Address):
+ return str(ip_obj)
+ else:
+ return ip_address
+
+ def _get_hostname_from_ip(self, ip):
+ #TODO: This is probably overcomplicated given the Mininet API
+ """
+ Get the hostname of a node given its IP address.
+ node: the node to check on (e.g. for local addresses)
+ This function runs once and caches the result, since we need to visit
+ each node to get its list of IP addresses.
+ """
+ if self._ip_map is None:
+ # Map of IP address to hostname
+ self._ip_map = {}
+
+ # Extract all addresses including localhost
+ cmd = "ip addr show | grep -E 'inet' | awk '{print $2}' | cut -d '/' -f1"
+
+ hosts = self.net.hosts
+ hosts += getattr(self.net, "stations", []) # mininet-wifi
+ for host in hosts:
+ for ip in host.cmd(cmd).splitlines():
+ if full_ip := self._convert_to_full_ip_address(ip):
+ self._ip_map[full_ip] = host.name
+ info(f"Created IP map for PCAP (will be cached): {self._ip_map}\n")
+
+ if full_ip := self._convert_to_full_ip_address(ip):
+ return self._ip_map.get(full_ip, ip)
+ return ip
+
+ def _send_pcap_chunks(self, node_id: str, known_frame: int, include_wire: bool):
+ """
+ Get, process and send chunks of pcap to UI
+ Blocking; should run in its own thread.
+ """
+
+ node = self.net[node_id]
+ file = self._get_pcap_file(node_id)
+
+ # We don't want to load and process the entire pcap file
+ # every time the user wants to recheck. Instead, use editcap
+ # to cut the part the user knows
+
+ # Look back by upto 12 frames in case the last packet was fragmented
+ known_frame = max(1, known_frame - 12)
+
+ # Get everything after known frame
+ editcap_cmd = f"editcap -r {file} /dev/stdout {known_frame}-0"
+
+ # Shark using NDN dissector
+ extra_fields = "-e ndn.bin " if include_wire else ""
+ list_cmd = f"tshark {SHARK_FIELDS_STR} {extra_fields} -r /dev/stdin -X '{self._get_lua()}'"
+
+ # Pipe editcap to tshark
+ piped_cmd = ["bash", "-c", f"{editcap_cmd} | {list_cmd}"]
+
+ # Collected packets (one chunk)
+ packets = []
+
+ def _send_packets(last=False):
+ """Send the current chunk to the UI (including empty)"""
+ res = {
+ "id": node_id,
+ "packets": packets,
+ }
+ if last:
+ res["last"] = True
+
+ self.socket.send_all(msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: WSFunctions.GET_PCAP,
+ WSKeys.MSG_KEY_RESULT: res,
+ }))
+
+ # Iterate each line of output
+ for line in run_popen_readline(node, piped_cmd):
+ parts: list[str] = line.decode("utf-8").strip("\n").split("\t")
+
+ if len(parts) < 8:
+ error(f"Invalid line in pcap: {parts}\n")
+ continue
+
+ is_ipv6 = parts[7] != "" and parts[8] != ""
+ from_ip = parts[7] if is_ipv6 else parts[5]
+ to_ip = parts[8] if is_ipv6 else parts[6]
+
+ packets.append([
+ int(parts[0]) + known_frame - 1, # frame number
+ float(parts[1]) * 1000, # timestamp
+ int(parts[2]), # length
+ str(parts[3]), # type
+ str(parts[4]), # NDN name
+ str(self._get_hostname_from_ip(from_ip)), # from
+ str(self._get_hostname_from_ip(to_ip)), # to
+ bytes.fromhex(parts[9]) if include_wire else 0, # packet content
+ ])
+
+ if len(packets) >= Config.PCAP_CHUNK_SIZE:
+ _send_packets()
+ packets = []
+
+ # Send the last chunk
+ _send_packets(last=True)
+
+ async def get_pcap(self, node_id: str, known_frame: int, include_wire=False):
+ """UI Function: Get list of packets for one node"""
+ if not is_valid_hostid(self.net, node_id):
+ return
+
+ # Run processing in separate thread
+ t = Thread(target=self._send_pcap_chunks, args=(node_id, known_frame, include_wire), daemon=True)
+ t.start()
+
+ async def get_pcap_wire(self, node_id, frame):
+ """UI Function: Get wire of one packet"""
+ if not is_valid_hostid(self.net, node_id):
+ return
+ file = self._get_pcap_file(node_id)
+
+ # chop the file to the frame
+ # include the last 12 frames in case of fragmentation
+ start_frame = max(1, frame - 12)
+ new_frame = frame - start_frame + 1
+
+ try:
+ # Get last 12 frames
+ editcap_cmd = f"editcap -r {file} /dev/stdout {start_frame}-{frame}"
+
+ # Filter for this packet only
+ wire_cmd = f"tshark -r - -e ndn.bin -Tfields -X {self._get_lua()} frame.number == {new_frame}"
+
+ # Pipe editcap to tshark
+ piped_cmd = ["bash", "-c", f"{editcap_cmd} | {wire_cmd}"]
+ hex_output = run_popen(self.net[node_id], piped_cmd).decode("utf-8").strip()
+ return bytes.fromhex(hex_output)
+ except Exception:
+ error(f"Error getting pcap wire for {node_id}")
diff --git a/minindn/minindn_play/socket.py b/minindn/minindn_play/socket.py
new file mode 100644
index 0000000..69e054a
--- /dev/null
+++ b/minindn/minindn_play/socket.py
@@ -0,0 +1,126 @@
+import os
+import asyncio
+import urllib
+import msgpack
+import secrets
+import time
+import webbrowser
+from threading import Thread
+
+import websockets
+
+from mininet.log import error
+from minindn.minindn_play.consts import Config, WSKeys
+
+class PlaySocket:
+ conn_list: dict = {}
+ executors: list = []
+ AUTH_TOKEN: str | None = None
+
+ def __init__(self):
+ """Initialize the PlaySocket.
+ This starts the background loop and creates the websocket server.
+ Calls to UI async functions are made from this class.
+ """
+ self._set_auth_token()
+ self.loop = asyncio.new_event_loop()
+ Thread(target=self.loop.run_forever, args=(), daemon=True).start()
+ self.loop.call_soon_threadsafe(self.loop.create_task, self._run())
+
+ def add_executor(self, executor):
+ self.executors.append(executor)
+
+ def send(self, websocket, msg):
+ """Send message to UI threadsafe"""
+ if websocket.state == websockets.protocol.State.OPEN:
+ self.loop.call_soon_threadsafe(self.loop.create_task, websocket.send(msg))
+
+ def send_all(self, msg):
+ """Send message to all UI threadsafe"""
+ for websocket in self.conn_list.copy():
+ try:
+ self.send(websocket, msg)
+ except Exception as err:
+ error(f'Failed to send to {websocket.remote_address} with error {err}\n')
+ del self.conn_list[websocket]
+
+ def _set_auth_token(self):
+ """Create auth token if it doesn't exist."""
+ # Perist auth token so you don't need to refresh every time
+ # Check if AUTH_FILE was modified less than a day ago
+ if os.path.exists(Config.AUTH_FILE) and time.time() - os.path.getmtime(Config.AUTH_FILE) < 24 * 60 * 60:
+ with open(Config.AUTH_FILE, 'r') as f:
+ self.AUTH_TOKEN = f.read().strip()
+
+ if not self.AUTH_TOKEN or len(self.AUTH_TOKEN) < 10:
+ self.AUTH_TOKEN = secrets.token_hex(16)
+ with open(Config.AUTH_FILE, 'w') as f:
+ f.write(self.AUTH_TOKEN)
+
+ async def _run(self) -> None:
+ """Runs in separate thread from main"""
+ # Show the URL to the user
+ ws_url = f"ws://{Config.SERVER_HOST_URL}:{Config.SERVER_PORT}"
+ ws_url_q = urllib.parse.quote(ws_url.encode('utf8'))
+ full_url = f"{Config.PLAY_URL}/?minindn={ws_url_q}&auth={self.AUTH_TOKEN}"
+ print(f"Opened NDN Play GUI at {full_url}")
+ webbrowser.open(full_url, 1)
+
+ # Start server
+ asyncio.set_event_loop(self.loop)
+
+ server = await websockets.serve(self._serve, Config.SERVER_HOST, Config.SERVER_PORT)
+ await server.serve_forever()
+
+ async def _serve(self, websocket):
+ """Handle websocket connection"""
+
+ try:
+ path = websocket.request.path
+ auth = urllib.parse.parse_qs(urllib.parse.urlparse(path).query)['auth'][0]
+ if auth != self.AUTH_TOKEN:
+ raise Exception("Invalid auth token")
+ except Exception:
+ print(f"Rejected connection from {websocket.remote_address}")
+ await websocket.close()
+ return
+
+ print(f"Accepted connection from {websocket.remote_address}")
+ self.conn_list[websocket] = 1
+ while True:
+ try:
+ fcall = msgpack.loads(await websocket.recv())
+ loop = asyncio.get_event_loop()
+ loop.create_task(self._call_fun(websocket, fcall))
+ except websockets.exceptions.ConnectionClosedOK:
+ print(f"Closed connection gracefully from {websocket.remote_address}")
+ break
+ except websockets.exceptions.ConnectionClosedError:
+ print(f"Closed connection with error from {websocket.remote_address}")
+ break
+
+ del self.conn_list[websocket]
+
+ async def _call_fun(self, websocket, fcall):
+ """Call function and return result to UI asynchronously"""
+
+ # Get function from any executor
+ fun = None
+ for executor in self.executors:
+ fun = getattr(executor, fcall[WSKeys.MSG_KEY_FUN], None)
+ if fun is not None:
+ break
+
+ # Function not found
+ if fun is None:
+ error(f"Function {fcall[WSKeys.MSG_KEY_FUN]} not found\n")
+ return # function not found
+
+ # Call function
+ res = await fun(*fcall[WSKeys.MSG_KEY_ARGS])
+ if res is not None:
+ pack = msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: fcall[WSKeys.MSG_KEY_FUN],
+ WSKeys.MSG_KEY_RESULT: res,
+ })
+ await websocket.send(pack)
diff --git a/minindn/minindn_play/term/__init__.py b/minindn/minindn_play/term/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/minindn/minindn_play/term/__init__.py
diff --git a/minindn/minindn_play/term/cbuf.py b/minindn/minindn_play/term/cbuf.py
new file mode 100644
index 0000000..c9a0ddc
--- /dev/null
+++ b/minindn/minindn_play/term/cbuf.py
@@ -0,0 +1,27 @@
+class CircularByteBuffer:
+ """Byte buffer handmade for terminal output"""
+ size: int = 0
+ start: int = 0
+ buffer: bytearray
+
+ def __init__(self, size: int):
+ self.buffer = bytearray(size)
+
+ def writeByte(self, byte: int):
+ if self.size < len(self.buffer):
+ self.buffer[self.size] = byte
+ self.size += 1
+ return
+
+ self.buffer[self.start % self.size] = byte
+ self.start = (self.start + 1) % self.size
+
+ def write(self, data: bytes):
+ for byte in data:
+ self.writeByte(byte)
+
+ def read(self):
+ if self.size <= len(self.buffer):
+ return self.buffer[self.start:self.start + self.size]
+ else:
+ return self.buffer[self.start:] + self.buffer[:self.start]
\ No newline at end of file
diff --git a/minindn/minindn_play/term/pty_manager.py b/minindn/minindn_play/term/pty_manager.py
new file mode 100644
index 0000000..f59b46f
--- /dev/null
+++ b/minindn/minindn_play/term/pty_manager.py
@@ -0,0 +1,125 @@
+import fcntl, struct, termios, os
+import subprocess
+import select
+import pty
+from io import BufferedWriter
+from threading import Thread
+from typing import TYPE_CHECKING
+
+import msgpack
+
+from minindn.minindn_play.term.cbuf import CircularByteBuffer
+from minindn.minindn_play.consts import WSKeys, WSFunctions
+
+if TYPE_CHECKING:
+ from minindn.minindn_play.term.term import TermExecutor
+
+
+class Pty:
+ id: str
+ name: str
+ master: int
+ slave: int
+ stdin: BufferedWriter
+ buffer: CircularByteBuffer
+ executor: 'TermExecutor'
+ process: subprocess.Popen | None = None
+
+ def __init__(self, executor, pty_id: str, name: str):
+ """
+ Initialize a new pty
+ Creates a new Python PTY instance and stores the slave and master file descriptors
+ """
+ self.master, self.slave = pty.openpty()
+ self.buffer = CircularByteBuffer(16000)
+ self.executor = executor
+ self.id = pty_id
+ self.name = name
+ self.stdin = os.fdopen(self.master, 'wb')
+ executor.pty_list[self.id] = self
+
+ def cleanup(self):
+ """Cleanup the pty"""
+
+ self.executor.socket.send_all(msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: WSFunctions.CLOSE_TERMINAL,
+ WSKeys.MSG_KEY_ID: self.id,
+ }))
+
+ try:
+ os.close(self.master)
+ os.close(self.slave)
+ except OSError:
+ pass
+
+ if self.id in self.executor.pty_list:
+ del self.executor.pty_list[self.id]
+
+class PtyManager:
+ ptys = {}
+ thread: Thread
+ executor: 'TermExecutor'
+ poller = select.poll()
+
+ def __init__(self, executor):
+ """Initialize the pty manager"""
+ self.executor = executor
+ self.thread = Thread(target=self._ui_thread, args=(), daemon=True)
+ self.thread.start()
+
+ def register(self, target_pty: Pty):
+ """Register a new pty with the manager"""
+ self.ptys[target_pty.master] = target_pty
+ self.poller.register(target_pty.master, select.POLLIN)
+
+ def unregister(self, target_pty: Pty):
+ """Unregister a pty from the manager"""
+ self.poller.unregister(target_pty.master)
+ target_pty.cleanup()
+ if target_pty.master in self.ptys:
+ del self.ptys[target_pty.master]
+
+ def _ui_thread(self):
+ """Thread that handles UI output"""
+ while True:
+ self._check_procs()
+ self._poll_fds()
+
+ def _check_procs(self):
+ """Check if any processes have exited and unregister them"""
+ for target_pty in list(self.ptys.values()):
+ if target_pty.process is not None and target_pty.process.poll() is not None:
+ self.unregister(target_pty)
+ continue
+
+ def _poll_fds(self):
+ """Poll all registered file descriptors"""
+ for (fd, status) in self.poller.poll(250):
+ if fd not in self.ptys:
+ self.poller.unregister(fd)
+ continue
+ target_pty = self.ptys[fd]
+
+ # Check if poller is closed
+ if status == select.POLLHUP:
+ self.unregister(target_pty)
+ continue
+
+ # Find the number of bytes available to read
+ bytes_available = fcntl.ioctl(target_pty.master, termios.FIONREAD, struct.pack('I', 0))
+ bytes_available = struct.unpack('I', bytes_available)[0]
+ bytes_to_read = min(bytes_available, 4096)
+
+ # This should never really happen
+ if bytes_to_read == 0:
+ continue
+
+ # Read everything available and send to UI
+ try:
+ out_bytes = os.read(target_pty.master, bytes_to_read)
+ self.executor.send_pty_out(out_bytes, target_pty.id)
+ target_pty.buffer.write(out_bytes)
+ except Exception as e:
+ print(e)
+ self.unregister(target_pty)
+ continue
diff --git a/minindn/minindn_play/term/term.py b/minindn/minindn_play/term/term.py
new file mode 100644
index 0000000..9faad8c
--- /dev/null
+++ b/minindn/minindn_play/term/term.py
@@ -0,0 +1,148 @@
+import os
+import logging
+import msgpack
+import fcntl
+import struct
+import termios
+import random
+import shutil
+
+from contextlib import redirect_stdout, redirect_stderr
+from code import InteractiveConsole
+
+from mininet.net import Mininet
+from mininet.cli import CLI
+from minindn.util import is_valid_hostid, host_home, getPopen
+from minindn.minindn_play.consts import WSKeys, WSFunctions
+from minindn.minindn_play.socket import PlaySocket
+from minindn.minindn_play.term.pty_manager import Pty, PtyManager
+
+class TermExecutor:
+ pty_list: dict[str, Pty] = {}
+ pty_manager: PtyManager
+
+ def __init__(self, net: Mininet, socket: PlaySocket):
+ self.net = net
+ self.socket = socket
+ self.pty_manager = PtyManager(self)
+
+ def start_cli(self):
+ """UI Function: Start CLI"""
+ # Send logs to UI
+ class WsCliHandler():
+ def __init__(self, parent: TermExecutor):
+ self.parent = parent
+
+ def write(self, msg: str):
+ if "cli" not in self.parent.pty_list:
+ return
+ mb = msg.encode("utf-8")
+ self.parent.send_pty_out(mb, "cli")
+ self.parent.pty_list["cli"].buffer.write(mb)
+
+ lg = logging.getLogger("mininet")
+ handler = logging.StreamHandler(WsCliHandler(self))
+ handler.terminator = ""
+ lg.addHandler(handler)
+
+ # Create pty for cli
+ cpty = Pty(self, "cli", "Mininet CLI")
+ self.pty_manager.register(cpty)
+
+ # Start cli
+ CLI.use_rawinput = False
+ CLI(self.net, stdin=os.fdopen(cpty.slave, "r"), stdout=os.fdopen(cpty.slave, "w"))
+
+ def start_repl(self):
+ """UI Function: Start REPL"""
+
+ cpty = Pty(self, "repl", "Python REPL")
+ self.pty_manager.register(cpty)
+
+ try:
+ with os.fdopen(cpty.slave, "w") as fout, os.fdopen(cpty.slave, "r") as fin, redirect_stdout(fout), redirect_stderr(fout):
+ def raw_input(prompt="") -> str:
+ print(prompt, end="", flush=True)
+ return fin.readline()
+ repl = InteractiveConsole({
+ "net": self.net,
+ })
+ repl.raw_input = raw_input
+ repl.interact(None, None)
+ except OSError:
+ pass
+
+ async def open_all_ptys(self):
+ """UI Function: Open all ptys currently active"""
+ for key in self.pty_list:
+ cpty = self.pty_list[key]
+ self.socket.send_all(msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: WSFunctions.OPEN_TERMINAL,
+ WSKeys.MSG_KEY_RESULT: self._open_term_response(cpty)
+ }))
+
+ async def open_term(self, nodeId: str):
+ """UI Function: Open new bash terminal"""
+ if not is_valid_hostid(self.net, nodeId):
+ return
+
+ # Get directory of node
+ node_home = host_home(self.net[nodeId])
+
+ # Copy .bashrc to node
+ if node_home is not None:
+ path = os.path.expanduser("~/.bashrc")
+ if os.path.isfile(path):
+ # Do this copy every time to make sure the file is up to date
+ target = node_home + "/.bashrc"
+ shutil.copy(path, target)
+
+ # Append extra commands
+ with open(target, "a") as f:
+ # Shell prompt
+ f.write("\nexport PS1='\\[\\033[01;32m\\]\\u@{}\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$ '\n".format(nodeId))
+
+ # Create pty
+ pty_id = nodeId + str(int(random.random() * 100000))
+ pty_name = f"bash [{nodeId}]"
+ cpty = Pty(self, pty_id, pty_name)
+ self.pty_manager.register(cpty)
+
+ # Start bash
+ cpty.process = getPopen(self.net[nodeId], "bash --noprofile -i", stdin=cpty.slave, stdout=cpty.slave, stderr=cpty.slave)
+
+ return self._open_term_response(cpty)
+
+ async def pty_in(self, pty_id: str, msg: msgpack.ExtType):
+ """UI Function: Send input to pty"""
+ if pty_id not in self.pty_list:
+ return
+
+ if pty_id == "cli" and msg.data == b"\x03":
+ # interrupt
+ for node in self.net.hosts:
+ if node.waiting:
+ node.sendInt()
+
+ self.pty_list[pty_id].stdin.write(msg.data)
+ self.pty_list[pty_id].stdin.flush()
+
+ async def pty_resize(self, pty_id, rows, cols):
+ """UI Function: Resize pty"""
+ if pty_id not in self.pty_list:
+ return
+
+ winsize = struct.pack("HHHH", rows, cols, 0, 0)
+ fcntl.ioctl(self.pty_list[pty_id].master, termios.TIOCSWINSZ, winsize)
+
+ def send_pty_out(self, msg: bytes, pty_id: str):
+ """Send output to UI"""
+ self.socket.send_all(msgpack.dumps({
+ WSKeys.MSG_KEY_FUN: WSFunctions.PTY_OUT,
+ WSKeys.MSG_KEY_ID: pty_id,
+ WSKeys.MSG_KEY_RESULT: msg,
+ }))
+
+ def _open_term_response(self, cpty: Pty):
+ """Return response for open terminal"""
+ return { "id": cpty.id, "name": cpty.name, "buf": cpty.buffer.read() }
diff --git a/minindn/util.py b/minindn/util.py
index 0b90a6d..8f80e2a 100644
--- a/minindn/util.py
+++ b/minindn/util.py
@@ -21,16 +21,18 @@
# along with Mini-NDN, e.g., in COPYING.md file.
# If not, see <http://www.gnu.org/licenses/>.
+import re
import sys
from os.path import isfile
-from subprocess import call
+from subprocess import call, PIPE
+
from six.moves.urllib.parse import quote
from mininet.cli import CLI
-
from mininet.log import error
+from mininet.node import Host
+from mininet.net import Mininet
-import re
sshbase = ['ssh', '-q', '-t', '-i/home/mininet/.ssh/id_rsa']
scpbase = ['scp', '-i', '/home/mininet/.ssh/id_rsa']
@@ -57,19 +59,20 @@
rcmd = scpbase + tmp
call(rcmd, stdout=devnull, stderr=devnull)
-def copyExistentFile(node, fileList, destination):
+def copyExistentFile(host, fileList, destination):
for f in fileList:
if isfile(f):
- node.cmd('cp {} {}'.format(f, destination))
+ host.cmd('cp {} {}'.format(f, destination))
break
if not isfile(destination):
fileName = destination.split('/')[-1]
raise IOError('{} not found in expected directory.'.format(fileName))
-def popenGetEnv(node, envDict=None):
+def popenGetEnv(host, envDict=None):
+ '''Helper method to set environment variables for Popen on nodes'''
env = {}
- homeDir = node.params['params']['homeDir']
- printenv = node.popen('printenv'.split(), cwd=homeDir).communicate()[0].decode('utf-8')
+ homeDir = host.params['params']['homeDir']
+ printenv = host.popen('printenv'.split(), cwd=homeDir).communicate()[0].decode('utf-8')
for var in printenv.split('\n'):
if var == '':
break
@@ -84,6 +87,7 @@
return env
def getPopen(host, cmd, envDict=None, **params):
+ '''Return Popen object for process on node with correctly set environmental variables'''
return host.popen(cmd, cwd=host.params['params']['homeDir'],
env=popenGetEnv(host, envDict), **params)
@@ -105,13 +109,46 @@
try:
from mn_wifi.cli import CLI as CLI_wifi
-
+ from mn_wifi.node import Station as mn_wifi_station
+ HAS_WIFI = True
class MiniNDNWifiCLI(CLI_wifi):
prompt = 'mini-ndn-wifi> '
def __init__(self, mininet, stdin=sys.stdin, script=None):
CLI_wifi.__init__(self, mininet, stdin, script)
except ImportError:
+ HAS_WIFI = False
class MiniNDNWifiCLI:
def __init__(self):
raise ImportError('Mininet-WiFi is not installed')
+
+def is_valid_hostid(net: Mininet, host_id: str):
+ """Check if a hostId is a host"""
+ if host_id not in net:
+ return False
+
+ if not isinstance(net[host_id], Host) and \
+ (HAS_WIFI and not isinstance(net[host_id], mn_wifi_station)):
+ return False
+
+ return True
+
+def run_popen(host, cmd):
+ """Helper to run command on node asynchronously and get output (blocking)"""
+ process = getPopen(host, cmd, stdout=PIPE)
+ return process.communicate()[0]
+
+def run_popen_readline(host, cmd):
+ """Helper to run command on node asynchronously and get output line by line (blocking)"""
+ process = getPopen(host, cmd, stdout=PIPE)
+ while True:
+ line: bytes = process.stdout.readline()
+ if not line:
+ break
+ yield line
+
+def host_home(host) -> str | None:
+ """Get home directory for host"""
+ if 'params' not in host.params or 'homeDir' not in host.params['params']:
+ return None
+ return host.params['params']['homeDir']