-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmsg_socket.py
67 lines (52 loc) · 1.82 KB
/
msg_socket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import socket
import struct
class MsgSocket:
__socket__ = None
def __init__(self, sock=None):
if sock is None:
self.__socket__ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.__socket__ = sock
def __del__(self):
self.__socket__.close()
def settimeout(self, timeout):
return self.__socket__.settimeout(timeout)
def bind(self, address, port):
return self.__socket__.bind((str(address), int(port)))
def listen(self, backlog):
return self.__socket__.listen(backlog)
def accept(self):
conn, addr = self.__socket__.accept()
return MsgSocket(sock=conn), addr
def connect(self, address, port):
return self.__socket__.connect((str(address), int(port)))
def connect_ex(self, address, port):
return self.__socket__.connect_ex((str(address), int(port)))
def getpeername(self):
return self.__socket__.getpeername()
def send_msg(self, msg):
data = self.__prepare_data__(msg)
return self.__socket__.sendall(data)
def recv_msg(self):
msg_len = self.recv_all(4)
if not msg_len:
return None
msg_len = struct.unpack(">I", msg_len)[0]
return self.recv_all(msg_len).decode("utf-8")
def recv_all(self, n):
data = b""
try:
while len(data) < n:
packet = self.__socket__.recv(n - len(data))
if not packet:
return None
data += packet
except ConnectionError:
return None
return data
@staticmethod
def __prepare_data__(msg):
if type(msg) is not str:
raise TypeError("param 'msg' must be type of str")
data = msg.encode("utf-8")
return struct.pack('>I', len(data)) + data