Cannot access directory watch files

Issue details

  • When I try to download a log file from my on-demand stream, I get an access denied error

Troubleshooting steps

This can happen if the file is moved or deleted between adding the file to the on-demand buffer and ingestion to Formant. This is because the on-demand buffer only retains a reference to the file on disk, so if the file is deleted, the Formant agent will not have access to the actual data.

To resolve this, be sure that the files in the directory you are watching are not moved or deleted between adding to the on-demand buffer and ingestion.

Alternatively, you can use the following script to ingest the entire file to the on-demand buffer, instead of just a file reference. Note that this will increase the memory footprint of the file in your on-demand buffer.

from typing import Dict
from formant.sdk.agent.v1 import Client
import os
import time
from formant.protos.model.v1 import datapoint_pb2, file_pb2
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler, FileSystemEvent
class DirectoryWatcher:
    def __init__(
        self,
        path: str,
        pattern: str,
        stream_name: str,
        client: Client,
        tags: Dict[str, str] = {},
    ):
        self.handler = FormantUploadHandler(path, pattern, stream_name, client, tags)
        self.observer = Observer()
        self.observer.schedule(self.handler, path, recursive=True)
    def start(self):
        self.observer.start()
    def stop(self):
        self.observer.stop()
    def __del__(self):
        self.stop()
class FormantUploadHandler(PatternMatchingEventHandler):
    def __init__(
        self, path: str, pattern: str, stream_name: str, client: Client, tags={}
    ):
        super().__init__(patterns=[pattern])
        self.client = client
        self.path = path
        self.stream_name = stream_name
        self.tags = tags
    def on_created(self, event: FileSystemEvent):
        if event.is_directory:
            return
        timestamp = int(time.time() * 1000)
        with open(event.src_path, "rb") as f:
            raw = f.read()
            self.client.post_data(
                self.stream_name,
                datapoint_pb2.Datapoint(
                    stream=self.stream_name,
                    file=file_pb2.File(
                        raw=raw, filename=os.path.basename(event.src_path)
                    ),
                    tags=self.tags,
                    timestamp=timestamp,
                ),
            )
    def on_modified(self, event):
        pass
    def on_moved(self, event):
        pass
    def on_deleted(self, event):
        pass

See also

👋

If you notice an issue with this page or need help, please reach out to us! Use the 'Did this page help you?' buttons below, or get in contact with our Customer Success team via the Intercom messenger in the bottom-right corner of this page, or at [email protected].