[spp] [PATCH 2/4] spp: add classes for thread management

ogawa.yasufumi at lab.ntt.co.jp ogawa.yasufumi at lab.ntt.co.jp
Fri Dec 8 09:31:32 CET 2017


From: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>

In spp.py, threads are created with global methods, primarythread,
connectionthread and acceptthread. It makes the code hard to understand.

This update defines classes for these threads as sub-classes of
'threading.Thread' and global methods are moved into run() of each of
the classes. This update also adds stop() in the classes to terminate
them clearly and fix bug for 'Socket is not connected' error occured
when called 'sock.shutdown(socket.SHUT_RDWR)'.

Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
---
 src/spp.py | 325 ++++++++++++++++++++++++++++++++++++-------------------------
 1 file changed, 190 insertions(+), 135 deletions(-)

diff --git a/src/spp.py b/src/spp.py
index 1ec2983..212d7ea 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -14,12 +14,12 @@ import socket
 import SocketServer
 import sys
 import threading
-from threading import Thread
+import traceback
 
 # Turn true if activate logger to debug remote command.
-logger = None
+logger = True
 
-if logger is not None:
+if logger is True:
     from logging import DEBUG
     from logging import Formatter
     from logging import getLogger
@@ -28,7 +28,7 @@ if logger is not None:
     handler = StreamHandler()
     handler.setLevel(DEBUG)
     formatter = Formatter(
-        '%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
+        '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s')
     handler.setFormatter(formatter)
     logger.setLevel(DEBUG)
     logger.addHandler(handler)
@@ -98,43 +98,61 @@ MAIN2SEC = GrowingList()
 SEC2MAIN = GrowingList()
 
 
-def connectionthread(name, client_id, conn, m2s, s2m):
-    """Manage secondary process connections"""
+class ConnectionThread(threading.Thread):
 
-    cmd_str = 'hello'
+    def __init__(self, client_id, conn, m2s, s2m):
+        super(ConnectionThread, self).__init__()
+        self.daemon = True
 
-    # infinite loop so that function do not terminate and thread do not end.
-    while True:
-        try:
-            _, _, _ = select.select([conn, ], [conn, ], [], 5)
-        except select.error:
-            break
+        self.client_id = client_id
+        self.conn = conn
+        self.m2s = m2s
+        self.s2m = s2m
+        self.stop_event = threading.Event()
+        self.conn_opened = False
 
-        # Sending message to connected secondary
-        try:
-            cmd_str = m2s.get(True)
-            conn.send(cmd_str)  # send only takes string
-        except KeyError:
-            break
-        except Exception as excep:
-            print(excep, ",Error while sending msg in connectionthread()!")
-            break
+    def stop(self):
+        self.stop_event.set()
 
-        # Receiving from secondary
-        try:
-            # 1024 stands for bytes of data to be received
-            data = conn.recv(1024)
-            if data:
-                s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
-            else:
-                s2m.put("closing:" + str(conn))
+    def run(self):
+        cmd_str = 'hello'
+
+        # infinite loop so that function do not terminate and thread do not
+        # end.
+        while True:
+            try:
+                _, _, _ = select.select(
+                    [self.conn, ], [self.conn, ], [], 5)
+            except select.error:
                 break
-        except Exception as excep:
-            print(excep, ",Error while receiving msg in connectionthread()!")
-            break
 
-    SECONDARY_LIST.remove(client_id)
-    conn.close()
+            # Sending message to connected secondary
+            try:
+                cmd_str = self.m2s.get(True)
+                self.conn.send(cmd_str)  # send only takes string
+            except KeyError:
+                break
+            except Exception as excep:
+                print(excep, ",Error while sending msg in connectionthread()!")
+                break
+
+            # Receiving from secondary
+            try:
+                # 1024 stands for bytes of data to be received
+                data = self.conn.recv(1024)
+                if data:
+                    self.s2m.put(
+                        "recv:%s:{%s}" % (str(self.conn.fileno()), data))
+                else:
+                    self.s2m.put("closing:" + str(self.conn))
+                    break
+            except Exception as excep:
+                print(
+                    excep, ",Error while receiving msg in connectionthread()!")
+                break
+
+        SECONDARY_LIST.remove(self.client_id)
+        self.conn.close()
 
 
 def getclientid(conn):
