[dpdk-dev] [PATCH 1/3] eal: add channel for multi-process communication

Burakov, Anatoly anatoly.burakov at intel.com
Mon Dec 11 12:04:33 CET 2017


On 30-Nov-17 6:44 PM, Jianfeng Tan wrote:
> Previouly, there are three channels for multi-process
> (i.e., primary/secondary) communication.
>    1. Config-file based channel, in which, the primary process writes
>       info into a pre-defined config file, and the secondary process
>       reads info out.
>    2. vfio submodule has its own channel based on unix socket for the
>       secondary process to get container fd and group fd from the
>       primary process.
>    3. pdump submodule also has its own channel based on unix socket for
>       packet dump.
> 
> It'll be good to have a generic communication channel for multi-process
> communication to accomodate the requirements including:
>    a. Secondary wants to send info to primary, for example, secondary
>       would like to send request (about some specific vdev to primary).
>    b. Sending info at any time, instead of just initialization time.
>    c. Share FDs with the other side, for vdev like vhost, related FDs
>       (memory region, kick) should be shared.
>    d. A send message request needs the other side to response immediately.
> 
> This patch proposes to create a communication channel, as an unix
> socket connection, for above requirements. Primary will listen on
> the unix socket; secondary will connect this socket to talk.
> 
> Three new APIs are added:
> 
>    1. rte_eal_mp_action_register is used to register an action,
>       indexed by a string; if the calling component wants to
>       response the messages from the corresponding component in
>       its primary process or secondary processes.
>    2. rte_eal_mp_action_unregister is used to unregister the action
>       if the calling component does not want to response the messages.
>    3. rte_eal_mp_sendmsg is used to send a message.
> 
> Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
> ---

<...snip...>

> +
> +int
> +rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action)
> +{
> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +	if (entry == NULL)
> +		return -ENOMEM;
> +
> +	if (find_action_entry_by_name(action_name) != NULL)
> +		return -EEXIST;

This should probably do a free(entry).

> +
> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +	entry->action = action;
> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> +	return 0;
> +}
> +

<...snip...>

> +
> +static int
> +add_secondary(void)
> +{
> +	int fd;
> +	struct epoll_event ev;
> +
> +	while (1) {
> +		fd = accept(mp_fds.listen, NULL, NULL);
> +		if (fd < 0 && errno == EAGAIN)
> +			break;
> +		else if (fd < 0) {
> +			RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> +				strerror(errno));
> +			return -1;
> +		}
> +
> +		ev.events = EPOLLIN | EPOLLRDHUP;
> +		ev.data.fd = fd;
> +		if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, fd, &ev) < 0) {
> +			RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
> +				strerror(errno));
> +			break;
> +		}
> +		if (add_sec_proc(fd) < 0) {
> +			RTE_LOG(ERR, EAL, "too many secondary processes\n");
> +			close(fd);
> +			break;
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +static void *
> +mp_handler(void *arg __rte_unused)
> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event ev;
> +	struct epoll_event *events;
> +	int is_primary = rte_eal_process_type() == RTE_PROC_PRIMARY;
> +
> +	ev.events = EPOLLIN | EPOLLRDHUP;
> +	ev.data.fd = (is_primary) ? mp_fds.listen : mp_fds.primary;
> +	if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);

rte_exit?

> +	}
> +
> +	events = calloc(20, sizeof ev);
> +
> +	while (1) {
> +		n = epoll_wait(mp_fds.efd, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +			if (is_primary && events[i].data.fd == mp_fds.listen) {
> +				if (events[i].events != EPOLLIN) {
> +					RTE_LOG(ERR, EAL, "what happens?\n");

More descriptive error message would be nice :)

> +					exit(EXIT_FAILURE);

rte_exit?

> +				}
> +
> +				if (add_secondary() < 0)
> +					break;

Doing epoll_ctl in multiple different places hurts readability IMO. 
Might be a good idea to refactor add_secondary and mp_handler in a way 
that keeps all epoll handling in one place.

> +
> +				continue;
> +			}
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				if (process_msg(fd) < 0) {
> +					RTE_LOG(ERR, EAL,
> +						"failed to process msg\n");
> +					if (!is_primary)
> +						exit(EXIT_FAILURE);

rte_exit()?

> +				}
> +				continue;
> +			}
> +
> +			/* EPOLLERR, EPOLLHUP, etc */
> +			if (is_primary) {
> +				RTE_LOG(ERR, EAL, "secondary exit: %d\n", fd);
> +				epoll_ctl(mp_fds.efd, EPOLL_CTL_DEL, fd, NULL);
> +				del_sec_proc(fd);
> +				close(fd);
> +			} else {
> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> +				/* Exit secondary when primary exits? */
> +				exit(EXIT_FAILURE);

This is changing previous behavior. I don't think exiting secondary when 
primary exits is something we want to do, so i would just print an 
error, but not exit the process.

> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +int
> +rte_eal_mp_channel_init(void)
> +{
> +	int i, fd, ret;
> +	const char *path;
> +	struct sockaddr_un un;
> +	pthread_t tid;
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	mp_fds.efd = epoll_create1(0);
> +	if (mp_fds.efd < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> +		return -1;
> +	}
> +
> +	fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Failed to create unix socket\n");
> +		return -1;
> +	}
> +
> +	memset(&un, 0, sizeof(un));
> +	un.sun_family = AF_UNIX;
> +	path = eal_mp_unix_path();
> +	strncpy(un.sun_path, path, sizeof(un.sun_path));
> +	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			mp_fds.secondaries[i] = -1;
> +
> +		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> +			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> +			close(fd);
> +			return -1;
> +		}
> +
> +		/* The file still exists since last run */
> +		unlink(path);
> +
> +		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> +				path, strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> +		ret = listen(fd, 1024);
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to listen: %s\n",
> +				strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		mp_fds.listen = fd;
> +	} else {
> +		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to connect primary\n");
> +			return -1;

Do we want to prevent secondary from launching if it can't connect to 
primary? Some use cases might rely on previous behavior. Maybe instead 
add some checks in handling functions to ensure that we have a valid 
connection to the primary before doing anything?

> +		}
> +		mp_fds.primary = fd;
> +	}
> +
> +	ret = pthread_create(&tid, NULL, mp_handler, NULL);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> +			strerror(errno));
> +		close(fd);
> +		close(mp_fds.efd);
> +		return -1;
> +	}

