socket模塊學(xué)習(xí)
非阻塞模式
socket的默認(rèn)情況下是阻塞模式:socket.accept()方法在沒有接受到連接之前不能處理已經(jīng)建立連接的其他操作,以及在recv()方法或者其他接受數(shù)據(jù)的方法時候都是阻塞的,如果沒有接受到數(shù)據(jù)就會一直處于阻塞狀態(tài),來等待接受數(shù)據(jù),這種情況只有通過開啟新的進(jìn)程或者線程來解決來自不同客戶端的連接請求或者接受數(shù)據(jù);socket可以支持非阻塞的模式;可以使用以下兩種方法來設(shè)置socket的非阻塞模式:
# 設(shè)置套接字為阻塞或非阻塞模式:如果 flag 為 false,則將套接字設(shè)置為非阻塞,否則設(shè)置為阻塞。
# socket.setblocking(flag)
# 如果value賦為 0,則套接字將處于非阻塞模式。如果指定為 None,則套接字將處于阻塞模式。
# socket.settimeout(value)
# 阻塞
sock.setblocking(True)
sock.settimeout(None)
# 非阻塞
sock.setblocking(False)
sock.settimeout(0.0)
在非阻塞模式下可以實現(xiàn)在單線程模式下實現(xiàn)與多個客戶端連接的交互:
非阻塞模式的服務(wù)端:
# demo_socket_server_2.py文件
import logging
import socket
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
class ServerClass(object):
"""docstring for ServerClass"""
def __init__(self):
self.__HOST = "127.0.0.1"
self.__PORT = 9999
self.ADDR = (self.__HOST, self.__PORT)
self.__TCP_SOCKET = socket.socket(
family=socket.AF_INET, type=socket.SOCK_STREAM)
# 設(shè)置非阻塞
# self.__TCP_SOCKET.setblocking(False)
self.__TCP_SOCKET.settimeout(0.0)
# 用來存放套接字對象的列表
self.connlist = list()
def start_server(self):
with self.__TCP_SOCKET as sock:
sock.bind(self.ADDR)
sock.listen()
logger.info("Server is Running")
while True:
try:
conn, addr = sock.accept()
# logger.info(conn)
# 將連接的套接字對象設(shè)置為非阻塞
conn.setblocking(False)
msg = f"Hi,{addr}"
self.send_data(conn, msg)
# 添加到列表
self.connlist.append(conn)
# 如果沒有連接進(jìn)來需要捕獲BlockingIOError異常
except BlockingIOError as e:
pass
# logger.debug("沒有新的客戶端連接")
# 循環(huán)套接字對象列表 進(jìn)行收發(fā)數(shù)據(jù)
for conn in self.connlist:
msg = self.recv_data(conn)
self.send_data(conn, msg)
def recv_data(self, conn):
"""接收數(shù)據(jù)"""
try:
msg = conn.recv(1024).decode("utf-8")
if not msg or msg in ["quit"]:
logger.debug("斷開連接")
# 將套接字對象從列表移除
self.connlist.remove(conn)
else:
logger.info(msg)
return msg
except IOError as e:
pass
# logger.debug("沒有接收到數(shù)據(jù)")
def send_data(self, conn, msg):
"""發(fā)送數(shù)據(jù)"""
if msg:
msg = f"From Server {msg}"
try:
conn.sendall(msg.encode("utf-8"))
except ConnectionResetError as e:
pass
logger.debug("連接已斷開,無法再發(fā)送信息")
if __name__ == '__main__':
ServerClass().start_server()
服務(wù)端非阻塞模式的情況下 主要通過循環(huán)控制不停的去捕獲BlockingIOError 異常來判斷是否有新的連接進(jìn)來,或者是是否有數(shù)據(jù)可以接受到;該情況下CPU的使用率會很高
客戶端代碼:
import logging
import time
import socket
import threading
# demo_logging_1.load_loggingconfig()
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
class ClientClass(object):
"""docstring for ClientClass"""
def __init__(self):
self.__HOST = "127.0.0.1"
self.__PORT = 9999
self.__ADDR = (self.__HOST, self.__PORT)
self.__TCP_SOCKET = socket.socket(
family=socket.AF_INET, type=socket.SOCK_STREAM)
def start_client(self):
"""啟動客戶端"""
with self.__TCP_SOCKET as sock:
# 鏈接服務(wù)端地址
sock.connect(self.__ADDR)
logger.info("%s" % sock.recv(1024).decode("utf-8"))
recv_t = threading.Thread(
target=self.recv_data, args=(sock,))
# 向服務(wù)端發(fā)送數(shù)據(jù)
send_t = threading.Thread(
target=self.send_data, args=(sock,))
# 接收數(shù)據(jù)線程設(shè)置為守護(hù)線程
recv_t.setDaemon(True)
recv_t.start()
send_t.start()
send_t.join()
def send_data(self, sock):
while True:
send_data = input()
sock.sendall(send_data.encode("utf-8"))
# 如果輸入 quit 或者 exit 斷開連接
if send_data in ("quit"):
logger.info("正在退出...")
break
def recv_data(self, sock):
while True:
try:
recv_data = sock.recv(1024).decode("utf-8")
logger.info(recv_data)
except Exception as e:
pass
# logger.error(e, exc_info=True)
time.sleep(0.5)
break
if __name__ == '__main__':
ClientClass().start_client()
效果:
服務(wù)端不需要多開啟進(jìn)程或者線程就可以實現(xiàn)與多個客戶端之間通信
select模塊
把上面非阻塞的服務(wù)端修改下,利用select模塊的select方法對套接字對象進(jìn)行監(jiān)控;
# select.select(rlist, wlist, xlist[, timeout])
- rlist:等待,直到可以開始讀取
- wlist:等待,直到可以開始寫入
- xlist:等待“異常情況”
- 可選的 timeout 參數(shù)以一個浮點數(shù)表示超時秒數(shù)
服務(wù)端:
# demo_socket_server_3.py文件
import select
import logging
import socket
import queue
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
class ServerClass(object):
"""docstring for ServerClass"""
def __init__(self):
self.__HOST = "127.0.0.1"
self.__PORT = 9999
self.ADDR = (self.__HOST, self.__PORT)
self.__TCP_SOCKET = socket.socket(
family=socket.AF_INET, type=socket.SOCK_STREAM)
# 設(shè)置非阻塞
# self.__TCP_SOCKET.setblocking(False)
self.__TCP_SOCKET.settimeout(0.0)
# 用來存放套接字對象的列表
self.inputlist = list()
self.outputlist = list()
# 存放客戶端發(fā)送過來的數(shù)據(jù)
self.msg_dict = dict()
def start_server(self):
with self.__TCP_SOCKET as sock:
sock.bind(self.ADDR)
sock.listen()
logger.info("Server is Running")
# 將套接字對象添加到列表中
self.inputlist.append(sock)
while True:
rlist, wlist, xlist = select.select(
self.inputlist, self.outputlist, self.inputlist)
for r_conn in rlist:
# 如果套接字對象是self.__TCP_SOCKET;表示有新的連接進(jìn)來了需要接受
if r_conn is sock:
conn, addr = r_conn.accept()
logger.info(f"{addr} 已連接")
# 和連接的客戶端打個招呼
conn.sendall(f"Hi,{addr}".encode("utf-8"))
# 設(shè)置為非阻塞
conn.setblocking(False)
# 將新的連接添加到套接字對象列表進(jìn)行監(jiān)控
self.inputlist.append(conn)
# 每接受一個連接: 將連接作為鍵,一個空隊列作為值
self.msg_dict[conn] = queue.Queue()
# 否則 是已經(jīng)建立的連接發(fā)送過來數(shù)據(jù)了 需要接受數(shù)據(jù)
else:
client_addr = r_conn.getpeername()
try:
recv_data = r_conn.recv(1024)
# 如果有數(shù)據(jù) 接收;數(shù)據(jù)存儲,將該連接添加到self.outputlist準(zhǔn)備下一步發(fā)送數(shù)據(jù)
if recv_data and recv_data.decode("utf-8") != "quit":
logger.info(
f"接收到來自 {client_addr} 的數(shù)據(jù):{recv_data.decode('utf-8')}")
# msg_dict: 連接為 字典鍵 接收到的數(shù)據(jù)放到隊列 為 字典值
self.msg_dict[r_conn].put(recv_data)
if r_conn not in self.outputlist:
self.outputlist.append(r_conn)
# 否則 證明該連接已經(jīng)斷開了
else:
logger.info(f"{client_addr} 已斷開")
# 執(zhí)行清除
self.clear_conn(r_conn)
except ConnectionResetError as e:
# 捕獲 ConnectionResetError 表示 客戶端斷開
logger.info(f"{client_addr} 異常斷開")
# 執(zhí)行清除
self.clear_conn(r_conn)
for w_conn in wlist:
try:
if w_conn in self.msg_dict:
msg = self.msg_dict[w_conn].get(False)
except queue.Empty as e:
# 數(shù)據(jù)隊列為空表示 該連接沒有發(fā)送數(shù)據(jù) 服務(wù)端沒有接收到
pass
else:
try:
w_conn.sendall(b'From Server ' + msg)
except ConnectionResetError as e:
# 捕獲ConnectionResetError 客戶端斷開 執(zhí)行清除
self.clear_conn(w_conn)
for e_conn in xlist:
e_conn.close()
# 執(zhí)行清除
self.clear_conn(e_conn)
def clear_conn(self, conn):
"""清除已經(jīng)斷開的連接"""
if conn in self.inputlist:
self.inputlist.remove(conn)
if conn in self.outputlist:
self.outputlist.remove(conn)
if conn in self.msg_dict:
del self.msg_dict[conn]
if __name__ == '__main__':
ServerClass().start_server()
現(xiàn)在設(shè)定的socket對象雖然是非阻塞的,但是因為select方法的作用,使用起來好像和阻塞的沒有什么區(qū)別,是因為select把socket監(jiān)控起來
rlist, wlist, xlist = select.select(self.inputlist, self.outputlist, self.inputlist)
如果在inputlist,outpulist中有活動的socket對象就會返回在rlist,wlist,xlist中;然后循環(huán)調(diào)用每個list去完成邏輯處理。 該方法同樣是可以在單線程模式下實現(xiàn)服務(wù)端對多個客戶端之間進(jìn)行通信;
客戶端代碼:同上;
效果展示:
selectors模塊
selectors模塊是高級 I/O 復(fù)用庫,它建立在 select 模塊原型的基礎(chǔ)之上。Python文檔推薦用戶改用此模塊。
# 默認(rèn)的選擇器類,使用當(dāng)前平臺上可用的最高效選擇器的實現(xiàn)
sel = selectors.DefaultSelector()
# 注冊一個用于選擇的文件對象,在其上監(jiān)視 I/O 事件
sel.register(fileobj, events, data=None)
# 注銷對一個文件對象的選擇,移除對它的監(jiān)視
sel.unregister(fileobj)
# 等待直到有已注冊的文件對象就緒,或是超過時限
# 返回由 (key, events) 元組構(gòu)成的列表,每項各表示一個就緒的文件對象,key 是對應(yīng)于就緒文件對象的 SelectorKey 實例。 events 是在此文件對象上等待的事件位掩碼。
events = sel.select()
selectors.SelectorKey類 用來將文件對象關(guān)聯(lián)到其下層的文件描述器、選定事件掩碼和附加數(shù)據(jù)等;有以下屬性:
名稱 | 說明 |
---|
fileobj | 已注冊的文件對象 | fd | 下層的文件描述器 | events | 此文件對象上被等待的事件 | data | 可選的關(guān)聯(lián)到此文件對象的不透明數(shù)據(jù):例如,這可被用來存儲各個客戶端的會話 ID |
events 是一個位掩碼,說明哪些 I/O 事件要在給定的文件對象上執(zhí)行等待。 它可以是以下模塊級常量的組合:
名稱 | 說明 |
---|
selectors.EVENT_READ | 可讀 | selectors.EVENT_WRITE | 可寫 |
服務(wù)端代碼示例:
import selectors
import logging
import socket
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(__name__)
class ServerClass(object):
"""docstring for ServerClass"""
def __init__(self):
self.__HOST = "127.0.0.1"
self.__PORT = 9999
self.ADDR = (self.__HOST, self.__PORT)
self.__TCP_SOCKET = socket.socket(
family=socket.AF_INET, type=socket.SOCK_STREAM)
# 設(shè)置非阻塞
# self.__TCP_SOCKET.setblocking(False)
self.__TCP_SOCKET.settimeout(0.0)
self.sele = selectors.DefaultSelector()
def start_server(self):
with self.__TCP_SOCKET as sock:
sock.bind(self.ADDR)
sock.listen()
logger.info("Server is Running")
self.sele.register(
sock, selectors.EVENT_READ, self.accept_conn)
while True:
events = self.sele.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
def accept_conn(self, sock, mask):
conn, addr = sock.accept()
logger.info(f"{addr} 已連接")
conn.sendall(f"Hi,{addr}".encode("utf-8"))
conn.setblocking(False)
self.sele.register(conn, selectors.EVENT_READ, self.read)
def read(self, conn, mask):
try:
client_addr = conn.getpeername()
recv_data = conn.recv(1024)
if recv_data and recv_data.decode("utf-8") != "quit":
logger.info(
f"接收到來自 {client_addr} 的數(shù)據(jù):{recv_data.decode('utf-8')}")
conn.sendall(b'From Server ' + recv_data)
else:
logger.info(f"{client_addr} 已斷開")
self.sele.unregister(conn)
except ConnectionResetError as e:
logger.info(f"{client_addr} 異常斷開")
self.sele.unregister(conn)
conn.close()
if __name__ == '__main__':
ServerClass().start_server()
比select模塊的select方法更簡潔一些;
客戶端代碼 :同上; 效果展示: 和select方法結(jié)果是一樣的; 主要參考: 一只小小寄居蟹 --這個博客 Python 中文文檔 selectors — 高級 I/O 復(fù)用庫
以上就是所有關(guān)于Python socket模塊的非阻塞模式的學(xué)習(xí) 如果有什么不對的地方,歡迎指正!
|