from errno import ENOTCONN
import socket
from typing import Optional, Tuple
from libmuscle.mcp.transport_client import ProfileData, TransportClient
from libmuscle.mcp.tcp_util import recv_all, recv_int64, send_int64
from libmuscle.profiling import ProfileTimestamp
[docs]class TcpTransportClient(TransportClient):
"""A client that connects to a TCPTransport server.
"""
[docs] @staticmethod
def can_connect_to(location: str) -> bool:
"""Whether this client class can connect to the given location.
Args:
location: The location to potentially connect to.
Returns:
True iff this class can connect to this location.
"""
return location.startswith('tcp:')
def __init__(self, location: str) -> None:
"""Create a TcpClient for a given location.
The client will connect to this location and be able to request
messages from any instance and port represented by it.
Args:
location: A location string for the peer.
"""
addresses = location[4:].split(',')
sock: Optional[socket.SocketType] = None
for address in addresses:
try:
sock = self._connect(address)
break
except RuntimeError:
pass
if sock is None:
raise RuntimeError('Could not connect to the server at location'
' {}'.format(location))
else:
if hasattr(socket, "TCP_NODELAY"):
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
if hasattr(socket, "TCP_QUICKACK"):
sock.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
self._socket = sock
[docs] def call(self, request: bytes) -> Tuple[bytes, ProfileData]:
"""Send a request to the server and receive the response.
This is a blocking call.
Args:
request: The request to send
Returns:
The received response
"""
start_wait = ProfileTimestamp()
send_int64(self._socket, len(request))
self._socket.sendall(request)
length = recv_int64(self._socket)
start_transfer = ProfileTimestamp()
response = recv_all(self._socket, length)
stop_transfer = ProfileTimestamp()
return response, (start_wait, start_transfer, stop_transfer)
[docs] def close(self) -> None:
"""Closes this client.
This closes any connections this client has and/or performs
other shutdown activities.
"""
try:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
except OSError as e:
# This can happen if the peer has shut down already when we
# close our connection to it, which is fine.
if e.errno != ENOTCONN:
raise
def _connect(self, address: str) -> socket.SocketType:
loc_parts = address.rsplit(':', 1)
host = loc_parts[0]
if host.startswith('['):
if host.endswith(']'):
host = host[1:-1]
else:
raise RuntimeError('Invalid address')
port = int(loc_parts[1])
addrinfo = socket.getaddrinfo(
host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)
for family, socktype, proto, _, sockaddr in addrinfo:
try:
sock = socket.socket(family, socktype, proto)
except Exception:
continue
try:
sock.connect(sockaddr)
except Exception:
sock.close()
continue
return sock
raise RuntimeError('Could not connect')