Skip to content

Threading in Backend

Threading in Python is abstracted in the backend, and there is a class called ThreadOutputWrapper() that can be used for threading-related tasks. It is instantiated with the following keyword arguments: target, args, kwargs, chunksize, warn_only, name, command_item, parent, and thread_type_class.

Here is a description of the parameters:

  • target: This is a function object that will run in a thread.
  • args: This is a list of arguments that will be passed to the threaded function. It can be a list of tuples, a list, or a set. For example, if there is a function function_a that takes 3 arguments (arg_1, arg_2, arg_3), then args can be a list of tuples such as [(1,2,3), (4,5,6), (7,8,9)], and function_a will be called with these arguments, one tuple per function inside the thread.
  • kwargs: This holds a fill-value for args. So, for function_a as the target that takes three arguments, if we pass args as [(1, 2), (4, 5)] and kwargs as third_value=50, like ThreadOutputWrapper(target=function_a, args=[(1, 2), (4, 5)], third_value=50), then function_a will be called as function_a(1, 2, 50) and function_a(4, 5, 50).
  • chunksize: This refers to two things: the number of threads and the number of chunks of arguments. In the above example, the chunksize is 3 because args has a length of 3. If args has a larger length, it is better to specify an appropriate chunksize, which will also represent the number of threads that will be created. It is advisable to avoid having a large number of threads by passing the actual chunksize value. Usually, a chunksize of 20 works for large args lists.
  • warn_only: This argument determines whether the program should stop or continue working on other threads when encountering an error.
  • name: This is the name of the process that is running all the threads.
  • command_item: When a thread is running for a command, we pass the command ID as command_item.
  • parent: This is the command that can log into the database.
  • thread_type_class: This is the class used to create the Python threads.

An example of using ThreadOutputWrapper :

Threading can speed up the process in cases such as fetching data from the API using a large number of API calls.

class Command(DateClientPromptCommand):

    def handle(self, *args, **kwargs):
        account = Accounts.objects.filter(active=True).first()
        list_of_data = [
            ({'name':'name-1', 'score':10, 'api-token':'token-1'}),
            ({'name':'name-2', 'score':20, 'api-token':'token-2'}),
            ({'name':'name-3', 'score':30, 'api-token':'token-3'})
        ]

        self.thread_safe_queue = Queue()

        # Process the data using threads
        ThreadOutputWrapper(
            target=process_and_save_to_database_using_thread,
            args = list_of_data,
            chunksize=3,
            command_item=self.command_item
        )

        # Save data to the database without using thread.
        self.save_data()

    # Defining a target function
    def process_using_thread(self, data):
        processed_data = process_data(name=data['name'], score=data['score'], token=data['api-token'])
        self.thread_safe_queue.put(processed_data)

    def save_data(self):
        for processed_data in list(self.thread_safe_queue.queue):
            ModelThatHoldsProcessedData.objects.create(data_1=process_data.data_1, data_2=process_data.data_2)

In the above example, the processing of data is done using threads. But the processed data is saved outside the thread in the save_data method. The arguments list will hold the data that can be passed to the threaded function. The chunksize is 3 because the length of args is three, which is not a large number, and it is appropriate to assign it to chunksize. The number of threads that will be created is also three. The target function is called inside the thread for each of the arguments present in list_of_data.

If there is a need to save data inside the thread, then ClientThreadOutputWrapper can be used.

ClientThreadOutputWrapper:

This is an extension of ThreadOutputWrapper, where the difference lies in the thread_type_class. Here, ClientCubedThread is the thread_type_class, which allows database operations from inside the thread. However, the first item in the argument list should always be an Account object, as the database connection can be established inside the thread.

The function that is threaded using ClientThreadOutputWrapper should reserve the first 2 parameters. The first parameter should be the connection, while the second parameter should be an account. The rest of the parameters can be the actual arguments required for the function.

An example of using ClientThreadOutputWrapper :

class Command(DateClientPromptCommand):

    def handle(self, *args, **kwargs):
        account = Accounts.objects.filter(active=True).first()
        list_of_data = [
            (account, {'name':'name-1', 'score':10, 'api-token':'token-1'}),
            (account, {'name':'name-2', 'score':20, 'api-token':'token-2'}),
            (account, {'name':'name-3', 'score':30, 'api-token':'token-3'})
        ]

        # Process the data and save to the database using thread:
        ClientThreadOutputWrapper(
            target=self.process_and_save_to_database_using_thread,
            args = list_of_data,
            chunksize=3,
            command_item=self.command_item
        )

    # Defining a target function
    def process_and_save_to_database_using_thread(self, connection, account, data):
        processed_data = process_data(name=data['name'], score=data['score'], token=data['api-token'])
        ModelThatHoldsProcessedData.objects.create(data_1=process_data.data_1, data_2=process_data.data_2)

In the above example, process_and_save_to_database_using_thread is a function that will run inside the thread. It has connection and account as the first two arguments, and the last argument is the actual argument that the function needs. The process_data function returns the data that will be saved to the database while the function is running inside the thread.

In the command, the arguments are prepared with the first element of the list being an Account type. Then the ClientThreadOutputWrapper is instantiated, which will call the process_and_save_to_database_using_thread inside the thread, passing one item from the args at a time.

A detailed implementation can be found in backend/base/tests/test_clientthreadoutputwrapper.py