Implement the Marker Protocol. A marker frame is placed on the interface queue

of each port and any further packets are blocked, when the all the marker frames
have been returned to us from the remote network device then we can be sure
that all interface queues are empty.

This is needed when a port is added or removed from the aggregation since it
will affect the hash based distribution, if the queues are not empty then a
packet from an existing connection may be placed on a different interface and
arrive out of order. This was previously achieved by suppressing transmission for
1 second, now that there is an active feedback this timeout as been increased
to 3 seconds and used as a fallback.
This commit is contained in:
Andrew Thompson 2007-05-19 07:47:04 +00:00
parent 72fc5161cf
commit 998971a70f
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=169739
2 changed files with 131 additions and 32 deletions

View File

@ -75,16 +75,20 @@ static const struct tlv_template lacp_info_tlv_template[] = {
typedef void (*lacp_timer_func_t)(struct lacp_port *);
static const struct tlv_template marker_info_tlv_template[] = {
{ MARKER_TYPE_INFO, 16 },
{ MARKER_TYPE_INFO,
sizeof(struct tlvhdr) + sizeof(struct lacp_markerinfo) },
{ 0, 0 },
};
static const struct tlv_template marker_response_tlv_template[] = {
{ MARKER_TYPE_RESPONSE, 16 },
{ MARKER_TYPE_RESPONSE,
sizeof(struct tlvhdr) + sizeof(struct lacp_markerinfo) },
{ 0, 0 },
};
static void lacp_fill_actorinfo(struct lacp_port *, struct lacp_peerinfo *);
static void lacp_fill_markerinfo(struct lacp_port *,
struct lacp_markerinfo *);
static uint64_t lacp_aggregator_bandwidth(struct lacp_aggregator *);
static void lacp_suppress_distributing(struct lacp_softc *,
@ -162,6 +166,7 @@ static void lacp_enable_collecting(struct lacp_port *);
static void lacp_disable_distributing(struct lacp_port *);
static void lacp_enable_distributing(struct lacp_port *);
static int lacp_xmit_lacpdu(struct lacp_port *);
static int lacp_xmit_marker(struct lacp_port *);
#if defined(LACP_DEBUG)
static void lacp_dump_lacpdu(const struct lacpdu *);
@ -350,6 +355,17 @@ lacp_fill_actorinfo(struct lacp_port *lp, struct lacp_peerinfo *info)
info->lip_state = lp->lp_state;
}
static void
lacp_fill_markerinfo(struct lacp_port *lp, struct lacp_markerinfo *info)
{
struct ifnet *ifp = lp->lp_ifp;
/* Fill in the port index and system id (encoded as the MAC) */
info->mi_rq_port = htons(ifp->if_index);
memcpy(&info->mi_rq_system, lp->lp_systemid.lsi_mac, ETHER_ADDR_LEN);
info->mi_rq_xid = htonl(0);
}
static int
lacp_xmit_lacpdu(struct lacp_port *lp)
{
@ -404,6 +420,46 @@ lacp_xmit_lacpdu(struct lacp_port *lp)
return (error);
}
static int
lacp_xmit_marker(struct lacp_port *lp)
{
struct lagg_port *lgp = lp->lp_lagg;
struct mbuf *m;
struct markerdu *mdu;
int error;
LAGG_WLOCK_ASSERT(lgp->lp_lagg);
m = m_gethdr(M_DONTWAIT, MT_DATA);
if (m == NULL) {
return (ENOMEM);
}
m->m_len = m->m_pkthdr.len = sizeof(*mdu);
mdu = mtod(m, struct markerdu *);
memset(mdu, 0, sizeof(*mdu));
memcpy(&mdu->mdu_eh.ether_dhost, ethermulticastaddr_slowprotocols,
ETHER_ADDR_LEN);
memcpy(&mdu->mdu_eh.ether_shost, lgp->lp_lladdr, ETHER_ADDR_LEN);
mdu->mdu_eh.ether_type = htons(ETHERTYPE_SLOW);
mdu->mdu_sph.sph_subtype = SLOWPROTOCOLS_SUBTYPE_MARKER;
mdu->mdu_sph.sph_version = 1;
/* Bump the transaction id and copy over the marker info */
lp->lp_marker.mi_rq_xid = htonl(ntohl(lp->lp_marker.mi_rq_xid) + 1);
TLV_SET(&mdu->mdu_tlv, MARKER_TYPE_INFO, sizeof(mdu->mdu_info));
mdu->mdu_info = lp->lp_marker;
LACP_DPRINTF((lp, "marker transmit, port=%u, sys=%6D, id=%u\n",
ntohs(mdu->mdu_info.mi_rq_port), mdu->mdu_info.mi_rq_system, ":",
ntohl(mdu->mdu_info.mi_rq_xid)));
m->m_flags |= M_MCAST;
error = lagg_enqueue(lp->lp_ifp, m);
return (error);
}
void
lacp_linkstate(struct lagg_port *lgp)
{
@ -516,6 +572,7 @@ lacp_port_create(struct lagg_port *lgp)
LIST_INSERT_HEAD(&lsc->lsc_ports, lp, lp_next);
lacp_fill_actorinfo(lp, &lp->lp_actor);
lacp_fill_markerinfo(lp, &lp->lp_marker);
lp->lp_state =
(active ? LACP_STATE_ACTIVITY : 0) |
(fast ? LACP_STATE_TIMEOUT : 0);
@ -786,13 +843,22 @@ lacp_select_tx_port(struct lagg_softc *lgs, struct mbuf *m)
static void
lacp_suppress_distributing(struct lacp_softc *lsc, struct lacp_aggregator *la)
{
struct lacp_port *lp;
if (lsc->lsc_active_aggregator != la) {
return;
}
LACP_DPRINTF((NULL, "%s\n", __func__));
lsc->lsc_suppress_distributing = TRUE;
/* XXX should consider collector max delay */
/* send a marker frame down each port to verify the queues are empty */
LIST_FOREACH(lp, &lsc->lsc_ports, lp_next) {
lp->lp_flags |= LACP_PORT_MARK;
lacp_xmit_marker(lp);
}
/* set a timeout for the marker frames */
callout_reset(&lsc->lsc_transit_callout,
LACP_TRANSIT_DELAY * hz / 1000, lacp_transit_expire, lsc);
}
@ -1600,8 +1666,11 @@ int
lacp_marker_input(struct lagg_port *lgp, struct mbuf *m)
{
struct lacp_port *lp = LACP_PORT(lgp);
struct lacp_port *lp2;
struct lacp_softc *lsc = lp->lp_lsc;
struct markerdu *mdu;
int error = 0;
int pending = 0;
LAGG_RLOCK_ASSERT(lgp->lp_lagg);
@ -1659,11 +1728,36 @@ lacp_marker_input(struct lagg_port *lgp, struct mbuf *m)
marker_response_tlv_template, TRUE)) {
goto bad;
}
/*
* we are not interested in responses as
* we don't have a marker sender.
*/
/* FALLTHROUGH */
LACP_DPRINTF((lp, "marker response, port=%u, sys=%6D, id=%u\n",
ntohs(mdu->mdu_info.mi_rq_port), mdu->mdu_info.mi_rq_system,
":", ntohl(mdu->mdu_info.mi_rq_xid)));
/* Verify that it is the last marker we sent out */
if (memcmp(&mdu->mdu_info, &lp->lp_marker,
sizeof(struct lacp_markerinfo)))
goto bad;
lp->lp_flags &= ~LACP_PORT_MARK;
if (lsc->lsc_suppress_distributing) {
/* Check if any ports are waiting for a response */
LIST_FOREACH(lp2, &lsc->lsc_ports, lp_next) {
if (lp2->lp_flags & LACP_PORT_MARK) {
pending = 1;
break;
}
}
if (pending == 0) {
/* All interface queues are clear */
LACP_DPRINTF((NULL, "queue flush complete\n"));
lsc->lsc_suppress_distributing = FALSE;
}
}
m_freem(m);
break;
default:
goto bad;
}
@ -1671,6 +1765,7 @@ lacp_marker_input(struct lagg_port *lgp, struct mbuf *m)
return (error);
bad:
LACP_DPRINTF((lp, "bad marker frame\n"));
m_freem(m);
return (EINVAL);
}

