[dpdk-dev] [RFC 02/24] vhost: move AF_UNIX code from socket.c to trans_af_unix.c

Stefan Hajnoczi stefanha at redhat.com
Fri Jan 19 14:44:22 CET 2018


The socket.c file serves two purposes:
1. librte_vhost public API entry points, e.g. rte_vhost_driver_register().
2. AF_UNIX socket management.

Move AF_UNIX socket code into trans_af_unix.c so that socket.c only
handles the librte_vhost public API entry points.  This will make it
possible to support other transports besides AF_UNIX.

This patch is a preparatory step that simply moves code from socket.c to
trans_af_unix.c unmodified, besides dropping 'static' qualifiers where
necessary because socket.c now calls into trans_af_unix.c.

A lot of socket.c state is exposed in vhost.h but this is a temporary
measure and will be cleaned up in later patches.  By simply moving code
unmodified in this patch it will be easier to review the actual
refactoring that follows.

Signed-off-by: Stefan Hajnoczi <stefanha at redhat.com>
---
 lib/librte_vhost/vhost.h         |  65 +++++
 lib/librte_vhost/socket.c        | 501 +--------------------------------------
 lib/librte_vhost/trans_af_unix.c | 451 +++++++++++++++++++++++++++++++++++
 3 files changed, 517 insertions(+), 500 deletions(-)

diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h
index 53811a8b1..8c6d6e524 100644
--- a/lib/librte_vhost/vhost.h
+++ b/lib/librte_vhost/vhost.h
@@ -5,6 +5,7 @@
 #ifndef _VHOST_NET_CDEV_H_
 #define _VHOST_NET_CDEV_H_
 #include <stdint.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/queue.h>
@@ -12,12 +13,15 @@
 #include <linux/vhost.h>
 #include <linux/virtio_net.h>
 #include <sys/socket.h>
+#include <sys/un.h> /* TODO remove when trans_af_unix.c refactoring is done */
 #include <linux/if.h>
+#include <pthread.h>
 
 #include <rte_log.h>
 #include <rte_ether.h>
 #include <rte_rwlock.h>
 
+#include "fd_man.h"
 #include "rte_vhost.h"
 
 /* Used to indicate that the device is running on a data core */
@@ -259,6 +263,67 @@ struct virtio_net {
 	int			slave_req_fd;
 } __rte_cache_aligned;
 
+/* The vhost_user, vhost_user_socket, vhost_user_connection, and reconnect
+ * declarations are temporary measures for moving AF_UNIX code into
+ * trans_af_unix.c.  They will be cleaned up as socket.c is untangled from
+ * trans_af_unix.c.
+ */
+TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection);
+
+/*
+ * Every time rte_vhost_driver_register() is invoked, an associated
+ * vhost_user_socket struct will be created.
+ */
+struct vhost_user_socket {
+	struct vhost_user_connection_list conn_list;
+	pthread_mutex_t conn_mutex;
+	char *path;
+	int socket_fd;
+	struct sockaddr_un un;
+	bool is_server;
+	bool reconnect;
+	bool dequeue_zero_copy;
+	bool iommu_support;
+
+	/*
+	 * The "supported_features" indicates the feature bits the
+	 * vhost driver supports. The "features" indicates the feature
+	 * bits after the rte_vhost_driver_features_disable/enable().
+	 * It is also the final feature bits used for vhost-user
+	 * features negotiation.
+	 */
+	uint64_t supported_features;
+	uint64_t features;
+
+	struct vhost_device_ops const *notify_ops;
+};
+
+struct vhost_user_connection {
+	struct vhost_user_socket *vsocket;
+	int connfd;
+	int vid;
+
+	TAILQ_ENTRY(vhost_user_connection) next;
+};
+
+#define MAX_VHOST_SOCKET 1024
+struct vhost_user {
+	struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
+	struct fdset fdset;
+	int vsocket_cnt;
+	pthread_mutex_t mutex;
+};
+
+extern struct vhost_user vhost_user;
+
+int create_unix_socket(struct vhost_user_socket *vsocket);
+int vhost_user_start_server(struct vhost_user_socket *vsocket);
+int vhost_user_start_client(struct vhost_user_socket *vsocket);
+
+extern pthread_t reconn_tid;
+
+int vhost_user_reconnect_init(void);
+bool vhost_user_remove_reconnect(struct vhost_user_socket *vsocket);
 
 #define VHOST_LOG_PAGE	4096
 
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 6e3857e7a..d681f9cae 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -4,17 +4,14 @@
 
 #include <stdint.h>
 #include <stdio.h>
