Managing datapoint throttling

How throttling works

To understand how throttling works, visit How telemetry streams work and see the section titled Mechanism #2: Throttling. Recall that throttling occurs when datapoints are posted at a rate greater than the configured "Stream rate" for a telemetry stream.

Catching throttled datapoints with the Agent SDK

When a datapoint is posted using the Client object, and it is throttled, the method which posts the datapoint will throw a Throttled exception.

🚧

Note: Throttled only means telemetry datapoints are dropped - it does not affect teleop data streaming.

When posting a datapoint to a stream that is configured in teleop, the Throttled exception does not apply to the live, high frequency teleop stream. It only refers to dropping of data from the telemetry ingestion pipeline.

This can be caught and handled by wrapping the call to a post method (such as post_text, post_numeric, post_json ...) with a try / except clause.

from formant.sdk.agent.v1.exceptions import Throttled
try:
  fclient.post_text("example.text", "[INFO] In progress...")
except Throttled:
  print("Datapoint was not throttled.")
try:
  fclient.post_text("example.text", "[INFO] Still going ...")
except Throttled:
  print("Datapoint was throttled.")

Ignoring Throttled exceptions with the Formant Agent SDK

Set the keyword argument value of ignore_throttled to True when instantiating a Client to ignore Throttled exceptions. The default value is False, which means that Throttled exceptions will be thrown by default.

from formant.sdk.agent.v1 import Client as FormantClient
fclient = FormantClient(ignore_throttled=True)

Ignoring Throttled exceptions can serve to make code cleaner and easier to write if you are sure no data will be throttled, or that it doesn't matter that datapoints are being throttled and dropped.

Example

Try running and playing with the following example to understand how it works.

import logging
from formant.sdk.agent.v1 import Client as FormantClient
from formant.sdk.agent.v1.exceptions import Throttled
import time


def ingest_with_exception_handling():
    fclient = FormantClient(enable_logging=False)

    print("Sending datapoints at a rate higher than the default stream throttle rate.")
    print("Catching all throttled exceptions.")
    count = 0
    for _ in range(50):
        time.sleep(0.1)
        try:
            fclient.post_numeric("example.numeric", _)
        except Throttled:
            count += 1
            print("Throttled " + str(count) + " datapoints", end="\r")
    print("\nComplete.\n")


def ingest_with_ignore_exceptions():
    fclient = FormantClient(ignore_throttled=True, enable_logging=False)

    print("Sending datapoints at a rate higher than the default stream throttle rate.")
    print("Ignoring all throttled exceptions.")
    for _ in range(50):
        time.sleep(0.1)
        fclient.post_numeric("example.numeric", _)
    print("Complete.\n")


if __name__ == "__main__":
    ingest_with_exception_handling()
    ingest_with_ignore_exceptions()

👋

If you notice an issue with this page or need help, please reach out to us! Use the 'Did this page help you?' buttons below, or get in contact with our Customer Success team via the Intercom messenger in the bottom-right corner of this page, or at [email protected].