summaryrefslogtreecommitdiff
path: root/net/xdp
diff options
context:
space:
mode:
Diffstat (limited to 'net/xdp')
-rw-r--r--net/xdp/xsk.c116
-rw-r--r--net/xdp/xsk_queue.h30
-rw-r--r--net/xdp/xskmap.c17
3 files changed, 126 insertions, 37 deletions
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 4faabd1ecfd1..cd62d4ba87a9 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -30,7 +30,7 @@
#include "xdp_umem.h"
#include "xsk.h"
-#define TX_BATCH_SIZE 16
+#define TX_BATCH_SIZE 32
static DEFINE_PER_CPU(struct list_head, xskmap_flush_list);
@@ -445,6 +445,97 @@ static void xsk_destruct_skb(struct sk_buff *skb)
sock_wfree(skb);
}
+static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
+ struct xdp_desc *desc)
+{
+ struct xsk_buff_pool *pool = xs->pool;
+ u32 hr, len, ts, offset, copy, copied;
+ struct sk_buff *skb;
+ struct page *page;
+ void *buffer;
+ int err, i;
+ u64 addr;
+
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
+
+ skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
+ if (unlikely(!skb))
+ return ERR_PTR(err);
+
+ skb_reserve(skb, hr);
+
+ addr = desc->addr;
+ len = desc->len;
+ ts = pool->unaligned ? len : pool->chunk_size;
+
+ buffer = xsk_buff_raw_get_data(pool, addr);
+ offset = offset_in_page(buffer);
+ addr = buffer - pool->addrs;
+
+ for (copied = 0, i = 0; copied < len; i++) {
+ page = pool->umem->pgs[addr >> PAGE_SHIFT];
+ get_page(page);
+
+ copy = min_t(u32, PAGE_SIZE - offset, len - copied);
+ skb_fill_page_desc(skb, i, page, offset, copy);
+
+ copied += copy;
+ addr += copy;
+ offset = 0;
+ }
+
+ skb->len += len;
+ skb->data_len += len;
+ skb->truesize += ts;
+
+ refcount_add(ts, &xs->sk.sk_wmem_alloc);
+
+ return skb;
+}
+
+static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
+ struct xdp_desc *desc)
+{
+ struct net_device *dev = xs->dev;
+ struct sk_buff *skb;
+
+ if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
+ skb = xsk_build_skb_zerocopy(xs, desc);
+ if (IS_ERR(skb))
+ return skb;
+ } else {
+ u32 hr, tr, len;
+ void *buffer;
+ int err;
+
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
+ tr = dev->needed_tailroom;
+ len = desc->len;
+
+ skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
+ if (unlikely(!skb))
+ return ERR_PTR(err);
+
+ skb_reserve(skb, hr);
+ skb_put(skb, len);
+
+ buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
+ err = skb_store_bits(skb, 0, buffer, len);
+ if (unlikely(err)) {
+ kfree_skb(skb);
+ return ERR_PTR(err);
+ }
+ }
+
+ skb->dev = dev;
+ skb->priority = xs->sk.sk_priority;
+ skb->mark = xs->sk.sk_mark;
+ skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
+ skb->destructor = xsk_destruct_skb;
+
+ return skb;
+}
+
static int xsk_generic_xmit(struct sock *sk)
{
struct xdp_sock *xs = xdp_sk(sk);
@@ -461,43 +552,30 @@ static int xsk_generic_xmit(struct sock *sk)
goto out;
while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
- char *buffer;
- u64 addr;
- u32 len;
-
if (max_batch-- == 0) {
err = -EAGAIN;
goto out;
}
- len = desc.len;
- skb = sock_alloc_send_skb(sk, len, 1, &err);
- if (unlikely(!skb))
+ skb = xsk_build_skb(xs, &desc);
+ if (IS_ERR(skb)) {
+ err = PTR_ERR(skb);
goto out;
+ }
- skb_put(skb, len);
- addr = desc.addr;
- buffer = xsk_buff_raw_get_data(xs->pool, addr);
- err = skb_store_bits(skb, 0, buffer, len);
/* This is the backpressure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
spin_lock_irqsave(&xs->pool->cq_lock, flags);
- if (unlikely(err) || xskq_prod_reserve(xs->pool->cq)) {
+ if (xskq_prod_reserve(xs->pool->cq)) {
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
kfree_skb(skb);
goto out;
}
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
- skb->dev = xs->dev;
- skb->priority = sk->sk_priority;
- skb->mark = sk->sk_mark;
- skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;
- skb->destructor = xsk_destruct_skb;
-
err = __dev_direct_xmit(skb, xs->queue_id);
if (err == NETDEV_TX_BUSY) {
/* Tell user-space to retry the send */
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 2823b7c3302d..2ac3802c2cd7 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -47,19 +47,18 @@ struct xsk_queue {
u64 queue_empty_descs;
};
-/* The structure of the shared state of the rings are the same as the
- * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
- * ring, the kernel is the producer and user space is the consumer. For
- * the Tx and fill rings, the kernel is the consumer and user space is
- * the producer.
+/* The structure of the shared state of the rings are a simple
+ * circular buffer, as outlined in
+ * Documentation/core-api/circular-buffers.rst. For the Rx and
+ * completion ring, the kernel is the producer and user space is the
+ * consumer. For the Tx and fill rings, the kernel is the consumer and
+ * user space is the producer.
*
* producer consumer
*
- * if (LOAD ->consumer) { LOAD ->producer
- * (A) smp_rmb() (C)
+ * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
* STORE $data LOAD $data
- * smp_wmb() (B) smp_mb() (D)
- * STORE ->producer STORE ->consumer
+ * STORE.rel ->producer (B) STORE.rel ->consumer (D)
* }
*
* (A) pairs with (D), and (B) pairs with (C).
@@ -78,7 +77,8 @@ struct xsk_queue {
*
* (A) is a control dependency that separates the load of ->consumer
* from the stores of $data. In case ->consumer indicates there is no
- * room in the buffer to store $data we do not. So no barrier is needed.
+ * room in the buffer to store $data we do not. The dependency will
+ * order both of the stores after the loads. So no barrier is needed.
*
* (D) protects the load of the data to be observed to happen after the
* store of the consumer pointer. If we did not have this memory
@@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
static inline void __xskq_cons_release(struct xsk_queue *q)
{
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cached_cons);
+ smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
}
static inline void __xskq_cons_peek(struct xsk_queue *q)
{
/* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- smp_rmb(); /* C, matches B */
+ q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
}
static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- smp_wmb(); /* B, matches C */
-
- WRITE_ONCE(q->ring->producer, idx);
+ smp_store_release(&q->ring->producer, idx); /* B, matches C */
}
static inline void xskq_prod_submit(struct xsk_queue *q)
diff --git a/net/xdp/xskmap.c b/net/xdp/xskmap.c
index 113fd9017203..67b4ce504852 100644
--- a/net/xdp/xskmap.c
+++ b/net/xdp/xskmap.c
@@ -87,7 +87,6 @@ static void xsk_map_free(struct bpf_map *map)
{
struct xsk_map *m = container_of(map, struct xsk_map, map);
- bpf_clear_redirect_map(map);
synchronize_net();
bpf_map_area_free(m);
}
@@ -125,6 +124,16 @@ static int xsk_map_gen_lookup(struct bpf_map *map, struct bpf_insn *insn_buf)
return insn - insn_buf;
}
+static void *__xsk_map_lookup_elem(struct bpf_map *map, u32 key)
+{
+ struct xsk_map *m = container_of(map, struct xsk_map, map);
+
+ if (key >= map->max_entries)
+ return NULL;
+
+ return READ_ONCE(m->xsk_map[key]);
+}
+
static void *xsk_map_lookup_elem(struct bpf_map *map, void *key)
{
WARN_ON_ONCE(!rcu_read_lock_held());
@@ -215,6 +224,11 @@ static int xsk_map_delete_elem(struct bpf_map *map, void *key)
return 0;
}
+static int xsk_map_redirect(struct bpf_map *map, u32 ifindex, u64 flags)
+{
+ return __bpf_xdp_redirect_map(map, ifindex, flags, __xsk_map_lookup_elem);
+}
+
void xsk_map_try_sock_delete(struct xsk_map *map, struct xdp_sock *xs,
struct xdp_sock **map_entry)
{
@@ -247,4 +261,5 @@ const struct bpf_map_ops xsk_map_ops = {
.map_check_btf = map_check_no_btf,
.map_btf_name = "xsk_map",
.map_btf_id = &xsk_map_btf_id,
+ .map_redirect = xsk_map_redirect,
};