Commit 4e3da39f authored by Leonardo Lai's avatar Leonardo Lai

implemented sendto body

parent 8b013930
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
#define QUEUE_TX 0 #define QUEUE_TX 0
#define NUM_RX_DESC_DEFAULT 1024 #define NUM_RX_DESC_DEFAULT 1024
#define NUM_TX_DESC_DEFAULT 1024 #define NUM_TX_DESC_DEFAULT 1024
#define PKTMBUF_POOL_NAME "UDPDK_mbuf_pool"
#define MBUF_CACHE_SIZE 512 #define MBUF_CACHE_SIZE 512
#define PKTMBUF_POOL_RX_NAME "UDPDK_mbuf_pool_RX"
#define PKTMBUF_POOL_TX_NAME "UDPDK_mbuf_pool_TX"
/* Packet poller */ /* Packet poller */
#define PKT_READ_SIZE 32 #define PKT_READ_SIZE 32
...@@ -39,4 +40,10 @@ ...@@ -39,4 +40,10 @@
/* L4 port switching */ /* L4 port switching */
#define UDP_PORT_TABLE_NAME "UDPDK_UDP_port_table" #define UDP_PORT_TABLE_NAME "UDPDK_UDP_port_table"
/* IPv4 header */
#define IP_DEFTTL 64
#define IP_VERSION 0x40
#define IP_HDRLEN 0x05
#define IP_VHL_DEF (IP_VERSION | IP_HDRLEN)
#endif //UDPDK_CONSTANTS_H #endif //UDPDK_CONSTANTS_H
...@@ -8,6 +8,10 @@ ...@@ -8,6 +8,10 @@
volatile int interrupted = 0; volatile int interrupted = 0;
struct rte_mempool *rx_pktmbuf_pool = NULL;
struct rte_mempool *tx_pktmbuf_pool = NULL;
htable_item *udp_port_table = NULL; htable_item *udp_port_table = NULL;
struct exch_zone_info *exch_zone_desc = NULL; struct exch_zone_info *exch_zone_desc = NULL;
......
...@@ -33,7 +33,8 @@ extern int interrupted; ...@@ -33,7 +33,8 @@ extern int interrupted;
extern struct exch_zone_info *exch_zone_desc; extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots; extern struct exch_slot *exch_slots;
extern htable_item *udp_port_table; extern htable_item *udp_port_table;
static struct rte_mempool *pktmbuf_pool; extern struct rte_mempool *rx_pktmbuf_pool;
extern struct rte_mempool *tx_pktmbuf_pool;
static pid_t poller_pid; static pid_t poller_pid;
/* Get the name of the rings of exchange slots */ /* Get the name of the rings of exchange slots */
...@@ -50,16 +51,20 @@ static inline const char * get_exch_ring_name(unsigned id, enum exch_ring_func f ...@@ -50,16 +51,20 @@ static inline const char * get_exch_ring_name(unsigned id, enum exch_ring_func f
} }
/* Initialize a pool of mbuf for reception and transmission */ /* Initialize a pool of mbuf for reception and transmission */
static int init_mbuf_pool(void) static int init_mbuf_pools(void)
{ {
const unsigned int num_mbufs_rx = NUM_RX_DESC_DEFAULT; const unsigned int num_mbufs_rx = NUM_RX_DESC_DEFAULT;
const unsigned int num_mbufs_tx = NUM_TX_DESC_DEFAULT; const unsigned int num_mbufs_tx = NUM_TX_DESC_DEFAULT; // TODO why sized like this?
const unsigned int num_mbufs_cache = 2 * MBUF_CACHE_SIZE; const unsigned int num_mbufs_cache = 2 * MBUF_CACHE_SIZE;
const unsigned int num_mbufs = num_mbufs_rx + num_mbufs_tx + num_mbufs_cache; const unsigned int num_mbufs = num_mbufs_rx + num_mbufs_tx + num_mbufs_cache;
pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_NAME, num_mbufs, MBUF_CACHE_SIZE, 0, rx_pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_RX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
return pktmbuf_pool == NULL; // 0 on success
tx_pktmbuf_pool = rte_pktmbuf_pool_create(PKTMBUF_POOL_TX_NAME, num_mbufs, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); // TODO size properly
return (rx_pktmbuf_pool == NULL || tx_pktmbuf_pool == NULL); // 0 on success
} }
/* Initialize a DPDK port */ /* Initialize a DPDK port */
...@@ -95,7 +100,7 @@ static int init_port(uint16_t port_num) ...@@ -95,7 +100,7 @@ static int init_port(uint16_t port_num)
// Setup the RX queues // Setup the RX queues
for (q = 0; q < rx_rings; q++) { for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(port_num, q, rx_ring_size, retval = rte_eth_rx_queue_setup(port_num, q, rx_ring_size,
rte_eth_dev_socket_id(port_num), NULL, pktmbuf_pool); rte_eth_dev_socket_id(port_num), NULL, rx_pktmbuf_pool);
if (retval < 0) { if (retval < 0) {
RTE_LOG(ERR, INIT, "Could not setup RX queue %d on port %d\n", q, port_num); RTE_LOG(ERR, INIT, "Could not setup RX queue %d on port %d\n", q, port_num);
return retval; return retval;
...@@ -256,10 +261,10 @@ int udpdk_init(int argc, char *argv[]) ...@@ -256,10 +261,10 @@ int udpdk_init(int argc, char *argv[])
argc -= retval; argc -= retval;
argv += retval; argv += retval;
// Initialize pool of mbuf // Initialize pools of mbuf
retval = init_mbuf_pool(); retval = init_mbuf_pools();
if (retval < 0) { if (retval < 0) {
RTE_LOG(ERR, INIT, "Cannot initialize pool of mbufs\n"); RTE_LOG(ERR, INIT, "Cannot initialize pools of mbufs\n");
return -1; return -1;
} }
......
...@@ -37,6 +37,7 @@ static volatile int poller_alive = 1; ...@@ -37,6 +37,7 @@ static volatile int poller_alive = 1;
extern struct exch_zone_info *exch_zone_desc; extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots; extern struct exch_slot *exch_slots;
extern htable_item *udp_port_table; extern htable_item *udp_port_table;
static struct rte_mempool *dummy_pool = NULL; // TODO dummy
/* Descriptor of a RX queue */ /* Descriptor of a RX queue */
struct rx_queue { struct rx_queue {
...@@ -89,7 +90,7 @@ static int setup_queues(void) ...@@ -89,7 +90,7 @@ static int setup_queues(void)
// Memory pool for mbufs // Memory pool for mbufs
// TODO actually unused because pool is needed only to initialize a queue, which is done in 'application' anyway // TODO actually unused because pool is needed only to initialize a queue, which is done in 'application' anyway
qconf->rx_queue.pool = rte_mempool_lookup(PKTMBUF_POOL_NAME); qconf->rx_queue.pool = rte_mempool_lookup(PKTMBUF_POOL_RX_NAME);
if (qconf->rx_queue.pool == NULL) { if (qconf->rx_queue.pool == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot retrieve mempool for mbufs\n"); RTE_LOG(ERR, POLLINIT, "Cannot retrieve mempool for mbufs\n");
return -1; return -1;
...@@ -197,17 +198,9 @@ int poller_init(int argc, char *argv[]) ...@@ -197,17 +198,9 @@ int poller_init(int argc, char *argv[])
signal(SIGINT, poller_sighandler); signal(SIGINT, poller_sighandler);
signal(SIGTERM, poller_sighandler); signal(SIGTERM, poller_sighandler);
return 0; dummy_pool = rte_pktmbuf_pool_create("DUMMYPOOL", 1024, 32, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); // TODO dummy
}
static void ipv4_int_to_str(unsigned int ip, char *buf) return 0;
{
unsigned char bytes[4];
bytes[0] = ip & 0xFF;
bytes[1] = (ip >> 8) & 0xFF;
bytes[2] = (ip >> 16) & 0xFF;
bytes[3] = (ip >> 24) & 0xFF;
snprintf(buf, 16, "%d.%d.%d.%d", bytes[3], bytes[2], bytes[1], bytes[0]);
} }
static void flush_rx_queue(uint16_t idx) static void flush_rx_queue(uint16_t idx)
...@@ -243,12 +236,7 @@ static inline uint16_t is_udp_pkt(struct rte_ipv4_hdr *ip_hdr) ...@@ -243,12 +236,7 @@ static inline uint16_t is_udp_pkt(struct rte_ipv4_hdr *ip_hdr)
static inline uint16_t get_udp_dst_port(struct rte_udp_hdr *udp_hdr) static inline uint16_t get_udp_dst_port(struct rte_udp_hdr *udp_hdr)
{ {
return rte_be_to_cpu_16(udp_hdr->dst_port); return udp_hdr->dst_port;
}
static inline uint32_t get_ip_dst(struct rte_ipv4_hdr *ip_hdr)
{
return rte_be_to_cpu_32(ip_hdr->dst_addr);
} }
static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queue, static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queue,
...@@ -260,7 +248,6 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu ...@@ -260,7 +248,6 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
struct rte_ip_frag_death_row *dr; struct rte_ip_frag_death_row *dr;
struct rx_queue *rxq; struct rx_queue *rxq;
uint16_t udp_dst_port; uint16_t udp_dst_port;
uint32_t ip_dst;
int sock_id; int sock_id;
rxq = &qconf->rx_queue; rxq = &qconf->rx_queue;
...@@ -292,6 +279,7 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu ...@@ -292,6 +279,7 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
m = mo; m = mo;
eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1); ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
// TODO must fix the IP header checksum as done in ip-sec example
} }
} }
} else { } else {
...@@ -305,12 +293,6 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu ...@@ -305,12 +293,6 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
} }
udp_dst_port = get_udp_dst_port( udp_dst_port = get_udp_dst_port(
(struct rte_udp_hdr *)((unsigned char *)ip_hdr + sizeof(struct rte_ipv4_hdr))); (struct rte_udp_hdr *)((unsigned char *)ip_hdr + sizeof(struct rte_ipv4_hdr)));
ip_dst = get_ip_dst(ip_hdr);
char ip_str[16]; // TODO DEBUG
ipv4_int_to_str(ip_dst, ip_str); // TODO DEBUG
printf("[DBG] UDP dest port: %d\n", udp_dst_port); // TODO DEBUG
printf("[DBG] IP dest addr: %s\n", ip_str); // TODO DEBUG
// Find the sock_id corresponding to the UDP dst port (L4 switching) and enqueue the packet to its queue // Find the sock_id corresponding to the UDP dst port (L4 switching) and enqueue the packet to its queue
sock_id = htable_lookup(udp_port_table, udp_dst_port); sock_id = htable_lookup(udp_port_table, udp_dst_port);
...@@ -345,7 +327,7 @@ void poller_body(void) ...@@ -345,7 +327,7 @@ void poller_body(void)
for (i = 0; i < NUM_SOCKETS_MAX; i++) { for (i = 0; i < NUM_SOCKETS_MAX; i++) {
if (exch_zone_desc->slots[i].used) { if (exch_zone_desc->slots[i].used) {
tx_sendable = rte_ring_dequeue_burst(exch_slots[i].tx_q, (void **)txbuf, PKT_WRITE_SIZE, NULL); tx_sendable = rte_ring_dequeue_burst(exch_slots[i].tx_q, (void **)txbuf, PKT_WRITE_SIZE, NULL);
if (likely(tx_sendable > 0)) { if (tx_sendable > 0) {
tx_count = rte_eth_tx_burst(PORT_TX, QUEUE_TX, txbuf, tx_sendable); // TODO should call a send function that accoubts for fragmentation tx_count = rte_eth_tx_burst(PORT_TX, QUEUE_TX, txbuf, tx_sendable); // TODO should call a send function that accoubts for fragmentation
if (unlikely(tx_count < tx_sendable)) { if (unlikely(tx_count < tx_sendable)) {
do { do {
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <netinet/in.h> #include <netinet/in.h>
#include <rte_log.h> #include <rte_log.h>
#include <rte_random.h>
#include "udpdk_api.h" #include "udpdk_api.h"
#include "udpdk_lookup_table.h" #include "udpdk_lookup_table.h"
...@@ -17,6 +18,7 @@ extern int interrupted; ...@@ -17,6 +18,7 @@ extern int interrupted;
extern htable_item *udp_port_table; extern htable_item *udp_port_table;
extern struct exch_zone_info *exch_zone_desc; extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots; extern struct exch_slot *exch_slots;
extern struct rte_mempool *tx_pktmbuf_pool;
static int socket_validate_args(int domain, int type, int protocol) static int socket_validate_args(int domain, int type, int protocol)
{ {
...@@ -113,6 +115,7 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) ...@@ -113,6 +115,7 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
return -1; return -1;
} }
// Check if the port is already being used
port = addr_in->sin_port; port = addr_in->sin_port;
ret = htable_lookup(udp_port_table, port); ret = htable_lookup(udp_port_table, port);
if (ret != -1) { if (ret != -1) {
...@@ -123,10 +126,13 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) ...@@ -123,10 +126,13 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
// Mark the slot as bound // Mark the slot as bound
exch_zone_desc->slots[sockfd].bound = 1; exch_zone_desc->slots[sockfd].bound = 1;
exch_zone_desc->slots[sockfd].udp_port = (int)port;
// Insert in the hashtable (port, sock_id) // Insert in the hashtable (port, sock_id)
htable_insert(udp_port_table, (int)port, sockfd); htable_insert(udp_port_table, (int)port, sockfd);
RTE_LOG(INFO, SYSCALL, "Binding port %d to sock_id %d\n", port, sockfd); RTE_LOG(INFO, SYSCALL, "Binding port %d to sock_id %d\n", port, sockfd);
// TODO must bind the IP address too (if INADDR_ANY, pick one)
return 0; return 0;
} }
...@@ -161,16 +167,111 @@ static int sendto_validate_args(int sockfd, const void *buf, size_t len, int fla ...@@ -161,16 +167,111 @@ static int sendto_validate_args(int sockfd, const void *buf, size_t len, int fla
return 0; return 0;
} }
// TODO move this elsewhere
static int get_free_udp_port(void)
{
int port;
if (exch_zone_desc->n_zones_active == NUM_SOCKETS_MAX) {
// No port available
return -1;
}
// Generate a random unused port
do {
port = (uint16_t)rte_rand();
} while (htable_lookup(udp_port_table, port) != -1);
return port;
}
ssize_t udpdk_sendto(int sockfd, const void *buf, size_t len, int flags, ssize_t udpdk_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen) const struct sockaddr *dest_addr, socklen_t addrlen)
{ {
struct rte_mbuf *pkt;
struct rte_ether_hdr *eth_hdr;
struct rte_ipv4_hdr *ip_hdr;
struct rte_udp_hdr *udp_hdr;
void *udp_data;
const struct sockaddr_in *dest_addr_in = (struct sockaddr_in *)dest_addr;
static struct rte_ether_addr src_eth_addr = { {0x68, 0x05, 0xca, 0x95, 0xf8, 0xec} }; // TODO from configuration
static uint32_t src_ip_addr = RTE_IPV4(2, 100, 31, 172); // TODO from bind (reversed for endianness=
static struct rte_ether_addr dst_eth_addr = { {0x68, 0x05, 0xca, 0x95, 0xfa, 0x64} }; // TODO from configuration
// Validate the arguments // Validate the arguments
if (sendto_validate_args(sockfd, buf, len, flags, dest_addr, addrlen) < 0) { if (sendto_validate_args(sockfd, buf, len, flags, dest_addr, addrlen) < 0) {
return -1; return -1;
} }
// TODO implement core // If the socket was not explicitly bound, bind it when the first packet is sent
return 0; if (unlikely(!exch_zone_desc->slots[sockfd].bound)) {
struct sockaddr_in saddr_in;
memset(&saddr_in, 0, sizeof(saddr_in));
saddr_in.sin_family = AF_INET;
saddr_in.sin_addr.s_addr = INADDR_ANY;
saddr_in.sin_port = get_free_udp_port();
if (udpdk_bind(sockfd, (const struct sockaddr *)&saddr_in, sizeof(saddr_in)) < 0) {
RTE_LOG(ERR, SYSCALL, "Send failed to bind\n");
return -1;
}
}
// Allocate one mbuf for the packet (will be freed when effectively sent)
pkt = rte_pktmbuf_alloc(tx_pktmbuf_pool);
if (!pkt) {
RTE_LOG(ERR, SYSCALL, "Sendto failed to allocate mbuf\n");
errno = ENOMEM;
return -1;
}
// Initialize the Ethernet header
eth_hdr = rte_pktmbuf_mtod(pkt, struct rte_ether_hdr *);
rte_ether_addr_copy(&src_eth_addr, &eth_hdr->s_addr);
rte_ether_addr_copy(&dst_eth_addr, &eth_hdr->d_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
// Initialize the IP header
ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
memset(ip_hdr, 0, sizeof(*ip_hdr));
ip_hdr->version_ihl = IP_VHL_DEF;
ip_hdr->type_of_service = 0;
ip_hdr->fragment_offset = 0;
ip_hdr->time_to_live = IP_DEFTTL;
ip_hdr->next_proto_id = IPPROTO_UDP;
ip_hdr->packet_id = 0;
ip_hdr->src_addr = src_ip_addr; // TODO this should be determined by bind()
ip_hdr->dst_addr = dest_addr_in->sin_addr.s_addr;
ip_hdr->total_length = rte_cpu_to_be_16(len + sizeof(*ip_hdr) + sizeof(*udp_hdr));
ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);
// Initialize the UDP header
udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
udp_hdr->src_port = exch_zone_desc->slots[sockfd].udp_port;
udp_hdr->dst_port = dest_addr_in->sin_port;
udp_hdr->dgram_cksum = 0; // UDP checksum is optional
udp_hdr->dgram_len = rte_cpu_to_be_16(len + sizeof(*udp_hdr));
// Fill other DPDK metadata
pkt->nb_segs = 1;
pkt->pkt_len = len + sizeof(*eth_hdr) + sizeof(*ip_hdr) + sizeof(*udp_hdr);
pkt->data_len = pkt->pkt_len;
pkt->l2_len = sizeof(struct rte_ether_hdr);
pkt->l3_len = sizeof(struct rte_ipv4_hdr);
pkt->l4_len = sizeof(struct rte_udp_hdr);
// Write payload
udp_data = (void *)(udp_hdr + 1);
strncpy(udp_data, buf, len);
// Put the packet in the tx_ring
if (rte_ring_enqueue(exch_slots[sockfd].tx_q, (void *)pkt) < 0) {
RTE_LOG(ERR, SYSCALL, "Sendto failed to put packet in the TX ring\n");
errno = ENOBUFS;
rte_pktmbuf_free(pkt);
return -1;
}
return len;
} }
static int recvfrom_validate_args(int sockfd, void *buf, size_t len, int flags, static int recvfrom_validate_args(int sockfd, void *buf, size_t len, int flags,
...@@ -239,6 +340,7 @@ ssize_t udpdk_recvfrom(int sockfd, void *buf, size_t len, int flags, ...@@ -239,6 +340,7 @@ ssize_t udpdk_recvfrom(int sockfd, void *buf, size_t len, int flags,
udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1); udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
udp_data = (void *)(udp_hdr + 1); udp_data = (void *)(udp_hdr + 1);
udp_data_len = pkt_len - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr); udp_data_len = pkt_len - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr);
printf("recfrom pktlen: %d\n", pkt_len); printf("recfrom pktlen: %d\n", pkt_len);
printf("recfrom udp_data_len: %d\n", udp_data_len); printf("recfrom udp_data_len: %d\n", udp_data_len);
...@@ -257,8 +359,8 @@ ssize_t udpdk_recvfrom(int sockfd, void *buf, size_t len, int flags, ...@@ -257,8 +359,8 @@ ssize_t udpdk_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr_in addr_in; struct sockaddr_in addr_in;
memset(&addr_in, 0, sizeof(addr_in)); memset(&addr_in, 0, sizeof(addr_in));
addr_in.sin_family = AF_INET; addr_in.sin_family = AF_INET;
addr_in.sin_port = rte_be_to_cpu_16(udp_hdr->src_port); addr_in.sin_port = udp_hdr->src_port;
addr_in.sin_addr.s_addr = rte_be_to_cpu_32(ip_hdr->src_addr); addr_in.sin_addr.s_addr = ip_hdr->src_addr;
if (sizeof(addr_in) <= *addrlen) { if (sizeof(addr_in) <= *addrlen) {
eff_addrlen = sizeof(addr_in); eff_addrlen = sizeof(addr_in);
} else { } else {
......
...@@ -28,6 +28,7 @@ struct exch_slot_info { ...@@ -28,6 +28,7 @@ struct exch_slot_info {
int used; // used by an open socket int used; // used by an open socket
int bound; // used by a socket that did 'bind' int bound; // used by a socket that did 'bind'
int sockfd; // TODO redundant because it matches the slot index in this implementation int sockfd; // TODO redundant because it matches the slot index in this implementation
int udp_port; // UDP port associated to the socket (only if bound)
} __rte_cache_aligned; } __rte_cache_aligned;
struct exch_zone_info { struct exch_zone_info {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment