Handling commands

Registering a command request callback

The Agent SDK can receive and handle commands.

def handle_command_request(request):
  print(request.command)

fclient.register_command_request_callback(
        handle_command_request, command_filter=["sensor_check", "send_on_route"]
)

Configure commands in Settings->Commands.

372372

Settings -> Commands

The "Command" field in Formant must match request.command in the command_filter

867867
864864

Settings for the "Sensor check" and "Send on route" commands

Multiple commands can be handled by the same callback. Different commands are differentiated by different request.command values.

Registering multiple callbacks

📘

Single-receiver behavior

If there are many callbacks registered to the same agent, such as when multiple applications are running, only one callback will receive any given command request.

This is opposed to how handling teleop command streams works.

Example application

Read and play with this example application to understand how to handle commands with the Formant Agent SDK.

import subprocess
import time

from formant.sdk.agent.v1 import Client as FormantClient

SENSOR_ADDRESSES = ["192.168.1.28, 192.168.1.29"]
MOTOR_ADDRESSES = ["192.168.30.90", "192.168.30.91", "192.168.30.92", "192.168.30.93"]

fclient = FormantClient()


def ping(address):
    # check if the address responds within 0.1 seconds
    shell_command = ["timeout", "0.1", "ping", "-c", "1", address]
    try:
        subprocess.check_output(shell_command)
        return True
    except (OSError, subprocess.CalledProcessError):
        return False


def handle_command_request(request):
    if request.command == "sensor_check":
        addresses = SENSOR_ADDRESSES
    elif request.command == "motor_check":
        addresses = MOTOR_ADDRESSES
    else:
        return

    if request.files is not None:
        for file in request.files:
            # example of reading files from a command
            print(
                "Command sent with file "
                + file["name"]
                + " accessible via "
                + file["url"]
            )

    success = not (False in list(map(ping, addresses)))
    fclient.send_command_response(request.id, success=success)


if __name__ == "__main__":
    fclient.register_command_request_callback(
        handle_command_request, command_filter=["sensor_check", "motor_check"]
    )

    # idly spin while the command request callback listens for commands
    while True:
        time.sleep(0.1)