@@ -149,9 +167,12 @@ def getclientid(conn):
     if data is None:
         return -1
 
+    if logger is not None:
+        logger.debug("data: %s" % data)
     client_id = int(data.strip('\0'))
 
     if client_id < 0 or client_id > MAX_SECONDARY:
+        logger.debug("Failed to get client_id: %d" % client_id)
         return -1
 
     found = 0
@@ -175,6 +196,9 @@ def getclientid(conn):
             free_client_id = i
             break
 
+    if logger is not None:
+        logger.debug("Found free_client_id: %d" % free_client_id)
+
     if free_client_id < 0:
         return -1
 
@@ -184,37 +208,69 @@ def getclientid(conn):
     return free_client_id
 
 
-def acceptthread(sock, main2sec, sec2main):
-    """Listen for secondary processes"""
+class AcceptThread(threading.Thread):
 
-    global SECONDARY_COUNT
+    def __init__(self, host, port, main2sec, sec2main):
+        super(AcceptThread, self).__init__()
+        self.daemon = True
 
-    try:
-        while True:
-            # Accepting incoming connections
-            conn, _ = sock.accept()
+        # Creating secondary socket object
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-            client_id = getclientid(conn)
-            if client_id < 0:
-                break
+        # Binding secondary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
+
+        # Listening secondary at the address
+        self.sock.listen(MAX_SECONDARY)
+
+        self.main2sec = main2sec
+        self.sec2main = sec2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
+
+    def stop(self):
+        if self.sock_opened is True:
+            try:
+                self.sock.shutdown(socket.SHUT_RDWR)
+            except socket.error as excep:
+                print(excep, ", Error while closing sock in AcceptThread!")
+                traceback.print_exc()
+        self.sock.close()
+        self.stop_event.set()
 
-            # Creating new thread.
-            # Calling secondarythread function for this function and passing
-            # conn as argument.
-            SECONDARY_LIST.append(client_id)
-            main2sec[client_id] = Queue()
-            sec2main[client_id] = Queue()
-            connection_thread = Thread(target=connectionthread,
-                                       args=('secondary', client_id, conn,
-                                             main2sec[client_id],
-                                             sec2main[client_id]))
-            connection_thread.daemon = True
-            connection_thread.start()
-
-            SECONDARY_COUNT += 1
-    except Exception as excep:
-        print(excep, ", Error in acceptthread()!")
-        sock.close()
+    def run(self):
+        global SECONDARY_COUNT
+
+        try:
+            while True:
+                # Accepting incoming connections
+                conn, _ = self.sock.accept()
+
+                client_id = getclientid(conn)
+                if client_id < 0:
+                    break
+
+                # Creating new thread.
+                # Calling secondarythread function for this function and
+                # passing conn as argument.
+                SECONDARY_LIST.append(client_id)
+                self.main2sec[client_id] = Queue()
+                self.sec2main[client_id] = Queue()
+                connection_thread = ConnectionThread(
+                    client_id, conn,
+                    self.main2sec[client_id],
+                    self.sec2main[client_id])
+                connection_thread.daemon = True
+                connection_thread.start()
+
+                SECONDARY_COUNT += 1
+        except Exception as excep:
+            print(excep, ", Error in AcceptThread!")
+            traceback.print_exc()
+            self.sock_opened = False
+            self.sock.close()
 
 
 def command_primary(command):
@@ -266,49 +322,78 @@ def print_status():
         print ("Connected secondary id: %d" % i)
 
 
-def primarythread(sock, main2primary, primary2main):
-    """Manage primary process connection"""
+class PrimaryThread(threading.Thread):
 
-    global PRIMARY
-    cmd_str = ''
+    def __init__(self, host, port, main2primary, primary2main):
+        super(PrimaryThread, self).__init__()
+        self.daemon = True
 
-    while True:
-        # waiting for connection
-        PRIMARY = False
-        conn, addr = sock.accept()
-        PRIMARY = True
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        # Binding primary socket to a address. bind() takes tuple of host
+        # and port.
+        self.sock.bind((host, port))
 
-        while conn:
-            try:
-                _, _, _ = select.select([conn, ], [conn, ], [], 5)
-            except select.error:
-                break
+        # Listening primary at the address
+        self.sock.listen(1)  # 5 denotes the number of clients can queue
 
