from socket import SocketType
[docs]class SocketClosed(Exception):
"""Raised when trying to read from a socket that was closed.
"""
pass
[docs]def recv_all(socket: SocketType, length: int) -> bytes:
"""Receive length bytes from a socket.
Args:
socket: Socket to receive on.
length: Number of bytes to receive.
Raises:
SocketClosed: If the socket was closed by the peer.
RuntimeError: If a read error occurred.
"""
databuf = bytearray(length)
received_count = 0
while received_count < length:
bytes_left = length - received_count
received_now = socket.recv_into(
memoryview(databuf)[received_count:], bytes_left)
if received_now == 0:
raise SocketClosed("Socket closed while receiving")
if received_now == -1:
raise RuntimeError("Error receiving")
received_count += received_now
return databuf
[docs]def send_int64(socket: SocketType, data: int) -> None:
"""Sends an int as a 64-bit signed little endian number.
Args:
socket: The socket to send on.
data: The number to send.
Raises:
RuntimeError: If there was an error sending the data.
"""
buf = data.to_bytes(8, byteorder='little')
socket.sendall(buf)
[docs]def recv_int64(socket: SocketType) -> int:
"""Receives an int as a 64-bit signed little endian number.
Args:
socket: The socket to receive on.
Raises:
SocketClosed: If the socket was closed by the peer.
RuntimeError: If a read error occurred.
"""
buf = recv_all(socket, 8)
return int.from_bytes(buf, 'little')