blob: 69e054a0453512c79d38161e55ef82c3270767b7 [file] [log] [blame]
import os
import asyncio
import urllib
import msgpack
import secrets
import time
import webbrowser
from threading import Thread
import websockets
from mininet.log import error
from minindn.minindn_play.consts import Config, WSKeys
class PlaySocket:
conn_list: dict = {}
executors: list = []
AUTH_TOKEN: str | None = None
def __init__(self):
"""Initialize the PlaySocket.
This starts the background loop and creates the websocket server.
Calls to UI async functions are made from this class.
"""
self._set_auth_token()
self.loop = asyncio.new_event_loop()
Thread(target=self.loop.run_forever, args=(), daemon=True).start()
self.loop.call_soon_threadsafe(self.loop.create_task, self._run())
def add_executor(self, executor):
self.executors.append(executor)
def send(self, websocket, msg):
"""Send message to UI threadsafe"""
if websocket.state == websockets.protocol.State.OPEN:
self.loop.call_soon_threadsafe(self.loop.create_task, websocket.send(msg))
def send_all(self, msg):
"""Send message to all UI threadsafe"""
for websocket in self.conn_list.copy():
try:
self.send(websocket, msg)
except Exception as err:
error(f'Failed to send to {websocket.remote_address} with error {err}\n')
del self.conn_list[websocket]
def _set_auth_token(self):
"""Create auth token if it doesn't exist."""
# Perist auth token so you don't need to refresh every time
# Check if AUTH_FILE was modified less than a day ago
if os.path.exists(Config.AUTH_FILE) and time.time() - os.path.getmtime(Config.AUTH_FILE) < 24 * 60 * 60:
with open(Config.AUTH_FILE, 'r') as f:
self.AUTH_TOKEN = f.read().strip()
if not self.AUTH_TOKEN or len(self.AUTH_TOKEN) < 10:
self.AUTH_TOKEN = secrets.token_hex(16)
with open(Config.AUTH_FILE, 'w') as f:
f.write(self.AUTH_TOKEN)
async def _run(self) -> None:
"""Runs in separate thread from main"""
# Show the URL to the user
ws_url = f"ws://{Config.SERVER_HOST_URL}:{Config.SERVER_PORT}"
ws_url_q = urllib.parse.quote(ws_url.encode('utf8'))
full_url = f"{Config.PLAY_URL}/?minindn={ws_url_q}&auth={self.AUTH_TOKEN}"
print(f"Opened NDN Play GUI at {full_url}")
webbrowser.open(full_url, 1)
# Start server
asyncio.set_event_loop(self.loop)
server = await websockets.serve(self._serve, Config.SERVER_HOST, Config.SERVER_PORT)
await server.serve_forever()
async def _serve(self, websocket):
"""Handle websocket connection"""
try:
path = websocket.request.path
auth = urllib.parse.parse_qs(urllib.parse.urlparse(path).query)['auth'][0]
if auth != self.AUTH_TOKEN:
raise Exception("Invalid auth token")
except Exception:
print(f"Rejected connection from {websocket.remote_address}")
await websocket.close()
return
print(f"Accepted connection from {websocket.remote_address}")
self.conn_list[websocket] = 1
while True:
try:
fcall = msgpack.loads(await websocket.recv())
loop = asyncio.get_event_loop()
loop.create_task(self._call_fun(websocket, fcall))
except websockets.exceptions.ConnectionClosedOK:
print(f"Closed connection gracefully from {websocket.remote_address}")
break
except websockets.exceptions.ConnectionClosedError:
print(f"Closed connection with error from {websocket.remote_address}")
break
del self.conn_list[websocket]
async def _call_fun(self, websocket, fcall):
"""Call function and return result to UI asynchronously"""
# Get function from any executor
fun = None
for executor in self.executors:
fun = getattr(executor, fcall[WSKeys.MSG_KEY_FUN], None)
if fun is not None:
break
# Function not found
if fun is None:
error(f"Function {fcall[WSKeys.MSG_KEY_FUN]} not found\n")
return # function not found
# Call function
res = await fun(*fcall[WSKeys.MSG_KEY_ARGS])
if res is not None:
pack = msgpack.dumps({
WSKeys.MSG_KEY_FUN: fcall[WSKeys.MSG_KEY_FUN],
WSKeys.MSG_KEY_RESULT: res,
})
await websocket.send(pack)