-#include <stdbool.h>
 #include <limits.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
-#include <sys/un.h>
 #include <sys/queue.h>
 #include <errno.h>
-#include <fcntl.h>
 #include <pthread.h>
 
 #include <rte_log.h>
@@ -23,61 +20,7 @@
 #include "vhost.h"
 #include "vhost_user.h"
 
-
-TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection);
-
-/*
- * Every time rte_vhost_driver_register() is invoked, an associated
- * vhost_user_socket struct will be created.
- */
-struct vhost_user_socket {
-	struct vhost_user_connection_list conn_list;
-	pthread_mutex_t conn_mutex;
-	char *path;
-	int socket_fd;
-	struct sockaddr_un un;
-	bool is_server;
-	bool reconnect;
-	bool dequeue_zero_copy;
-	bool iommu_support;
-
-	/*
-	 * The "supported_features" indicates the feature bits the
-	 * vhost driver supports. The "features" indicates the feature
-	 * bits after the rte_vhost_driver_features_disable/enable().
-	 * It is also the final feature bits used for vhost-user
-	 * features negotiation.
-	 */
-	uint64_t supported_features;
-	uint64_t features;
-
-	struct vhost_device_ops const *notify_ops;
-};
-
-struct vhost_user_connection {
-	struct vhost_user_socket *vsocket;
-	int connfd;
-	int vid;
-
-	TAILQ_ENTRY(vhost_user_connection) next;
-};
-
-#define MAX_VHOST_SOCKET 1024
-struct vhost_user {
-	struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
-	struct fdset fdset;
-	int vsocket_cnt;
-	pthread_mutex_t mutex;
-};
-
-#define MAX_VIRTIO_BACKLOG 128
-
-static void vhost_user_server_new_connection(int fd, void *data, int *remove);
-static void vhost_user_read_cb(int fd, void *dat, int *remove);
-static int create_unix_socket(struct vhost_user_socket *vsocket);
-static int vhost_user_start_client(struct vhost_user_socket *vsocket);
-
-static struct vhost_user vhost_user = {
+struct vhost_user vhost_user = {
 	.fdset = {
 		.fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} },
 		.fd_mutex = PTHREAD_MUTEX_INITIALIZER,
@@ -87,424 +30,6 @@ static struct vhost_user vhost_user = {
 	.mutex = PTHREAD_MUTEX_INITIALIZER,
 };
 
-/* return bytes# of read on success or negative val on failure. */
-int
-read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
-{
-	struct iovec iov;
-	struct msghdr msgh;
-	size_t fdsize = fd_num * sizeof(int);
-	char control[CMSG_SPACE(fdsize)];
-	struct cmsghdr *cmsg;
-	int ret;
-
-	memset(&msgh, 0, sizeof(msgh));
-	iov.iov_base = buf;
-	iov.iov_len  = buflen;
-
-	msgh.msg_iov = &iov;
-	msgh.msg_iovlen = 1;
-	msgh.msg_control = control;
-	msgh.msg_controllen = sizeof(control);
-
-	ret = recvmsg(sockfd, &msgh, 0);
-	if (ret <= 0) {
-		RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n");
-		return ret;
-	}
-
-	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
-		RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n");
-		return -1;
-	}
-
-	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
-		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
-		if ((cmsg->cmsg_level == SOL_SOCKET) &&
-			(cmsg->cmsg_type == SCM_RIGHTS)) {
-			memcpy(fds, CMSG_DATA(cmsg), fdsize);
-			break;
-		}
-	}
-
-	return ret;
-}
-
-int
-send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
-{
-
-	struct iovec iov;
-	struct msghdr msgh;
-	size_t fdsize = fd_num * sizeof(int);
-	char control[CMSG_SPACE(fdsize)];
-	struct cmsghdr *cmsg;
-	int ret;
-
-	memset(&msgh, 0, sizeof(msgh));
-	iov.iov_base = buf;
-	iov.iov_len = buflen;
-
-	msgh.msg_iov = &iov;
-	msgh.msg_iovlen = 1;
-
-	if (fds && fd_num > 0) {
-		msgh.msg_control = control;
-		msgh.msg_controllen = sizeof(control);
-		cmsg = CMSG_FIRSTHDR(&msgh);
-		cmsg->cmsg_len = CMSG_LEN(fdsize);
-		cmsg->cmsg_level = SOL_SOCKET;
-		cmsg->cmsg_type = SCM_RIGHTS;
-		memcpy(CMSG_DATA(cmsg), fds, fdsize);
-	} else {
-		msgh.msg_control = NULL;
-		msgh.msg_controllen = 0;
-	}
-
-	do {
-		ret = sendmsg(sockfd, &msgh, 0);
-	} while (ret < 0 && errno == EINTR);
-
-	if (ret < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG,  "sendmsg error\n");
-		return ret;
-	}
-
-	return ret;
-}
-
-static void
-vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
-{
-	int vid;
-	size_t size;
-	struct vhost_user_connection *conn;
-	int ret;
-
-	conn = malloc(sizeof(*conn));
-	if (conn == NULL) {
-		close(fd);
-		return;
-	}
-
-	vid = vhost_new_device();
-	if (vid == -1) {
-		goto err;
-	}
-
-	size = strnlen(vsocket->path, PATH_MAX);
-	vhost_set_ifname(vid, vsocket->path, size);
-
-	if (vsocket->dequeue_zero_copy)
-		vhost_enable_dequeue_zero_copy(vid);
-
-	RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
-
-	if (vsocket->notify_ops->new_connection) {
-		ret = vsocket->notify_ops->new_connection(vid);
-		if (ret < 0) {
-			RTE_LOG(ERR, VHOST_CONFIG,
-				"failed to add vhost user connection with fd %d\n",
-				fd);
-			goto err;
-		}
-	}
-
-	conn->connfd = fd;
-	conn->vsocket = vsocket;
-	conn->vid = vid;
-	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
-			NULL, conn);
-	if (ret < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"failed to add fd %d into vhost server fdset\n",
-			fd);
-
-		if (vsocket->notify_ops->destroy_connection)
-			vsocket->notify_ops->destroy_connection(conn->vid);
-
-		goto err;
-	}
-
-	pthread_mutex_lock(&vsocket->conn_mutex);
-	TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next);
-	pthread_mutex_unlock(&vsocket->conn_mutex);
-	return;
-
-err:
-	free(conn);
-	close(fd);
-}
-
-/* call back when there is new vhost-user connection from client  */
-static void
-vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
-{
-	struct vhost_user_socket *vsocket = dat;
-
-	fd = accept(fd, NULL, NULL);
-	if (fd < 0)
-		return;
-
-	RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd);
-	vhost_user_add_connection(fd, vsocket);
-}
-
-static void
-vhost_user_read_cb(int connfd, void *dat, int *remove)
-{
-	struct vhost_user_connection *conn = dat;
-	struct vhost_user_socket *vsocket = conn->vsocket;
-	int ret;
-
-	ret = vhost_user_msg_handler(conn->vid, connfd);
-	if (ret < 0) {
-		close(connfd);
-		*remove = 1;
-		vhost_destroy_device(conn->vid);
-
-		if (vsocket->notify_ops->destroy_connection)
-			vsocket->notify_ops->destroy_connection(conn->vid);
-
-		pthread_mutex_lock(&vsocket->conn_mutex);
-		TAILQ_REMOVE(&vsocket->conn_list, conn, next);
-		pthread_mutex_unlock(&vsocket->conn_mutex);
-
-		free(conn);
-
-		if (vsocket->reconnect) {
-			create_unix_socket(vsocket);
-			vhost_user_start_client(vsocket);
-		}
-	}
-}
-
-static int
-create_unix_socket(struct vhost_user_socket *vsocket)
-{
-	int fd;
-	struct sockaddr_un *un = &vsocket->un;
-
-	fd = socket(AF_UNIX, SOCK_STREAM, 0);
-	if (fd < 0)
-		return -1;
-	RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
-		vsocket->is_server ? "server" : "client", fd);
-
-	if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"vhost-user: can't set nonblocking mode for socket, fd: "
-			"%d (%s)\n", fd, strerror(errno));
-		close(fd);
-		return -1;
-	}
-
-	memset(un, 0, sizeof(*un));
-	un->sun_family = AF_UNIX;
-	strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
-	un->sun_path[sizeof(un->sun_path) - 1] = '\0';
-
-	vsocket->socket_fd = fd;
-	return 0;
-}
-
-static int
-vhost_user_start_server(struct vhost_user_socket *vsocket)
-{
-	int ret;
-	int fd = vsocket->socket_fd;
-	const char *path = vsocket->path;
-
-	ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));
-	if (ret < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"failed to bind to %s: %s; remove it and try again\n",
-			path, strerror(errno));
-		goto err;
-	}
-	RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);
-
-	ret = listen(fd, MAX_VIRTIO_BACKLOG);
-	if (ret < 0)
-		goto err;
-
-	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
-		  NULL, vsocket);
-	if (ret < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"failed to add listen fd %d to vhost server fdset\n",
-			fd);
-		goto err;
-	}
-
-	return 0;
-
-err:
-	close(fd);
-	return -1;
-}
-
-struct vhost_user_reconnect {
-	struct sockaddr_un un;
-	int fd;
-	struct vhost_user_socket *vsocket;
-
-	TAILQ_ENTRY(vhost_user_reconnect) next;
-};
-
-TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect);
-struct vhost_user_reconnect_list {
-	struct vhost_user_reconnect_tailq_list head;
-	pthread_mutex_t mutex;
-};
-
-static struct vhost_user_reconnect_list reconn_list;
-static pthread_t reconn_tid;
-
-static int
-vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
-{
-	int ret, flags;
-
-	ret = connect(fd, un, sz);
-	if (ret < 0 && errno != EISCONN)
-		return -1;
-
-	flags = fcntl(fd, F_GETFL, 0);
-	if (flags < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"can't get flags for connfd %d\n", fd);
-		return -2;
-	}
-	if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-				"can't disable nonblocking on fd %d\n", fd);
-		return -2;
-	}
-	return 0;
-}
-
-static void *
-vhost_user_client_reconnect(void *arg __rte_unused)
-{
-	int ret;
-	struct vhost_user_reconnect *reconn, *next;
-
-	while (1) {
-		pthread_mutex_lock(&reconn_list.mutex);
-
-		/*
-		 * An equal implementation of TAILQ_FOREACH_SAFE,
-		 * which does not exist on all platforms.
-		 */
-		for (reconn = TAILQ_FIRST(&reconn_list.head);
-		     reconn != NULL; reconn = next) {
-			next = TAILQ_NEXT(reconn, next);
-
-			ret = vhost_user_connect_nonblock(reconn->fd,
-						(struct sockaddr *)&reconn->un,
-						sizeof(reconn->un));
-			if (ret == -2) {
-				close(reconn->fd);
-				RTE_LOG(ERR, VHOST_CONFIG,
-					"reconnection for fd %d failed\n",
-					reconn->fd);
-				goto remove_fd;
-			}
-			if (ret == -1)
-				continue;
-
-			RTE_LOG(INFO, VHOST_CONFIG,
-				"%s: connected\n", reconn->vsocket->path);
-			vhost_user_add_connection(reconn->fd, reconn->vsocket);
-remove_fd:
-			TAILQ_REMOVE(&reconn_list.head, reconn, next);
-			free(reconn);
-		}
-
-		pthread_mutex_unlock(&reconn_list.mutex);
-		sleep(1);
-	}
-
-	return NULL;
-}
-
-static int
-vhost_user_reconnect_init(void)
-{
-	int ret;
-	char thread_name[RTE_MAX_THREAD_NAME_LEN];
-
-	ret = pthread_mutex_init(&reconn_list.mutex, NULL);
-	if (ret < 0) {
-		RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex");
-		return ret;
-	}
-	TAILQ_INIT(&reconn_list.head);
-
-	ret = pthread_create(&reconn_tid, NULL,
-			     vhost_user_client_reconnect, NULL);
-	if (ret != 0) {
-		RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread");
-		if (pthread_mutex_destroy(&reconn_list.mutex)) {
-			RTE_LOG(ERR, VHOST_CONFIG,
-				"failed to destroy reconnect mutex");
-		}
-	} else {
-		snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
-			 "vhost-reconn");
-
-		if (rte_thread_setname(reconn_tid, thread_name)) {
-			RTE_LOG(DEBUG, VHOST_CONFIG,
-				"failed to set reconnect thread name");
-		}
-	}
-
-	return ret;
-}
-
-static int
-vhost_user_start_client(struct vhost_user_socket *vsocket)
-{
-	int ret;
-	int fd = vsocket->socket_fd;
-	const char *path = vsocket->path;
-	struct vhost_user_reconnect *reconn;
-
-	ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
-					  sizeof(vsocket->un));
-	if (ret == 0) {
-		vhost_user_add_connection(fd, vsocket);
-		return 0;
-	}
-
-	RTE_LOG(WARNING, VHOST_CONFIG,
-		"failed to connect to %s: %s\n",
-		path, strerror(errno));
-
-	if (ret == -2 || !vsocket->reconnect) {
-		close(fd);
-		return -1;
-	}
-
-	RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
-	reconn = malloc(sizeof(*reconn));
-	if (reconn == NULL) {
-		RTE_LOG(ERR, VHOST_CONFIG,
-			"failed to allocate memory for reconnect\n");
-		close(fd);
-		return -1;
-	}
-	reconn->un = vsocket->un;
-	reconn->fd = fd;
-	reconn->vsocket = vsocket;
-	pthread_mutex_lock(&reconn_list.mutex);
-	TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
-	pthread_mutex_unlock(&reconn_list.mutex);
-
-	return 0;
-}
-
 static struct vhost_user_socket *
 find_vhost_user_socket(const char *path)
 {
@@ -688,30 +213,6 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
 	return ret;
 }
 
