Create custom commands

Custom commands are for use cases that cannot be solved by the built-in commands. It could be a special request to your robot or to a service on your system. Some examples are restart services, stop services, return to dock, go on a mission, or ping a specific sensor.

📘

Info

A custom command is only available to be run on the specific devices that this command definition is available. Hence, make sure that the command definition is copied or moved to the devices that you want to run the command on.

The client application uses the Agent API to read and respond to commands. This help page explains the process of creating a new custom command.

Use GetCommandRequestStream on the Formant agent gRPC API to retrieve an infinite stream of all "command requests" issued to the device.

Here is a basic example of handling arbitrary commands issued to a device. All this example does is print the received "command request" object. This example assumes the formant python module is installed in your python environment.

import grpc
from formant.protos.agent.v1 import agent_pb2, agent_pb2_grpc

channel = grpc.insecure_channel("localhost:5501")
agent = agent_pb2_grpc.AgentStub(channel)

stream_request = agent_pb2.GetCommandRequestStreamRequest(command_filter=[])

for r in agent.GetCommandRequestStream(stream_request):
    print("Received command request:\n%s" % r.request)

This example handler will receive all commands issued to the device. The command stream will continue to be processed until the for loop is exited.

A filtered command request stream can be fetched by passing a "command filter" to the API call. Here is another, more in-depth example of running health checks on local sensor and motor addresses using commands. In the first example, we used the GetCommandRequestStreamRequest API to retrieve an infinite stream, and in this example, we will use the GetCommandRequestRequest and GetCommandRequest APIs to fetch new commands periodically.

import subprocess
import time

import grpc
from formant.protos.agent.v1 import agent_pb2, agent_pb2_grpc
from formant.protos.model.v1.commands_pb2 import CommandResponse
from formant.protos.model.v1.datapoint_pb2 import Datapoint
from formant.protos.model.v1.text_pb2 import Text

channel = grpc.insecure_channel("localhost:5501")
agent = agent_pb2_grpc.AgentStub(channel)


def ping(address):
    # check if the address responds within 0.1 seconds
    shell_command = ["timeout", "0.1", "ping", "-c", "1", address]
    try:
        subprocess.check_output(shell_command)
        return True
    except (OSError, subprocess.CalledProcessError):
        return False


SENSOR_ADDRESSES = ["192.168.1.28"]
MOTOR_ADDRESSES = ["192.168.30.90", "192.168.30.91", "192.168.30.92", "192.168.30.93"]

command_request_request = agent_pb2.GetCommandRequestRequest(
    command_filter=["sensor_check", "motor_check"]
)

if __name__ == "__main__":
    while True:
        command_request = agent.GetCommandRequest(command_request_request).request
        command = command_request.command
        command_request_id = command_request.id

        if command == "sensor_check":
            addresses = SENSOR_ADDRESSES
        elif command == "motor_check":
            addresses = MOTOR_ADDRESSES
        else:
            continue

        success = not (False in list(map(ping, addresses)))

        if success:
            message = "Health check succeeded"
        else:
            message = "Health check failed"

        datapoint = Datapoint(
            stream=command,
            text=Text(value=message),
            timestamp=int(time.time() * 1000.0),
        )

        command_response = CommandResponse(
            request_id=command_request_id, success=True, datapoint=datapoint
        )
        command_response = agent_pb2.SendCommandResponseRequest(
            response=command_response
        )
        agent.SendCommandResponse(command_response)

        time.sleep(0.05)

Once the command processor is written and added on the device, it should be configured on the web application. The command definition is used to associate a command with program logic. The command definition used in the client application must match the command definition used in-app.