View File

@ -62,6 +62,7 @@
#define LACP_STATE_EXPIRED (1<<7)
#define LACP_PORT_NTT 0x00000001
#define LACP_PORT_MARK 0x00000002
#define LACP_PORT_PROMISC 0x00000004
#define LACP_PORT_LADDRCHANGED 0x00000008
#define LACP_PORT_ATTACHED 0x00000010
@ -157,7 +158,30 @@ struct lacpdu {
uint8_t ldu_resv[50];
} __packed;
#define LACP_TRANSIT_DELAY 1000 /* in msec */
/*
* IEEE802.3ad marker protocol
*
* protocol (on-wire) definitions.
*/
struct lacp_markerinfo {
uint16_t mi_rq_port;
uint8_t mi_rq_system[ETHER_ADDR_LEN];
uint32_t mi_rq_xid;
uint8_t mi_pad[2];
} __packed;
struct markerdu {
struct ether_header mdu_eh;
struct slowprothdr mdu_sph;
struct tlvhdr mdu_tlv;
struct lacp_markerinfo mdu_info;
struct tlvhdr mdu_tlv_term;
uint8_t mdu_resv[90];
} __packed;
#define MARKER_TYPE_INFO 0x01
#define MARKER_TYPE_RESPONSE 0x02
enum lacp_selected {
LACP_UNSELECTED,
@ -181,8 +205,10 @@ struct lacp_port {
struct ifnet *lp_ifp;
struct lacp_peerinfo lp_partner;
struct lacp_peerinfo lp_actor;
struct lacp_markerinfo lp_marker;
#define lp_state lp_actor.lip_state
#define lp_key lp_actor.lip_key
#define lp_systemid lp_actor.lip_systemid
struct timeval lp_last_lacpdu;
int lp_lacpdu_sent;
enum lacp_mux_state lp_mux_state;
@ -229,35 +255,13 @@ struct lacp_softc {
#define LACP_LONG_TIMEOUT_TIME (3 * LACP_SLOW_PERIODIC_TIME)
#define LACP_CHURN_DETECTION_TIME (60)
#define LACP_AGGREGATE_WAIT_TIME (2)
#define LACP_TRANSIT_DELAY 3000 /* in msec */
/*
int tlv_check(const void *, size_t, const struct tlvhdr *,
const struct tlv_template *, boolean_t);
*/
/*
* IEEE802.3ad marker protocol
*
* protocol (on-wire) definitions.
*/
struct markerdu {
struct ether_header mdu_eh;
struct slowprothdr mdu_sph;
struct tlvhdr mdu_tlv;
uint16_t mdu_rq_port;
uint8_t mdu_rq_system[6];
uint8_t mdu_rq_xid[4];
uint8_t mdu_pad[2];
struct tlvhdr mdu_tlv_term;
uint8_t mdu_resv[90];
} __packed;
#define MARKER_TYPE_INFO 1
#define MARKER_TYPE_RESPONSE 2
#define LACP_STATE_EQ(s1, s2, mask) \
((((s1) ^ (s2)) & (mask)) == 0)