type hint some files

This commit is contained in:
Huy Pham 2020-01-10 15:32:16 -08:00
parent 0e74212c43
commit a8a0255624
13 changed files with 643 additions and 62 deletions

View file

@ -1,7 +1,9 @@
from tkinter import messagebox
import grpc
def show_grpc_error(e):
def show_grpc_error(e: grpc.RpcError):
title = [x.capitalize() for x in e.code().name.lower().split("_")]
title = " ".join(title)
title = f"GRPC {title}"

View file

@ -1,6 +1,7 @@
import logging
import tkinter as tk
from tkinter.font import Font
from typing import Tuple
from core.gui import themes
from core.gui.dialogs.linkconfig import LinkConfigurationDialog
@ -13,7 +14,7 @@ EDGE_COLOR = "#ff0000"
class CanvasWirelessEdge:
def __init__(self, token, position, src, dst, canvas):
def __init__(self, token: Tuple[int, int], position, src: int, dst: int, canvas):
self.token = token
self.src = src
self.dst = dst
@ -31,7 +32,7 @@ class CanvasEdge:
Canvas edge class
"""
def __init__(self, x1, y1, x2, y2, src, canvas):
def __init__(self, x1: int, y1: int, x2: int, y2: int, src: int, canvas):
"""
Create an instance of canvas edge object
:param int x1: source x-coord

View file

@ -9,7 +9,7 @@ class Images:
images = {}
@classmethod
def create(cls, file_path, width, height=None):
def create(cls, file_path: str, width: int, height: int = None):
if height is None:
height = width
image = Image.open(file_path)
@ -22,12 +22,12 @@ class Images:
cls.images[image.stem] = str(image)
@classmethod
def get(cls, image_enum, width, height=None):
def get(cls, image_enum, width: int, height: int = None):
file_path = cls.images[image_enum.value]
return cls.create(file_path, width, height)
@classmethod
def get_custom(cls, name, width, height=None):
def get_custom(cls, name: str, width: int, height: int = None):
file_path = cls.images[name]
return cls.create(file_path, width, height)

View file

