[dpdk-dev] [RFC 2/4] vhost: add support for SocketPair Broker
Ilya Maximets
i.maximets at ovn.org
Wed Mar 17 21:25:28 CET 2021
New flag RTE_VHOST_USER_SOCKETPAIR_BROKER to say that provided socket
is a path to SocketPair Broker socket in a following format:
'/path/to/socketpair/broker.sock,broker-key=<key>'
This format is chosen to avoid lots of code changes and refactoring
inside the vhost library, mainly because vhost library treats
socket path as a unique device identifier.
'<key>' is a broker key that will be used by a broker to identify
two clients that needs to be paired together, i.e. vhost device
will be connected with a client that provided the same key.
libspbroker needed for a build.
Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
---
doc/guides/prog_guide/vhost_lib.rst | 10 ++
lib/librte_vhost/meson.build | 7 +
lib/librte_vhost/rte_vhost.h | 1 +
lib/librte_vhost/socket.c | 245 +++++++++++++++++++++++++---
4 files changed, 237 insertions(+), 26 deletions(-)
diff --git a/doc/guides/prog_guide/vhost_lib.rst b/doc/guides/prog_guide/vhost_lib.rst
index dc29229167..f0f0d3fde7 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -118,6 +118,16 @@ The following is an overview of some key Vhost API functions:
It is disabled by default.
+ - ``RTE_VHOST_USER_SOCKETPAIR_BROKER``
+
+ Enabling of this flag makes vhost library to treat socket ``path`` as a
+ path to SocketPair Broker. In this case ``path`` should include
+ ``,broker-key=<key>`` after the actual broker's socket path. ``<key>``
+ will be used as a broker key, so it will be able to connect 2 processes
+ that provided the same key.
+
+ Incompatible with ``RTE_VHOST_USER_NO_RECONNECT``.
+
* ``rte_vhost_driver_set_features(path, features)``
This function sets the feature bits the vhost-user driver supports. The
diff --git a/lib/librte_vhost/meson.build b/lib/librte_vhost/meson.build
index 6185deab33..3292edcb52 100644
--- a/lib/librte_vhost/meson.build
+++ b/lib/librte_vhost/meson.build
@@ -15,6 +15,13 @@ elif (toolchain == 'clang' and cc.version().version_compare('>=3.7.0'))
elif (toolchain == 'icc' and cc.version().version_compare('>=16.0.0'))
cflags += '-DVHOST_ICC_UNROLL_PRAGMA'
endif
+
+spbroker_dep = dependency('spbroker', required: false)
+if spbroker_dep.found()
+ dpdk_conf.set('RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER', 1)
+ ext_deps += spbroker_dep
+endif
+
dpdk_conf.set('RTE_LIBRTE_VHOST_POSTCOPY',
cc.has_header('linux/userfaultfd.h'))
cflags += '-fno-strict-aliasing'
diff --git a/lib/librte_vhost/rte_vhost.h b/lib/librte_vhost/rte_vhost.h
index 010f160869..87662c9f7f 100644
--- a/lib/librte_vhost/rte_vhost.h
+++ b/lib/librte_vhost/rte_vhost.h
@@ -36,6 +36,7 @@ extern "C" {
/* support only linear buffers (no chained mbufs) */
#define RTE_VHOST_USER_LINEARBUF_SUPPORT (1ULL << 6)
#define RTE_VHOST_USER_ASYNC_COPY (1ULL << 7)
+#define RTE_VHOST_USER_SOCKETPAIR_BROKER (1ULL << 8)
/* Features. */
#ifndef VIRTIO_NET_F_GUEST_ANNOUNCE
diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c
index 0169d36481..f0a1c9044c 100644
--- a/lib/librte_vhost/socket.c
+++ b/lib/librte_vhost/socket.c
@@ -16,6 +16,10 @@
#include <fcntl.h>
#include <pthread.h>
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+#include <socketpair-broker/helper.h>
+#endif
+
#include <rte_log.h>
#include "fd_man.h"
@@ -33,9 +37,11 @@ struct vhost_user_socket {
struct vhost_user_connection_list conn_list;
pthread_mutex_t conn_mutex;
char *path;
+ char *broker_key;
int socket_fd;
struct sockaddr_un un;
bool is_server;
+ bool is_broker;
bool reconnect;
bool iommu_support;
bool use_builtin_virtio_net;
@@ -81,7 +87,8 @@ struct vhost_user {
static void vhost_user_server_new_connection(int fd, void *data, int *remove);
static void vhost_user_read_cb(int fd, void *dat, int *remove);
static int create_unix_socket(struct vhost_user_socket *vsocket);
-static int vhost_user_start_client(struct vhost_user_socket *vsocket);
+static int recreate_unix_socket(struct vhost_user_socket *vsocket);
+static int vhost_user_start(struct vhost_user_socket *vsocket);
static struct vhost_user vhost_user = {
.fdset = {
@@ -283,6 +290,81 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
close(fd);
}
+#ifdef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+static int
+vhost_user_broker_msg_handler(int fd, struct vhost_user_socket *vsocket)
+{
+ int peer_fd;
+ char *err;
+
+ peer_fd = sp_broker_receive_set_pair(fd, &err);
+ if (peer_fd < 0) {
+ VHOST_LOG_CONFIG(ERR,
+ "failed to receive SP_BROKER_SET_PAIR on fd %d: %s\n",
+ fd, err);
+ free(err);
+ return -1;
+ }
+
+ VHOST_LOG_CONFIG(INFO, "new vhost user connection is %d\n", peer_fd);
+ vhost_user_add_connection(peer_fd, vsocket);
+ return 0;
+}
+
+static void
+vhost_user_broker_msg_cb(int connfd, void *dat, int *remove)
+{
+ struct vhost_user_socket *vsocket = dat;
+ int ret;
+
+ ret = vhost_user_broker_msg_handler(connfd, vsocket);
+
+ /* Don't need a broker connection anymore. */
+ *remove = 1;
+
+ if (ret < 0) {
+ recreate_unix_socket(vsocket);
+ vhost_user_start(vsocket);
+ }
+}
+
+static int
+vhost_user_start_broker_connection(
+ int fd __rte_unused,
+ struct vhost_user_socket *vsocket __rte_unused)
+{
+ char *err;
+ int ret;
+
+ ret = sp_broker_send_get_pair(fd, vsocket->broker_key,
+ vsocket->is_server, &err);
+ if (ret) {
+ VHOST_LOG_CONFIG(ERR,
+ "failed to send SP_BROKER_GET_PAIR request on fd %d: %s\n",
+ fd, err);
+ free(err);
+ return -1;
+ }
+
+ ret = fdset_add(&vhost_user.fdset, fd, vhost_user_broker_msg_cb,
+ NULL, vsocket);
+ if (ret < 0) {
+ VHOST_LOG_CONFIG(ERR,
+ "failed to add broker fd %d to vhost fdset\n", fd);
+ return -1;
+ }
+ return 0;
+}
+#else
+static int
+vhost_user_start_broker_connection(
+ int fd __rte_unused,
+ struct vhost_user_socket *vsocket __rte_unused)
+{
+ return -1;
+}
+#endif
+
/* call back when there is new vhost-user connection from client */
static void
vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused)
@@ -321,7 +403,7 @@ vhost_user_read_cb(int connfd, void *dat, int *remove)
if (vsocket->reconnect) {
create_unix_socket(vsocket);
- vhost_user_start_client(vsocket);
+ vhost_user_start(vsocket);
}
pthread_mutex_lock(&vsocket->conn_mutex);
@@ -337,14 +419,17 @@ create_unix_socket(struct vhost_user_socket *vsocket)
{
int fd;
struct sockaddr_un *un = &vsocket->un;
+ char *broker_key = strstr(vsocket->path, ",broker-key=");
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
return -1;
- VHOST_LOG_CONFIG(INFO, "vhost-user %s: socket created, fd: %d\n",
- vsocket->is_server ? "server" : "client", fd);
+ VHOST_LOG_CONFIG(INFO, "vhost-user %s: %ssocket created, fd: %d\n",
+ vsocket->is_server ? "server" : "client",
+ vsocket->is_broker ? "broker " : "", fd);
- if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) {
+ if ((!vsocket->is_server || vsocket->is_broker)
+ && fcntl(fd, F_SETFL, O_NONBLOCK)) {
VHOST_LOG_CONFIG(ERR,
"vhost-user: can't set nonblocking mode for socket, fd: "
"%d (%s)\n", fd, strerror(errno));
@@ -352,12 +437,21 @@ create_unix_socket(struct vhost_user_socket *vsocket)
return -1;
}
+ /* Temporarily limiting the path by the actual path. */
+ if (vsocket->is_broker && broker_key)
+ broker_key[0] = '\0';
+
memset(un, 0, sizeof(*un));
un->sun_family = AF_UNIX;
strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path));
un->sun_path[sizeof(un->sun_path) - 1] = '\0';
vsocket->socket_fd = fd;
+
+ /* Restoring original path. */
+ if (vsocket->is_broker && broker_key)
+ broker_key[0] = ',';
+
return 0;
}
@@ -425,7 +519,8 @@ static struct vhost_user_reconnect_list reconn_list;
static pthread_t reconn_tid;
static int
-vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
+vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz,
+ bool disable_nonblock)
{
int ret, flags;
@@ -433,6 +528,9 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
if (ret < 0 && errno != EISCONN)
return -1;
+ if (!disable_nonblock)
+ return 0;
+
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
VHOST_LOG_CONFIG(ERR,
@@ -447,8 +545,22 @@ vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz)
return 0;
}
+static int
+recreate_unix_socket(struct vhost_user_socket *vsocket)
+{
+ close(vsocket->socket_fd);
+ if (create_unix_socket(vsocket) < 0) {
+ VHOST_LOG_CONFIG(ERR,
+ "Failed to re-create socket for %s\n",
+ vsocket->un.sun_path);
+ vsocket->socket_fd = -1;
+ return -2;
+ }
+ return 0;
+}
+
static void *
-vhost_user_client_reconnect(void *arg __rte_unused)
+vhost_user_reconnect(void *arg __rte_unused)
{
int ret;
struct vhost_user_reconnect *reconn, *next;
@@ -466,7 +578,8 @@ vhost_user_client_reconnect(void *arg __rte_unused)
ret = vhost_user_connect_nonblock(reconn->fd,
(struct sockaddr *)&reconn->un,
- sizeof(reconn->un));
+ sizeof(reconn->un),
+ !reconn->vsocket->is_broker);
if (ret == -2) {
close(reconn->fd);
VHOST_LOG_CONFIG(ERR,
@@ -478,8 +591,26 @@ vhost_user_client_reconnect(void *arg __rte_unused)
continue;
VHOST_LOG_CONFIG(INFO,
- "%s: connected\n", reconn->vsocket->path);
- vhost_user_add_connection(reconn->fd, reconn->vsocket);
+ "%s: connected\n",
+ reconn->vsocket->un.sun_path);
+
+ if (reconn->vsocket->is_broker) {
+ struct vhost_user_socket *vsocket;
+
+ vsocket = reconn->vsocket;
+ if (vhost_user_start_broker_connection(
+ reconn->fd, vsocket)) {
+ if (recreate_unix_socket(vsocket)) {
+ goto remove_fd;
+ } else {
+ reconn->fd = vsocket->socket_fd;
+ continue;
+ }
+ }
+ } else {
+ vhost_user_add_connection(reconn->fd,
+ reconn->vsocket);
+ }
remove_fd:
TAILQ_REMOVE(&reconn_list.head, reconn, next);
free(reconn);
@@ -505,7 +636,7 @@ vhost_user_reconnect_init(void)
TAILQ_INIT(&reconn_list.head);
ret = rte_ctrl_thread_create(&reconn_tid, "vhost_reconn", NULL,
- vhost_user_client_reconnect, NULL);
+ vhost_user_reconnect, NULL);
if (ret != 0) {
VHOST_LOG_CONFIG(ERR, "failed to create reconnect thread");
if (pthread_mutex_destroy(&reconn_list.mutex)) {
@@ -518,18 +649,25 @@ vhost_user_reconnect_init(void)
}
static int
-vhost_user_start_client(struct vhost_user_socket *vsocket)
+vhost_user_start(struct vhost_user_socket *vsocket)
{
int ret;
int fd = vsocket->socket_fd;
- const char *path = vsocket->path;
+ const char *path = vsocket->un.sun_path;
struct vhost_user_reconnect *reconn;
ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un,
- sizeof(vsocket->un));
+ sizeof(vsocket->un),
+ !vsocket->is_broker);
if (ret == 0) {
- vhost_user_add_connection(fd, vsocket);
- return 0;
+ if (!vsocket->is_broker) {
+ vhost_user_add_connection(fd, vsocket);
+ return 0;
+ } else if (vhost_user_start_broker_connection(fd, vsocket)) {
+ ret = recreate_unix_socket(vsocket);
+ } else {
+ return 0;
+ }
}
VHOST_LOG_CONFIG(WARNING,
@@ -822,6 +960,11 @@ rte_vhost_driver_get_queue_num(const char *path, uint32_t *queue_num)
static void
vhost_user_socket_mem_free(struct vhost_user_socket *vsocket)
{
+ if (vsocket && vsocket->broker_key) {
+ free(vsocket->broker_key);
+ vsocket->broker_key = NULL;
+ }
+
if (vsocket && vsocket->path) {
free(vsocket->path);
vsocket->path = NULL;
@@ -946,15 +1089,50 @@ rte_vhost_driver_register(const char *path, uint64_t flags)
#endif
}
- if ((flags & RTE_VHOST_USER_CLIENT) != 0) {
+ if ((flags & RTE_VHOST_USER_SOCKETPAIR_BROKER) != 0) {
+#ifndef RTE_LIBRTE_VHOST_SOCKETPAIR_BROKER
+ VHOST_LOG_CONFIG(ERR,
+ "SocketPair Broker requested but not compiled\n");
+ ret = -1;
+ goto out_mutex;
+#endif
+ char *broker_key = strstr(vsocket->path, ",broker-key=");
+
+ if (!broker_key || !broker_key[12]) {
+ VHOST_LOG_CONFIG(ERR,
+ "Connection to SocketPair Broker requested but"
+ " key is not provided: %s\n", vsocket->path);
+ ret = -1;
+ goto out_mutex;
+ }
+ vsocket->is_broker = true;
+ vsocket->broker_key = strdup(broker_key + 12);
+ if (vsocket->broker_key == NULL) {
+ VHOST_LOG_CONFIG(ERR,
+ "error: failed to copy broker key\n");
+ ret = -1;
+ goto out_mutex;
+ }
+ }
+
+ if ((flags & RTE_VHOST_USER_CLIENT) == 0)
+ vsocket->is_server = true;
+
+ if (!vsocket->is_server || vsocket->is_broker) {
vsocket->reconnect = !(flags & RTE_VHOST_USER_NO_RECONNECT);
+ if (vsocket->is_broker && !vsocket->reconnect) {
+ VHOST_LOG_CONFIG(ERR,
+ "SocketPair Broker with NO_RECONNECT "
+ "is not supported\n");
+ ret = -1;
+ goto out_mutex;
+ }
if (vsocket->reconnect && reconn_tid == 0) {
if (vhost_user_reconnect_init() != 0)
goto out_mutex;
}
- } else {
- vsocket->is_server = true;
}
+
ret = create_unix_socket(vsocket);
if (ret < 0) {
goto out_mutex;
@@ -1052,7 +1230,23 @@ rte_vhost_driver_unregister(const char *path)
}
pthread_mutex_unlock(&vsocket->conn_mutex);
- if (vsocket->is_server) {
+ if (vsocket->reconnect) {
+ if (vhost_user_remove_reconnect(vsocket)) {
+ /*
+ * reconn->fd is a socket_fd for
+ * client and broker connections and
+ * it's closed now.
+ */
+ vsocket->socket_fd = -1;
+ }
+ }
+
+ /*
+ * socket_fd is still valid for server connection
+ * or broker connection that is currently connected
+ * to the broker.
+ */
+ if (vsocket->socket_fd != -1) {
/*
* If r/wcb is executing, release vhost_user's
* mutex lock, and try again since the r/wcb
@@ -1063,13 +1257,12 @@ rte_vhost_driver_unregister(const char *path)
pthread_mutex_unlock(&vhost_user.mutex);
goto again;
}
-
close(vsocket->socket_fd);
- unlink(path);
- } else if (vsocket->reconnect) {
- vhost_user_remove_reconnect(vsocket);
}
+ if (vsocket->is_server && !vsocket->is_broker)
+ unlink(path);
+
pthread_mutex_destroy(&vsocket->conn_mutex);
vhost_user_socket_mem_free(vsocket);
@@ -1152,8 +1345,8 @@ rte_vhost_driver_start(const char *path)
}
}
- if (vsocket->is_server)
+ if (vsocket->is_server && !vsocket->is_broker)
return vhost_user_start_server(vsocket);
else
- return vhost_user_start_client(vsocket);
+ return vhost_user_start(vsocket);
}
--
2.26.2
More information about the dev
mailing list