Skip to content

Cron Agent

This agent is in charge of actually executing charge of actually executing runs. Specifically, this agent is tasked with executing Django management commands in separate processes, usually ones defined in update_agg_pct_batch.

Running the agent

The agent can be run as a standard Django management command:

sudo python3 manage.py command_agent_cron

Running continuously

By default, this will run up to the max amount of concurrent processes (as configured by --concurrency, the default is 5) and then exit. If you want to run it continuously, add the --forever flag.

This is how the agent is run in production

SystemD

In production, this agent is setup to run via systemd. You can find the agent setup on cubed-command-a and cubed-command-b.

To check the status of the agent, you can run this command:

sudo systemctl status command-agent-cron

Internals

Process list

The agent keeps a list of processes in memory that it has spawned based on available runs.

It continuously polls these processes until they return a status code. Should the return code be 0, it updates the runs status to be Success, otherwise it updates it to be Failed.

Parallel flag

This works the same way as the --parallel flag available on the ClientCommand class used by most cron commands.

Graceful shutdown

As the list of processes is kept in memory, it is lost when the agent is restarted (such as on deployment) or otherwise crashes.

The engine can recover from this (see the Timeout status), however an improvement would be to store this list on disk or in Redis so the agent can re-use it when restarting.

Multiple versions

The intention is for multiple versions of this agent to be running at one time (to spread the workload over multiple servers).

To prevent these versions from trying to execute the same run at the same time, the Django Queryset method select_for_update is used when selecting the next run:

CommandRun.objects \
    .order_by('waiting_at') \
    .filter(slot__definition__source=CommandDefinitionSource.Cron) \
    .filter(status=CommandRunStatus.Waiting) \
    .select_for_update(skip_locked=True) \
    .first()