From 651e4e6a307776b3f5201639e10c4b4d0949dcb0 Mon Sep 17 00:00:00 2001 From: Gleb Smirnoff Date: Sun, 30 Nov 2014 13:24:21 +0000 Subject: [PATCH] Merge from projects/sendfile: extend protocols API to support sending not ready data: o Add new flag to pru_send() flags - PRUS_NOTREADY. o Add new protocol method pru_ready(). Sponsored by: Nginx, Inc. Sponsored by: Netflix --- sys/dev/cxgb/ulp/tom/cxgb_cpl_io.c | 2 +- sys/dev/cxgbe/tom/t4_cpl_io.c | 4 ++-- sys/dev/cxgbe/tom/t4_ddp.c | 4 ++-- sys/kern/uipc_domain.c | 1 + sys/kern/uipc_mbuf.c | 6 +++--- sys/kern/uipc_sockbuf.c | 10 +++++----- sys/kern/uipc_socket.c | 7 +++++++ sys/netinet/tcp_input.c | 4 ++-- sys/netinet/tcp_reass.c | 2 +- sys/netinet/tcp_usrreq.c | 4 ++-- sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c | 2 +- sys/sys/mbuf.h | 2 +- sys/sys/protosw.h | 3 +++ sys/sys/sockbuf.h | 4 ++-- 14 files changed, 33 insertions(+), 22 deletions(-) diff --git a/sys/dev/cxgb/ulp/tom/cxgb_cpl_io.c b/sys/dev/cxgb/ulp/tom/cxgb_cpl_io.c index 81a446a64a4f..a4b67088db93 100644 --- a/sys/dev/cxgb/ulp/tom/cxgb_cpl_io.c +++ b/sys/dev/cxgb/ulp/tom/cxgb_cpl_io.c @@ -1199,7 +1199,7 @@ do_rx_data(struct sge_qset *qs, struct rsp_desc *r, struct mbuf *m) } toep->tp_enqueued += m->m_pkthdr.len; - sbappendstream_locked(so_rcv, m); + sbappendstream_locked(so_rcv, m, 0); sorwakeup_locked(so); SOCKBUF_UNLOCK_ASSERT(so_rcv); diff --git a/sys/dev/cxgbe/tom/t4_cpl_io.c b/sys/dev/cxgbe/tom/t4_cpl_io.c index 29e5fa243be5..5dc843428448 100644 --- a/sys/dev/cxgbe/tom/t4_cpl_io.c +++ b/sys/dev/cxgbe/tom/t4_cpl_io.c @@ -1086,7 +1086,7 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) #ifdef USE_DDP_RX_FLOW_CONTROL toep->rx_credits -= m->m_len; /* adjust for F_RX_FC_DDP */ #endif - sbappendstream_locked(sb, m); + sbappendstream_locked(sb, m, 0); toep->sb_cc = sbused(sb); } socantrcvmore_locked(so); /* unlocks the sockbuf */ @@ -1586,7 +1586,7 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m) ("%s: sb %p has more data (%d) than last time (%d).", __func__, sb, sbused(sb), toep->sb_cc)); toep->rx_credits += toep->sb_cc - sbused(sb); - sbappendstream_locked(sb, m); + sbappendstream_locked(sb, m, 0); toep->sb_cc = sbused(sb); sorwakeup_locked(so); SOCKBUF_UNLOCK_ASSERT(sb); diff --git a/sys/dev/cxgbe/tom/t4_ddp.c b/sys/dev/cxgbe/tom/t4_ddp.c index bf0889963306..17eb071b3cb7 100644 --- a/sys/dev/cxgbe/tom/t4_ddp.c +++ b/sys/dev/cxgbe/tom/t4_ddp.c @@ -231,7 +231,7 @@ insert_ddp_data(struct toepcb *toep, uint32_t n) #ifdef USE_DDP_RX_FLOW_CONTROL toep->rx_credits -= n; /* adjust for F_RX_FC_DDP */ #endif - sbappendstream_locked(sb, m); + sbappendstream_locked(sb, m, 0); toep->sb_cc = sbused(sb); } @@ -466,7 +466,7 @@ handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len) #ifdef USE_DDP_RX_FLOW_CONTROL toep->rx_credits -= len; /* adjust for F_RX_FC_DDP */ #endif - sbappendstream_locked(sb, m); + sbappendstream_locked(sb, m, 0); toep->sb_cc = sbused(sb); wakeup: KASSERT(toep->ddp_flags & db_flag, diff --git a/sys/kern/uipc_domain.c b/sys/kern/uipc_domain.c index 709cc0eb0569..9eda77c0f2d8 100644 --- a/sys/kern/uipc_domain.c +++ b/sys/kern/uipc_domain.c @@ -152,6 +152,7 @@ protosw_init(struct protosw *pr) DEFAULT(pu->pru_sosend, sosend_generic); DEFAULT(pu->pru_soreceive, soreceive_generic); DEFAULT(pu->pru_sopoll, sopoll_generic); + DEFAULT(pu->pru_ready, pru_ready_notsupp); #undef DEFAULT if (pr->pr_init) (*pr->pr_init)(); diff --git a/sys/kern/uipc_mbuf.c b/sys/kern/uipc_mbuf.c index 323426898d09..3880456bddf7 100644 --- a/sys/kern/uipc_mbuf.c +++ b/sys/kern/uipc_mbuf.c @@ -388,7 +388,7 @@ mb_dupcl(struct mbuf *n, struct mbuf *m) * cleaned too. */ void -m_demote(struct mbuf *m0, int all) +m_demote(struct mbuf *m0, int all, int flags) { struct mbuf *m; @@ -400,7 +400,7 @@ m_demote(struct mbuf *m0, int all) m->m_flags &= ~M_PKTHDR; bzero(&m->m_pkthdr, sizeof(struct pkthdr)); } - m->m_flags = m->m_flags & (M_EXT|M_RDONLY|M_NOFREE); + m->m_flags = m->m_flags & (M_EXT | M_RDONLY | M_NOFREE | flags); } } @@ -997,7 +997,7 @@ m_catpkt(struct mbuf *m, struct mbuf *n) M_ASSERTPKTHDR(n); m->m_pkthdr.len += n->m_pkthdr.len; - m_demote(n, 1); + m_demote(n, 1, 0); m_cat(m, n); } diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c index b5a5185d46ce..537f9c867bf8 100644 --- a/sys/kern/uipc_sockbuf.c +++ b/sys/kern/uipc_sockbuf.c @@ -636,7 +636,7 @@ sbappend(struct sockbuf *sb, struct mbuf *m) * that is, a stream protocol (such as TCP). */ void -sbappendstream_locked(struct sockbuf *sb, struct mbuf *m) +sbappendstream_locked(struct sockbuf *sb, struct mbuf *m, int flags) { SOCKBUF_LOCK_ASSERT(sb); @@ -646,8 +646,8 @@ sbappendstream_locked(struct sockbuf *sb, struct mbuf *m) SBLASTMBUFCHK(sb); /* Remove all packet headers and mbuf tags to get a pure data chain. */ - m_demote(m, 1); - + m_demote(m, 1, flags & PRUS_NOTREADY ? M_NOTREADY : 0); + sbcompress(sb, m, sb->sb_mbtail); sb->sb_lastrecord = sb->sb_mb; @@ -660,11 +660,11 @@ sbappendstream_locked(struct sockbuf *sb, struct mbuf *m) * that is, a stream protocol (such as TCP). */ void -sbappendstream(struct sockbuf *sb, struct mbuf *m) +sbappendstream(struct sockbuf *sb, struct mbuf *m, int flags) { SOCKBUF_LOCK(sb); - sbappendstream_locked(sb, m); + sbappendstream_locked(sb, m, flags); SOCKBUF_UNLOCK(sb); } diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index 7433b0e2ad1c..b2091ea28417 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -3178,6 +3178,13 @@ pru_send_notsupp(struct socket *so, int flags, struct mbuf *m, return EOPNOTSUPP; } +int +pru_ready_notsupp(struct socket *so, struct mbuf *m, int count) +{ + + return (EOPNOTSUPP); +} + /* * This isn't really a ``null'' operation, but it's the default one and * doesn't do anything destructive. diff --git a/sys/netinet/tcp_input.c b/sys/netinet/tcp_input.c index 468f14210565..005ccd05b3fa 100644 --- a/sys/netinet/tcp_input.c +++ b/sys/netinet/tcp_input.c @@ -1855,7 +1855,7 @@ tcp_do_segment(struct mbuf *m, struct tcphdr *th, struct socket *so, newsize, so, NULL)) so->so_rcv.sb_flags &= ~SB_AUTOSIZE; m_adj(m, drop_hdrlen); /* delayed header drop */ - sbappendstream_locked(&so->so_rcv, m); + sbappendstream_locked(&so->so_rcv, m, 0); } /* NB: sorwakeup_locked() does an implicit unlock. */ sorwakeup_locked(so); @@ -2882,7 +2882,7 @@ dodata: /* XXX */ if (so->so_rcv.sb_state & SBS_CANTRCVMORE) m_freem(m); else - sbappendstream_locked(&so->so_rcv, m); + sbappendstream_locked(&so->so_rcv, m, 0); /* NB: sorwakeup_locked() does an implicit unlock. */ sorwakeup_locked(so); } else { diff --git a/sys/netinet/tcp_reass.c b/sys/netinet/tcp_reass.c index dffee00f9b18..17d9a79eb50d 100644 --- a/sys/netinet/tcp_reass.c +++ b/sys/netinet/tcp_reass.c @@ -262,7 +262,7 @@ present: m_freem(mq); else { mq->m_nextpkt = NULL; - sbappendstream_locked(&so->so_rcv, mq); + sbappendstream_locked(&so->so_rcv, mq, 0); wakeup = 1; } } diff --git a/sys/netinet/tcp_usrreq.c b/sys/netinet/tcp_usrreq.c index 965491a5ca33..8b4048c0a1da 100644 --- a/sys/netinet/tcp_usrreq.c +++ b/sys/netinet/tcp_usrreq.c @@ -843,7 +843,7 @@ tcp_usr_send(struct socket *so, int flags, struct mbuf *m, m_freem(control); /* empty control, just free it */ } if (!(flags & PRUS_OOB)) { - sbappendstream(&so->so_snd, m); + sbappendstream(&so->so_snd, m, flags); if (nam && tp->t_state < TCPS_SYN_SENT) { /* * Do implied connect if not yet connected, @@ -901,7 +901,7 @@ tcp_usr_send(struct socket *so, int flags, struct mbuf *m, * of data past the urgent section. * Otherwise, snd_up should be one lower. */ - sbappendstream_locked(&so->so_snd, m); + sbappendstream_locked(&so->so_snd, m, flags); SOCKBUF_UNLOCK(&so->so_snd); if (nam && tp->t_state < TCPS_SYN_SENT) { /* diff --git a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c index a6eba64e1622..2e91d8529434 100644 --- a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c @@ -889,7 +889,7 @@ sdp_append(struct sdp_sock *ssk, struct sockbuf *sb, struct mbuf *mb, int cnt) m_adj(mb, SDP_HEAD_SIZE); n->m_pkthdr.len += mb->m_pkthdr.len; n->m_flags |= mb->m_flags & (M_PUSH | M_URG); - m_demote(mb, 1); + m_demote(mb, 1, 0); sbcompress(sb, mb, sb->sb_mbtail); return; } diff --git a/sys/sys/mbuf.h b/sys/sys/mbuf.h index 190900c43d22..ac3cfb2d0ced 100644 --- a/sys/sys/mbuf.h +++ b/sys/sys/mbuf.h @@ -950,7 +950,7 @@ struct mbuf *m_copypacket(struct mbuf *, int); void m_copy_pkthdr(struct mbuf *, struct mbuf *); struct mbuf *m_copyup(struct mbuf *, int, int); struct mbuf *m_defrag(struct mbuf *, int); -void m_demote(struct mbuf *, int); +void m_demote(struct mbuf *, int, int); struct mbuf *m_devget(char *, int, int, struct ifnet *, void (*)(char *, caddr_t, u_int)); struct mbuf *m_dup(struct mbuf *, int); diff --git a/sys/sys/protosw.h b/sys/sys/protosw.h index 2d98a4c25edd..55db0e339699 100644 --- a/sys/sys/protosw.h +++ b/sys/sys/protosw.h @@ -208,6 +208,8 @@ struct pr_usrreqs { #define PRUS_OOB 0x1 #define PRUS_EOF 0x2 #define PRUS_MORETOCOME 0x4 +#define PRUS_NOTREADY 0x8 + int (*pru_ready)(struct socket *so, struct mbuf *m, int count); int (*pru_sense)(struct socket *so, struct stat *sb); int (*pru_shutdown)(struct socket *so); int (*pru_flush)(struct socket *so, int direction); @@ -251,6 +253,7 @@ int pru_rcvd_notsupp(struct socket *so, int flags); int pru_rcvoob_notsupp(struct socket *so, struct mbuf *m, int flags); int pru_send_notsupp(struct socket *so, int flags, struct mbuf *m, struct sockaddr *addr, struct mbuf *control, struct thread *td); +int pru_ready_notsupp(struct socket *so, struct mbuf *m, int count); int pru_sense_null(struct socket *so, struct stat *sb); int pru_shutdown_notsupp(struct socket *so); int pru_sockaddr_notsupp(struct socket *so, struct sockaddr **nam); diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h index ec16b2593dd2..5bd9bb556251 100644 --- a/sys/sys/sockbuf.h +++ b/sys/sys/sockbuf.h @@ -131,8 +131,8 @@ struct sockbuf { void sbappend(struct sockbuf *sb, struct mbuf *m); void sbappend_locked(struct sockbuf *sb, struct mbuf *m); -void sbappendstream(struct sockbuf *sb, struct mbuf *m); -void sbappendstream_locked(struct sockbuf *sb, struct mbuf *m); +void sbappendstream(struct sockbuf *sb, struct mbuf *m, int flags); +void sbappendstream_locked(struct sockbuf *sb, struct mbuf *m, int flags); int sbappendaddr(struct sockbuf *sb, const struct sockaddr *asa, struct mbuf *m0, struct mbuf *control); int sbappendaddr_locked(struct sockbuf *sb, const struct sockaddr *asa,