Skip to content

Threaded Commmands

Brief

Some commands are executed using our threading wrappers, which allows functions to run in seperate threads in the Cubed command system. Note that conductor decides whether a command has failed or not based on the exit code of the command, see Engine Steps. If the exit code is 0, the command is considered successful, otherwise it is considered failed. Our threading wrappers swallow exceptions in their threads, and log them to the database.

This means that if a command fails in a thread, the command will still be considered successful by conductor. This is because the exit code of the command is 0, and conductor does not know that the command has failed. This is a problem, because the command has actually failed, and we need to know that it has failed. To solve this problem, we have implemented a system where the exit code of the command is set to 1 if the command fails in a thread. This means that conductor will know that the command has failed, and will retry it if necessary.

How it works

We have implemented a mixin called ThreadedCommandExceptionMixin which captures all exceptions stored within the threading class. After completion the mixin logs any failures to the console, and raises a custom exception called CubedThreadExceptionInCommand which is caught by conductor, and flags the command as failed.

How to use it

To use this mixin, simply add it to the command class you want to use it with.

example.py

from client.commands_mixin import ThreadedCommandExceptionMixin

class Command(ThreadedCommandExceptionMixin, DateClientPromptCommand):
    name = 'my_command'

    def handle_client_prompt(self, *args, **kwargs):

        # business logic

        thread_exceptions = []

        thread_object = ThreadOutputWrapper(
            target=self.my_threaded_function,
            args=[(1, 2), (3, 4)],
            chunksize=2,
            command_item=self.command_item
        )

        thread_exceptions.extend(thread_object.exceptions)

        # clean up

        self.handle_thread_exceptions(thread_exceptions)

ThreadedCommandExceptionMixin

class ThreadedCommandExceptionMixin:
    def handle_thread_error(self, exceptions):
        """
        This method raises error if an exception was seen in any
        thread.
        """
        if exceptions:
            self.print_failure(f"Threaded functions had {len(exceptions)} exceptions.")
            for exception in exceptions:
                self.print_failure(f"Exception: {exception}")
            raise CubedThreadExceptionInCommand("Error Occurred in one of the Threaded Functions.")

A detailed implementation can be found in backend/client/management/commands/pull_semrush_project_information.py