RYU,Chapter 2. 流量监控(Traffic Monitor)

Created at 2018-07-16 Updated at 2018-07-18 Category Study Tag SDN / RYU

在交换机的基础上,加入流量监控的功能

流量监控为维护网络的安全和业务的正常工作提供了一个基础条件。此章节讲述了如果利用RYU实现流量监控功能。我们仍然对RYU提供的源代码进行初步的解读。

先贴上全部的源码:

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

def __init__(self, *args, **kwargs):
super(SimpleMonitor13, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)

@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]

def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(10)

def _request_stats(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser

req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)

req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)

@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath '
'in-port eth-dst '
'out-port packets bytes')
self.logger.info('---------------- '
'-------- ----------------- '
'-------- -------- --------')
for stat in sorted([flow for flow in body if flow.priority == 1],
key=lambda flow: (flow.match['in_port'],
flow.match['eth_dst'])):
self.logger.info('%016x %8x %17s %8x %8d %8d',
ev.msg.datapath.id,
stat.match['in_port'], stat.match['eth_dst'],
stat.instructions[0].actions[0].port,
stat.packet_count, stat.byte_count)

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)

由于流量监控类继承了SimpleSwitch13类,所以不需要在进行数据报转发的代码编写。


逐步解析:

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

def __init__(self, *args, **kwargs):
super(SimpleMonitor13, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)

定义了一个SimpleMonitor13类,该类继承了simple_switch_13这个模块中的SimpleSwitch13类,也就是我们上一次所解析的。
然后在init下初始化类:

  • 有一个交换机的字典集
  • self.monitor_thread = hub.spawn(self._monitor) 这句话暂时没能够理解

@set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]

ofp_event.EventOFPStateChange:用于协商阶段更改通知的事件类。这个类的一个实例在更改协商阶段后被发送给观察者
当出现改变时,执行_state_change_handler:

  • 如果在连接状态发现一个新的交换机,则注册该交换机并记录、监控;
  • 如果连接断开,则注销该交换机,记录并释放内存。
def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(10)

def _request_stats(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser

req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)

req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)

在_monitor中每10秒执行一次_request_stats
_request_stats的内容是向注册的交换机发送要求,从而统计讯息。
OFPFlowStatsRequest:单个流统计请求消息,控制器使用此消息来查询各个流统计信息。

  • 暂时了解有这么多可以匹配的项目

OFPPortStatsRequest:端口统计请求消息,控制器使用此消息来查询有关端口统计信息的信息

  • 标志位代表什么暂时没去深究
  • OFPP_ANY代表读取所有端口的信息。

问题:当交换机收到这些请求的时候,回复了什么?


@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath '
'in-port eth-dst '
'out-port packets bytes')
self.logger.info('---------------- '
'-------- ----------------- '
'-------- -------- --------')
for stat in sorted([flow for flow in body if flow.priority == 1],
key=lambda flow: (flow.match['in_port'],
flow.match['eth_dst'])):
self.logger.info('%016x %8x %17s %8x %8d %8d',
ev.msg.datapath.id,
stat.match['in_port'], stat.match['eth_dst'],
stat.instructions[0].actions[0].port,
stat.packet_count, stat.byte_count)

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)
愿你是你所期待的样子

愿你还是你所期待的样子

Hide