<...snip...>

> +	if (fds_num > SCM_MAX_FD) {
> +		RTE_LOG(ERR, EAL,
> +			"Cannot send more than %d FDs\n", SCM_MAX_FD);
> +		return -E2BIG;
> +	}
> +
> +	len_msg = sizeof(struct msg_hdr) + len_params;
> +	if (len_msg > MAX_MESSAGE_LENGTH) {
> +		RTE_LOG(ERR, EAL, "Message is too long\n");
> +		return -ENOMEM;

Nitpicking, but is this really -ENOMEM? Shouldn't this be -EINVAL or 
-E2BIG? Also, this is external API - maybe return -1 and set rte_errno?

> +	}
> +
> +	RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg);

Do we want this as INFO, not DEBUG?

> +
> +	msg = malloc(len_msg);
> +	if (!msg) {
> +		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
> +		return -ENOMEM;
> +	}

<...snip...>

>   
>   /**
> + * Action function typedef used by other components.
> + *
> + * As we create unix socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming messages.
> + */
> +typedef int (*rte_eal_mp_t)(const void *params, int len,
> +			    int fds[], int fds_num);

Nitpicking, but probably needs newlines before comments, here and after 
next function definition.

> +/**
> + * Register an action function for primary/secondary communication.
> + *
> + * Call this function to register an action, if the calling component wants
> + * to response the messages from the corresponding component in its primary
> + * process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the action.
> + *
> + * @param action
> + *   The action argument is the function pointer to the action function.
> + *
> + * @return
> + *  - 0 on success.
> + *  - (<0) on failure.
> + */

<...snip...>

> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 229eec9..a84eab4 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -896,6 +896,15 @@ rte_eal_init(int argc, char **argv)
>   
>   	eal_check_mem_on_local_socket();
>   
> +	if (rte_eal_mp_channel_init() < 0) {
> +		rte_eal_init_alert("failed to init mp channel\n");
> +		rte_errno = EFAULT;
> +		return -1;
> +	}

As noted above, maybe only fail if it's primary process?

> +
> +	if (eal_plugins_init() < 0)
> +		rte_eal_init_alert("Cannot init plugins\n");

This is probably a leftover of some other patch?

> +
>   	eal_thread_init_master(rte_config.master_lcore);
>   
>   	ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
> diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
> index f4f46c1..6762397 100644
> --- a/lib/librte_eal/rte_eal_version.map
> +++ b/lib/librte_eal/rte_eal_version.map
> @@ -235,4 +235,26 @@ EXPERIMENTAL {
>   	rte_service_set_stats_enable;
>   	rte_service_start_with_defaults;
>   
> +} DPDK_17.08;
> +
> +DPDK_17.11 {
> +	global:
> +
> +	rte_bus_get_iommu_class;
> +	rte_eal_iova_mode;
> +	rte_eal_mbuf_default_mempool_ops;
> +	rte_lcore_has_role;
> +	rte_memcpy_ptr;
> +	rte_pci_get_iommu_class;
> +	rte_pci_match;
> +
> +} DPDK_17.08;
> +

Same here, this looks like leftovers of rebase.

> +DPDK_18.02 {
> +	global:
> +
> +	rte_eal_mp_action_register;
> +	rte_eal_mp_action_unregister;
> +	rte_eal_mp_sendmsg;
> +
>   } DPDK_17.11;
> 


-- 
Thanks,
Anatoly


More information about the dev mailing list