What is an Adapter?

An adapter is a program that can be run by the Formant agent on startup. The primary purpose of adapters is to communicate with the Formant plaform through the Agent SDK. This will allow you to ingest information if you are not running in a ROS environment, or information that is not available as a ROS topic.

How they are run

Adapters are managed in the "Adapters" section of the Settings menu in the Formant app.

2874

Where to find your adapters

Adding a new adapter allows you to automatically install and run that adapter on your device. For example, if we are provisioning many Nvidia Jetson based Jetbots, we can create an adapter for it and configure that adapter to be installed and run in our device configuration menu.

Adapters should be executable packages in a zip folder. The typical structure is to provide a python package with a shell script, like start.sh, that the agent can run on startup.

Example Adapter

Below, we'll look at the complete Jetbot Python adapter. See the source repository here for the full package.

Below, our adapter is publishing the following information:

  • Speed
  • Individual motor speeds
  • Location (Lat/Lng)
  • Battery charge percentage
  • Camera statistics (like capture rate, etc.)
  • Camera images
  • An event when it comes online

It also allows for Teleop control.

import sys
import time
import threading
import collections
from statistics import mean, stdev

from formant.sdk.agent.v1 import Client as FormantClient
import cv2

from jetbot import Robot, INA219


MAX_CHARGING_VOLTAGE = 12.6
MIN_CHARGING_VOLTAGE = 11.0
MAX_DISCHARGING_VOLTAGE = 12.1
MIN_DISCHARGING_VOLTAGE = 10.0
DEFAULT_MAX_SPEED = 0.7
DEFAULT_MIN_SPEED = 0.1
DEFAULT_START_SPEED = 0.1
DEFAULT_SPEED_DEADZONE = 0.25
DEFAULT_SPEED_INCREMENT = 0.025
DEFAULT_ANGULAR_REDUCTION = 0.50
DEFAULT_LATITUDE = 41.322937  # The pyramid of Enver Hoxha
DEFAULT_LONGITUDE = 19.820896
DEFAULT_GST_STRING = (
    "nvarguscamerasrc ! "
    "video/x-raw(memory:NVMM), width=(int)640, height=(int)480, format=(string)NV12, framerate=(fraction)30/1 ! "
    "nvvidconv ! "
    "video/x-raw, width=(int)640, height=(int)480, format=(string)BGRx ! "
    "videoconvert ! "
    "appsink "
)


