Dynamic port configuration
In the tutorial we have seen that, when creating an Instance, you need to indicate
the ports that the instance has. For, example, the reaction component defined two
ports:
instance = Instance({
Operator.F_INIT: ['initial_state'], # 1D Grid
Operator.O_F: ['final_state']}) # 1D Grid
Instance instance(argc, argv, {
{Operator::F_INIT, {"initial_state"}}, // 1D Grid
{Operator::O_F, {"final_state"}}}); // 1D Grid
Some components in your simulation can be generic, for example merging multiple inputs into a single container. For these generic components, MUSCLE3 allows the component to have a dynamic ports configuration: the ports will be created based on the yMMSL configuration instead of statically configured when creating the Instance.
Changed in version 0.10.0: Added support for dynamic O_I and S ports.
Example: generic combiner component
The generic combiner component from this example receives inputs on multiple F_INIT
ports, combines them into a list and finally sends it on its output port(s). In the code
listings below, you see how the component:
Indicates it wants a dynamic port configuration, by not providing a port description when creating the Instance.
Requests which ports are available, and checks that it only has
F_INITandO_Fports.Receives on all connected
F_INITports.Combines the message data from all inputs and sends it on all connected
O_Fports.
docs/source/examples/python/combiner.pyimport logging
from ymmsl import Operator
from libmuscle import Instance, InstanceFlags, Message
def main():
# 1. Request dynamic port configuration by not providing a ports description
instance = Instance()
# Optionally provide instance flags with: Instance(flags=...)
# 2. Request which ports are available:
ports = instance.list_ports()
# check that we don't have any O_I or S ports defined
if ports.get(Operator.O_I) or ports.get(Operator.S):
msg = "The combiner component does not support O_I or S ports."
instance.error_shutdown(msg)
raise RuntimeError(msg)
# Find connected F_INIT ports, and sort them by their name
f_init_ports = sorted(
port for port in ports.get(Operator.F_INIT, []) if instance.is_connected(port)
)
# Check that we have at least one input
if not f_init_ports:
msg = "The combiner actor requires at least one connected F_INIT port."
instance.error_shutdown(msg)
raise RuntimeError(msg)
while instance.reuse_instance():
# 3. Receive on all connected F_INIT ports
input_messages = []
for port in f_init_ports:
input_messages.append(instance.receive(port))
# 4. Combine the input and send on connected O_F ports
timestamp = input_messages[0].timestamp
next_timestamp = input_messages[0].next_timestamp
data = [msg.data for msg in input_messages]
output = Message(timestamp, next_timestamp, data)
for port in ports.get(Operator.O_F, []):
instance.send(port, output)
if __name__ == "__main__":
logging.basicConfig()
main()
docs/source/examples/cpp/combiner.cpp#include <algorithm>
#include <iostream>
#include <stdexcept>
#include <string>
#include <vector>
#include <libmuscle/libmuscle.hpp>
#include <ymmsl/ymmsl.hpp>
using libmuscle::Data;
using libmuscle::Instance;
using libmuscle::Message;
using ymmsl::Operator;
int main(int argc, char * argv[]) {
// 1. Request dynamic port configuration by not providing a ports description
Instance instance(argc, argv);
// Optionally provide instance flags with: Instance(argc, argv, instance_flags)
// 2. Request which ports are available:
auto ports = instance.list_ports();
// check that we don't have any O_I or S ports defined
if (!ports[Operator::O_I].empty() || !ports[Operator::S].empty()) {
std::string msg = "The combiner component does not support O_I or S ports.";
instance.error_shutdown(msg);
throw std::runtime_error(msg);
}
// Find connected F_INIT ports
std::vector<std::string> f_init_ports;
for (auto && port_name : ports[Operator::F_INIT]) {
if (instance.is_connected(port_name)) {
f_init_ports.push_back(port_name);
}
}
// sort them by name,
std::sort(f_init_ports.begin(), f_init_ports.end());
// and check that we have at least one input
if (f_init_ports.empty()) {
std::string msg = "The combiner actor requires at least one connected F_INIT port.";
instance.error_shutdown(msg);
throw std::runtime_error(msg);
}
while (instance.reuse_instance()) {
// 3. Receive on all connected F_INIT ports
std::vector<libmuscle::Message> input_messages;
for (auto const & port : f_init_ports) {
input_messages.push_back(instance.receive(port));
}
// 4. Combine the input and send on connected O_F ports
auto timestamp = input_messages[0].timestamp();
auto next_timestamp = input_messages[0].next_timestamp();
Data data = Data::nils(input_messages.size());
for (std::size_t i = 0; i < input_messages.size(); ++i) {
Data copy;
copy.reseat(input_messages[i].data());
data[i] = copy;
}
Message output(timestamp, next_timestamp, data);
for (auto const & port : ports[Operator::O_F]) {
instance.send(port, output);
}
}
return 0;
}