[dpdk-dev] [RFC 2/4] vhost: add support for SocketPair Broker

Ilya Maximets i.maximets at ovn.org
Wed Mar 17 21:25:28 CET 2021


New flag RTE_VHOST_USER_SOCKETPAIR_BROKER to say that provided socket
is a path to SocketPair Broker socket in a following format:

  '/path/to/socketpair/broker.sock,broker-key=<key>'

This format is chosen to avoid lots of code changes and refactoring
inside the vhost library, mainly because vhost library treats
socket path as a unique device identifier.

'<key>' is a broker key that will be used by a broker to identify
two clients that needs to be paired together, i.e. vhost device
will be connected with a client that provided the same key.

libspbroker needed for a build.

Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
---
 doc/guides/prog_guide/vhost_lib.rst |  10 ++
 lib/librte_vhost/meson.build        |   7 +
 lib/librte_vhost/rte_vhost.h        |   1 +
 lib/librte_vhost/socket.c           | 245 +++++++++++++++++++++++++---
 4 files changed, 237 insertions(+), 26 deletions(-)

diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index dc29229167..f0f0d3fde7 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -118,6 +118,16 @@ The following is an overview of some key Vhost API functions:
 
     It is disabled by default.
 
+  - ``RTE_VHOST_USER_SOCKETPAIR_BROKER``
+
+    Enabling of this flag makes vhost library to treat socket ``path`` as a
+    path to SocketPair Broker.  In this case ``path`` should include
+    ``,broker-key=<key>`` after the actual broker's socket path.  ``<key>``
+    will be used as a broker key, so it will be able to connect 2 processes
+    that provided the same key.
+
+    Incompatible with ``RTE_VHOST_USER_NO_RECONNECT``.
+
 * ``rte_vhost_driver_set_features(path, features)``
 
   This function sets the feature bits the vhost-user driver supports. The
diff --git a/lib/librte_vhost/meson.build b/lib/librte_vhost/meson.build
index 6185deab33..3292edcb52 100644
--- a/lib/librte_vhost/meson.build
+++ b/lib/librte_vhost/meson.build
@@ -15,6 +15,13 @@ elif (toolchain == 'clang' and cc.version().version_compare('>=3.7.0'))
 elif (toolchain == 'icc' and cc.version().version_compare('>=16.0.0'))
 	cflags += '-DVHOST_ICC_UNROLL_PRAGMA'
 endif
+
+spbroker_dep = dependency('spbroker', required: false)
+if spbroker_dep.found()
+	dpdk_conf.set('RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER', 1)
+	ext_deps += spbroker_dep
+endif
+
 dpdk_conf.set('RTE_LIBRTE_VHOST_POSTCOPY',
 	      cc.has_header('linux/userfaultfd.h'))
 cflags += '-fno-strict-aliasing'
diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 010f160869..87662c9f7f 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -36,6 +36,7 @@ extern "C" {
 /* support only linear buffers (no chained mbufs) */
 #define RTE_VHOST_USER_LINEARBUF_SUPPORT	(1ULL << 6)
 #define RTE_VHOST_USER_ASYNC_COPY	(1ULL << 7)
+#define RTE_VHOST_USER_SOCKETPAIR_BROKER	(1ULL << 8)
 
 /* Features. */
 #ifndef VIRTIO_NET_F_GUEST_ANNOUNCE
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 0169d36481..f0a1c9044c 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -16,6 +16,10 @@
 #include <fcntl.h>
 #include <pthread.h>
 
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+#include <socketpair-broker/helper.h>
+#endif
+
 #include <rte_log.h>
 
 #include "fd_man.h"
@@ -33,9 +37,11 @@ struct vhost_user_socket {
 	struct vhost_user_connection_list conn_list;
 	pthread_mutex_t conn_mutex;
 	char *path;
+	char *broker_key;
 	int socket_fd;
 	struct sockaddr_un un;
 	bool is_server;
+	bool is_broker;
 	bool reconnect;
 	bool iommu_support;
 	bool use_builtin_virtio_net;
@@ -81,7 +87,8 @@ struct vhost_user {
 static void vhost_user_server_new_connection(int fd, void *data, int *remove);
 static void vhost_user_read_cb(int fd, void *dat, int *remove);
 static int create_unix_socket(struct vhost_user_socket *vsocket);
-static int vhost_user_start_client(struct vhost_user_socket *vsocket);
+static int recreate_unix_socket(struct vhost_user_socket *vsocket);
+static int vhost_user_start(struct vhost_user_socket *vsocket);
 
 static struct vhost_user vhost_user = {
 	.fdset = {
@@ -283,6 +290,81 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
 	close(fd);
 }
 
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+static int
+vhost_user_broker_msg_handler(int fd, struct vhost_user_socket *vsocket)
+{
+	int peer_fd;
+	char *err;
+
+	peer_fd = sp_broker_receive_set_pair(fd, &err);
+	if (peer_fd < 0) {
+		VHOST_LOG_CONFIG(ERR,
+			"failed to receive SP_BROKER_SET_PAIR on fd %d: %s\n",
+			fd, err);
+		free(err);
+		return -1;
+	}
+
+	VHOST_LOG_CONFIG(INFO, "new vhost user connection is %d\n", peer_fd);
+	vhost_user_add_connection(peer_fd, vsocket);
+	return 0;
+}
+
+static void
+vhost_user_broker_msg_cb(int connfd, void *dat, int *remove)
+{
+	struct vhost_user_socket *vsocket = dat;
+	int ret;
+
+	ret = vhost_user_broker_msg_handler(connfd, vsocket);
+
+	/* Don't need a broker connection anymore. */
+	*remove = 1;
+
+	if (ret < 0) {
+		recreate_unix_socket(vsocket);
+		vhost_user_start(vsocket);
+	}
+}
+
+static int
+vhost_user_start_broker_connection(
+	int fd __rte_unused,
+	struct vhost_user_socket *vsocket __rte_unused)
+{
+	char *err;
+	int ret;
+
+	ret = sp_broker_send_get_pair(fd, vsocket->broker_key,
+				      vsocket->is_server, &err);
+	if (ret) {
+		VHOST_LOG_CONFIG(ERR,
+			"failed to send SP_BROKER_GET_PAIR request on fd %d: %s\n",
+			fd, err);
+		free(err);
+		return -1;
+	}
+
+	ret = fdset_add(&vhost_user.fdset, fd, vhost_user_broker_msg_cb,
+			NULL, vsocket);
+	if (ret < 0) {
+		VHOST_LOG_CONFIG(ERR,
+			"failed to add broker fd %d to vhost fdset\n", fd);
+		return -1;
+	}
+	return 0;
+}
+#else
+static int
+vhost_user_start_broker_connection(
+	int fd __rte_unused,
+	struct vhost_user_socket *vsocket __rte_unused)
+{
+	return -1;
+}
+#endif
+
 /* call back when there is new vhost-user connection from client  */
 static void
 vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
@@ -321,7 +403,7 @@ vhost_user_read_cb(int connfd, void *dat, int *remove)
 
 		if (vsocket->reconnect) {
 			create_unix_socket(vsocket);
-			vhost_user_start_client(vsocket);
+			vhost_user_start(vsocket);
 		}
 
 		pthread_mutex_lock(&vsocket->conn_mutex);
@@ -337,14 +419,17 @@ create_unix_socket(struct vhost_user_socket *vsocket)
 {
 	int fd;
 	struct sockaddr_un *un = &vsocket->un;
+	char *broker_key = strstr(vsocket->path, ",broker-key=");
 
 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
 	if (fd < 0)
 		return -1;
-	VHOST_LOG_CONFIG(INFO, "vhost-user %s: socket created, fd: %d\n",
-		vsocket->is_server ? "server" : "client", fd);
+	VHOST_LOG_CONFIG(INFO, "vhost-user %s: %ssocket created, fd: %d\n",
+		vsocket->is_server ? "server" : "client",
+		vsocket->is_broker ? "broker " : "", fd);
 
-	if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
+	if ((!vsocket->is_server || vsocket->is_broker)
+	    && fcntl(fd, F_SETFL, O_NONBLOCK)) {
 		VHOST_LOG_CONFIG(ERR,
 			"vhost-user: can't set nonblocking mode for socket, fd: "
 			"%d (%s)\n", fd, strerror(errno));
@@ -352,12 +437,21 @@ create_unix_socket(struct vhost_user_socket *vsocket)
 		return -1;
 	}
 
+	/* Temporarily limiting the path by the actual path. */
+	if (vsocket->is_broker && broker_key)
+		broker_key[0] = '\0';
+
 	memset(un, 0, sizeof(*un));
 	un->sun_family = AF_UNIX;
 	strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
 	un->sun_path[sizeof(un->sun_path) - 1] = '\0';
 
 	vsocket->socket_fd = fd;
+
+	/* Restoring original path. */
+	if (vsocket->is_broker && broker_key)
+		broker_key[0] = ',';
+
 	return 0;
 }
 
@@ -425,7 +519,8 @@ static struct vhost_user_reconnect_list reconn_list;
 static pthread_t reconn_tid;
 
 static int
-vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
+vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz,
+			    bool disable_nonblock)
 {
 	int ret, flags;
 
@@ -433,6 +528,9 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
 	if (ret < 0 && errno != EISCONN)
 		return -1;
 
+	if (!disable_nonblock)
+		return 0;
+
 	flags = fcntl(fd, F_GETFL, 0);
 	if (flags < 0) {
 		VHOST_LOG_CONFIG(ERR,
@@ -447,8 +545,22 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
 	return 0;
 }
 
+static int
+recreate_unix_socket(struct vhost_user_socket *vsocket)
+{
+	close(vsocket->socket_fd);
+	if (create_unix_socket(vsocket) < 0) {
+		VHOST_LOG_CONFIG(ERR,
+			"Failed to re-create socket for %s\n",
+			vsocket->un.sun_path);
+		vsocket->socket_fd = -1;
+		return -2;
+	}
+	return 0;
+}
+
 static void *
-vhost_user_client_reconnect(void *arg __rte_unused)
+vhost_user_reconnect(void *arg __rte_unused)
 {
 	int ret;
 	struct vhost_user_reconnect *reconn, *next;
@@ -466,7 +578,8 @@ vhost_user_client_reconnect(void *arg __rte_unused)
 
 			ret = vhost_user_connect_nonblock(reconn->fd,
 						(struct sockaddr *)&reconn->un,
-						sizeof(reconn->un));
+						sizeof(reconn->un),
+						!reconn->vsocket->is_broker);
 			if (ret == -2) {
 				close(reconn->fd);
 				VHOST_LOG_CONFIG(ERR,
@@ -478,8 +591,26 @@ vhost_user_client_reconnect(void *arg __rte_unused)
 				continue;
 
 			VHOST_LOG_CONFIG(INFO,
-				"%s: connected\n", reconn->vsocket->path);
-			vhost_user_add_connection(reconn->fd, reconn->vsocket);
+				"%s: connected\n",
+				reconn->vsocket->un.sun_path);
+
+			if (reconn->vsocket->is_broker) {
+				struct vhost_user_socket *vsocket;
+
+				vsocket = reconn->vsocket;
+				if (vhost_user_start_broker_connection(
+					reconn->fd, vsocket)) {
+					if (recreate_unix_socket(vsocket)) {
+						goto remove_fd;
+					} else {
+						reconn->fd = vsocket->socket_fd;
+						continue;
+					}
+				}
+			} else {
+				vhost_user_add_connection(reconn->fd,
+							  reconn->vsocket);
+			}
 remove_fd:
 			TAILQ_REMOVE(&reconn_list.head, reconn, next);
 			free(reconn);
@@ -505,7 +636,7 @@ vhost_user_reconnect_init(void)
 	TAILQ_INIT(&reconn_list.head);
 
 	ret = rte_ctrl_thread_create(&reconn_tid, "vhost_reconn", NULL,
-			     vhost_user_client_reconnect, NULL);
+			     vhost_user_reconnect, NULL);
 	if (ret != 0) {
 		VHOST_LOG_CONFIG(ERR, "failed to create reconnect thread");
 		if (pthread_mutex_destroy(&reconn_list.mutex)) {
@@ -518,18 +649,25 @@ vhost_user_reconnect_init(void)
 }
 
 static int
-vhost_user_start_client(struct vhost_user_socket *vsocket)
+vhost_user_start(struct vhost_user_socket *vsocket)
 {
 	int ret;
 	int fd = vsocket->socket_fd;
-	const char *path = vsocket->path;
+	const char *path = vsocket->un.sun_path;
 	struct vhost_user_reconnect *reconn;
 
 	ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
-					  sizeof(vsocket->un));
+					  sizeof(vsocket->un),
+					  !vsocket->is_broker);
 	if (ret == 0) {
-		vhost_user_add_connection(fd, vsocket);
-		return 0;
+		if (!vsocket->is_broker) {
+			vhost_user_add_connection(fd, vsocket);
+			return 0;
+		} else if (vhost_user_start_broker_connection(fd, vsocket)) {
+			ret = recreate_unix_socket(vsocket);
+		} else {
+			return 0;
+		}
 	}
 
 	VHOST_LOG_CONFIG(WARNING,
@@ -822,6 +960,11 @@ rte_vhost_driver_get_queue_num(const char *path, uint32_t *queue_num)
 static void
 vhost_user_socket_mem_free(struct vhost_user_socket *vsocket)
 {
+	if (vsocket && vsocket->broker_key) {
+		free(vsocket->broker_key);
+		vsocket->broker_key = NULL;
+	}
+
 	if (vsocket && vsocket->path) {
 		free(vsocket->path);
 		vsocket->path = NULL;
@@ -946,15 +1089,50 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
 #endif
 	}
 
-	if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
+	if ((flags & RTE_VHOST_USER_SOCKETPAIR_BROKER) != 0) {
+#ifndef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+		VHOST_LOG_CONFIG(ERR,
+			"SocketPair Broker requested but not compiled\n");
+		ret = -1;
+		goto out_mutex;
+#endif
+		char *broker_key = strstr(vsocket->path, ",broker-key=");
+
+		if (!broker_key || !broker_key[12]) {
+			VHOST_LOG_CONFIG(ERR,
+				"Connection to SocketPair Broker requested but"
+				" key is not provided: %s\n", vsocket->path);
+			ret = -1;
+			goto out_mutex;
+		}
+		vsocket->is_broker = true;
+		vsocket->broker_key = strdup(broker_key + 12);
+		if (vsocket->broker_key == NULL) {
+			VHOST_LOG_CONFIG(ERR,
+				"error: failed to copy broker key\n");
+			ret = -1;
+			goto out_mutex;
+		}
+	}
+
+	if ((flags & RTE_VHOST_USER_CLIENT) == 0)
+		vsocket->is_server = true;
+
+	if (!vsocket->is_server || vsocket->is_broker) {
 		vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
+		if (vsocket->is_broker && !vsocket->reconnect) {
+			VHOST_LOG_CONFIG(ERR,
+				"SocketPair Broker with NO_RECONNECT "
+				"is not supported\n");
+			ret = -1;
+			goto out_mutex;
+		}
 		if (vsocket->reconnect && reconn_tid == 0) {
 			if (vhost_user_reconnect_init() != 0)
 				goto out_mutex;
 		}
-	} else {
-		vsocket->is_server = true;
 	}
+
 	ret = create_unix_socket(vsocket);
 	if (ret < 0) {
 		goto out_mutex;
@@ -1052,7 +1230,23 @@ rte_vhost_driver_unregister(const char *path)
 			}
 			pthread_mutex_unlock(&vsocket->conn_mutex);
 
-			if (vsocket->is_server) {
+			if (vsocket->reconnect) {
+				if (vhost_user_remove_reconnect(vsocket)) {
+					/*
+					 * reconn->fd is a socket_fd for
+					 * client and broker connections and
+					 * it's closed now.
+					 */
+					vsocket->socket_fd = -1;
+				}
+			}
+
+			/*
+			 * socket_fd is still valid for server connection
+			 * or broker connection that is currently connected
+			 * to the broker.
+			 */
+			if (vsocket->socket_fd != -1) {
 				/*
 				 * If r/wcb is executing, release vhost_user's
 				 * mutex lock, and try again since the r/wcb
@@ -1063,13 +1257,12 @@ rte_vhost_driver_unregister(const char *path)
 					pthread_mutex_unlock(&vhost_user.mutex);
 					goto again;
 				}
-
 				close(vsocket->socket_fd);
-				unlink(path);
-			} else if (vsocket->reconnect) {
-				vhost_user_remove_reconnect(vsocket);
 			}
 
+			if (vsocket->is_server && !vsocket->is_broker)
+				unlink(path);
+
 			pthread_mutex_destroy(&vsocket->conn_mutex);
 			vhost_user_socket_mem_free(vsocket);
 
@@ -1152,8 +1345,8 @@ rte_vhost_driver_start(const char *path)
 		}
 	}
 
-	if (vsocket->is_server)
+	if (vsocket->is_server && !vsocket->is_broker)
 		return vhost_user_start_server(vsocket);
 	else
-		return vhost_user_start_client(vsocket);
+		return vhost_user_start(vsocket);
 }
-- 
2.26.2



More information about the dev mailing list