class FormantJetBotAdapter:
    def __init__(self):
        print("INFO: Starting Formant JetBot Adapter")

        # Set global params
        self.max_speed = DEFAULT_MAX_SPEED
        self.min_speed = DEFAULT_MIN_SPEED
        self.speed_deadzone = DEFAULT_SPEED_DEADZONE
        self.speed_increment = DEFAULT_SPEED_INCREMENT
        self.angular_reduction = DEFAULT_ANGULAR_REDUCTION
        self.latitude = DEFAULT_LATITUDE
        self.longitude = DEFAULT_LONGITUDE
        self.gst_string = DEFAULT_GST_STRING
        self.start_speed = DEFAULT_START_SPEED
        self.speed = self.start_speed
        self.is_shutdown = False

        # Store frame rate information to publish
        self.camera_width = 0
        self.camera_height = 0
        self.camera_frame_timestamps = collections.deque([], maxlen=100)
        self.camera_frame_sizes = collections.deque([], maxlen=100)

        # Create clients
        self.robot = Robot()
        self.ina219 = INA219(addr=0x41)
        self.fclient = FormantClient(ignore_throttled=True, ignore_unavailable=True)

        self.update_from_app_config()
        self.publish_online_event()

        self.fclient.register_command_request_callback(self.handle_command_request)

        self.fclient.register_teleop_callback(
            self.handle_teleop, ["Joystick", "Buttons"]
        )

        print("INFO: Starting speed thread")
        threading.Thread(target=self.publish_speed, daemon=True).start()

        print("INFO: Starting motor states thread")
        threading.Thread(target=self.publish_motor_states, daemon=True).start()

        print("INFO: Starting location thread")
        threading.Thread(target=self.publish_location, daemon=True).start()

        print("INFO: Starting battery state thread")
        threading.Thread(target=self.publish_battery_state, daemon=True).start()

        print("INFO: Starting camera stats thread")
        threading.Thread(target=self.publish_camera_stats, daemon=True).start()

        # Start the camera feed
        self.publish_camera_feed()

    def __del__(self):
        self.is_shutdown = True

    def publish_speed(self):
        while not self.is_shutdown:
            # self.fclient.post_numeric("speed", self.speed)
            self.fclient.post_numericset(
                "Speed", {"speed": (self.speed + self.speed_deadzone, "m/s")},
            )
            time.sleep(1.0)

    def publish_motor_states(self):
        while not self.is_shutdown:
            self.fclient.post_numeric(
                "Motor Speed", self.robot.left_motor.value, {"value": "left"}
            )
            self.fclient.post_numeric(
                "Motor Speed", self.robot.right_motor.value, {"value": "right"}
            )
            time.sleep(0.5)

    def publish_location(self):
        while not self.is_shutdown:
            self.fclient.post_geolocation("Location", self.latitude, self.longitude)
            time.sleep(10.0)

    def publish_battery_state(self):
        while not self.is_shutdown:
            bus_voltage = self.ina219.getBusVoltage_V()
            shunt_voltage = self.ina219.getShuntVoltage_mV() / 1000
            current = self.ina219.getCurrent_mA() / 1000

            charging = False
            if shunt_voltage > 0.01 and current > 0.01:
                charging = True

            # approximate battery charge percentage calibration
            now = bus_voltage - MIN_DISCHARGING_VOLTAGE
            full = MAX_DISCHARGING_VOLTAGE - MIN_DISCHARGING_VOLTAGE
            charge_percentage = now / full * 100
            if charging:
                now = bus_voltage - MIN_CHARGING_VOLTAGE
                full = MAX_CHARGING_VOLTAGE - MIN_CHARGING_VOLTAGE
                charge_percentage = now / full * 100

            if charge_percentage >= 100:
                charge_percentage = 100

            self.fclient.post_battery(
                "Battery State", charge_percentage, voltage=bus_voltage, current=current
            )

            self.fclient.post_bitset(
                "Battery Charging", {"charging": charging, "discharging": not charging}
            )

            time.sleep(1.0)

    def publish_camera_stats(self):
        while not self.is_shutdown:
            try:
                timestamps = list(self.camera_frame_timestamps)
                sizes = list(self.camera_frame_sizes)
            except:
                print("ERROR: deque mutated while iterating")
                pass

            length = len(timestamps)
            if length > 2:
                size_mean = mean(sizes)
                size_stdev = stdev(sizes)
                jitter = self.calculate_jitter(timestamps)
                oldest = timestamps[0]
                newest = timestamps[-1]
                diff = newest - oldest
                if diff > 0:
                    hz = length / diff
                    self.fclient.post_numericset(
                        "Camera Statistics",
                        {
                            "Rate": (hz, "Hz"),
                            "Mean Size": (size_mean, "bytes"),
                            "Std Dev": (size_stdev, "bytes"),
                            "Mean Jitter": (jitter, "ms"),
                            "Width": (self.camera_width, "pixels"),
                            "Height": (self.camera_height, "pixels"),
                        },
                    )
            time.sleep(5.0)

    def publish_camera_feed(self):
        cap = cv2.VideoCapture(self.gst_string, cv2.CAP_GSTREAMER)
        if cap is None:
            print("ERROR: Could not read from video capture source.")
            sys.exit()

        while not self.is_shutdown:
            _, image = cap.read()

            try:
                encoded = cv2.imencode(".jpg", image)[1].tostring()
                self.fclient.post_image("Camera", encoded)

                # Track stats for publishing
                self.camera_frame_timestamps.append(time.time())
                self.camera_frame_sizes.append(len(encoded) * 3 / 4)
                self.camera_width = image.shape[1]
                self.camera_height = image.shape[0]
            except:
                print("ERROR: encoding failed")

    def publish_online_event(self):
        try:
            commit_hash_file = (
                "/home/jetbot/formant-jetbot-adapter/.git/refs/heads/main"
            )
            with open(commit_hash_file) as f:
                commit_hash = f.read()
        except Exception:
            print(
                "ERROR: formant-jetbot-adapter repo must be installed at "
                "/home/jetbot/formant-jetbot-adapter to receive online event"
            )

        self.fclient.create_event(
            "Formant JetBot adapter online",
            notify=False,
            tags={"hash": commit_hash.strip()},
        )

    def update_from_app_config(self):
        print("INFO: Updating configuration ...")
        self.max_speed = float(
            self.fclient.get_app_config("max_speed", DEFAULT_MAX_SPEED)
        )
        self.min_speed = float(
            self.fclient.get_app_config("min_speed", DEFAULT_MIN_SPEED)
        )
        self.speed_deadzone = float(
            self.fclient.get_app_config("speed_deadzone", DEFAULT_SPEED_DEADZONE)
        )
        self.speed_increment = float(
            self.fclient.get_app_config("speed_increment", DEFAULT_SPEED_INCREMENT)
        )
        self.angular_reduction = float(
            self.fclient.get_app_config("angular_reduction", DEFAULT_ANGULAR_REDUCTION)
        )
        self.latitude = float(
            self.fclient.get_app_config("latitude", DEFAULT_LATITUDE)
        )
        self.longitude = float(
            self.fclient.get_app_config("longitude", DEFAULT_LONGITUDE)
        )
        self.gst_string = self.fclient.get_app_config("gst_string", DEFAULT_GST_STRING)
        self.start_speed = float(
            self.fclient.get_app_config("start_speed", DEFAULT_START_SPEED)
        )

    def handle_command_request(self, request):
        if request.command == "jetbot.nudge_forward":
            self._handle_nudge_forward()
            self.fclient.send_command_response(request.id, success=True)
        elif request.command == "jetbot.nudge_backward":
            self._handle_nudge_backward()
            self.fclient.send_command_response(request.id, success=True)
        elif request.command == "jetbot.update_config":
            self.update_from_app_config()
            self.fclient.send_command_response(request.id, success=True)
        else:
            self.fclient.send_command_response(request.id, success=False)

    def handle_teleop(self, control_datapoint):
        if control_datapoint.stream == "Joystick":
            self.handle_joystick(control_datapoint)
        elif control_datapoint.stream == "Buttons":
            self.handle_buttons(control_datapoint)

    def handle_joystick(self, joystick):
        left_motor_value = 0.0
        right_motor_value = 0.0

        # Add contributions from the joysticks
        # TODO: turn this into a circle, not a square
        left_motor_value += (
            self.speed * joystick.twist.angular.z * self.angular_reduction
        )
        right_motor_value += (
            -self.speed * joystick.twist.angular.z * self.angular_reduction
        )

        left_motor_value += self.speed * joystick.twist.linear.x
        right_motor_value += self.speed * joystick.twist.linear.x

        # Improve the deadzone
        if left_motor_value > 0:
            left_motor_value += self.speed_deadzone
        elif left_motor_value < 0:
            left_motor_value -= self.speed_deadzone

        if right_motor_value > 0:
            right_motor_value += self.speed_deadzone
        elif right_motor_value < 0:
            right_motor_value -= self.speed_deadzone

        # Set the motor values
        self.robot.left_motor.value = left_motor_value
        self.robot.right_motor.value = right_motor_value

    def handle_buttons(self, _):
        if _.bitset.bits[0].key == "nudge forward":
            self._handle_nudge_forward()
        elif _.bitset.bits[0].key == "nudge backward":
            self._handle_nudge_backward()
        elif _.bitset.bits[0].key == "speed +":
            self._handle_increase_speed()
        elif _.bitset.bits[0].key == "speed -":
            self._handle_decrease_speed()

    def _handle_nudge_forward(self):
        self.fclient.post_text("Commands", "nudge forward")
        self.robot.forward(self.speed + self.speed_deadzone)
        time.sleep(0.5)
        self.robot.stop()

    def _handle_nudge_backward(self):
        self.fclient.post_text("Commands", "nudge backward")
        self.robot.backward(self.speed + self.speed_deadzone)
        time.sleep(0.5)
        self.robot.stop()

    def _handle_increase_speed(self):
        self.fclient.post_text("Commands", "increase speed")
        if self.speed + self.speed_increment <= self.max_speed:
            self.speed += self.speed_increment
        else:
            self.speed = self.max_speed

    def _handle_decrease_speed(self):
        self.fclient.post_text("Commands", "decrease speed")
        if self.speed - self.speed_increment >= self.min_speed:
            self.speed -= self.speed_increment
        else:
            self.speed = self.min_speed

    def calculate_jitter(self, timestamps):
        length = len(self.camera_frame_timestamps)
        oldest = self.camera_frame_timestamps[0]
        newest = self.camera_frame_timestamps[-1]
        step_value = (newest - oldest) / length

        # Make a list of the difference between the expected and actual step sizes
        jitters = []
        for n in range(length - 1):
            if n > 0:
                jitter = abs((timestamps[n] - timestamps[n - 1]) - step_value)
                jitters.append(jitter)

        return mean(jitters)


if __name__ == "__main__":
    adapter = FormantJetBotAdapter()

Formant Agent SDK Guide

Head here for a full guide on the Formant Agent SDK