Commit fe0b1a39 authored by Leonardo Lai's avatar Leonardo Lai

recvfrom bugfixes

partial sendto implementation

improved shutdown of processes
parent 9e447ba3
...@@ -18,6 +18,8 @@ extern "C" { ...@@ -18,6 +18,8 @@ extern "C" {
int udpdk_init(int argc, char *argv[]); int udpdk_init(int argc, char *argv[]);
void udpdk_interrupt(int signum);
void udpdk_cleanup(void); void udpdk_cleanup(void);
int udpdk_socket(int domain, int type, int protocol); int udpdk_socket(int domain, int type, int protocol);
......
udpdk_init udpdk_init
udpdk_interrupt
udpdk_cleanup udpdk_cleanup
udpdk_socket udpdk_socket
udpdk_bind udpdk_bind
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#define PORT_RX 0 #define PORT_RX 0
#define PORT_TX 0 #define PORT_TX 0
#define QUEUE_RX 0 #define QUEUE_RX 0
#define QUEUE_TX 0
#define NUM_RX_DESC_DEFAULT 1024 #define NUM_RX_DESC_DEFAULT 1024
#define NUM_TX_DESC_DEFAULT 1024 #define NUM_TX_DESC_DEFAULT 1024
#define PKTMBUF_POOL_NAME "UDPDK_mbuf_pool" #define PKTMBUF_POOL_NAME "UDPDK_mbuf_pool"
...@@ -19,6 +20,7 @@ ...@@ -19,6 +20,7 @@
/* Packet poller */ /* Packet poller */
#define PKT_READ_SIZE 32 #define PKT_READ_SIZE 32
#define PKT_WRITE_SIZE 32
#define PREFETCH_OFFSET 4 #define PREFETCH_OFFSET 4
#define NUM_FLOWS_DEF 0x1000 #define NUM_FLOWS_DEF 0x1000
#define NUM_FLOWS_MIN 1 #define NUM_FLOWS_MIN 1
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include "udpdk_lookup_table.h" #include "udpdk_lookup_table.h"
#include "udpdk_types.h" #include "udpdk_types.h"
volatile int interrupted = 0;
htable_item *udp_port_table = NULL; htable_item *udp_port_table = NULL;
struct exch_zone_info *exch_zone_desc = NULL; struct exch_zone_info *exch_zone_desc = NULL;
......
...@@ -26,7 +26,10 @@ ...@@ -26,7 +26,10 @@
#include "udpdk_types.h" #include "udpdk_types.h"
#define RTE_LOGTYPE_INIT RTE_LOGTYPE_USER1 #define RTE_LOGTYPE_INIT RTE_LOGTYPE_USER1
#define RTE_LOGTYPE_CLOSE RTE_LOGTYPE_USER1
#define RTE_LOGTYPE_INTR RTE_LOGTYPE_USER1
extern int interrupted;
extern struct exch_zone_info *exch_zone_desc; extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots; extern struct exch_slot *exch_slots;
extern htable_item *udp_port_table; extern htable_item *udp_port_table;
...@@ -218,8 +221,10 @@ static int init_exchange_slots(void) ...@@ -218,8 +221,10 @@ static int init_exchange_slots(void)
// Allocate enough memory to store the exchange slots // Allocate enough memory to store the exchange slots
exch_slots = rte_malloc(EXCH_SLOTS_NAME, sizeof(*exch_slots) * NUM_SOCKETS_MAX, 0); exch_slots = rte_malloc(EXCH_SLOTS_NAME, sizeof(*exch_slots) * NUM_SOCKETS_MAX, 0);
if (exch_slots == NULL) if (exch_slots == NULL) {
rte_exit(EXIT_FAILURE, "Cannot allocate memory for exchange slots\n"); RTE_LOG(ERR, INIT, "Cannot allocate memory for exchange slots\n");
return -1;
}
// Create a rte_ring for each RX and TX slot // Create a rte_ring for each RX and TX slot
for (i = 0; i < NUM_SOCKETS_MAX; i++) { for (i = 0; i < NUM_SOCKETS_MAX; i++) {
...@@ -313,6 +318,7 @@ int udpdk_init(int argc, char *argv[]) ...@@ -313,6 +318,7 @@ int udpdk_init(int argc, char *argv[])
}; };
sleep(1); // TODO use some synchronization mechanism between primary and secondary sleep(1); // TODO use some synchronization mechanism between primary and secondary
if (poller_init(poller_argc, poller_argv) < 0) { if (poller_init(poller_argc, poller_argv) < 0) {
RTE_LOG(INFO, INIT, "Poller initialization failed\n");
return -1; return -1;
} }
poller_body(); poller_body();
...@@ -321,19 +327,25 @@ int udpdk_init(int argc, char *argv[]) ...@@ -321,19 +327,25 @@ int udpdk_init(int argc, char *argv[])
return 0; return 0;
} }
void udpdk_interrupt(int signum)
{
RTE_LOG(INFO, INTR, "Killing the poller process (%d)...\n", poller_pid);
interrupted = 1;
}
void udpdk_cleanup(void) void udpdk_cleanup(void)
{ {
uint16_t port_id; uint16_t port_id;
pid_t pid; pid_t pid;
// Kill the poller process // Kill the poller process
RTE_LOG(INFO, INIT, "Killing the poller process (%d)...\n", poller_pid); RTE_LOG(INFO, CLOSE, "Killing the poller process (%d)...\n", poller_pid);
kill(poller_pid, SIGTERM); kill(poller_pid, SIGTERM);
pid = waitpid(poller_pid, NULL, 0); pid = waitpid(poller_pid, NULL, 0);
if (pid < 0) { if (pid < 0) {
RTE_LOG(WARNING, INIT, "Failed killing the poller process\n"); RTE_LOG(WARNING, CLOSE, "Failed killing the poller process\n");
} else { } else {
RTE_LOG(INFO, INIT, "...killed!\n"); RTE_LOG(INFO, CLOSE, "...killed!\n");
} }
// Stop and close DPDK ports // Stop and close DPDK ports
......
...@@ -112,18 +112,35 @@ static int setup_exch_zone(void) ...@@ -112,18 +112,35 @@ static int setup_exch_zone(void)
// Retrieve the exchange zone descriptor in shared memory // Retrieve the exchange zone descriptor in shared memory
mz = rte_memzone_lookup(EXCH_MEMZONE_NAME); mz = rte_memzone_lookup(EXCH_MEMZONE_NAME);
if (mz == NULL) if (mz == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot retrieve exchange memzone descriptor\n"); RTE_LOG(ERR, POLLINIT, "Cannot retrieve exchange memzone descriptor\n");
return -1;
}
exch_zone_desc = mz->addr; exch_zone_desc = mz->addr;
// Allocate enough memory to store the exchange slots
exch_slots = rte_zmalloc(EXCH_SLOTS_NAME, sizeof(*exch_slots) * NUM_SOCKETS_MAX, 0);
if (exch_slots == NULL) {
RTE_LOG(ERR, POLLINIT, "Cannot allocate memory for exchange slots\n");
return -1;
}
for (i = 0; i < NUM_SOCKETS_MAX; i++) { for (i = 0; i < NUM_SOCKETS_MAX; i++) {
// Retrieve the RX queue for each slot // Retrieve the RX queue for each slot
exch_slots[i].rx_q = rte_ring_lookup(get_exch_ring_name(i, EXCH_RING_RX)); exch_slots[i].rx_q = rte_ring_lookup(get_exch_ring_name(i, EXCH_RING_RX));
if (exch_slots[i].rx_q == NULL) if (exch_slots[i].rx_q == NULL) {
RTE_LOG(ERR, POLLINIT, "Failed to retrieve rx ring queue for exchanger %u\n", i); RTE_LOG(ERR, POLLINIT, "Failed to retrieve rx ring queue for exchanger %u\n", i);
return -1;
}
// Retrieve the TX queue for each slot
exch_slots[i].tx_q = rte_ring_lookup(get_exch_ring_name(i, EXCH_RING_TX));
if (exch_slots[i].tx_q == NULL) {
RTE_LOG(ERR, POLLINIT, "Failed to retrieve tx ring queue for exchanger %u\n", i);
return -1;
}
// rx_buffer and rx_count are already zeroed thanks to zmalloc
} }
// TODO do the same for TX rings
return 0; return 0;
} }
...@@ -287,17 +304,17 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu ...@@ -287,17 +304,17 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
(struct rte_udp_hdr *)((unsigned char *)ip_hdr + sizeof(struct rte_ipv4_hdr))); (struct rte_udp_hdr *)((unsigned char *)ip_hdr + sizeof(struct rte_ipv4_hdr)));
ip_dst = get_ip_dst(ip_hdr); ip_dst = get_ip_dst(ip_hdr);
char ip_str[16]; // TODO DEBUG
ipv4_int_to_str(ip_dst, ip_str); // TODO DEBUG
printf("[DBG] UDP dest port: %d\n", udp_dst_port); // TODO DEBUG printf("[DBG] UDP dest port: %d\n", udp_dst_port); // TODO DEBUG
char ip_str[16]; printf("[DBG] IP dest addr: %s\n", ip_str); // TODO DEBUG
ipv4_int_to_str(ip_dst, ip_str);
printf("[DBG] IP dest addr: %s\n", ip_str);
// TODO based on UDP, find the appropriate exchange buffer // TODO based on UDP, find the appropriate exchange buffer
// TODO here enqueuing is a dummy round-robin, not based on actual port! // TODO here enqueuing is a dummy round-robin, not based on actual port!
if (foo & 1) { if (foo & 1) {
enqueue_rx_packet(0, m);
} else {
enqueue_rx_packet(1, m); enqueue_rx_packet(1, m);
} else {
enqueue_rx_packet(0, m);
} }
} }
...@@ -311,33 +328,49 @@ void poller_body(void) ...@@ -311,33 +328,49 @@ void poller_body(void)
lcore_id = rte_lcore_id(); lcore_id = rte_lcore_id();
qconf = &lcore_queue_conf[lcore_id]; qconf = &lcore_queue_conf[lcore_id];
// TODO check if the socket is active before doing things on it
while (poller_alive) { while (poller_alive) {
struct rte_mbuf *buf[PKT_READ_SIZE]; struct rte_mbuf *rxbuf[PKT_READ_SIZE];
uint16_t rx_count; struct rte_mbuf *txbuf[PKT_WRITE_SIZE];
uint16_t rx_count, tx_sendable, tx_count;
int i, j; int i, j;
// Get current timestamp (needed for reassembly) // Get current timestamp (needed for reassembly)
cur_tsc = rte_rdtsc(); cur_tsc = rte_rdtsc();
// Receive packets from DPDK port 0 (queue 0) # TODO use more queues // Transmit packets to DPDK port 0 (queue 0)
rx_count = rte_eth_rx_burst(PORT_RX, QUEUE_RX, buf, PKT_READ_SIZE); for (i = 0; i < NUM_SOCKETS_MAX; i++) {
if (exch_zone_desc->slots[i].used) {
tx_sendable = rte_ring_dequeue_burst(exch_slots[i].tx_q, (void **)txbuf, PKT_READ_SIZE, NULL);
if (likely(tx_sendable > 0)) {
tx_count = rte_eth_tx_burst(PORT_TX, QUEUE_TX, txbuf, tx_sendable); // TODO should call a send function that accoubts for fragmentation
if (unlikely(tx_count < tx_sendable)) {
do {
rte_pktmbuf_free(txbuf[tx_count]);
} while (++tx_count < tx_sendable);
}
}
}
}
// Receive packets from DPDK port 0 (queue 0) TODO use more queues
rx_count = rte_eth_rx_burst(PORT_RX, QUEUE_RX, rxbuf, PKT_READ_SIZE);
if (likely(rx_count > 0)) { if (likely(rx_count > 0)) {
printf("poller rxcount: %d\n", rx_count); // TODO debug
// Prefetch some packets (to reduce cache misses later) // Prefetch some packets (to reduce cache misses later)
for (j = 0; j < PREFETCH_OFFSET && j < rx_count; j++) { for (j = 0; j < PREFETCH_OFFSET && j < rx_count; j++) {
rte_prefetch0(rte_pktmbuf_mtod(buf[j], void *)); rte_prefetch0(rte_pktmbuf_mtod(rxbuf[j], void *));
} }
// Prefetch the remaining packets, and reassemble the first ones // Prefetch the remaining packets, and reassemble the first ones
for (j = 0; j < (rx_count - PREFETCH_OFFSET); j++) { for (j = 0; j < (rx_count - PREFETCH_OFFSET); j++) {
rte_prefetch0(rte_pktmbuf_mtod(buf[j + PREFETCH_OFFSET], void *)); rte_prefetch0(rte_pktmbuf_mtod(rxbuf[j + PREFETCH_OFFSET], void *));
reassemble(buf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc); reassemble(rxbuf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
} }
// Reassemble the second batch of fragments // Reassemble the second batch of fragments
for (; j < rx_count; j++) { for (; j < rx_count; j++) {
reassemble(buf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc); reassemble(rxbuf[j], PORT_RX, QUEUE_RX, qconf, cur_tsc);
} }
// Effectively flush the packets to exchange buffers // Effectively flush the packets to exchange buffers
......
...@@ -6,11 +6,14 @@ ...@@ -6,11 +6,14 @@
#include "errno.h" #include "errno.h"
#include <netinet/in.h> #include <netinet/in.h>
#include <rte_log.h>
#include "udpdk_api.h" #include "udpdk_api.h"
#include "udpdk_lookup_table.h" #include "udpdk_lookup_table.h"
#define RTE_LOGTYPE_SYSCALL RTE_LOGTYPE_USER1 #define RTE_LOGTYPE_SYSCALL RTE_LOGTYPE_USER1
extern int interrupted;
extern htable_item *udp_port_table; extern htable_item *udp_port_table;
extern struct exch_zone_info *exch_zone_desc; extern struct exch_zone_info *exch_zone_desc;
extern struct exch_slot *exch_slots; extern struct exch_slot *exch_slots;
...@@ -126,24 +129,60 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) ...@@ -126,24 +129,60 @@ int udpdk_bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
return 0; return 0;
} }
static int sendto_validate_args(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
// Ensure sockfd is not beyond max limit
if (sockfd >= NUM_SOCKETS_MAX) {
errno = ENOTSOCK;
return -1;
}
// Check if the sockfd is valid
if (!exch_zone_desc->slots[sockfd].used) {
errno = EBADF;
return -1;
}
// TODO check if buf is a legit address
// Check if flags are supported (atm none is supported)
if (flags != 0) {
errno = EINVAL;
return -1;
}
// Check if the sender is specified
if (dest_addr == NULL || addrlen == 0) {
errno = EINVAL;
return -1;
}
return 0;
}
ssize_t udpdk_sendto(int sockfd, const void *buf, size_t len, int flags, ssize_t udpdk_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen) const struct sockaddr *dest_addr, socklen_t addrlen)
{ {
// TODO implement // Validate the arguments
if (sendto_validate_args(sockfd, buf, len, flags, dest_addr, addrlen) < 0) {
return -1;
}
// TODO implement core
return 0; return 0;
} }
static int recvfrom_validate_args(int s, void *buf, size_t len, int flags, static int recvfrom_validate_args(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) struct sockaddr *src_addr, socklen_t *addrlen)
{ {
// Ensure sockfd is not beyond max limit // Ensure sockfd is not beyond max limit
if (s >= NUM_SOCKETS_MAX) { if (sockfd >= NUM_SOCKETS_MAX) {
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
// Check if the sockfd is valid // Check if the sockfd is valid
if (!exch_zone_desc->slots[s].used) { if (!exch_zone_desc->slots[sockfd].used) {
errno = EBADF; errno = EBADF;
return -1; return -1;
} }
...@@ -164,10 +203,10 @@ static int recvfrom_validate_args(int s, void *buf, size_t len, int flags, ...@@ -164,10 +203,10 @@ static int recvfrom_validate_args(int s, void *buf, size_t len, int flags,
return 0; return 0;
} }
ssize_t udpdk_recvfrom(int s, void *buf, size_t len, int flags, ssize_t udpdk_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) struct sockaddr *src_addr, socklen_t *addrlen)
{ {
int ret = 0; int ret = -1;
struct rte_mbuf *pkt = NULL; struct rte_mbuf *pkt = NULL;
uint32_t pkt_len; uint32_t pkt_len;
uint32_t udp_data_len; uint32_t udp_data_len;
...@@ -178,14 +217,20 @@ ssize_t udpdk_recvfrom(int s, void *buf, size_t len, int flags, ...@@ -178,14 +217,20 @@ ssize_t udpdk_recvfrom(int s, void *buf, size_t len, int flags,
struct rte_udp_hdr *udp_hdr; struct rte_udp_hdr *udp_hdr;
void *udp_data; void *udp_data;
printf("Inside recvfrom\n");
// Validate the arguments // Validate the arguments
if (recvfrom_validate_args(s, buf, len, flags, src_addr, addrlen) < 0) { if (recvfrom_validate_args(sockfd, buf, len, flags, src_addr, addrlen) < 0) {
return -1; return -1;
} }
// Dequeue one packet (busy wait until one is available) // Dequeue one packet (busy wait until one is available)
while (ret != 0) { while (ret < 0 && !interrupted) {
ret = rte_ring_dequeue(exch_slots[s].rx_q, (void **)&pkt); ret = rte_ring_dequeue(exch_slots[sockfd].rx_q, (void **)&pkt);
}
if (interrupted) {
RTE_LOG(INFO, SYSCALL, "Recvfrom returning due to signal\n");
errno = EINTR;
return -1;
} }
// Get some useful pointers to headers and data // Get some useful pointers to headers and data
pkt_len = pkt->pkt_len; pkt_len = pkt->pkt_len;
...@@ -194,6 +239,8 @@ ssize_t udpdk_recvfrom(int s, void *buf, size_t len, int flags, ...@@ -194,6 +239,8 @@ ssize_t udpdk_recvfrom(int s, void *buf, size_t len, int flags,
udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1); udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
udp_data = (void *)(udp_hdr + 1); udp_data = (void *)(udp_hdr + 1);
udp_data_len = pkt_len - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr); udp_data_len = pkt_len - sizeof(struct rte_ipv4_hdr) - sizeof(struct rte_udp_hdr);
printf("recfrom pktlen: %d\n", pkt_len);
printf("recfrom udp_data_len: %d\n", udp_data_len);
// If the provided buffer is large enough to store it, then copy the whole packet, else only part of it // If the provided buffer is large enough to store it, then copy the whole packet, else only part of it
if (udp_data_len >= len) { if (udp_data_len >= len) {
......
...@@ -40,6 +40,6 @@ struct exch_slot { ...@@ -40,6 +40,6 @@ struct exch_slot {
struct rte_ring *tx_q; // TX queue struct rte_ring *tx_q; // TX queue
struct rte_mbuf *rx_buffer[EXCH_BUF_SIZE]; // buffers storing rx packets before flushing to rt_ring struct rte_mbuf *rx_buffer[EXCH_BUF_SIZE]; // buffers storing rx packets before flushing to rt_ring
uint16_t rx_count; // current number of packets in the rx buffer uint16_t rx_count; // current number of packets in the rx buffer
}; } __rte_cache_aligned;
#endif //UDPDK_TYPES_H #endif //UDPDK_TYPES_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment