libmuscle.mcp.session_state module

class libmuscle.mcp.session_state.SessionState[source]

Bases: object

Tracks the state of an RPC session.

Our TCP server is multithreaded, spawning a thread for every active connection. If a connection is lost, then the thread will die, but only when it realises that the connection is gone, and that only happens during a send or receive.

It’s therefore possible to have multiple threads for the same session at the same time, if the client has detected a broken connection and reconnected (to a new thread) while the existing thread on the server is still processing the request, and will only discover the broken connection when it tries to send the result.

This class facilitates collaboration between those threads, making sure that each request is processed exactly once, and that the response continues to be available until the client has successfully received it.

A connection can be in two states: 1) request n has been received and is being processed, and 2) request n has been received and has been processed so that response n can be sent. We transition from 1) to 2) when processing of request n completes, and we transition from 2) back to 1) when request n+1 is received, at which point n is incremented and we’re back to 1). The response is sent during 2), but sending it doesn’t itself change the state, because a TCP send just puts the data in a local buffer and we can’t tell whether it’ll actually reach the receiver.

Objects of this class therefore store n, the most recently received request, and, if and only if we’re in state 2), response n.

set_response(response: Buffer) None[source]

Set the response and notify that we’re done.

This sets the response and notifies any threads waiting in wait_get_response() that one is available.

If a connection is lost while we’re waiting for a response to become available, then there are two threads: one with a dead connection that’s still doing the processing, and one with a working connection that should send the result. If the connection breaks again, then there will be another one.

When the work is done, we wake up all threads and have them all try to send, only the one with the working connection will succeed and continue, the rest will fail and quit. Or maybe all of them fail, in which case the client will have to send another request to try again.

Parameters:

response – A newly generated response

triage_request(request_nr: int) tuple[bool, bool][source]

Decide what to do about an incoming request

This returns a tuple (should_process, should_send) that tells the handler thread, given the current state of the connection, whether it should try to process the request, and whether it should try to send the result.

When request n is received, then we know that any requests before that have been completed successfully, because the client won’t send request n until after it’s received the response for n-1 in good order. Therefore, if the current request is n-1 and we receive request n, then we can delete the response for n-1, set the current request to n, and start processing.

If we receive request n while n is already the current request, then this is a re-request submitted to a new handling thread because the client encountered an error either during a previous send of request n or receive of response n, and is trying again. In that case, another thread is already processing the request, and we should wait for a response to be available and send it. (The thread doing the processing will have a broken connection, so it will fail on sending and quit.) If we also fail to send due to another disconnect, then the client will make another request and there will be a new thread to try to send the response again, until it succeeds.

If we receive a request with number < n, then this is an old request that was received previously, but then the thread got suspended for a while and the connection broke, so the client tried again and it was handled by another thread, and now this one is way behind. In that case, we do nothing and quit.

If we receive a request n+2 or more while the current request is n, then the client is skipping numbers or sending them out of order, so that shouldn’t happen.

To summarise: there are three possible options: under normal conditions the request should be processed and the response sent. If this is a re-request, then another thread will be processing already and we just need to send the response when it becomes available. If the request is old and has already been completed successfully, then we neither process nor request.

Note that request numbers may wrap, in which case this will crash or hang. At a rate of 1000 requests per second, this will take a bit under 3 million years, so it shouldn’t be an issue except for astrophysics simulations, but then those should just use AMUSE anyway.

Parameters:

request_nr – The request number of the received request

Returns:

Whether to try to process the request, and whether to try to send the response.

wait_get_response(request_nr: int) Buffer | None[source]

Wait for a response to be available and return it

It shouldn’t be possible for anyone to be waiting for response n while response n-1 isn’t available yet, but it is possible to be waiting for response n while that response has already been returned and we’re processing n+1 or later. In that case we return None, signalling that that response no longer exists and doesn’t need to be sent again. The send would fail anyway because if the client moved on then it was because our connection is broken.

Returns:

The response for the current request, or None if it’s no longer available.