Commit de52d4b0 authored by Leonardo Lai's avatar Leonardo Lai

implemented IP fragmentation for outgoing packets

parent 4e3da39f
......@@ -6,6 +6,9 @@
#ifndef UDPDK_CONSTANTS_H
#define UDPDK_CONSTANTS_H
#define MAX(a,b) ((a) > (b) ? a : b)
#define MIN(a,b) ((a) < (b) ? a : b)
#define NUM_SOCKETS_MAX 1024
/* DPDK ports */
......@@ -16,18 +19,25 @@
#define NUM_RX_DESC_DEFAULT 1024
#define NUM_TX_DESC_DEFAULT 1024
#define MBUF_CACHE_SIZE 512
#define PKTMBUF_POOL_RX_NAME "UDPDK_mbuf_pool_RX"
#define PKTMBUF_POOL_TX_NAME "UDPDK_mbuf_pool_TX"
#define PKTMBUF_POOL_RX_NAME "UDPDK_mbuf_pool_RX"
#define PKTMBUF_POOL_TX_NAME "UDPDK_mbuf_pool_TX"
#define PKTMBUF_POOL_DIRECT_TX_NAME "UDPDK_mbuf_pool_direct_TX"
#define PKTMBUF_POOL_INDIRECT_TX_NAME "UDPDK_mbuf_pool_indir_TX"
/* Packet poller */
#define PKT_READ_SIZE 32
#define PKT_WRITE_SIZE 32
#define PREFETCH_OFFSET 4
/* IPv4 Fragmentation */
#define NUM_FLOWS_DEF 0x1000
#define NUM_FLOWS_MIN 1
#define NUM_FLOWS_MAX UINT16_MAX
#define MAX_FLOW_TTL MS_PER_S
#define IP_FRAG_TBL_BUCKET_ENTRIES 16
#define IPV4_MTU_DEFAULT RTE_ETHER_MTU
#define MAX_PACKET_FRAG RTE_LIBRTE_IP_FRAG_MAX_FRAG
/* Packet poller */
#define BURST_SIZE 32
#define RX_MBUF_TABLE_SIZE BURST_SIZE
#define TX_MBUF_TABLE_SIZE (2 * MAX(BURST_SIZE, MAX_PACKET_FRAG))
#define PREFETCH_OFFSET 4
/* Exchange memzone */
#define EXCH_MEMZONE_NAME "UDPDK_exchange_desc"
......
......@@ -12,6 +12,10 @@ struct rte_mempool *rx_pktmbuf_pool = NULL;
struct rte_mempool *tx_pktmbuf_pool = NULL;
struct rte_mempool *tx_pktmbuf_direct_pool = NULL;
struct rte_mempool *tx_pktmbuf_indirect_pool = NULL;
htable_item *udp_port_table = NULL;
struct exch_zone_info *exch_zone_desc = NULL;
......
......@@ -11,6 +11,7 @@
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_errno.h>
#include <rte_ethdev.h>
#include <rte_launch.h>
#include <rte_lcore.h>
......@@ -35,6 +36,8 @@ extern struct exch_slot *exch_slots;
extern htable_item *udp_port_table;
extern struct rte_mempool *rx_pktmbuf_pool;
extern struct rte_mempool *tx_pktmbuf_pool;
extern struct rte_mempool *tx_pktmbuf_direct_pool;
extern struct rte_mempool *tx_pktmbuf_indirect_pool;
static pid_t poller_pid;
/* Get the name of the rings of exchange slots */
......@@ -57,14 +60,37 @@ static int init_mbuf_pools(void)
const unsigned int num_mbufs_tx = NUM_TX_DESC_DEFAULT; // TODO why sized like this?
const unsigned int num_mbufs_cache = 2 * MBUF_CACHE_SIZE;
const unsigned int num_mbufs = num_mbufs_rx + num_mbufs_tx + num_mbufs_cache;
const int socket = rte_socket_id();
rx_pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_RX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
RTE_MBUF_DEFAULT_BUF_SIZE, socket);
if (rx_pktmbuf_pool == NULL) {
RTE_LOG(ERR, INIT, "Failed to allocate RX pool: %s\n", rte_strerror(rte_errno));
return -1;
}
tx_pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_TX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); // TODO size properly
RTE_MBUF_DEFAULT_BUF_SIZE, socket); // used by the app (sendto) TODO size properly
if (tx_pktmbuf_pool == NULL) {
RTE_LOG(ERR, INIT, "Failed to allocate TX pool: %s\n", rte_strerror(rte_errno));
return -1;
}
return (rx_pktmbuf_pool == NULL || tx_pktmbuf_pool == NULL); // 0 on success
tx_pktmbuf_direct_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_DIRECT_TX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, socket); // used by the poller TODO size properly
if (tx_pktmbuf_direct_pool == NULL) {
RTE_LOG(ERR, INIT, "Failed to allocate TX direct pool: %s\n", rte_strerror(rte_errno));
return -1;
}
tx_pktmbuf_indirect_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_INDIRECT_TX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, socket); // used by the poller TODO size properly
if (tx_pktmbuf_indirect_pool == NULL) {
RTE_LOG(ERR, INIT, "Failed to allocate TX indirect pool: %s\n", rte_strerror(rte_errno));
return -1;
}
return 0;
}
/* Initialize a DPDK port */
......
......@@ -37,18 +37,25 @@ static volatile int poller_alive = 1;
extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots;
extern htable_item *udp_port_table;
static struct rte_mempool *dummy_pool = NULL; // TODO dummy
/* Descriptor of a RX queue */
struct rx_queue {
struct rte_mbuf *rx_mbuf_table[RX_MBUF_TABLE_SIZE];
struct rte_ip_frag_tbl *frag_tbl; // table to store incoming packet fragments
struct rte_mempool *pool; // pool of mbufs
uint16_t portid;
};
struct tx_queue {
struct rte_mbuf *tx_mbuf_table[TX_MBUF_TABLE_SIZE];
struct rte_mempool *direct_pool;
struct rte_mempool *indirect_pool;
};
/* Descriptor of each lcore (queue configuration) */
struct lcore_queue_conf {
struct rx_queue rx_queue;
struct tx_queue tx_queue;
struct rte_ip_frag_death_row death_row;
} __rte_cache_aligned;
......@@ -88,11 +95,11 @@ static int setup_queues(void)
qconf = &lcore_queue_conf[lcore_id];
frag_cycles = (rte_get_tsc_hz() + MS_PER_S - 1) / MS_PER_S * MAX_FLOW_TTL;
// Memory pool for mbufs
// Pool of mbufs for RX
// TODO actually unused because pool is needed only to initialize a queue, which is done in 'application' anyway
qconf->rx_queue.pool = rte_mempool_lookup(PKTMBUF_POOL_RX_NAME);
if (qconf->rx_queue.pool == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot retrieve mempool for mbufs\n");
RTE_LOG(ERR, POLLINIT, "Cannot retrieve pool of mbufs for RX\n");
return -1;
}
......@@ -105,6 +112,20 @@ static int setup_queues(void)
}
RTE_LOG(INFO, POLLINIT, "Created IP fragmentation table\n");
// Pool of direct mbufs for TX
qconf->tx_queue.direct_pool = rte_mempool_lookup(PKTMBUF_POOL_DIRECT_TX_NAME);
if (qconf->tx_queue.direct_pool == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot retrieve pool of direct mbufs for TX\n");
return -1;
}
// Pool of indirect mbufs for TX
qconf->tx_queue.indirect_pool = rte_mempool_lookup(PKTMBUF_POOL_INDIRECT_TX_NAME);
if (qconf->tx_queue.indirect_pool == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot retrieve pool of indirect mbufs for TX\n");
return -1;
}
return 0;
}
......@@ -198,8 +219,6 @@ int poller_init(int argc, char *argv[])
signal(SIGINT, poller_sighandler);
signal(SIGTERM, poller_sighandler);
dummy_pool = rte_pktmbuf_pool_create("DUMMYPOOL", 1024, 32, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); // TODO dummy
return 0;
}
......@@ -304,58 +323,123 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
enqueue_rx_packet(sock_id, m);
}
static inline void flush_tx_table(struct rte_mbuf **tx_mbuf_table, uint16_t tx_count)
{
int tx_sent;
tx_sent = rte_eth_tx_burst(PORT_TX, QUEUE_TX, tx_mbuf_table, tx_count);
if (unlikely(tx_sent < tx_count)) {
// Free unsent mbufs
do {
rte_pktmbuf_free(tx_mbuf_table[tx_sent]);
} while (++tx_sent < tx_count);
}
}
/* Packet polling routine */
void poller_body(void)
{
unsigned lcore_id;
uint64_t cur_tsc;
struct lcore_queue_conf *qconf;
struct rte_mbuf **rx_mbuf_table;
struct rte_mbuf **tx_mbuf_table;
struct rte_mbuf *pkt = NULL;
const struct rte_ether_hdr *old_eth_hdr;
struct rte_ether_hdr *new_eth_hdr;
uint16_t rx_count = 0, tx_count = 0;
uint64_t ol_flags;
int n_fragments;
int i, j;
lcore_id = rte_lcore_id();
qconf = &lcore_queue_conf[lcore_id];
rx_mbuf_table = qconf->rx_queue.rx_mbuf_table;
tx_mbuf_table = qconf->tx_queue.tx_mbuf_table;
while (poller_alive) {
struct rte_mbuf *rxbuf[PKT_READ_SIZE];
struct rte_mbuf *txbuf[PKT_WRITE_SIZE];
uint16_t rx_count, tx_sendable, tx_count;
int i, j;
// Get current timestamp (needed for reassembly)
cur_tsc = rte_rdtsc();
// Transmit packets to DPDK port 0 (queue 0)
for (i = 0; i < NUM_SOCKETS_MAX; i++) {
if (exch_zone_desc->slots[i].used) {
tx_sendable = rte_ring_dequeue_burst(exch_slots[i].tx_q, (void **)txbuf, PKT_WRITE_SIZE, NULL);
if (tx_sendable > 0) {
tx_count = rte_eth_tx_burst(PORT_TX, QUEUE_TX, txbuf, tx_sendable); // TODO should call a send function that accoubts for fragmentation
if (unlikely(tx_count < tx_sendable)) {
do {
rte_pktmbuf_free(txbuf[tx_count]);
} while (++tx_count < tx_sendable);
while (tx_count < BURST_SIZE) {
// Try to dequeue one packet (and move to next slot if this was empty)
if (rte_ring_dequeue(exch_slots[i].tx_q, (void **)&pkt) < 0) {
break;
}
// Fragment the packet if needed
if (likely(pkt->pkt_len <= IPV4_MTU_DEFAULT)) { // fragmentation not needed
tx_mbuf_table[tx_count] = pkt;
tx_count++;
} else { // fragmentation needed
// Save the Ethernet header and strip it (because fragmentation applies from IPv4 header)
old_eth_hdr = rte_pktmbuf_mtod(pkt, const struct rte_ether_hdr *);
rte_pktmbuf_adj(pkt, (uint16_t)sizeof(struct rte_ether_hdr));
// Put the fragments in the TX table, one after the other starting from the pos of the last mbuf
n_fragments = rte_ipv4_fragment_packet(pkt, &tx_mbuf_table[tx_count],
(uint16_t)(TX_MBUF_TABLE_SIZE - tx_count), IPV4_MTU_DEFAULT,
qconf->tx_queue.direct_pool, qconf->tx_queue.indirect_pool);
// Free the original mbuf
rte_pktmbuf_free(pkt);
// Checksum must be recomputed
ol_flags = (PKT_TX_IPV4 | PKT_TX_IP_CKSUM);
if (unlikely(n_fragments < 0)) {
RTE_LOG(ERR, POLLBODY, "Failed to fragment a packet\n");
break;
}
// Re-attach (and adjust) the Ethernet header to each fragment
for (j = tx_count; j < tx_count + n_fragments; j++) {
pkt = tx_mbuf_table[j];
new_eth_hdr = (struct rte_ether_hdr *)rte_pktmbuf_prepend(pkt, sizeof(struct rte_ether_hdr));
if (unlikely(new_eth_hdr == NULL)) {
RTE_LOG(ERR, POLLBODY, "mbuf has no room to rebuild the Ethernet header\n");
for (int k = tx_count; k < tx_count + n_fragments; k++) {
rte_pktmbuf_free(tx_mbuf_table[k]);
}
tx_count -= n_fragments;
break;
}
new_eth_hdr->ether_type = old_eth_hdr->ether_type;
rte_ether_addr_copy(&old_eth_hdr->s_addr, &new_eth_hdr->s_addr);
rte_ether_addr_copy(&old_eth_hdr->d_addr, &new_eth_hdr->d_addr);
pkt->ol_flags |= ol_flags;
pkt->l2_len = sizeof(struct rte_ether_hdr);
}
tx_count += n_fragments;
}
}
// If a batch of packets is ready, send it
if (tx_count >= BURST_SIZE) {
flush_tx_table(tx_mbuf_table, tx_count);
tx_count = 0;
}
}
}
// Flush remaining packets (otherwise we'd need a timeout to ensure progress for sporadic traffic)
if (tx_count > 0) {
flush_tx_table(tx_mbuf_table, tx_count);
tx_count = 0;
}
// Receive packets from DPDK port 0 (queue 0) TODO use more queues
rx_count = rte_eth_rx_burst(PORT_RX, QUEUE_RX, rxbuf, PKT_READ_SIZE);
rx_count = rte_eth_rx_burst(PORT_RX, QUEUE_RX, rx_mbuf_table, RX_MBUF_TABLE_SIZE);
if (likely(rx_count > 0)) {
// Prefetch some packets (to reduce cache misses later)
for (j = 0; j < PREFETCH_OFFSET && j < rx_count; j++) {
rte_prefetch0(rte_pktmbuf_mtod(rxbuf[j], void *));
rte_prefetch0(rte_pktmbuf_mtod(rx_mbuf_table[j], void *));
}
// Prefetch the remaining packets, and reassemble the first ones
for (j = 0; j < (rx_count - PREFETCH_OFFSET); j++) {
rte_prefetch0(rte_pktmbuf_mtod(rxbuf[j + PREFETCH_OFFSET], void *));
reassemble(rxbuf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
rte_prefetch0(rte_pktmbuf_mtod(rx_mbuf_table[j + PREFETCH_OFFSET], void *));
reassemble(rx_mbuf_table[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
}
// Reassemble the second batch of fragments
for (; j < rx_count; j++) {
reassemble(rxbuf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
reassemble(rx_mbuf_table[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
}
// Effectively flush the packets to exchange buffers
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment