Source code for libmuscle.mcp.tcp_util

from errno import EBADF, ENOTCONN
from socket import SocketType

from typing_extensions import Buffer

import libmuscle.mark as mark


[docs] class SocketClosed(Exception): """Raised when trying to read from a socket that was closed. """ pass
_CONNECTION_ERRORS = (BrokenPipeError, ConnectionError, SocketClosed, TimeoutError) _CONNECTION_ERRNOS = (EBADF, ENOTCONN)
[docs] def is_disconnect(exception: Exception) -> bool: """Checks whether this is a disconnect or another problem.""" if isinstance(exception, _CONNECTION_ERRORS): return True if isinstance(exception, OSError): if exception.errno in _CONNECTION_ERRNOS: return True return False
[docs] def recv_all(socket: SocketType, length: int) -> Buffer: """Receive length bytes from a socket. Args: socket: Socket to receive on. length: Number of bytes to receive. Raises: SocketClosed: If the socket was closed by the peer. RuntimeError: If a read error occurred. """ databuf = bytearray(length) received_count = 0 while received_count < length: mark.before_tcp_receive(socket) bytes_left = length - received_count received_now = socket.recv_into( memoryview(databuf)[received_count:], bytes_left) if received_now == 0: raise SocketClosed("Socket closed while receiving") if received_now == -1: raise RuntimeError("Error receiving") received_count += received_now return databuf
[docs] def send_int64(socket: SocketType, data: int) -> None: """Sends an int as a 64-bit signed little endian number. Args: socket: The socket to send on. data: The number to send. Raises: RuntimeError: If there was an error sending the data. """ buf = data.to_bytes(8, byteorder='little') mark.before_tcp_send(socket) socket.sendall(buf)
[docs] def recv_int64(socket: SocketType) -> int: """Receives an int as a 64-bit signed little endian number. Args: socket: The socket to receive on. Raises: SocketClosed: If the socket was closed by the peer. RuntimeError: If a read error occurred. """ mark.before_tcp_receive(socket) buf = recv_all(socket, 8) return int.from_bytes(buf, 'little')
[docs] def send_frame(socket: SocketType, data: Buffer) -> None: """Sends a frame as length + data. Args: socket: The socket to send on data: The data to send Raises: RuntimeError: If there was an error sending the data. """ send_int64(socket, len(memoryview(data))) socket.sendall(data)
[docs] def recv_frame(socket: SocketType) -> Buffer: """Receives a frame as length + data. Args: socket: The socket to receive on Returns: The received data. Raises: RuntimeError: If there was an error receiving the data. """ length = recv_int64(socket) data = recv_all(socket, length) return data