Commit 8e1b4398 authored by Jie's avatar Jie

initial version for DirectDAQ

parent e3c3ba46
ROOTDIR=../..
DEPSDIR=${ROOTDIR}/deps
ifeq ($(RTE_TARGET),)
$(error "Please define RTE_TARGET environment variable")
endif
ifeq ($(UDPDK_PATH),)
UDPDK_PATH=${ROOTDIR}
endif
VPATH = #./
CC = gcc
CXX = g++
CFLAGS = -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE -D_REENTRANT -fPIC -O2 -DPERF_OPT -g -D_DEBUG #-D_THREAD_SAFE -W -g #_Wall
CFLAGS += -I./lib -I${ROOTDIR}/udpdk -I${ROOTDIR}/udpdk/list -I${DEPSDIR}/dpdk/${RTE_TARGET}/include
CXXFLAGS = -I./lib -fPIC #D_DEBUG
LDFLAGS = -L./lib -Wl,--rpath=./lib -L${UDPDK_PATH}/udpdk -Wl,--whole-archive,-ludpdk,--no-whole-archive -L${DEPSDIR}/dpdk/${RTE_TARGET}/lib -Wl,--whole-archive,-ldpdk,--no-whole-archive
LIBS = -lpthread -lm -Wl,--no-whole-archive -lrt -lm -ldl -lcrypto -pthread -lnuma #-static
MYLIBNAME = dd
CONS = dd_cons
PROD = dpdk_recv
SEND = dpdk_send
MYLIB = lib$(MYLIBNAME).so
TARGETS = $(MYLIB) $(PROD) $(SEND) $(CONS)
CONSOBJS = dd_cons.o
PRODOBJS = dpdk_recv.o
SENDOBJS = dpdk_send.o
MYLIBOBJS = dd.o
OBJS = $(PRODOBJS) $(CONSOBJS) $(MYLIBOBJS) $(EXTS)
.PHONY:all
all: $(TARGETS)
$(PROD): $(PRODOBJS) $(EXTS)
$(CC) $(CFLAGS) $(LDFLAGS) $^ -l$(MYLIBNAME) $(LIBS) -o $@ #-lstdc++
$(SEND): $(SENDOBJS) $(EXTS)
$(CC) $(CFLAGS) $(LDFLAGS) $^ -l$(MYLIBNAME) $(LIBS) -o $@ #-lstdc++
$(CONS): $(CONSOBJS) $(EXTS)
$(CC) $(CFLAGS) $(LDFLAGS) $^ -l$(MYLIBNAME) $(LIBS) -o $@ #-lstdc++
$(MYLIB): $(MYLIBOBJS)
$(CC) -shared $^ -o $@
cp $@ lib/
.c.o:
$(CC) $(CFLAGS) -c $< -o $@
.cpp.o:
$(CXX) $(CXXFLAGS) -c $< -o $@
.PHONY:clean
clean:
rm -f *.o $(OBJS) $(TARGETS) lib/$(MYLIB)
#For demo test:
Send host:
change config.ini according to local NIC mac/ip and remote mac
#delay 1 second in loop
sudo ./dpdk_send -c config.ini -d 1000000
Recv host:
swap local/remote setting in config.ini
in one terminal: mkdir /tmp/shm; sudo ./dpdk_recv -c config.ini
in another terminal: sudo ./dd_cons
#ifndef _SYS_BUF_RING_H_
#define _SYS_BUF_RING_H_
#define CACHE_LINE_SIZE 64
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
volatile uint32_t br_cons_head __attribute__((aligned(CACHE_LINE_SIZE)));
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;
void *br_ring[0] __attribute__((aligned(CACHE_LINE_SIZE)));
};
/*
* multi-producer safe lock-free ring buffer enqueue
*
*/
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
{
uint32_t prod_head, prod_next, cons_tail;
#ifdef DEBUG_BUFRING
int i;
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
if(br->br_ring[i] == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
//critical_enter();
do {
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
//rmb();
__sync_synchronize();
if (prod_head == br->br_prod_head &&
cons_tail == br->br_cons_tail) {
br->br_drops++;
//critical_exit();
//return (ENOBUFS);
return -1;
}
continue;
}
} while (!__sync_bool_compare_and_swap(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
#endif
br->br_ring[prod_head] = buf;
/*
* If there are other enqueues in progress
* that preceded us, we need to wait for them
* to complete
*/
while (br->br_prod_tail != prod_head)
;//cpu_spinwait();
__sync_synchronize(); //memory barrier
//atomic_store_rel_int(&br->br_prod_tail, prod_next);
br->br_prod_tail = prod_next;
//critical_exit();
return (0);
}
/*
* multi-consumer safe dequeue
*
*/
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
void *buf;
//critical_enter();
do {
cons_head = br->br_cons_head;
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == br->br_prod_tail) {
//critical_exit();
return (void*)(-1);
}
} while (!__sync_bool_compare_and_swap(&br->br_cons_head, cons_head, cons_next));
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
/*
* If there are other dequeues in progress
* that preceded us, we need to wait for them
* to complete
*/
while (br->br_cons_tail != cons_head)
;//cpu_spinwait();
__sync_synchronize();
//atomic_store_rel_int(&br->br_cons_tail, cons_next);
br->br_cons_tail = cons_next;
//critical_exit();
return (buf);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
return (br->br_cons_head == br->br_prod_tail);
}
static __inline int
buf_ring_count(struct buf_ring *br)
{
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);
}
static __inline void *
buf_ring_dequeue_mc_blob(struct buf_ring *br, uint32_t blob_size)
{
uint32_t cons_head, cons_next;
void *buf;
if(buf_ring_count(br) < blob_size) {
return (void*)(-1);
}
//critical_enter();
do {
cons_head = br->br_cons_head;
cons_next = (cons_head + blob_size) & br->br_cons_mask;
if (buf_ring_count(br) < blob_size) { //no new whole blob
//get last blob
cons_head = (cons_head - blob_size) & br->br_cons_mask;
buf = br->br_ring[cons_head];
return (buf);
}
} while (!__sync_bool_compare_and_swap(&br->br_cons_head, cons_head, cons_next));
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
/*
* If there are other dequeues in progress
* that preceded us, we need to wait for them
* to complete
*/
while (br->br_cons_tail != cons_head)
;//cpu_spinwait();
__sync_synchronize();
//atomic_store_rel_int(&br->br_cons_tail, cons_next);
br->br_cons_tail = cons_next;
//critical_exit();
return (buf);
}
#endif
# UDPDK configuration file
[dpdk]
lcores_primary=2
lcores_secondary=4
n_mem_channels=2
[port0]
mac_addr=24:42:53:30:28:27
ip_addr=172.31.100.2
[port0_dst]
mac_addr=24:42:53:30:29:37
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <fcntl.h>
#include <ctype.h>
#include <termios.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <linux/types.h>
#include <math.h>
#include <getopt.h>
#include <sched.h>
#include "buf_ring.h"
#include "shm.h"
#include "dd.h"
#if defined(_DEBUG)
#define dd_pr(fmt,arg...) \
printf("%s:%d:"fmt, __FUNCTION__, __LINE__, ##arg)
#else
#define dd_pr(fmt,arg...)
#endif
#define dd_error(fmt,arg...) \
fprintf(stderr, "ERROR in %s:%d:"fmt, __FUNCTION__, __LINE__, ##arg)
#if defined(_DEBUG)
#else
#endif
typedef struct {
SHM_HEADER *pHeader;
struct buf_ring *br;
ADC_SAMPLE *sample;
uint32_t sample_tail; //single producer
uint32_t sample_blob_last;
uint32_t channel_num;
uint32_t blob_size; //number of sample group
uint32_t stream_mode;
} DD_Context;
static DD_Context g_ctx[JOB_NUM_MAX];
typedef struct {
uint32_t blob_size;
uint16_t blob_id;
uint16_t slice_id; //one UDP is a slice
uint32_t sample_rate;
uint8_t sample_group_count; //number of sample group(a group consists of all channels' sample)
uint8_t channels_enabled;
uint8_t downsampling;
uint8_t job_id;
} DD_PayloadHeader;
//utility functions
//interface
int32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id)
{
char file[256];
int shm_fd = 0, size;
struct stat stBuf;
void * mem = NULL;
sprintf(file, "%s.%d", DD_SHM_PATH, job_id);
size = DD_SHM_SIZE;
if(mode == MODE_PRODUCER && stat(file, &stBuf) == -1) {
shm_fd = open(file, O_CREAT | O_RDWR, 00777);
if(shm_fd < 0) {
printf("create %s failed!\n", file);
return -1;
}
//adjust size
if (ftruncate(shm_fd, size + sizeof(SHM_HEADER)) < 0) {
close(shm_fd);
unlink(file);
printf("adjust file size to %d Bytes failed!\n", size);
return -1;
}
}
if(shm_fd == 0) {
shm_fd = open(file, O_RDWR);
if(shm_fd < 0) {
printf("open %s failed\n", file);
return -1;
}
}
mem = (void*)mmap(0, size + sizeof(SHM_HEADER), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if(mem == MAP_FAILED) {
printf("mmap failed!\n");
exit(0);
}
g_ctx[job_id].pHeader = (SHM_HEADER *)mem;
g_ctx[job_id].br = &(g_ctx[job_id].pHeader->br);
if(mode == MODE_PRODUCER) {
g_ctx[job_id].pHeader->magic = DD_SHM_MAGIC;
memset(&(g_ctx[job_id].pHeader->br), 0, sizeof(struct buf_ring));
if(ring_size == 0) {
ring_size = DEFAULT_RING_SIZE;
}
else if(ring_size & (ring_size - 1) != 0){
printf("ring_size must be pow of 2!\n");
return -1;
}
g_ctx[job_id].pHeader->br.br_prod_size = ring_size;
g_ctx[job_id].pHeader->br.br_cons_size = ring_size;
g_ctx[job_id].pHeader->br.br_prod_mask = ring_size - 1 ;
g_ctx[job_id].pHeader->br.br_cons_mask = ring_size - 1 ;
}
else if(mode == MODE_CONSUMER && g_ctx[job_id].pHeader->magic != DD_SHM_MAGIC) {
printf("shm MAGIC doesn't match!\n");
return -1;
}
g_ctx[job_id].sample = (ADC_SAMPLE*)(mem + sizeof(SHM_HEADER) + 8*g_ctx[job_id].pHeader->br.br_prod_size);
dd_pr("pHeader: %p, pHeader->br: %p, pHeader->br.br_ring: %p, sample: %p\n", g_ctx[job_id].pHeader, &(g_ctx[job_id].pHeader->br), g_ctx[job_id].br->br_ring, g_ctx[job_id].sample);
return 0;
}
int32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id)
{
g_ctx[job_id].channel_num = channel_num;
g_ctx[job_id].blob_size = blob_size;
g_ctx[job_id].stream_mode = stream_mode;
return 0;
}
int32_t dd_status(uint32_t job_id)
{
if(g_ctx[job_id].pHeader == NULL) {
return -1;
}
dd_pr("size: %d, used: %d, tail: %d, head: %d, drop: %lu\n", g_ctx[job_id].br->br_prod_size, buf_ring_count(g_ctx[job_id].br), g_ctx[job_id].br->br_prod_tail, g_ctx[job_id].br->br_cons_tail, g_ctx[job_id].br->br_drops);
return 0;
}
int32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id)
{
int64_t ret;
while(num_group--) {
ret = (int64_t)buf_ring_dequeue_mc(g_ctx[job_id].br);
if(ret != -1) {
memcpy(pSample, &g_ctx[job_id].sample[ret * g_ctx[job_id].channel_num], sizeof(ADC_SAMPLE) * g_ctx[job_id].channel_num);
}
else return -1;
}
return (int32_t)ret;
}
int32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id)
{
int ret;
while(num_group--) {
memcpy(&g_ctx[job_id].sample[g_ctx[job_id].sample_tail * g_ctx[job_id].channel_num], pSample, sizeof(ADC_SAMPLE) * g_ctx[job_id].channel_num);
ret = buf_ring_enqueue(g_ctx[job_id].br, (void *)(uint64_t)g_ctx[job_id].sample_tail);
if(ret == 0) {
g_ctx[job_id].sample_tail = (g_ctx[job_id].sample_tail + 1) & g_ctx[job_id].br->br_prod_mask;
}
else return -1;
pSample += g_ctx[job_id].channel_num;
}
return 0;
}
int32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id)
{
int64_t ret;
uint32_t prod_tail, sample_tail, sample_start, count;
prod_tail = (g_ctx[job_id].br->br_prod_tail - 1) & g_ctx[job_id].br->br_prod_mask;
sample_tail = (uint32_t)g_ctx[job_id].br->br_ring[prod_tail];
sample_start = (sample_tail - g_ctx[job_id].blob_size - 1) & g_ctx[job_id].br->br_prod_mask;
count = g_ctx[job_id].blob_size;
while(count--) {
memcpy(pSample, &g_ctx[job_id].sample[sample_start * g_ctx[job_id].channel_num], sizeof(ADC_SAMPLE) * g_ctx[job_id].channel_num);
sample_start = (sample_start + 1) & g_ctx[job_id].br->br_prod_mask;
pSample += g_ctx[job_id].channel_num;
}
return 0;
}
uint32_t dd_count(uint32_t job_id)
{
return buf_ring_count(g_ctx[job_id].br);
}
uint32_t dd_close();
#ifndef __DD_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#define DEFAULT_RING_SIZE (128*1024)
#define CH_NUM_MAX 192
#define JOB_NUM_MAX 250
typedef uint16_t ADC_SAMPLE;
enum {
MODE_PRODUCER = 0,
MODE_CONSUMER
};
int32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id);
int32_t dd_status(uint32_t job_id);
int32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id);
int32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id); //risky
//int32_t dd_release_blob(ADC_SAMPLE *pSample);
int32_t dd_get_blob_copy(ADC_SAMPLE *buf, uint32_t offset, uint32_t count);
uint32_t dd_count(uint32_t job_id);
uint32_t dd_close();
#ifdef __cplusplus
}
#endif
#endif //__D_H__
#define _GNU_SOURCE
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdarg.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <fcntl.h>
#include <ctype.h>
#include <termios.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <linux/types.h>
#include <math.h>
#include <getopt.h>
#include <sched.h>
#include <pthread.h>
#include "dd.h"
#define THREAD_NUM 1
static int32_t exitRequested = 0;
static pthread_t threads[THREAD_NUM];
/*
* sigintHandler --
*
* SIGINT handler, so we can gracefully exit when the user hits ctrl-C.
*/
static void
sigintHandler(int signum)
{
switch(signum) {
case SIGINT:
exitRequested = 1;
break;
case SIGUSR1:
break;
}
}
static void print_usage(const char *prog)
{
printf("Usage: %s [-DsA]\n", prog);
puts(
" -v verbose\n"
);
exit(1);
}
static void parse_opts(int argc, char *argv[])
{
while (1) {
int c;
char *endp;
c = getopt(argc, argv, "D:s:S:A:rvzw:d:nipomTN:O:P:I:D:V:W:X:Y:Z:");
if (c == -1)
break;
switch (c) {
default:
print_usage(argv[0]);
break;
}
}
}
//DD parameters
static uint32_t job_id = 1;
static uint32_t channels_enabled = 192;
static uint32_t blob_size = 3;
static void *cons_proc(void *param)
{
int32_t index, i;
ADC_SAMPLE sample[192];
while(!exitRequested) {
index = dd_get_slice(sample, 1, job_id);
if(index != -1) {
printf("thread #%ld: get index: %d\n", (int64_t)param, index);
printf("\t first 4 channel sample data:\n");
for(i = 0; i < 4; i++) {
printf("\t channel #%3d: 0x%4x\n", i, sample[i]);
}
}
usleep(200000); //200ms
}
}
int main(int argc, char **argv) {
struct timeval now;
uint64_t ts, ts1;
cpu_set_t mask;
struct sched_param param;
int ret, i;
//Set CPU affinity.
#if 0
CPU_ZERO(&mask);
CPU_SET(1, &mask); //cpu 1
if(sched_setaffinity(0, sizeof(cpu_set_t), &mask) == -1)
{
exit(EXIT_FAILURE);
}
#endif
#if 0
//Set scheduler and priority.
param.sched_priority = 50;
sched_setscheduler(0, SCHED_FIFO, &param);
#endif
parse_opts(argc, argv);
signal(SIGINT, sigintHandler);
printf("dd_cons\n");
ret = dd_init(MODE_CONSUMER, 0, job_id);
printf("dd_init(): %d\n", ret);
dd_config(channels_enabled, blob_size, 0, job_id);
dd_status(job_id);
//create theads to consume buf_ring
for(i = 0; i < THREAD_NUM; i++) {
ret = pthread_create(&threads[i], NULL, cons_proc, (void*)(uint64_t)i);
if(ret != 0) //failed
{
printf("Create thread #%d failed!\n", i);
return -1;
}
}
//wait for threads
for(i = 0; i < THREAD_NUM; i++) {
if( 0 == pthread_join(threads[i], NULL)) //thread quit normally
{
printf("Thread #%d finished\n", i);
}
}
out:
return 0;
}
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <udpdk_api.h>
#include "dd.h"
typedef struct {
uint32_t blob_size;
uint16_t blob_id;
uint16_t slice_id;
uint32_t sample_rate;
uint8_t sample_group_count;
uint8_t channels_enabled;
uint8_t downsampling;
uint8_t job_id;
} DD_PayloadHeader;
#define PORT_PING 10000
#define PORT_PONG 10001
#define IP_PONG "172.31.100.1"
#define MAX_SAMPLES 1000
static volatile int app_alive = 1;
static int log_enabled = 0;
static char *log_file;
static FILE *log;
static unsigned delay = 1000000;
static unsigned samples[MAX_SAMPLES];
static unsigned n_samples = 0;
static const char *progname;
static void signal_handler(int signum)
{
printf("Caught signal %d in pingpong main process\n", signum);
udpdk_interrupt(signum);
app_alive = 0;
}
static void dd_recv(void)
{
int sock, n;
struct sockaddr_in servaddr, cliaddr;
struct timespec ts_msg;
char buf[1936];
// Create a socket
if ((sock = udpdk_socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
fprintf(stderr, "Pong: socket creation failed");
return;
}
// Bind it
memset(&servaddr, 0, sizeof(servaddr));
memset(&cliaddr, 0, sizeof(cliaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT_PONG);
if (udpdk_bind(sock, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
fprintf(stderr, "Pong: bind failed");
return;
}
/*
* typedef struct {
uint32_t blob_size;
uint16_t blob_id;
uint16_t slice_id;
uint32_t sample_rate;
uint8_t sample_group_count;
uint8_t channels_enabled;
uint8_t downsampling;
uint8_t job_id;
}
* */
printf("DD receiving UDP...\n");
while (app_alive) {
// Bounce incoming packets
int len = sizeof(cliaddr);
n = udpdk_recvfrom(sock, (void *)buf, sizeof(buf), 0, ( struct sockaddr *) &cliaddr, &len);
if (n > 0) {
DD_PayloadHeader *pHeader = (DD_PayloadHeader *)buf;
printf("=Received UDP packet: %d bytes...\n", n);
printf("blob_size: %d\n", pHeader->blob_size);
printf("blob_id: %d\n", pHeader->blob_id);
printf("slice_id: %d\n", pHeader->slice_id);
printf("sample_rate: %d\n", pHeader->sample_rate);
printf("sample_group_count: %d\n", pHeader->sample_group_count);
printf("channels_enabled: %d\n", pHeader->channels_enabled);
printf("downsampling: %d\n", pHeader->downsampling);
printf("job_id: %d\n", pHeader->job_id);
#if 1
if(dd_status(pHeader->job_id) < 0) { //first time
printf("=dd_init()\n");
dd_init(MODE_PRODUCER, 0, pHeader->job_id);
dd_config(pHeader->channels_enabled, pHeader->blob_size, 0, pHeader->job_id);
}
dd_put_slice((ADC_SAMPLE *)(buf + 16), pHeader->sample_group_count, pHeader->job_id);
#endif
}
}
}
static void usage(void)
{
printf("%s -c CONFIG -f FUNCTION \n"
" -c CONFIG: .ini configuration file\n"
" -d DELAY: delay (microseconds) between two ping invocations\n"
, progname);
}
static int parse_app_args(int argc, char *argv[])
{
int c;
progname = argv[0];
while ((c = getopt(argc, argv, "c:d:")) != -1) {
switch (c) {
case 'c':
// this is for the .ini cfg file needed by DPDK, not by the app
break;
case 'd':
delay = atoi(optarg);
break;
default:
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
usage();
return -1;
}
}
return 0;
}
int main(int argc, char *argv[])
{
int retval;
// Register signals for shutdown
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// Initialize UDPDK
retval = udpdk_init(argc, argv);
if (retval < 0) {
goto _end;
return -1;
}
printf("App: UDPDK Intialized\n");
// Parse app-specific arguments
printf("Parsing app arguments...\n");
retval = parse_app_args(argc, argv);
if (retval != 0) {
goto _end;
return -1;
}
dd_recv();
_end:
if (log_enabled) {
printf("Dumping %d samples to log...\n", n_samples);
for (unsigned i = 0; i < n_samples; ++i)
fprintf(log, "%d\n", samples[i]);
printf("Closing log...\n");
fclose(log);
}
udpdk_interrupt(0);
udpdk_cleanup();
return 0;
}
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <time.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <udpdk_api.h>
#include "dd.h"
typedef struct {
uint32_t blob_size;
uint16_t blob_id;
uint16_t slice_id;
uint32_t sample_rate;
uint8_t sample_group_count;
uint8_t channels_enabled;
uint8_t downsampling;
uint8_t job_id;
} DD_PayloadHeader;
#define PORT_PING 10000
#define PORT_PONG 10001
#define IP_PONG "172.31.100.1"
static volatile int app_alive = 1;
static int log_enabled = 0;
static char *log_file;
static FILE *log;
static unsigned delay = 1000000;
static const char *progname;
static void signal_handler(int signum)
{
printf("Caught signal %d in pingpong main process\n", signum);
udpdk_interrupt(signum);
app_alive = 0;
}
static void dd_send(void)
{
struct sockaddr_in servaddr, destaddr;
struct timespec ts, ts_msg, ts_now;
int n, i, j, len;
static char buf[1936];
char *p;
DD_PayloadHeader *pHeader = (DD_PayloadHeader *)buf;
fprintf(stderr, "DD Send sample data using DPDK...\n");
// Create a socket
int sock;
if ((sock = udpdk_socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
fprintf(stderr, "Ping: socket creation failed");
return;
}
// Bind it
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT_PING);
if (udpdk_bind(sock, (const struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
fprintf(stderr, "bind failed");
return;
}
/*
typedef struct {
uint32_t blob_size;
uint16_t blob_id;
uint16_t slice_id;
uint32_t sample_rate;
uint8_t sample_group_count;
uint8_t channels_enabled;
uint8_t downsampling;
uint8_t job_id;
} DD_PayloadHeader;
* */
n = 1;
while (app_alive) {
destaddr.sin_family = AF_INET;
destaddr.sin_addr.s_addr = inet_addr(IP_PONG);
destaddr.sin_port = htons(PORT_PONG);
clock_gettime(CLOCK_REALTIME, &ts);
pHeader-> blob_size = 3;
pHeader-> blob_id = n;
pHeader-> slice_id = n;
pHeader-> sample_rate = 200000; //200K Hz
pHeader-> sample_group_count = 3; //3*192*2 = 1152 bytes
pHeader-> channels_enabled = 192;
pHeader-> downsampling = 0;
pHeader-> job_id = 1;
len = 1152 + 16;
p = buf + 16;
for(i = 0; i < pHeader->channels_enabled * pHeader-> blob_size; i++) {
//little endian
p[2*i+1] = i & 0xFF ; //channel id
p[2*i] = n & 0xFF; //test byte data
}
printf("send UDP packet #%d...\n", n);
udpdk_sendto(sock, (void *)buf, len, 0,
(const struct sockaddr *) &destaddr, sizeof(destaddr));
usleep(delay);
n++;
}
}
static void usage(void)
{
printf("%s -c CONFIG -f FUNCTION \n"
" -c CONFIG: .ini configuration file\n"
" -d DELAY: delay (microseconds) between two ping invocations\n"
, progname);
}
static int parse_app_args(int argc, char *argv[])
{
int c;
progname = argv[0];
while ((c = getopt(argc, argv, "c:d:")) != -1) {
switch (c) {
case 'c':
// this is for the .ini cfg file needed by DPDK, not by the app
break;
case 'd':
delay = atoi(optarg);
break;
default:
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
usage();
return -1;
}
}
return 0;
}
int main(int argc, char *argv[])
{
int retval;
// Register signals for shutdown
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// Initialize UDPDK
retval = udpdk_init(argc, argv);
if (retval < 0) {
goto _end;
return -1;
}
printf("App: UDPDK Intialized\n");
// Parse app-specific arguments
printf("Parsing app arguments...\n");
retval = parse_app_args(argc, argv);
if (retval != 0) {
goto _end;
return -1;
}
dd_send();
_end:
udpdk_interrupt(0);
udpdk_cleanup();
return 0;
}
#ifndef _SHM_H_
#define _SHM_H_
#include "buf_ring.h"
enum {
FLAG_SCURVE = 1,
FLAG_PID_FE,
FLAG_PID_TE
};
#define DD_SHM_MAGIC 0xAA55AC57
typedef struct {
uint32_t magic;
uint32_t shm_len;
struct buf_ring br __attribute__((aligned(CACHE_LINE_SIZE)));
} SHM_HEADER;
#define DD_SHM_PATH "/tmp/shm/dd.shm"
#define DD_SHM_SIZE (64*1024*1024)
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <fcntl.h>
#include "buf_ring.h"
#include "shm.h"
#include "dd.h"
int main(int argc, char *argv[]) {
struct buf_ring br;
printf("sizeof(buf_ring): %d Bytes\n", sizeof(struct buf_ring));
printf("br address: 0x%08x\n", &br);
printf("br.br_cons_head address: 0x%08x\n", &br.br_cons_head);
printf("br.br_ring address: 0x%08x\n", br.br_ring);
return 0;
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ CFLAGS += $(WERROR_FLAGS) -O3
TARGET="pingpong"
all:
cc -I${ROOTDIR}/udpdk -I${DEPSDIR}/dpdk/${RTE_TARGET}/include -o ${TARGET} ${SRCS} ${LIBS}
cc -I${ROOTDIR}/udpdk -I${ROOTDIR}/udpdk/list -I${DEPSDIR}/dpdk/${RTE_TARGET}/include -o ${TARGET} ${SRCS} ${LIBS}
.PHONY: clean
clean:
......
......@@ -25,7 +25,7 @@ CFLAGS += $(WERROR_FLAGS) -O3
TARGET="pktgen"
all:
cc -I${ROOTDIR}/udpdk -I${DEPSDIR}/dpdk/${RTE_TARGET}/include -o ${TARGET} ${SRCS} ${LIBS}
cc -I${ROOTDIR}/udpdk -I${ROOTDIR}/udpdk/list -I${DEPSDIR}/dpdk/${RTE_TARGET}/include -o ${TARGET} ${SRCS} ${LIBS}
.PHONY: clean
clean:
......
......@@ -7,8 +7,8 @@ lcores_secondary=4
n_mem_channels=2
[port0]
mac_addr=68:05:ca:95:f8:ec
mac_addr=24:42:53:30:28:27
ip_addr=172.31.100.2
[port0_dst]
mac_addr=68:05:ca:95:fa:64
mac_addr=24:42:53:30:29:37
#for deps/dpdk compilation, add
HOST_WERROR_FLAGS := -w
WERROR_FLAGS := -w
to the bottom of deps/dpdk/mk/toolchain/gcc/rte.vars.mk
modprobe vfio-pci
echo 1 > /sys/module/vfio/parameters/enable_unsafe_noiommu_mode
export RTE_SDK=/mnt/source/UDPDK/deps/dpdk
export RTE_TARGET=x86_64-native-linux-gcc
......@@ -22,8 +22,9 @@ UDPDK_DPDK=${DEPSDIR}/dpdk/${RTE_TARGET}
INIH=${DEPSDIR}/inih
CFLAGS= -march=native -O2
CFLAGS+= -Wall -Wno-deprecated-declarations -Werror -Wno-unused-variable
CFLAGS= -march=native -O2 -msse4.1
#CFLAGS+= -Wall -Wno-deprecated-declarations -Werror -Wno-unused-variable
CFLAGS+= -Wall -Wno-deprecated-declarations -w -Wno-unused-variable
CFLAGS+= -fno-common -finline-limit=8000
CFLAGS+= --param inline-unit-growth=100
CFLAGS+= --param large-function-growth=1000
......
......@@ -135,14 +135,14 @@ static int init_port(uint16_t port_num)
const struct rte_eth_conf port_conf = {
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
.max_rx_pkt_len = RTE_MIN(JUMBO_FRAME_MAX_SIZE, dev_info.max_rx_pktlen),
.max_rx_pkt_len = 3000, //RTE_MIN(JUMBO_FRAME_MAX_SIZE, dev_info.max_rx_pktlen),
.split_hdr_size = 0,
.offloads = (DEV_RX_OFFLOAD_CHECKSUM |
DEV_RX_OFFLOAD_SCATTER |
DEV_RX_OFFLOAD_JUMBO_FRAME),
.offloads = 0 //(DEV_RX_OFFLOAD_CHECKSUM) // |
//DEV_RX_OFFLOAD_SCATTER |
//DEV_RX_OFFLOAD_JUMBO_FRAME),
},
.txmode = {
.offloads = DEV_TX_OFFLOAD_MULTI_SEGS,
.offloads = 0 //DEV_TX_OFFLOAD_MULTI_SEGS,
}
};
......@@ -396,7 +396,7 @@ void udpdk_cleanup(void)
// Kill the poller process
RTE_LOG(INFO, CLOSE, "Killing the poller process (%d)...\n", poller_pid);
kill(poller_pid, SIGTERM);
//kill(poller_pid, SIGTERM);
pid = waitpid(poller_pid, NULL, 0);
if (pid < 0) {
RTE_LOG(WARNING, CLOSE, "Failed killing the poller process\n");
......
......@@ -331,7 +331,8 @@ static inline void reassemble(struct rte_mbuf *m, uint16_t portid, uint32_t queu
eth_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) {
//if (RTE_ETH_IS_IPV4_HDR(m->packet_type)) { //vmxnet3 has no checksum offload for IP packet
if (1) {
ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
......@@ -472,7 +473,8 @@ void poller_body(void)
// Free the original mbuf
rte_pktmbuf_free(pkt);
// Checksum must be recomputed
ol_flags = (PKT_TX_IPV4 | PKT_TX_IP_CKSUM);
//ol_flags = (PKT_TX_IPV4 | PKT_TX_IP_CKSUM);
ol_flags = (PKT_TX_IPV4);
if (unlikely(n_fragments < 0)) {
RTE_LOG(ERR, POLLBODY, "Failed to fragment a packet\n");
break;
......
......@@ -11,6 +11,7 @@
#include "udpdk_api.h"
#include "udpdk_bind_table.h"
#include "udpdk_dump.h"
#define RTE_LOGTYPE_SYSCALL RTE_LOGTYPE_USER1
......@@ -363,7 +364,9 @@ ssize_t udpdk_sendto(int sockfd, const void *buf, size_t len, int flags,
rte_pktmbuf_free(pkt);
return -1;
}
#if 0
udpdk_dump_mbuf(pkt);
#endif
return len;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment