一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

Python系列 之 socket模塊 - 非阻塞模式_python socket 非阻塞

 禁忌石 2023-11-17 發(fā)布于浙江

socket模塊學(xué)習(xí)

非阻塞模式

socket的默認(rèn)情況下是阻塞模式:socket.accept()方法在沒有接受到連接之前不能處理已經(jīng)建立連接的其他操作,以及在recv()方法或者其他接受數(shù)據(jù)的方法時候都是阻塞的,如果沒有接受到數(shù)據(jù)就會一直處于阻塞狀態(tài),來等待接受數(shù)據(jù),這種情況只有通過開啟新的進(jìn)程或者線程來解決來自不同客戶端的連接請求或者接受數(shù)據(jù);socket可以支持非阻塞的模式;可以使用以下兩種方法來設(shè)置socket的非阻塞模式:

# 設(shè)置套接字為阻塞或非阻塞模式:如果 flag 為 false,則將套接字設(shè)置為非阻塞,否則設(shè)置為阻塞。
# socket.setblocking(flag)
# 如果value賦為 0,則套接字將處于非阻塞模式。如果指定為 None,則套接字將處于阻塞模式。
# socket.settimeout(value)
# 阻塞
sock.setblocking(True)
sock.settimeout(None)
# 非阻塞
sock.setblocking(False)
sock.settimeout(0.0)

在非阻塞模式下可以實現(xiàn)在單線程模式下實現(xiàn)與多個客戶端連接的交互:

非阻塞模式的服務(wù)端:

# demo_socket_server_2.py文件
import logging
import socket

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)


class ServerClass(object):
    """docstring for ServerClass"""

    def __init__(self):
        self.__HOST = "127.0.0.1"
        self.__PORT = 9999
        self.ADDR = (self.__HOST, self.__PORT)
        self.__TCP_SOCKET = socket.socket(
            family=socket.AF_INET, type=socket.SOCK_STREAM)
        # 設(shè)置非阻塞
        # self.__TCP_SOCKET.setblocking(False)
        self.__TCP_SOCKET.settimeout(0.0)
        # 用來存放套接字對象的列表
        self.connlist = list()

    def start_server(self):
        with self.__TCP_SOCKET as sock:
            sock.bind(self.ADDR)
            sock.listen()
            logger.info("Server is Running")
            while True:
                try:
                    conn, addr = sock.accept()
                    # logger.info(conn)
                    # 將連接的套接字對象設(shè)置為非阻塞
                    conn.setblocking(False)
                    msg = f"Hi,{addr}"
                    self.send_data(conn, msg)
                    # 添加到列表
                    self.connlist.append(conn)
                # 如果沒有連接進(jìn)來需要捕獲BlockingIOError異常
                except BlockingIOError as e:
                    pass
                    # logger.debug("沒有新的客戶端連接")
                # 循環(huán)套接字對象列表 進(jìn)行收發(fā)數(shù)據(jù)
                for conn in self.connlist:
                    msg = self.recv_data(conn)
                    self.send_data(conn, msg)

    def recv_data(self, conn):
        """接收數(shù)據(jù)"""
        try:
            msg = conn.recv(1024).decode("utf-8")
            if not msg or msg in ["quit"]:
                logger.debug("斷開連接")
                # 將套接字對象從列表移除
                self.connlist.remove(conn)
            else:
                logger.info(msg)
                return msg
        except IOError as e:
            pass
            # logger.debug("沒有接收到數(shù)據(jù)")

    def send_data(self, conn, msg):
        """發(fā)送數(shù)據(jù)"""
        if msg:
            msg = f"From Server {msg}"
            try:
                conn.sendall(msg.encode("utf-8"))
            except ConnectionResetError as e:
                pass
                logger.debug("連接已斷開,無法再發(fā)送信息")


if __name__ == '__main__':
    ServerClass().start_server()

服務(wù)端非阻塞模式的情況下 主要通過循環(huán)控制不停的去捕獲BlockingIOError 異常來判斷是否有新的連接進(jìn)來,或者是是否有數(shù)據(jù)可以接受到;該情況下CPU的使用率會很高

客戶端代碼:

import logging
import time
import socket
import threading

# demo_logging_1.load_loggingconfig()
logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)


class ClientClass(object):
    """docstring for ClientClass"""

    def __init__(self):
        self.__HOST = "127.0.0.1"
        self.__PORT = 9999
        self.__ADDR = (self.__HOST, self.__PORT)
        self.__TCP_SOCKET = socket.socket(
            family=socket.AF_INET, type=socket.SOCK_STREAM)

    def start_client(self):
        """啟動客戶端"""
        with self.__TCP_SOCKET as sock:
            # 鏈接服務(wù)端地址
            sock.connect(self.__ADDR)
            logger.info("%s" % sock.recv(1024).decode("utf-8"))
            recv_t = threading.Thread(
                target=self.recv_data, args=(sock,))
            # 向服務(wù)端發(fā)送數(shù)據(jù)
            send_t = threading.Thread(
                target=self.send_data, args=(sock,))
            # 接收數(shù)據(jù)線程設(shè)置為守護(hù)線程
            recv_t.setDaemon(True)
            recv_t.start()
            send_t.start()
            send_t.join()

    def send_data(self, sock):
        while True:
            send_data = input()
            sock.sendall(send_data.encode("utf-8"))
            # 如果輸入 quit 或者 exit 斷開連接
            if send_data in ("quit"):
                logger.info("正在退出...")
                break

    def recv_data(self, sock):
        while True:
            try:
                recv_data = sock.recv(1024).decode("utf-8")
                logger.info(recv_data)
            except Exception as e:
                pass
                # logger.error(e, exc_info=True)
                time.sleep(0.5)
                break


if __name__ == '__main__':
    ClientClass().start_client()

效果:
在這里插入圖片描述

服務(wù)端不需要多開啟進(jìn)程或者線程就可以實現(xiàn)與多個客戶端之間通信

select模塊

把上面非阻塞的服務(wù)端修改下,利用select模塊的select方法對套接字對象進(jìn)行監(jiān)控;

# select.select(rlist, wlist, xlist[, timeout])
  1. rlist:等待,直到可以開始讀取
  2. wlist:等待,直到可以開始寫入
  3. xlist:等待“異常情況”
  4. 可選的 timeout 參數(shù)以一個浮點數(shù)表示超時秒數(shù)

服務(wù)端:

# demo_socket_server_3.py文件
import select
import logging
import socket
import queue

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)


class ServerClass(object):
    """docstring for ServerClass"""

    def __init__(self):
        self.__HOST = "127.0.0.1"
        self.__PORT = 9999
        self.ADDR = (self.__HOST, self.__PORT)
        self.__TCP_SOCKET = socket.socket(
            family=socket.AF_INET, type=socket.SOCK_STREAM)
        # 設(shè)置非阻塞
        # self.__TCP_SOCKET.setblocking(False)
        self.__TCP_SOCKET.settimeout(0.0)
        # 用來存放套接字對象的列表
        self.inputlist = list()
        self.outputlist = list()
        # 存放客戶端發(fā)送過來的數(shù)據(jù)
        self.msg_dict = dict()

    def start_server(self):
        with self.__TCP_SOCKET as sock:
            sock.bind(self.ADDR)
            sock.listen()
            logger.info("Server is Running")
            # 將套接字對象添加到列表中
            self.inputlist.append(sock)
            while True:
                rlist, wlist, xlist = select.select(
                    self.inputlist, self.outputlist, self.inputlist)
                for r_conn in rlist:
                    # 如果套接字對象是self.__TCP_SOCKET;表示有新的連接進(jìn)來了需要接受
                    if r_conn is sock:
                        conn, addr = r_conn.accept()
                        logger.info(f"{addr} 已連接")
                        # 和連接的客戶端打個招呼
                        conn.sendall(f"Hi,{addr}".encode("utf-8"))
                        # 設(shè)置為非阻塞
                        conn.setblocking(False)
                        # 將新的連接添加到套接字對象列表進(jìn)行監(jiān)控
                        self.inputlist.append(conn)
                        # 每接受一個連接: 將連接作為鍵,一個空隊列作為值
                        self.msg_dict[conn] = queue.Queue()
                    # 否則 是已經(jīng)建立的連接發(fā)送過來數(shù)據(jù)了 需要接受數(shù)據(jù)
                    else:
                        client_addr = r_conn.getpeername()
                        try:
                            recv_data = r_conn.recv(1024)
                            # 如果有數(shù)據(jù) 接收;數(shù)據(jù)存儲,將該連接添加到self.outputlist準(zhǔn)備下一步發(fā)送數(shù)據(jù)
                            if recv_data and recv_data.decode("utf-8") != "quit":
                                logger.info(
                                    f"接收到來自 {client_addr} 的數(shù)據(jù):{recv_data.decode('utf-8')}")
                                # msg_dict: 連接為 字典鍵  接收到的數(shù)據(jù)放到隊列 為 字典值
                                self.msg_dict[r_conn].put(recv_data)
                                if r_conn not in self.outputlist:
                                    self.outputlist.append(r_conn)
                            # 否則 證明該連接已經(jīng)斷開了
                            else:
                                logger.info(f"{client_addr} 已斷開")
                                # 執(zhí)行清除
                                self.clear_conn(r_conn)
                        except ConnectionResetError as e:
                            # 捕獲 ConnectionResetError 表示 客戶端斷開
                            logger.info(f"{client_addr} 異常斷開")
                            # 執(zhí)行清除
                            self.clear_conn(r_conn)
                for w_conn in wlist:
                    try:
                        if w_conn in self.msg_dict:
                            msg = self.msg_dict[w_conn].get(False)
                    except queue.Empty as e:
                        # 數(shù)據(jù)隊列為空表示 該連接沒有發(fā)送數(shù)據(jù) 服務(wù)端沒有接收到
                        pass
                    else:
                        try:
                            w_conn.sendall(b'From Server ' + msg)
                        except ConnectionResetError as e:
                            # 捕獲ConnectionResetError 客戶端斷開 執(zhí)行清除                            
                            self.clear_conn(w_conn)

                for e_conn in xlist:
                    e_conn.close()
                    # 執(zhí)行清除
                    self.clear_conn(e_conn)

    def clear_conn(self, conn):
        """清除已經(jīng)斷開的連接"""
        if conn in self.inputlist:
            self.inputlist.remove(conn)
        if conn in self.outputlist:
            self.outputlist.remove(conn)
        if conn in self.msg_dict:
            del self.msg_dict[conn]


if __name__ == '__main__':
    ServerClass().start_server()

現(xiàn)在設(shè)定的socket對象雖然是非阻塞的,但是因為select方法的作用,使用起來好像和阻塞的沒有什么區(qū)別,是因為select把socket監(jiān)控起來

rlist, wlist, xlist = select.select(self.inputlist, self.outputlist, self.inputlist)

如果在inputlist,outpulist中有活動的socket對象就會返回在rlist,wlist,xlist中;然后循環(huán)調(diào)用每個list去完成邏輯處理。
該方法同樣是可以在單線程模式下實現(xiàn)服務(wù)端對多個客戶端之間進(jìn)行通信;

客戶端代碼:同上;

效果展示:
在這里插入圖片描述

selectors模塊

selectors模塊是高級 I/O 復(fù)用庫,它建立在 select 模塊原型的基礎(chǔ)之上。Python文檔推薦用戶改用此模塊。

# 默認(rèn)的選擇器類,使用當(dāng)前平臺上可用的最高效選擇器的實現(xiàn)
sel = selectors.DefaultSelector()
# 注冊一個用于選擇的文件對象,在其上監(jiān)視 I/O 事件
sel.register(fileobj, events, data=None)
# 注銷對一個文件對象的選擇,移除對它的監(jiān)視
sel.unregister(fileobj)

# 等待直到有已注冊的文件對象就緒,或是超過時限
# 返回由 (key, events) 元組構(gòu)成的列表,每項各表示一個就緒的文件對象,key 是對應(yīng)于就緒文件對象的 SelectorKey 實例。 events 是在此文件對象上等待的事件位掩碼。
events = sel.select()

selectors.SelectorKey類 用來將文件對象關(guān)聯(lián)到其下層的文件描述器、選定事件掩碼和附加數(shù)據(jù)等;有以下屬性:

名稱說明
fileobj已注冊的文件對象
fd下層的文件描述器
events此文件對象上被等待的事件
data可選的關(guān)聯(lián)到此文件對象的不透明數(shù)據(jù):例如,這可被用來存儲各個客戶端的會話 ID

