Create custom commands
Custom commands are for use cases that cannot be solved by the built-in commands. It could be a special request to your robot or to a service on your system. Some examples are restart services, stop services, return to dock, go on a mission, or ping a specific sensor.
Info
A custom command is only available to be run on the specific devices that this command definition is available. Hence, make sure that the command definition is copied or moved to the devices that you want to run the command on.
The client application uses the Agent API to read and respond to commands. This help page explains the process of creating a new custom command.
Use GetCommandRequestStream on the Formant agent gRPC API to retrieve an infinite stream of all "command requests" issued to the device.
Here is a basic example of handling arbitrary commands issued to a device. All this example does is print the received "command request" object. This example assumes the formant python module is installed in your python environment.
import grpc
from formant.protos.agent.v1 import agent_pb2, agent_pb2_grpc
channel = grpc.insecure_channel("localhost:5501")
agent = agent_pb2_grpc.AgentStub(channel)
stream_request = agent_pb2.GetCommandRequestStreamRequest(command_filter=[])
for r in agent.GetCommandRequestStream(stream_request):
print("Received command request:\n%s" % r.request)
This example handler will receive all commands issued to the device. The command stream will continue to be processed until the for loop is exited.
A filtered command request stream can be fetched by passing a "command filter" to the API call. Here is another, more in-depth example of running health checks on local sensor and motor addresses using commands. In the first example, we used the GetCommandRequestStreamRequest API to retrieve an infinite stream, and in this example, we will use the GetCommandRequestRequest and GetCommandRequest APIs to fetch new commands periodically.
import subprocess
import time
import grpc
from formant.protos.agent.v1 import agent_pb2, agent_pb2_grpc
from formant.protos.model.v1.commands_pb2 import CommandResponse
from formant.protos.model.v1.datapoint_pb2 import Datapoint
from formant.protos.model.v1.text_pb2 import Text
channel = grpc.insecure_channel("localhost:5501")
agent = agent_pb2_grpc.AgentStub(channel)
def ping(address):
# check if the address responds within 0.1 seconds
shell_command = ["timeout", "0.1", "ping", "-c", "1", address]
try:
subprocess.check_output(shell_command)
return True
except (OSError, subprocess.CalledProcessError):
return False
SENSOR_ADDRESSES = ["192.168.1.28"]
MOTOR_ADDRESSES = ["192.168.30.90", "192.168.30.91", "192.168.30.92", "192.168.30.93"]
command_request_request = agent_pb2.GetCommandRequestRequest(
command_filter=["sensor_check", "motor_check"]
)
if __name__ == "__main__":
while True:
command_request = agent.GetCommandRequest(command_request_request).request
command = command_request.command
command_request_id = command_request.id
if command == "sensor_check":
addresses = SENSOR_ADDRESSES
elif command == "motor_check":
addresses = MOTOR_ADDRESSES
else:
continue
success = not (False in list(map(ping, addresses)))
if success:
message = "Health check succeeded"
else:
message = "Health check failed"
datapoint = Datapoint(
stream=command,
text=Text(value=message),
timestamp=int(time.time() * 1000.0),
)
command_response = CommandResponse(
request_id=command_request_id, success=True, datapoint=datapoint
)
command_response = agent_pb2.SendCommandResponseRequest(
response=command_response
)
agent.SendCommandResponse(command_response)
time.sleep(0.05)
Once the command processor is written and added on the device, it should be configured on the web application. The command definition is used to associate a command with program logic. The command definition used in the client application must match the command definition used in-app.
Updated over 1 year ago