-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathport.py
58 lines (49 loc) · 1.91 KB
/
port.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import netaddr
import logging
from ryu.topology import switches
from ryu.topology.switches import Port as Port_type
from ryu.lib.dpid import dpid_to_str
from ryu.lib.port_no import port_no_to_str
from ryu.ofproto.ofproto_v1_0_parser import OFPPhyPort
logger = logging.getLogger(__name__)
class Port(switches.Port):
def __init__(self, port, neighbor=None, datapath=None):
self.neighbor_switch_dpid = None
self.neighbor_port_no = None
self.gateway = None
if isinstance(port, Port_type): # port to neighbor switch
self.dpid = port.dpid
self._ofproto = port._ofproto
self._config = port._config
self._state = port._state
self.port_no = port.port_no
self.hw_addr = netaddr.EUI(port.hw_addr)
self.name = port.name
if neighbor:
self.neighbor_switch_dpid = neighbor.dpid
self.neighbor_port_no = neighbor.port_no
elif isinstance(port, OFPPhyPort): # ofp_phy_port
self.dpid = datapath.id
self._ofproto = datapath.ofproto
self._config = port.config
self._state = port.state
self.port_no = port.port_no
self.hw_addr = netaddr.EUI(port.hw_addr)
self.name = port.name
else:
raise AttributeError
def to_dict(self):
d = {'port_no': port_no_to_str(self.port_no),
'hw_addr': str(self.hw_addr),
'name': self.name.rstrip('\0')}
if self.gateway:
d['gateway'] = self.gateway.to_dict()
else:
d['gateway'] = {}
if self.neighbor_switch_dpid:
d['neighbor_switch_dpid'] = dpid_to_str(self.neighbor_switch_dpid)
d['neighbor_port_no'] = port_no_to_str(self.neighbor_port_no)
else:
d['neighbor_switch_dpid'] = ''
d['neighbor_port_no'] = ''
return d