events 是一個位掩碼,說明哪些 I/O 事件要在給定的文件對象上執(zhí)行等待。 它可以是以下模塊級常量的組合:

名稱說明
selectors.EVENT_READ可讀
selectors.EVENT_WRITE可寫

服務(wù)端代碼示例:

import selectors
import logging
import socket

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)


class ServerClass(object):
    """docstring for ServerClass"""

    def __init__(self):
        self.__HOST = "127.0.0.1"
        self.__PORT = 9999
        self.ADDR = (self.__HOST, self.__PORT)
        self.__TCP_SOCKET = socket.socket(
            family=socket.AF_INET, type=socket.SOCK_STREAM)
        # 設(shè)置非阻塞
        # self.__TCP_SOCKET.setblocking(False)
        self.__TCP_SOCKET.settimeout(0.0)
        self.sele = selectors.DefaultSelector()

    def start_server(self):
        with self.__TCP_SOCKET as sock:
            sock.bind(self.ADDR)
            sock.listen()
            logger.info("Server is Running")
            self.sele.register(
                sock, selectors.EVENT_READ, self.accept_conn)
            while True:
                events = self.sele.select()
                for key, mask in events:
                    callback = key.data
                    callback(key.fileobj, mask)

    def accept_conn(self, sock, mask):
        conn, addr = sock.accept()
        logger.info(f"{addr} 已連接")
        conn.sendall(f"Hi,{addr}".encode("utf-8"))
        conn.setblocking(False)
        self.sele.register(conn, selectors.EVENT_READ, self.read)

    def read(self, conn, mask):
        try:
            client_addr = conn.getpeername()
            recv_data = conn.recv(1024)
            if recv_data and recv_data.decode("utf-8") != "quit":
                logger.info(
                    f"接收到來自 {client_addr} 的數(shù)據(jù):{recv_data.decode('utf-8')}")
                conn.sendall(b'From Server ' + recv_data)
            else:
                logger.info(f"{client_addr} 已斷開")
                self.sele.unregister(conn)
        except ConnectionResetError as e:
            logger.info(f"{client_addr} 異常斷開")
            self.sele.unregister(conn)
            conn.close()


if __name__ == '__main__':
    ServerClass().start_server()

比select模塊的select方法更簡潔一些;

客戶端代碼 :同上;
效果展示:
在這里插入圖片描述
和select方法結(jié)果是一樣的;
主要參考:
一只小小寄居蟹 --這個博客
Python 中文文檔 selectors — 高級 I/O 復(fù)用庫

以上就是所有關(guān)于Python socket模塊的非阻塞模式的學(xué)習(xí)
如果有什么不對的地方,歡迎指正!

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    色老汉在线视频免费亚欧| 亚洲欧美日产综合在线网| 一区二区三区精品人妻| 国产日韩在线一二三区| 国产精品免费福利在线| 欧美日韩亚洲国产av| 91精品蜜臀一区二区三区| 亚洲欧美日本国产不卡| 国产精品伦一区二区三区四季| 中文字幕人妻av不卡| 欧美丰满人妻少妇精品| 人妻少妇av中文字幕乱码高清| 91人妻久久精品一区二区三区| 小黄片大全欧美一区二区| 成人欧美精品一区二区三区| 精品国产亚洲免费91| 极品熟女一区二区三区| 人妻一区二区三区多毛女| 成人午夜在线视频观看| 国产精品国三级国产专不卡| 在线视频三区日本精品| 麻豆视频传媒入口在线看| 欧美不雅视频午夜福利| 亚洲一区二区三区熟女少妇| 欧美大黄片在线免费观看| 欧美精品专区一区二区| 老熟妇2久久国内精品| 久久一区内射污污内射亚洲| 中文字幕不卡欧美在线| 91后入中出内射在线| 日韩精品视频香蕉视频| 91国内视频一区二区三区| 亚洲午夜精品视频观看| 国产精品午夜小视频观看| 亚洲黄色在线观看免费高清| 欧美一区二区三区五月婷婷| 国产精品丝袜美腿一区二区| 国产亚洲精品久久久优势| 亚洲天堂久久精品成人| 亚洲伦理中文字幕在线观看 | 国产日韩中文视频一区|