from errno import EBADF, ENOTCONN
from socket import SocketType
from typing_extensions import Buffer
import libmuscle.mark as mark
[docs]
class SocketClosed(Exception):
"""Raised when trying to read from a socket that was closed.
"""
pass
_CONNECTION_ERRORS = (BrokenPipeError, ConnectionError, SocketClosed, TimeoutError)
_CONNECTION_ERRNOS = (EBADF, ENOTCONN)
[docs]
def is_disconnect(exception: Exception) -> bool:
"""Checks whether this is a disconnect or another problem."""
if isinstance(exception, _CONNECTION_ERRORS):
return True
if isinstance(exception, OSError):
if exception.errno in _CONNECTION_ERRNOS:
return True
return False
[docs]
def recv_all(socket: SocketType, length: int) -> Buffer:
"""Receive length bytes from a socket.
Args:
socket: Socket to receive on.
length: Number of bytes to receive.
Raises:
SocketClosed: If the socket was closed by the peer.
RuntimeError: If a read error occurred.
"""
databuf = bytearray(length)
received_count = 0
while received_count < length:
mark.before_tcp_receive(socket)
bytes_left = length - received_count
received_now = socket.recv_into(
memoryview(databuf)[received_count:], bytes_left)
if received_now == 0:
raise SocketClosed("Socket closed while receiving")
if received_now == -1:
raise RuntimeError("Error receiving")
received_count += received_now
return databuf
[docs]
def send_int64(socket: SocketType, data: int) -> None:
"""Sends an int as a 64-bit signed little endian number.
Args:
socket: The socket to send on.
data: The number to send.
Raises:
RuntimeError: If there was an error sending the data.
"""
buf = data.to_bytes(8, byteorder='little')
mark.before_tcp_send(socket)
socket.sendall(buf)
[docs]
def recv_int64(socket: SocketType) -> int:
"""Receives an int as a 64-bit signed little endian number.
Args:
socket: The socket to receive on.
Raises:
SocketClosed: If the socket was closed by the peer.
RuntimeError: If a read error occurred.
"""
mark.before_tcp_receive(socket)
buf = recv_all(socket, 8)
return int.from_bytes(buf, 'little')
[docs]
def send_frame(socket: SocketType, data: Buffer) -> None:
"""Sends a frame as length + data.
Args:
socket: The socket to send on
data: The data to send
Raises:
RuntimeError: If there was an error sending the data.
"""
send_int64(socket, len(memoryview(data)))
socket.sendall(data)
[docs]
def recv_frame(socket: SocketType) -> Buffer:
"""Receives a frame as length + data.
Args:
socket: The socket to receive on
Returns:
The received data.
Raises:
RuntimeError: If there was an error receiving the data.
"""
length = recv_int64(socket)
data = recv_all(socket, length)
return data