Threading in Backend¶
Threading in Python is abstracted in the backend, and there is a class called ThreadOutputWrapper() that can be used for threading-related tasks. It is instantiated with the following keyword arguments: target, args, kwargs, chunksize, warn_only, name, command_item, parent, and thread_type_class.
Here is a description of the parameters:
- target: This is a function object that will run in a thread.
- args: This is a list of arguments that will be passed to the threaded function. It can be a list of tuples, a list, or a set. For example, if there is a function
function_athat takes 3 arguments(arg_1, arg_2, arg_3), then args can be a list of tuples such as[(1,2,3), (4,5,6), (7,8,9)], andfunction_awill be called with these arguments, one tuple per function inside the thread. - kwargs: This holds a fill-value for
args. So, forfunction_aas thetargetthat takes three arguments, if we passargsas[(1, 2), (4, 5)]andkwargsasthird_value=50, likeThreadOutputWrapper(target=function_a, args=[(1, 2), (4, 5)], third_value=50), thenfunction_awill be called asfunction_a(1, 2, 50)andfunction_a(4, 5, 50). - chunksize: This refers to two things: the number of threads and the number of chunks of arguments. In the above example, the
chunksizeis 3 becauseargshas a length of 3. Ifargshas a larger length, it is better to specify an appropriatechunksize, which will also represent the number of threads that will be created. It is advisable to avoid having a large number of threads by passing the actualchunksizevalue. Usually, achunksizeof 20 works for largeargslists. - warn_only: This argument determines whether the program should stop or continue working on other threads when encountering an error.
- name: This is the name of the process that is running all the threads.
- command_item: When a thread is running for a command, we pass the command ID as
command_item. - parent: This is the command that can log into the database.
- thread_type_class: This is the class used to create the Python threads.
An example of using ThreadOutputWrapper :¶
Threading can speed up the process in cases such as fetching data from the API using a large number of API calls.
class Command(DateClientPromptCommand):
def handle(self, *args, **kwargs):
account = Accounts.objects.filter(active=True).first()
list_of_data = [
({'name':'name-1', 'score':10, 'api-token':'token-1'}),
({'name':'name-2', 'score':20, 'api-token':'token-2'}),
({'name':'name-3', 'score':30, 'api-token':'token-3'})
]
self.thread_safe_queue = Queue()
# Process the data using threads
ThreadOutputWrapper(
target=process_and_save_to_database_using_thread,
args = list_of_data,
chunksize=3,
command_item=self.command_item
)
# Save data to the database without using thread.
self.save_data()
# Defining a target function
def process_using_thread(self, data):
processed_data = process_data(name=data['name'], score=data['score'], token=data['api-token'])
self.thread_safe_queue.put(processed_data)
def save_data(self):
for processed_data in list(self.thread_safe_queue.queue):
ModelThatHoldsProcessedData.objects.create(data_1=process_data.data_1, data_2=process_data.data_2)
In the above example, the processing of data is done using threads. But the processed data is saved outside the thread in the save_data method. The arguments list will hold the data that can be passed to the threaded function. The chunksize is 3 because the length of args is three, which is not a large number, and it is appropriate to assign it to chunksize. The number of threads that will be created is also three. The target function is called inside the thread for each of the arguments present in list_of_data.
If there is a need to save data inside the thread, then ClientThreadOutputWrapper can be used.
ClientThreadOutputWrapper:¶
This is an extension of ThreadOutputWrapper, where the difference lies in the thread_type_class. Here, ClientCubedThread is the thread_type_class, which allows database operations from inside the thread. However, the first item in the argument list should always be an Account object, as the database connection can be established inside the thread.
The function that is threaded using ClientThreadOutputWrapper should reserve the first 2 parameters. The first parameter should be the connection, while the second parameter should be an account. The rest of the parameters can be the actual arguments required for the function.
An example of using ClientThreadOutputWrapper :¶
class Command(DateClientPromptCommand):
def handle(self, *args, **kwargs):
account = Accounts.objects.filter(active=True).first()
list_of_data = [
(account, {'name':'name-1', 'score':10, 'api-token':'token-1'}),
(account, {'name':'name-2', 'score':20, 'api-token':'token-2'}),
(account, {'name':'name-3', 'score':30, 'api-token':'token-3'})
]
# Process the data and save to the database using thread:
ClientThreadOutputWrapper(
target=self.process_and_save_to_database_using_thread,
args = list_of_data,
chunksize=3,
command_item=self.command_item
)
# Defining a target function
def process_and_save_to_database_using_thread(self, connection, account, data):
processed_data = process_data(name=data['name'], score=data['score'], token=data['api-token'])
ModelThatHoldsProcessedData.objects.create(data_1=process_data.data_1, data_2=process_data.data_2)
In the above example, process_and_save_to_database_using_thread is a function that will run inside the thread. It has connection and account as the first two arguments, and the last argument is the actual argument that the function needs. The process_data function returns the data that will be saved to the database while the function is running inside the thread.
In the command, the arguments are prepared with the first element of the list being an Account type. Then the ClientThreadOutputWrapper is instantiated, which will call the process_and_save_to_database_using_thread inside the thread, passing one item from the args at a time.
A detailed implementation can be found in backend/base/tests/test_clientthreadoutputwrapper.py