[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication

Yuanhan Liu yliu at fridaylinux.org
Wed Sep 27 14:19:47 CEST 2017


On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
> Previouly, there is only one way for primary/secondary to exchange
> messages, that is, primary process writes info into some predefind
> file, and secondary process reads info out. That cannot address
> the requirements:
>   a. Secondary wants to send info to primary.
>   b. Sending info at any time, instead of just initialization time.
>   c. Share FD with the other side.
> 
> This patch proposes to create a communication channel (as an unix
> socket connection) for above requirements.

Firstly, I think it's a good idea to have such generic interfaces for
multiple process.

> Three new APIs are added:
> 
>   1. rte_eal_primary_secondary_add_action is used to register an action,

As you have said, it's for registration, why use "add" verb here?
Normally, "register" implies one time action, while "add" means
it could be a repeat action.

Also, the function name is a bit long. Maybe something like
"rte_eal_mp_xxx" is shorter and better.

[...]
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> +	int len = strlen(name);
> +	struct action_entry *entry;
> +
> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
> +		if (strncmp(entry->action_name, name, len) == 0)
> +				break;

Broken indentation here.

> +	}
> +
> +	return entry;
> +}
> +
> +int
> +rte_eal_primary_secondary_add_action(const char *action_name,
> +				     rte_eal_primary_secondary_t action)
> +{
> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +	if (entry == NULL)
> +		return -ENOMEM;
> +
> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +	entry->action = action;
> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);

Since you intended to support "one primary process and multiple secondary
process", here we need a lock to protect the list.

Another wonder is do we really need that, I mean 1:N model?

> +	return 0;
> +}
> +
> +void
> +rte_eal_primary_secondary_del_action(const char *name)
> +{
> +	struct action_entry *entry = find_action_entry_by_name(name);
> +
> +	TAILQ_REMOVE(&action_entry_list, entry, next);
> +	free(entry);
> +}
> +
> +#define MAX_SECONDARY_PROCS	8
> +
> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */

I think it's not a good idea to use "pri". For me, "private" comes to
my mind firstly but not "primary".

> +static int fd_listen;   /* unix listen socket by primary */
> +static int fd_to_pri;   /* only used by secondary process */
> +static int fds_to_sec[MAX_SECONDARY_PROCS];

Too many vars. I'd suggest to use a struct here, which could also make
the naming a bit simpler. For instance,

struct mp_fds {
        int efd;

        union {
                /* fds for primary process */
                struct {
                        int listen;
			/* fds used to send msg to secondary process */
                        int secondaries[...];
                };

                /* fds for secondary process */
                struct {
			/* fds used to send msg to the primary process */
                        int primary;
                };
        };
};

It also separates the scope well. Note that above field naming does
not like perfect though. Feel free to come up with some better names.

[...]
> +static int
> +process_msg(int fd)
> +{
> +	int len;
> +	int params_len;
> +	char buf[1024];
> +	int fds[8]; /* accept at most 8 FDs per message */

Why it's 8? I think you are adding a vhost-user specific limitation to a
generic interface, which isn't quite right.

> +	struct msg_hdr *hdr;
> +	struct action_entry *entry;
> +
> +	len = read_msg(fd, buf, 1024, fds, 8);
> +	if (len < 0) {
> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
> +			strerror(errno));
> +		return -1;
> +	}
> +
> +	hdr = (struct msg_hdr *) buf;
                                ^
An extra whitespace.

> +
> +	entry = find_action_entry_by_name(hdr->action_name);
> +	if (entry == NULL) {
> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> +			hdr->action_name);
> +		return -1;
> +	}
> +
> +	params_len = len - sizeof(struct msg_hdr);
> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +}
> +
> +static void *
> +thread_primary(__attribute__((unused)) void *arg)

Use __rte_unused instead, and put it after the var name.

> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_listen;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +			if (events[i].data.fd == fd_listen) {
> +				if (events[i].events != EPOLLIN) {
> +					RTE_LOG(ERR, EAL, "what happens?\n");
> +					exit(EXIT_FAILURE);
> +				}
> +
> +				fd = accept(fd_listen, NULL, NULL);
> +				if (fd < 0) {
> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +
> +				event.data.fd = fd;
> +				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
> +					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +				if (add_sec_proc(fd) < 0)
> +					RTE_LOG(ERR, EAL, "too many secondary processes\n");
> +
> +				continue;
> +			}
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL,
> +					"secondary process exit: %d\n", fd);
> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
> +				del_sec_proc(fd);
> +				continue;
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from secondary process\n");
> +
> +				process_msg(fd);
> +			}
> +		}
> +	}

Too much redundant code. You are doing check twice and it could be
simplified.

> +
> +	return NULL;
> +}
> +
> +static void *
> +thread_secondary(__attribute__((unused)) void *arg)

I'm also wondering this one can be removed. I think we just need one
thread handling.

> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_to_pri;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> +				/* Do we need exit secondary when primary exits? */
> +				exit(EXIT_FAILURE);
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from primary process\n");
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +int
> +rte_eal_primary_secondary_channel_init(void)
> +{
> +	int i, fd, ret;
> +	const char *path;
> +	struct sockaddr_un un;
> +	pthread_t tid;
> +	void*(*fn)(void *);
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	efd_pri_sec = epoll_create1(0);
> +	if (efd_pri_sec < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> +		return -1;
> +	}
> +
> +	fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Failed to create unix socket");
> +		return -1;
> +	}
> +
> +	memset(&un, 0, sizeof(un));
> +	un.sun_family = AF_UNIX;
> +	path = eal_primary_secondary_unix_path();
> +	strncpy(un.sun_path, path, sizeof(un.sun_path));
> +	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +

Do not leave an extra whitespace line here.
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			fds_to_sec[i] = -1;
> +
> +		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> +			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> +			close(fd);
> +			return -1;
> +		}
> +
> +		/* The file still exists since last run */
> +		unlink(path);
> +
> +		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> +				path, strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> +		ret = listen(fd, 1024);
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +
> +		fn = thread_primary;
> +		fd_listen = fd;
> +	} else {
> +		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to connect primary\n");
> +			return -1;
> +		}
> +		fn = thread_secondary;
> +		fd_to_pri = fd;
> +	}
> +
> +	ret = pthread_create(&tid, NULL, fn, NULL);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> +			strerror(errno));
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> +		 "ps_channel");

It may be not that easy to know what "ps" stands for. Maybe "rte_mp_xx"
is better. Naming it start with "rte_" reminds people easily that it's
a thread from DPDK.

[...]
> diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
> index 8acbd99..78bb4fb 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -67,6 +67,24 @@ eal_runtime_config_path(void)
>  	return buffer;
>  }
>  
> +/** Path of primary/secondary communication unix socket file. */
> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_primary_secondary_unix_path(void)
> +{
> +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> +	const char *directory = default_config_dir;
> +	const char *home_dir = getenv("HOME");

It's not a good practice to generate such file at HOME dir. User would
be surprised to find it at HOME dir. In the worst case, user might delete
it.

The more common way is to put it to tmp dir, like "/tmp".

> +
> +	if (getuid() != 0 && home_dir != NULL)
> +		directory = home_dir;
> +	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
> +		 directory, internal_config.hugefile_prefix);
> +
> +	return buffer;
> +
> +}
> +
[...]
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 48f12f4..237c0b1 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
>  
>  	eal_check_mem_on_local_socket();
>  
> +	if (rte_eal_primary_secondary_channel_init() < 0) {
> +		rte_eal_init_alert("Cannot create unix channel.");

The alert message doesn't quite match the function name. Actually, you
have already print the specific error inside that init function when it
fails. Thus, you could just say "failed to init mp channel" or something
like this.

	--yliu


More information about the dev mailing list