-            # Sending message to connected primary
-            try:
-                cmd_str = main2primary.get(True)
-                conn.send(cmd_str)  # send only takes string
-            except KeyError:
-                break
-            except Exception as excep:
-                print(excep, ", Error while sending msg in primarythread()!")
-                break
+        self.main2primary = main2primary
+        self.primary2main = primary2main
+        self.stop_event = threading.Event()
+        self.sock_opened = False
 
-            # Receiving from primary
-            try:
-                # 1024 stands for bytes of data to be received
-                data = conn.recv(1024)
-                if data:
-                    primary2main.put("recv:%s:{%s}" % (str(addr), data))
-                else:
-                    primary2main.put("closing:" + str(addr))
-                    conn.close()
+    def stop(self):
+        if self.sock_opened is True:
+            self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+        self.stop_event.set()
+
+    def run(self):
+        global PRIMARY
+        cmd_str = ''
+
+        while True:
+            # waiting for connection
+            PRIMARY = False
+            conn, addr = self.sock.accept()
+            PRIMARY = True
+
+            while conn:
+                try:
+                    _, _, _ = select.select([conn, ], [conn, ], [], 5)
+                except select.error:
                     break
-            except Exception as excep:
-                print(excep, ", Error while receiving msg in primarythread()!")
-                break
 
-    print ("primary communication thread end")
+                self.sock_opened = True
+                # Sending message to connected primary
+                try:
+                    cmd_str = self.main2primary.get(True)
+                    conn.send(cmd_str)  # send only takes string
+                except KeyError:
+                    break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while sending msg in primarythread()!")
+                    break
+
+                # Receiving from primary
+                try:
+                    # 1024 stands for bytes of data to be received
+                    data = conn.recv(1024)
+                    if data:
+                        self.primary2main.put(
+                            "recv:%s:{%s}" % (str(addr), data))
+                    else:
+                        self.primary2main.put("closing:" + str(addr))
+                        conn.close()
+                        self.sock_opened = False
+                        break
+                except Exception as excep:
+                    print(
+                        excep,
+                        ", Error while receiving msg in primarythread()!")
+                    break
 
 
 def close_all_secondary():
@@ -593,36 +678,11 @@ def main(argv):
     print('secondary port : %d' % secondary_port)
     print('management port : %d' % management_port)
 
-    # Creating primary socket object
-    primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding primary socket to a address. bind() takes tuple of host and port.
-    primary_sock.bind((host, primary_port))
-
-    # Listening primary at the address
-    primary_sock.listen(1)  # 5 denotes the number of clients can queue
-
-    primary_thread = Thread(target=primarythread,
-                            args=(primary_sock, MAIN2PRIMARY, PRIMARY2MAIN))
-    primary_thread.daemon = True
+    primary_thread = PrimaryThread(
+        host, primary_port, MAIN2PRIMARY, PRIMARY2MAIN)
     primary_thread.start()
 
-    # Creating secondary socket object
-    secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-    # Binding secondary socket to a address. bind() takes tuple of host
-    # and port.
-    secondary_sock.bind((host, secondary_port))
-
-    # Listening secondary at the address
-    secondary_sock.listen(MAX_SECONDARY)
-
-    # secondary process handling thread
-    accept_thread = Thread(target=acceptthread,
-                           args=(secondary_sock, MAIN2SEC, SEC2MAIN))
-    accept_thread.daemon = True
+    accept_thread = AcceptThread(host, secondary_port, MAIN2SEC, SEC2MAIN)
     accept_thread.start()
 
     shell = Shell()
@@ -641,16 +701,11 @@ def main(argv):
     shell = None
 
     try:
-        primary_sock.shutdown(socket.SHUT_RDWR)
-        primary_sock.close()
-    except socket.error as excep:
-        print(excep, ", Error while closing primary_sock in main()!")
-
-    try:
-        secondary_sock.shutdown(socket.SHUT_RDWR)
-        secondary_sock.close()
+        primary_thread.stop()
+        accept_thread.stop()
     except socket.error as excep:
-        print(excep, ", Error while closing secondary_sock in main()!")
+        print(excep, ", Error while terminating threads in main()!")
+        traceback.print_exc()
 
 
 if __name__ == "__main__":
-- 
2.13.1



More information about the spp mailing list