@ -3,8 +3,10 @@ The actions taken when each menubar option is clicked
"""
import logging
import tkinter as tk
import webbrowser
from tkinter import filedialog, messagebox
from typing import Optional
from core.gui.appconfig import XMLS_PATH
from core.gui.dialogs.about import AboutDialog
@ -30,14 +32,14 @@ class MenuAction:
self.app = app
self.canvas = app.canvas
def cleanup_old_session(self, quitapp=False):
def cleanup_old_session(self, quitapp: bool = False):
logging.info("cleaning up old session")
self.app.core.stop_session()
self.app.core.delete_session()
# if quitapp:
# self.app.quit()
def prompt_save_running_session(self, quitapp=False):
def prompt_save_running_session(self, quitapp: bool = False):
"""
Prompt use to stop running session before application is closed
@ -56,7 +58,7 @@ class MenuAction:
elif quitapp:
self.app.quit()
def on_quit(self, event=None):
def on_quit(self, event: Optional[tk.Event] = None):
"""
Prompt user whether so save running session, and then close the application
@ -64,7 +66,7 @@ class MenuAction:
"""
self.prompt_save_running_session(quitapp=True)
def file_save_as_xml(self, event=None):
def file_save_as_xml(self, event: Optional[tk.Event] = None):
logging.info("menuaction.py file_save_as_xml()")
file_path = filedialog.asksaveasfilename(
initialdir=str(XMLS_PATH),
@ -75,7 +77,7 @@ class MenuAction:
if file_path:
self.app.core.save_xml(file_path)
def file_open_xml(self, event=None):
def file_open_xml(self, event: Optional[tk.Event] = None):
logging.info("menuaction.py file_open_xml()")
file_path = filedialog.askopenfilename(
initialdir=str(XMLS_PATH),
@ -141,11 +143,11 @@ class MenuAction:
else:
self.app.core.cancel_throughputs()
def copy(self, event=None):
def copy(self, event: Optional[tk.Event] = None):
logging.debug("copy")
self.app.canvas.copy()
def paste(self, event=None):
def paste(self, event: Optional[tk.Event] = None):
logging.debug("paste")
self.app.canvas.paste()

View file

@ -149,7 +149,7 @@ class Menubar(tk.Menu):
view_menu.add_command(label="Zoom out", accelerator="-", state=tk.DISABLED)
self.add_cascade(label="View", menu=view_menu)
def create_show_menu(self, view_menu):
def create_show_menu(self, view_menu: tk.Menu):
"""
Create the menu items in View/Show
@ -169,7 +169,7 @@ class Menubar(tk.Menu):
menu.add_command(label="API Messages", state=tk.DISABLED)
view_menu.add_cascade(label="Show", menu=menu)
def create_experimental_menu(self, tools_menu):
def create_experimental_menu(self, tools_menu: tk.Menu):
"""
Create experimental menu item and the sub menu items inside
@ -182,7 +182,7 @@ class Menubar(tk.Menu):
menu.add_command(label="Topology partitioning...", state=tk.DISABLED)
tools_menu.add_cascade(label="Experimental", menu=menu)
def create_random_menu(self, topology_generator_menu):
def create_random_menu(self, topology_generator_menu: tk.Menu):
"""
Create random menu item and the sub menu items inside
@ -197,7 +197,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Random", menu=menu)
def create_grid_menu(self, topology_generator_menu):
def create_grid_menu(self, topology_generator_menu: tk.Menu):
"""
Create grid menu item and the sub menu items inside
@ -212,7 +212,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Grid", menu=menu)
def create_connected_grid_menu(self, topology_generator_menu):
def create_connected_grid_menu(self, topology_generator_menu: tk.Menu):
"""
Create connected grid menu items and the sub menu items inside
@ -229,7 +229,7 @@ class Menubar(tk.Menu):
menu.add_cascade(label=label, menu=submenu)
topology_generator_menu.add_cascade(label="Connected Grid", menu=menu)
def create_chain_menu(self, topology_generator_menu):
def create_chain_menu(self, topology_generator_menu: tk.Menu):
"""
Create chain menu item and the sub menu items inside
@ -244,7 +244,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Chain", menu=menu)
def create_star_menu(self, topology_generator_menu):
def create_star_menu(self, topology_generator_menu: tk.Menu):
"""
Create star menu item and the sub menu items inside
@ -257,7 +257,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Star", menu=menu)
def create_cycle_menu(self, topology_generator_menu):
def create_cycle_menu(self, topology_generator_menu: tk.Menu):
"""
Create cycle menu item and the sub items inside
@ -270,7 +270,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Cycle", menu=menu)
def create_wheel_menu(self, topology_generator_menu):
def create_wheel_menu(self, topology_generator_menu: tk.Menu):
"""
Create wheel menu item and the sub menu items inside
@ -283,7 +283,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Wheel", menu=menu)
def create_cube_menu(self, topology_generator_menu):
def create_cube_menu(self, topology_generator_menu: tk.Menu):
"""
Create cube menu item and the sub menu items inside
@ -296,7 +296,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Cube", menu=menu)
def create_clique_menu(self, topology_generator_menu):
def create_clique_menu(self, topology_generator_menu: tk.Menu):
"""
Create clique menu item and the sub menu items inside
@ -309,7 +309,7 @@ class Menubar(tk.Menu):
menu.add_command(label=label, state=tk.DISABLED)
topology_generator_menu.add_cascade(label="Clique", menu=menu)
def create_bipartite_menu(self, topology_generator_menu):
def create_bipartite_menu(self, topology_generator_menu: tk.Menu):
"""
Create bipartite menu item and the sub menu items inside
@ -328,7 +328,7 @@ class Menubar(tk.Menu):
temp = temp - 1
topology_generator_menu.add_cascade(label="Bipartite", menu=menu)
def create_topology_generator_menu(self, tools_menu):
def create_topology_generator_menu(self, tools_menu: tk.Menu):
"""
Create topology menu item and its sub menu items
@ -371,7 +371,7 @@ class Menubar(tk.Menu):
menu.add_command(label="Debugger...", state=tk.DISABLED)
self.add_cascade(label="Tools", menu=menu)
def create_observer_widgets_menu(self, widget_menu):
def create_observer_widgets_menu(self, widget_menu: tk.Menu):
"""
Create observer widget menu item and create the sub menu items inside
@ -409,7 +409,7 @@ class Menubar(tk.Menu):
)
widget_menu.add_cascade(label="Observer Widgets", menu=menu)
def create_adjacency_menu(self, widget_menu):
def create_adjacency_menu(self, widget_menu: tk.Menu):
"""
Create adjacency menu item and the sub menu items inside

View file

@ -68,9 +68,5 @@ class StatusBar(ttk.Frame):
dialog = AlertsDialog(self.app, self.app)
dialog.show()
def set_status(self, message):
def set_status(self, message: str):
self.statusvar.set(message)
def stop_session_callback(self, cleanup_time):
self.progress_bar.stop()
self.statusvar.set(f"Stopped in {cleanup_time:.3f} seconds")

View file

@ -33,7 +33,7 @@ class Colors:
listboxbg = "#f2f1f0"
def load(style):
def load(style: ttk.Style):
style.theme_create(
THEME_DARK,
"clam",
@ -141,13 +141,13 @@ def load(style):
)
def theme_change_menu(event):
def theme_change_menu(event: tk.Event):
if not isinstance(event.widget, tk.Menu):
return
style_menu(event.widget)
def style_menu(widget):
def style_menu(widget: ttk.Widget):
style = ttk.Style()
bg = style.lookup(".", "background")
fg = style.lookup(".", "foreground")
@ -159,7 +159,7 @@ def style_menu(widget):
)
def style_listbox(widget):
def style_listbox(widget: ttk.Widget):
style = ttk.Style()
bg = style.lookup(".", "background")
fg = style.lookup(".", "foreground")
@ -176,7 +176,7 @@ def style_listbox(widget):
)
def theme_change(event):
def theme_change(event: tk.Event):
style = ttk.Style()
style.configure(Styles.picker_button, font=("TkDefaultFont", 8, "normal"))
style.configure(

View file

@ -5,6 +5,7 @@ from functools import partial
from tkinter import messagebox, ttk
from tkinter.font import Font
from core.api.grpc import core_pb2
from core.gui.dialogs.customnodes import CustomNodesDialog
from core.gui.dialogs.marker import MarkerDialog
from core.gui.graph.enums import GraphMode
@ -100,7 +101,7 @@ class Toolbar(ttk.Frame):
self.create_network_button()
self.create_annotation_button()
def design_select(self, button):
def design_select(self, button: ttk.Button):
logging.info("selecting design button: %s", button)
self.select_button.state(["!pressed"])
self.link_button.state(["!pressed"])
@ -109,7 +110,7 @@ class Toolbar(ttk.Frame):
self.annotation_button.state(["!pressed"])
button.state(["pressed"])
def runtime_select(self, button):
def runtime_select(self, button: ttk.Button):
logging.info("selecting runtime button: %s", button)
self.runtime_select_button.state(["!pressed"])
self.stop_button.state(["!pressed"])
@ -185,7 +186,7 @@ class Toolbar(ttk.Frame):
0, lambda: self.show_picker(self.node_button, self.node_picker)
)
def show_picker(self, button, picker):
def show_picker(self, button: ttk.Button, picker: ttk.Frame):
x = self.winfo_width() + 1
y = button.winfo_rooty() - picker.master.winfo_rooty() - 1
picker.place(x=x, y=y)
@ -195,7 +196,7 @@ class Toolbar(ttk.Frame):
self.wait_window(picker)
self.app.unbind_all("<ButtonRelease-1>")
def create_picker_button(self, image, func, frame, label):
def create_picker_button(self, image, func, frame: ttk.Frame, label: str):
"""
Create button and put it on the frame
@ -212,7 +213,7 @@ class Toolbar(ttk.Frame):
button.bind("<ButtonRelease-1>", lambda e: func())
button.grid(pady=1)
def create_button(self, frame, image, func, tooltip):
def create_button(self, frame: ttk.Frame, image, func, tooltip: str):
button = ttk.Button(frame, image=image, command=func)
button.image = image
button.grid(sticky="ew")
@ -243,7 +244,7 @@ class Toolbar(ttk.Frame):
task = BackgroundTask(self, self.app.core.start_session, self.start_callback)
task.start()
def start_callback(self, response):
def start_callback(self, response: core_pb2.StartSessionResponse):
self.app.statusbar.progress_bar.stop()
total = time.perf_counter() - self.time
message = f"Start ran for {total:.3f} seconds"
@ -275,7 +276,7 @@ class Toolbar(ttk.Frame):
dialog = CustomNodesDialog(self.app, self.app)
dialog.show()
def update_button(self, button, image, node_draw):
def update_button(self, button: ttk.Button, image, node_draw):
logging.info("update button(%s): %s", button, node_draw)
self.hide_pickers()
button.configure(image=image)
@ -426,7 +427,7 @@ class Toolbar(ttk.Frame):
task = BackgroundTask(self, self.app.core.stop_session, self.stop_callback)
task.start()
def stop_callback(self, response):
def stop_callback(self, response: core_pb2.StopSessionResponse):
self.app.statusbar.progress_bar.stop()
self.set_design()
total = time.perf_counter() - self.time
@ -436,7 +437,7 @@ class Toolbar(ttk.Frame):
if not response.result:
messagebox.showerror("Stop Error", "Errors stopping session")
def update_annotation(self, image, shape_type):
def update_annotation(self, image, shape_type: str):
logging.info("clicked annotation: ")
self.hide_pickers()
self.annotation_button.configure(image=image)

View file

@ -1,5 +1,6 @@
import tkinter as tk
from tkinter import ttk
from typing import Optional
from core.gui.themes import Styles
@ -9,7 +10,7 @@ class Tooltip(object):
Create tool tip for a given widget
"""
def __init__(self, widget, text="widget info"):
def __init__(self, widget: tk.Widget, text: str = "widget info"):
self.widget = widget
self.text = text
self.widget.bind("<Enter>", self.on_enter)
@ -18,10 +19,10 @@ class Tooltip(object):
self.id = None
self.tw = None
def on_enter(self, event=None):
def on_enter(self, event: Optional[tk.Event] = None):
self.schedule()
def on_leave(self, event=None):
def on_leave(self, event: Optional[tk.Event] = None):
self.unschedule()
self.close(event)
@ -35,7 +36,7 @@ class Tooltip(object):
if id_:
self.widget.after_cancel(id_)
def enter(self, event=None):
def enter(self, event: Optional[tk.Event] = None):
x, y, cx, cy = self.widget.bbox("insert")
x += self.widget.winfo_rootx()
y += self.widget.winfo_rooty() + 32
@ -50,6 +51,6 @@ class Tooltip(object):
label = ttk.Label(frame, text=self.text, style=Styles.tooltip)
label.grid()
def close(self, event=None):
def close(self, event: Optional[tk.Event] = None):
if self.tw:
self.tw.destroy()

View file

@ -27,7 +27,7 @@ class InputValidation:
self.rgb = self.master.register(self.check_rbg)
self.hex = self.master.register(self.check_hex)
def ip_focus_out(self, event):
def ip_focus_out(self, event: tk.Event):
value = event.widget.get()
try:
IPNetwork(value)
@ -35,12 +35,12 @@ class InputValidation:
event.widget.delete(0, tk.END)
event.widget.insert(tk.END, "invalid")
def focus_out(self, event, default):
def focus_out(self, event: tk.Event, default: str):
value = event.widget.get()
if value == "":
event.widget.insert(tk.END, default)
def check_positive_int(self, s):
def check_positive_int(self, s: str):
if len(s) == 0:
return True
try:
@ -51,7 +51,7 @@ class InputValidation:
except ValueError:
return False
def check_positive_float(self, s):
def check_positive_float(self, s: str):
if len(s) == 0:
return True
try:
@ -62,7 +62,7 @@ class InputValidation:
except ValueError:
return False
def check_node_name(self, s):
def check_node_name(self, s: str):
if len(s) < 0:
return False
if len(s) == 0:
@ -72,7 +72,7 @@ class InputValidation:
return False
return True
def check_canvas_int(sefl, s):
def check_canvas_int(self, s: str):
if len(s) == 0:
return True
try:
@ -83,7 +83,7 @@ class InputValidation:
except ValueError:
return False
def check_canvas_float(self, s):
def check_canvas_float(self, s: str):
if not s:
return True
try:
@ -94,7 +94,7 @@ class InputValidation:
except ValueError:
return False
def check_ip4(self, s):
def check_ip4(self, s: str):
if not s:
return True
pat = re.compile("^([0-9]+[.])*[0-9]*$")
@ -113,7 +113,7 @@ class InputValidation:
else:
return False
def check_rbg(self, s):
def check_rbg(self, s: str):
if not s:
return True
if s.startswith("0") and len(s) >= 2:
@ -127,7 +127,7 @@ class InputValidation:
except ValueError:
return False
def check_hex(self, s):
def check_hex(self, s: str):
if not s:
return True
pat = re.compile("^([#]([0-9]|[a-f])+)$|^[#]$")

View file

@ -0,0 +1,9 @@
"""
corens3
Python package containing CORE components for use
with the ns-3 simulator.
See http://code.google.com/p/coreemu/
for more information on CORE.
"""

View file

@ -0,0 +1,550 @@
"""
ns3.py: defines classes for running emulations with ns-3 simulated networks.
"""
import logging
import subprocess
import threading
import time
import ns.core
import ns.internet
import ns.lte
import ns.mobility
import ns.network
import ns.tap_bridge
import ns.wifi
import ns.wimax
from core import constants
from core.emulator.enumerations import EventTypes
from core.emulator.enumerations import LinkTypes
from core.emulator.enumerations import NodeTypes
from core.utils import make_tuple
from core.location.mobility import WayPointMobility
from core.nodes.base import CoreNode, CoreNetworkBase
from core.emulator.session import Session
ns.core.GlobalValue.Bind(
"SimulatorImplementationType",
ns.core.StringValue("ns3::RealtimeSimulatorImpl")
)
ns.core.GlobalValue.Bind(
"ChecksumEnabled",
ns.core.BooleanValue("true")
)
class CoreNs3Node(CoreNode, ns.network.Node):
"""
The CoreNs3Node is both a CoreNode backed by a network namespace and
an ns-3 Node simulator object. When linked to simulated networks, the TunTap
device will be used.
"""
def __init__(self, *args, **kwds):
ns.network.Node.__init__(self)
# ns-3 ID starts at 0, CORE uses 1
_id = self.GetId() + 1
if '_id' not in kwds:
kwds['_id'] = _id
CoreNode.__init__(self, *args, **kwds)
def newnetif(self, net=None, addrlist=None, hwaddr=None, ifindex=None, ifname=None):
"""
Add a network interface. If we are attaching to a CoreNs3Net, this
will be a TunTap. Otherwise dispatch to CoreNode.newnetif().
"""
if not addrlist:
addrlist = []
if not isinstance(net, CoreNs3Net):
return CoreNode.newnetif(self, net, addrlist, hwaddr, ifindex, ifname)
ifindex = self.newtuntap(ifindex, ifname)
self.attachnet(ifindex, net)
netif = self.netif(ifindex)
netif.sethwaddr(hwaddr)
for addr in make_tuple(addrlist):
netif.addaddr(addr)
addrstr = netif.addrlist[0]
addr, mask = addrstr.split('/')
tap = net._tapdevs[netif]
tap.SetAttribute(
"IpAddress",
ns.network.Ipv4AddressValue(ns.network.Ipv4Address(addr))
)
tap.SetAttribute(
"Netmask",
ns.network.Ipv4MaskValue(ns.network.Ipv4Mask(f"/{mask}"))
)
ns.core.Simulator.Schedule(ns.core.Time("0"), netif.install)
return ifindex
def getns3position(self):
"""
Return the ns-3 (x, y, z) position of a node.
"""
try:
mm = self.GetObject(ns.mobility.MobilityModel.GetTypeId())
pos = mm.GetPosition()
return pos.x, pos.y, pos.z
except AttributeError:
self.warn("ns-3 mobility model not found")
return 0, 0, 0
def setns3position(self, x, y, z):
"""
Set the ns-3 (x, y, z) position of a node.
"""
try:
mm = self.GetObject(ns.mobility.MobilityModel.GetTypeId())
if z is None:
z = 0.0
mm.SetPosition(ns.core.Vector(x, y, z))
except AttributeError:
self.warn("ns-3 mobility model not found, not setting position")
class CoreNs3Net(CoreNetworkBase):
"""
The CoreNs3Net is a helper PyCoreNet object. Networks are represented
entirely in simulation with the TunTap device bridging the emulated and
simulated worlds.
"""
apitype = NodeTypes.WIRELESS_LAN.value
linktype = LinkTypes.WIRELESS.value
# icon used
type = "wlan"
def __init__(
self, session, _id=None, name=None, start=True, server=None
):
CoreNetworkBase.__init__(self, session, _id, name, start, server)
self.tapbridge = ns.tap_bridge.TapBridgeHelper()
self._ns3devs = {}
self._tapdevs = {}
def attach(self, netif):
"""
Invoked from netif.attach(). Create a TAP device using the TapBridge
object. Call getns3dev() to get model-specific device.
"""
self._netif[netif] = netif
self._linked[netif] = {}
ns3dev = self.getns3dev(netif.node)
tap = self.tapbridge.Install(netif.node, ns3dev)
tap.SetMode(ns.tap_bridge.TapBridge.CONFIGURE_LOCAL)
tap.SetAttribute(
"DeviceName",
ns.core.StringValue(netif.localname)
)
self._ns3devs[netif] = ns3dev
self._tapdevs[netif] = tap
def getns3dev(self, node):
"""
Implement depending on network helper. Install this network onto
the given node and return the device. Register the ns3 device into
self._ns3devs
"""
raise NotImplementedError
def findns3dev(self, node):
"""
Given a node, return the interface and ns3 device associated with
this network.
"""
for netif in node.netifs():
if netif in self._ns3devs:
return netif, self._ns3devs[netif]
return None, None
def shutdown(self):
"""
Session.shutdown() will invoke this.
"""
pass
def usecorepositions(self):
"""
Set position callbacks for interfaces on this net so the CORE GUI
can update the ns-3 node position when moved with the mouse.
"""
for netif in self.netifs():
netif.poshook = self.setns3position
def setns3position(self, netif, x, y, z):
logging.info("setns3position: %s (%s, %s, %s)", netif.node.name, x, y, z)
netif.node.setns3position(x, y, z)
class Ns3LteNet(CoreNs3Net):
def __init__(self, *args, **kwds):
"""
Uses a LteHelper to create an ns-3 based LTE network.
"""
CoreNs3Net.__init__(self, *args, **kwds)
self.lte = ns.lte.LteHelper()
# enhanced NodeB node list
self.enbnodes = []
self.dlsubchannels = None
self.ulsubchannels = None
def setsubchannels(self, downlink, uplink):
"""
Set the downlink/uplink subchannels, which are a list of ints.
These should be set prior to using CoreNs3Node.newnetif().
"""
self.dlsubchannels = downlink
self.ulsubchannels = uplink
def setnodeb(self, node):
"""
Mark the given node as a nodeb (base transceiver station)
"""
self.enbnodes.append(node)
def linknodeb(self, node, nodeb, mob, mobb):
"""
Register user equipment with a nodeb.
Optionally install mobility model while we have the ns-3 devs handy.
"""
_tmp, nodebdev = self.findns3dev(nodeb)
_tmp, dev = self.findns3dev(node)
if nodebdev is None or dev is None:
raise KeyError("ns-3 device for node not found")
self.lte.RegisterUeToTheEnb(dev, nodebdev)
if mob:
self.lte.AddMobility(dev.GetPhy(), mob)
if mobb:
self.lte.AddDownlinkChannelRealization(mobb, mob, dev.GetPhy())
def getns3dev(self, node):
"""
Get the ns3 NetDevice using the LteHelper.
"""
if node in self.enbnodes:
devtype = ns.lte.LteHelper.DEVICE_TYPE_ENODEB
else:
devtype = ns.lte.LteHelper.DEVICE_TYPE_USER_EQUIPMENT
nodes = ns.network.NodeContainer(node)
devs = self.lte.Install(nodes, devtype)
devs.Get(0).GetPhy().SetDownlinkSubChannels(self.dlsubchannels)
devs.Get(0).GetPhy().SetUplinkSubChannels(self.ulsubchannels)
return devs.Get(0)
def attach(self, netif):
"""
Invoked from netif.attach(). Create a TAP device using the TapBridge
object. Call getns3dev() to get model-specific device.
"""
self._netif[netif] = netif
self._linked[netif] = {}
ns3dev = self.getns3dev(netif.node)
self.tapbridge.SetAttribute("Mode", ns.core.StringValue("UseLocal"))
# self.tapbridge.SetAttribute("Mode",
# ns.core.IntegerValue(ns.tap_bridge.TapBridge.USE_LOCAL))
tap = self.tapbridge.Install(netif.node, ns3dev)
# tap.SetMode(ns.tap_bridge.TapBridge.USE_LOCAL)
logging.info("using TAP device %s for %s/%s", netif.localname, netif.node.name, netif.name)
subprocess.check_call(['tunctl', '-t', netif.localname, '-n'])
# check_call([IP_BIN, 'link', 'set', 'dev', netif.localname, \
# 'address', '%s' % netif.hwaddr])
subprocess.check_call([constants.IP_BIN, 'link', 'set', netif.localname, 'up'])
tap.SetAttribute("DeviceName", ns.core.StringValue(netif.localname))
self._ns3devs[netif] = ns3dev
self._tapdevs[netif] = tap
class Ns3WifiNet(CoreNs3Net):
def __init__(self, *args, **kwds):
"""
Uses a WifiHelper to create an ns-3 based Wifi network.
"""
rate = kwds.pop('rate', 'OfdmRate54Mbps')
CoreNs3Net.__init__(self, *args, **kwds)
self.wifi = ns.wifi.WifiHelper().Default()
self.wifi.SetStandard(ns.wifi.WIFI_PHY_STANDARD_80211a)
self.wifi.SetRemoteStationManager(
"ns3::ConstantRateWifiManager",
"DataMode",
ns.core.StringValue(rate),
"NonUnicastMode",
ns.core.StringValue(rate)
)
self.mac = ns.wifi.NqosWifiMacHelper.Default()
self.mac.SetType("ns3::AdhocWifiMac")
channel = ns.wifi.YansWifiChannelHelper.Default()
self.phy = ns.wifi.YansWifiPhyHelper.Default()
self.phy.SetChannel(channel.Create())
def getns3dev(self, node):
"""
Get the ns3 NetDevice using the WifiHelper.
"""
devs = self.wifi.Install(self.phy, self.mac, node)
return devs.Get(0)
class Ns3WimaxNet(CoreNs3Net):
def __init__(self, *args, **kwds):
CoreNs3Net.__init__(self, *args, **kwds)
self.wimax = ns.wimax.WimaxHelper()
self.scheduler = ns.wimax.WimaxHelper.SCHED_TYPE_SIMPLE
self.phy = ns.wimax.WimaxHelper.SIMPLE_PHY_TYPE_OFDM
# base station node list
self.bsnodes = []
def setbasestation(self, node):
self.bsnodes.append(node)
def getns3dev(self, node):
if node in self.bsnodes:
devtype = ns.wimax.WimaxHelper.DEVICE_TYPE_BASE_STATION
else:
devtype = ns.wimax.WimaxHelper.DEVICE_TYPE_SUBSCRIBER_STATION
nodes = ns.network.NodeContainer(node)
devs = self.wimax.Install(nodes, devtype, self.phy, self.scheduler)
if node not in self.bsnodes:
devs.Get(0).SetModulationType(ns.wimax.WimaxPhy.MODULATION_TYPE_QAM16_12)
# debug
self.wimax.EnableAscii("wimax-device-%s" % node.name, devs)
return devs.Get(0)
@staticmethod
def ipv4netifaddr(netif):
for addr in netif.addrlist:
if ':' in addr:
# skip ipv6
continue
ip = ns.network.Ipv4Address(addr.split('/')[0])
mask = ns.network.Ipv4Mask('/' + addr.split('/')[1])
return ip, mask
return None, None
def addflow(self, node1, node2, upclass, downclass):
"""
Add a Wimax service flow between two nodes.
"""
netif1, ns3dev1 = self.findns3dev(node1)
netif2, ns3dev2 = self.findns3dev(node2)
if not netif1 or not netif2:
raise ValueError("interface not found")
addr1, mask1 = self.ipv4netifaddr(netif1)
addr2, mask2 = self.ipv4netifaddr(netif2)
clargs1 = (addr1, mask1, addr2, mask2) + downclass
clargs2 = (addr2, mask2, addr1, mask1) + upclass
clrec1 = ns.wimax.IpcsClassifierRecord(*clargs1)
clrec2 = ns.wimax.IpcsClassifierRecord(*clargs2)
ns3dev1.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_DOWN,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec1)
)
ns3dev1.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_UP,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec2)
)
ns3dev2.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_DOWN,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec2)
)
ns3dev2.AddServiceFlow(self.wimax.CreateServiceFlow(
ns.wimax.ServiceFlow.SF_DIRECTION_UP,
ns.wimax.ServiceFlow.SF_TYPE_RTPS, clrec1)
)
class Ns3Session(Session):
"""
A Session that starts an ns-3 simulation thread.
"""
def __init__(self, _id, persistent=False, duration=600):
self.duration = duration
self.nodes = ns.network.NodeContainer()
self.mobhelper = ns.mobility.MobilityHelper()
Session.__init__(self, _id)
def run(self, vis=False):
"""
Run the ns-3 simulation and return the simulator thread.
"""
def runthread():
ns.core.Simulator.Stop(ns.core.Seconds(self.duration))
logging.info("running ns-3 simulation for %d seconds", self.duration)
if vis:
try:
import visualizer
except ImportError:
logging.exception("visualizer is not available")
ns.core.Simulator.Run()
else:
visualizer.start()
else:
ns.core.Simulator.Run()
# self.evq.run() # event queue may have WayPointMobility events
self.set_state(EventTypes.RUNTIME_STATE, send_event=True)
t = threading.Thread(target=runthread)
t.daemon = True
t.start()
return t
def shutdown(self):
# TODO: the following line tends to segfault ns-3 (and therefore core-daemon)
ns.core.Simulator.Destroy()
Session.shutdown(self)
def addnode(self, name):
"""
A convenience helper for Session.addobj(), for adding CoreNs3Nodes
to this session. Keeps a NodeContainer for later use.
"""
n = self.create_node(cls=CoreNs3Node, name=name)
self.nodes.Add(n)
return n
def setupconstantmobility(self):
"""
Install a ConstantPositionMobilityModel.
"""
palloc = ns.mobility.ListPositionAllocator()
for i in xrange(self.nodes.GetN()):
(x, y, z) = ((100.0 * i) + 50, 200.0, 0.0)
palloc.Add(ns.core.Vector(x, y, z))
node = self.nodes.Get(i)
node.position.set(x, y, z)
self.mobhelper.SetPositionAllocator(palloc)
self.mobhelper.SetMobilityModel("ns3::ConstantPositionMobilityModel")
self.mobhelper.Install(self.nodes)
def setuprandomwalkmobility(self, bounds, time=10, speed=25.0):
"""
Set up the random walk mobility model within a bounding box.
- bounds is the max (x, y, z) boundary
- time is the number of seconds to maintain the current speed
and direction
- speed is the maximum speed, with node speed randomly chosen
from [0, speed]
"""
x, y, z = map(float, bounds)
self.mobhelper.SetPositionAllocator(
"ns3::RandomBoxPositionAllocator",
"X",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % x),
"Y",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % y),
"Z",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % z)
)
self.mobhelper.SetMobilityModel(
"ns3::RandomWalk2dMobilityModel",
"Mode", ns.core.StringValue("Time"),
"Time", ns.core.StringValue("%ss" % time),
"Speed",
ns.core.StringValue("ns3::UniformRandomVariable[Min=0|Max=%s]" % speed),
"Bounds", ns.core.StringValue("0|%s|0|%s" % (x, y))
)
self.mobhelper.Install(self.nodes)
def startns3mobility(self, refresh_ms=300):
"""
Start a thread that updates CORE nodes based on their ns-3
positions.
"""
self.set_state(EventTypes.INSTANTIATION_STATE)
self.mobilitythread = threading.Thread(
target=self.ns3mobilitythread,
args=(refresh_ms,))
self.mobilitythread.daemon = True
self.mobilitythread.start()
def ns3mobilitythread(self, refresh_ms):
"""
Thread target that updates CORE nodes every refresh_ms based on
their ns-3 positions.
"""
valid_states = (
EventTypes.RUNTIME_STATE.value,
EventTypes.INSTANTIATION_STATE.value
)
while self.state in valid_states:
for i in xrange(self.nodes.GetN()):
node = self.nodes.Get(i)
x, y, z = node.getns3position()
if (x, y, z) == node.position.get():
continue
# from WayPointMobility.setnodeposition(node, x, y, z)
node.position.set(x, y, z)
node_data = node.data(0)
self.broadcast_node(node_data)
self.sdt.updatenode(node.id, flags=0, x=x, y=y, z=z)
time.sleep(0.001 * refresh_ms)
def setupmobilitytracing(self, net, filename, nodes):
"""
Start a tracing thread using the ASCII output from the ns3
mobility helper.
"""
net.mobility = WayPointMobility(session=self, _id=net.id)
net.mobility.setendtime()
net.mobility.refresh_ms = 300
net.mobility.empty_queue_stop = False
of = ns.network.OutputStreamWrapper(filename, filemode=0o777)
self.mobhelper.EnableAsciiAll(of)
self.mobilitytracethread = threading.Thread(
target=self.mobilitytrace,
args=(net, filename, nodes)
)
self.mobilitytracethread.daemon = True
self.mobilitytracethread.start()
def mobilitytrace(self, net, filename, nodes, verbose):
nodemap = {}
# move nodes to initial positions
for node in nodes:
x, y, z = node.getns3position()
net.mobility.setnodeposition(node, x, y, z)
nodemap[node.GetId()] = node
logging.info("mobilitytrace opening '%s'", filename)
f = None
try:
f = open(filename)
f.seek(0, 2)
sleep = 0.001
kickstart = True
while True:
if self.state != EventTypes.RUNTIME_STATE.value:
break
line = f.readline()
if not line:
time.sleep(sleep)
if sleep < 1.0:
sleep += 0.001
continue
sleep = 0.001
items = dict(x.split("=") for x in line.split())
logging.info("trace: %s %s %s", items['node'], items['pos'], items['vel'])
x, y, z = map(float, items['pos'].split(':'))
vel = map(float, items['vel'].split(':'))
node = nodemap[int(items['node'])]
net.mobility.addwaypoint(time=0, nodenum=node.id, x=x, y=y, z=z, speed=vel)
if kickstart:
kickstart = False
self.event_loop.add_event(0, net.mobility.start)
self.event_loop.run()
else:
if net.mobility.state != net.mobility.STATE_RUNNING:
net.mobility.state = net.mobility.STATE_RUNNING
self.event_loop.add_event(0, net.mobility.runround)
except IOError:
logging.exception("mobilitytrace error opening: %s", filename)
finally:
if f:
f.close()

19
ns3/setup.py Normal file
View file

@ -0,0 +1,19 @@
import glob
from setuptools import setup
_EXAMPLES_DIR = "share/corens3/examples"
setup(
name="core-ns3",
version="5.5.2",
packages=[
"corens3",
],
data_files=[(_EXAMPLES_DIR, glob.glob("examples/*"))],
description="Python ns-3 components of CORE",
url="https://github.com/coreemu/core",
author="Boeing Research & Technology",
license="GPLv2",
long_description="Python scripts and modules for building virtual simulated networks."
)