-static bool
-vhost_user_remove_reconnect(struct vhost_user_socket *vsocket)
-{
-	int found = false;
-	struct vhost_user_reconnect *reconn, *next;
-
-	pthread_mutex_lock(&reconn_list.mutex);
-
-	for (reconn = TAILQ_FIRST(&reconn_list.head);
-	     reconn != NULL; reconn = next) {
-		next = TAILQ_NEXT(reconn, next);
-
-		if (reconn->vsocket == vsocket) {
-			TAILQ_REMOVE(&reconn_list.head, reconn, next);
-			close(reconn->fd);
-			free(reconn);
-			found = true;
-			break;
-		}
-	}
-	pthread_mutex_unlock(&reconn_list.mutex);
-	return found;
-}
-
 /**
  * Unregister the specified vhost socket
  */
diff --git a/lib/librte_vhost/trans_af_unix.c b/lib/librte_vhost/trans_af_unix.c
index 9ed04b7eb..636f69916 100644
--- a/lib/librte_vhost/trans_af_unix.c
+++ b/lib/librte_vhost/trans_af_unix.c
@@ -33,7 +33,458 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <fcntl.h>
+
+#include <rte_log.h>
+
 #include "vhost.h"
+#include "vhost_user.h"
+
+#define MAX_VIRTIO_BACKLOG 128
+
+static void vhost_user_read_cb(int connfd, void *dat, int *remove);
+
+/* return bytes# of read on success or negative val on failure. */
+int
+read_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
+{
+	struct iovec iov;
+	struct msghdr msgh;
+	size_t fdsize = fd_num * sizeof(int);
+	char control[CMSG_SPACE(fdsize)];
+	struct cmsghdr *cmsg;
+	int ret;
+
+	memset(&msgh, 0, sizeof(msgh));
+	iov.iov_base = buf;
+	iov.iov_len  = buflen;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+	msgh.msg_control = control;
+	msgh.msg_controllen = sizeof(control);
+
+	ret = recvmsg(sockfd, &msgh, 0);
+	if (ret <= 0) {
+		RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n");
+		return ret;
+	}
+
+	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+		RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n");
+		return -1;
+	}
+
+	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+		if ((cmsg->cmsg_level == SOL_SOCKET) &&
+			(cmsg->cmsg_type == SCM_RIGHTS)) {
+			memcpy(fds, CMSG_DATA(cmsg), fdsize);
+			break;
+		}
+	}
+
+	return ret;
+}
+
+int
+send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num)
+{
+
+	struct iovec iov;
+	struct msghdr msgh;
+	size_t fdsize = fd_num * sizeof(int);
+	char control[CMSG_SPACE(fdsize)];
+	struct cmsghdr *cmsg;
+	int ret;
+
+	memset(&msgh, 0, sizeof(msgh));
+	iov.iov_base = buf;
+	iov.iov_len = buflen;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+
+	if (fds && fd_num > 0) {
+		msgh.msg_control = control;
+		msgh.msg_controllen = sizeof(control);
+		cmsg = CMSG_FIRSTHDR(&msgh);
+		cmsg->cmsg_len = CMSG_LEN(fdsize);
+		cmsg->cmsg_level = SOL_SOCKET;
+		cmsg->cmsg_type = SCM_RIGHTS;
+		memcpy(CMSG_DATA(cmsg), fds, fdsize);
+	} else {
+		msgh.msg_control = NULL;
+		msgh.msg_controllen = 0;
+	}
+
+	do {
+		ret = sendmsg(sockfd, &msgh, 0);
+	} while (ret < 0 && errno == EINTR);
+
+	if (ret < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG,  "sendmsg error\n");
+		return ret;
+	}
+
+	return ret;
+}
+
+static void
+vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
+{
+	int vid;
+	size_t size;
+	struct vhost_user_connection *conn;
+	int ret;
+
+	conn = malloc(sizeof(*conn));
+	if (conn == NULL) {
+		close(fd);
+		return;
+	}
+
+	vid = vhost_new_device();
+	if (vid == -1) {
+		goto err;
+	}
+
+	size = strnlen(vsocket->path, PATH_MAX);
+	vhost_set_ifname(vid, vsocket->path, size);
+
+	if (vsocket->dequeue_zero_copy)
+		vhost_enable_dequeue_zero_copy(vid);
+
+	RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid);
+
+	if (vsocket->notify_ops->new_connection) {
+		ret = vsocket->notify_ops->new_connection(vid);
+		if (ret < 0) {
+			RTE_LOG(ERR, VHOST_CONFIG,
+				"failed to add vhost user connection with fd %d\n",
+				fd);
+			goto err;
+		}
+	}
+
+	conn->connfd = fd;
+	conn->vsocket = vsocket;
+	conn->vid = vid;
+	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
+			NULL, conn);
+	if (ret < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to add fd %d into vhost server fdset\n",
+			fd);
+
+		if (vsocket->notify_ops->destroy_connection)
+			vsocket->notify_ops->destroy_connection(conn->vid);
+
+		goto err;
+	}
+
+	pthread_mutex_lock(&vsocket->conn_mutex);
+	TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next);
+	pthread_mutex_unlock(&vsocket->conn_mutex);
+	return;
+
+err:
+	free(conn);
+	close(fd);
+}
+
+/* call back when there is new vhost-user connection from client  */
+static void
+vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
+{
+	struct vhost_user_socket *vsocket = dat;
+
+	fd = accept(fd, NULL, NULL);
+	if (fd < 0)
+		return;
+
+	RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd);
+	vhost_user_add_connection(fd, vsocket);
+}
+
+static void
+vhost_user_read_cb(int connfd, void *dat, int *remove)
+{
+	struct vhost_user_connection *conn = dat;
+	struct vhost_user_socket *vsocket = conn->vsocket;
+	int ret;
+
+	ret = vhost_user_msg_handler(conn->vid, connfd);
+	if (ret < 0) {
+		close(connfd);
+		*remove = 1;
+		vhost_destroy_device(conn->vid);
+
+		if (vsocket->notify_ops->destroy_connection)
+			vsocket->notify_ops->destroy_connection(conn->vid);
+
+		pthread_mutex_lock(&vsocket->conn_mutex);
+		TAILQ_REMOVE(&vsocket->conn_list, conn, next);
+		pthread_mutex_unlock(&vsocket->conn_mutex);
+
+		free(conn);
+
+		if (vsocket->reconnect) {
+			create_unix_socket(vsocket);
+			vhost_user_start_client(vsocket);
+		}
+	}
+}
+
+int
+create_unix_socket(struct vhost_user_socket *vsocket)
+{
+	int fd;
+	struct sockaddr_un *un = &vsocket->un;
+
+	fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0)
+		return -1;
+	RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n",
+		vsocket->is_server ? "server" : "client", fd);
+
+	if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"vhost-user: can't set nonblocking mode for socket, fd: "
+			"%d (%s)\n", fd, strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	memset(un, 0, sizeof(*un));
+	un->sun_family = AF_UNIX;
+	strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
+	un->sun_path[sizeof(un->sun_path) - 1] = '\0';
+
+	vsocket->socket_fd = fd;
+	return 0;
+}
+
+int
+vhost_user_start_server(struct vhost_user_socket *vsocket)
+{
+	int ret;
+	int fd = vsocket->socket_fd;
+	const char *path = vsocket->path;
+
+	ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un));
+	if (ret < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to bind to %s: %s; remove it and try again\n",
+			path, strerror(errno));
+		goto err;
+	}
+	RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path);
+
+	ret = listen(fd, MAX_VIRTIO_BACKLOG);
+	if (ret < 0)
+		goto err;
+
+	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
+		  NULL, vsocket);
+	if (ret < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to add listen fd %d to vhost server fdset\n",
+			fd);
+		goto err;
+	}
+
+	return 0;
+
+err:
+	close(fd);
+	return -1;
+}
+
+struct vhost_user_reconnect {
+	struct sockaddr_un un;
+	int fd;
+	struct vhost_user_socket *vsocket;
+
+	TAILQ_ENTRY(vhost_user_reconnect) next;
+};
+
+TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect);
+struct vhost_user_reconnect_list {
+	struct vhost_user_reconnect_tailq_list head;
+	pthread_mutex_t mutex;
+};
+
+static struct vhost_user_reconnect_list reconn_list;
+pthread_t reconn_tid;
+
+static int
+vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
+{
+	int ret, flags;
+
+	ret = connect(fd, un, sz);
+	if (ret < 0 && errno != EISCONN)
+		return -1;
+
+	flags = fcntl(fd, F_GETFL, 0);
+	if (flags < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"can't get flags for connfd %d\n", fd);
+		return -2;
+	}
+	if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+				"can't disable nonblocking on fd %d\n", fd);
+		return -2;
+	}
+	return 0;
+}
+
+static void *
+vhost_user_client_reconnect(void *arg __rte_unused)
+{
+	int ret;
+	struct vhost_user_reconnect *reconn, *next;
+
+	while (1) {
+		pthread_mutex_lock(&reconn_list.mutex);
+
+		/*
+		 * An equal implementation of TAILQ_FOREACH_SAFE,
+		 * which does not exist on all platforms.
+		 */
+		for (reconn = TAILQ_FIRST(&reconn_list.head);
+		     reconn != NULL; reconn = next) {
+			next = TAILQ_NEXT(reconn, next);
+
+			ret = vhost_user_connect_nonblock(reconn->fd,
+						(struct sockaddr *)&reconn->un,
+						sizeof(reconn->un));
+			if (ret == -2) {
+				close(reconn->fd);
+				RTE_LOG(ERR, VHOST_CONFIG,
+					"reconnection for fd %d failed\n",
+					reconn->fd);
+				goto remove_fd;
+			}
+			if (ret == -1)
+				continue;
+
+			RTE_LOG(INFO, VHOST_CONFIG,
+				"%s: connected\n", reconn->vsocket->path);
+			vhost_user_add_connection(reconn->fd, reconn->vsocket);
+remove_fd:
+			TAILQ_REMOVE(&reconn_list.head, reconn, next);
+			free(reconn);
+		}
+
+		pthread_mutex_unlock(&reconn_list.mutex);
+		sleep(1);
+	}
+
+	return NULL;
+}
+
+int
+vhost_user_reconnect_init(void)
+{
+	int ret;
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	ret = pthread_mutex_init(&reconn_list.mutex, NULL);
+	if (ret < 0) {
+		RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex");
+		return ret;
+	}
+	TAILQ_INIT(&reconn_list.head);
+
+	ret = pthread_create(&reconn_tid, NULL,
+			     vhost_user_client_reconnect, NULL);
+	if (ret != 0) {
+		RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread");
+		if (pthread_mutex_destroy(&reconn_list.mutex)) {
+			RTE_LOG(ERR, VHOST_CONFIG,
+				"failed to destroy reconnect mutex");
+		}
+	} else {
+		snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+			 "vhost-reconn");
+
+		if (rte_thread_setname(reconn_tid, thread_name)) {
+			RTE_LOG(DEBUG, VHOST_CONFIG,
+				"failed to set reconnect thread name");
+		}
+	}
+
+	return ret;
+}
+
+int
+vhost_user_start_client(struct vhost_user_socket *vsocket)
+{
+	int ret;
+	int fd = vsocket->socket_fd;
+	const char *path = vsocket->path;
+	struct vhost_user_reconnect *reconn;
+
+	ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
+					  sizeof(vsocket->un));
+	if (ret == 0) {
+		vhost_user_add_connection(fd, vsocket);
+		return 0;
+	}
+
+	RTE_LOG(WARNING, VHOST_CONFIG,
+		"failed to connect to %s: %s\n",
+		path, strerror(errno));
+
+	if (ret == -2 || !vsocket->reconnect) {
+		close(fd);
+		return -1;
+	}
+
+	RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path);
+	reconn = malloc(sizeof(*reconn));
+	if (reconn == NULL) {
+		RTE_LOG(ERR, VHOST_CONFIG,
+			"failed to allocate memory for reconnect\n");
+		close(fd);
+		return -1;
+	}
+	reconn->un = vsocket->un;
+	reconn->fd = fd;
+	reconn->vsocket = vsocket;
+	pthread_mutex_lock(&reconn_list.mutex);
+	TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next);
+	pthread_mutex_unlock(&reconn_list.mutex);
+
+	return 0;
+}
+
+bool
+vhost_user_remove_reconnect(struct vhost_user_socket *vsocket)
+{
+	int found = false;
+	struct vhost_user_reconnect *reconn, *next;
+
+	pthread_mutex_lock(&reconn_list.mutex);
+
+	for (reconn = TAILQ_FIRST(&reconn_list.head);
+	     reconn != NULL; reconn = next) {
+		next = TAILQ_NEXT(reconn, next);
+
+		if (reconn->vsocket == vsocket) {
+			TAILQ_REMOVE(&reconn_list.head, reconn, next);
+			close(reconn->fd);
+			free(reconn);
+			found = true;
+			break;
+		}
+	}
+	pthread_mutex_unlock(&reconn_list.mutex);
+	return found;
+}
 
 static int
 af_unix_vring_call(struct virtio_net *dev __rte_unused,
-- 
2.14.3



More information about the dev mailing list