Skip to content

SQS

Each SQS queue follows the naming scheme visscore_<queue_type>_<account_token>.

Queue Type

There are three types of Queue Type, Main, Retry, and Test. Main and Retry are replacements for the buffer system that the DCS uses, currently the Visscore Service only uses the Type Main.

The first stage of this process is setting the server.accounts.can_poll, Worker Class, Worker ID variables.

Note

The Visscore Service uses 10 main workers and 5 retry workers, for example the names for the first two main workers would be worker.main.0 and worker.main.1

The worker class and id are set in vissore.Server

1
2
self.main_workers = [self.clsWorker('worker.main.%d' % i, self, self.main_buffer, self.retry_buffer) 
    for i in range(0, int(self.config['server']['main_workers']))]

And then decompiled in vissore.Worker

1
worker_class, worker_id = self.name.split('.')[-2:]

The can_poll Dict is set on the init of accounts.BaseSingleAccountList self.can_poll = {'main' : False, 'retry' : False}

Receiving Messages

The flow of receiving a message is as follows:

    if self.can_poll():
        try:
            message = self.open_envelope(self.receive_messages(queue))
            if message:
                if int(worker_id) == 0:
                    self.allow_polling(True, worker_class)

                // turn message into visit and return //

            self.allow_polling(False, worker_class)

Note

If a message is not received then we can assume that we either have too many DPS Visscore boxes processing the SQS Queue - meaning we either need to ask Positive Internet to drop 1 of the boxes, or the account is just having a quite period of traffic on its account.

To save us API call costs, not receiving a message will trigger self.allow_polling(False, worker_class) to be called. This means only the master worker can poll the queue until the next message is received, saving up to 90% of wasted api calls.

can_poll()

The 0th worker for each type will always be allowed to poll. Each other worker will only be able to poll for messages if can_poll has been set to true. can_poll is updated from allow_polling()

    if int(worker_id) == 0:
        return True
    else:
        if self.server.accounts.can_poll[worker_class]:
            return True
        else:
            return False

receive_messages()

To save costs on SQS the DPS uses Long Polling, this attempts to limit the amount of API calls to SQS the system makes. Long Polling means each worker will listen to the queue for 20 seconds and wait for a message, after 20 seconds if a message has not been returned we return none

    def receive_messages(self, queue):
        return queue.receive_messages(VisibilityTimeout=10, MaxNumberOfMessages=1, WaitTimeSeconds=20)

allow_polling()

This function is called to turn polling on or off for all none master workers in the class that was passed in.

    def allow_polling(self, allowed, worker_class):
        self.server.accounts.can_poll[worker_class] = allowed

Format of Messages.

    {
        "aid": "c-a-cubed-uk",
        "sid": "<token>",
        "created": "<timestamp hit was serialized into an instance of a Visit class on the DCS>"
    }