Skip to content

server.py

The server is the main class that is initialized from main.py on server boot. Part of the server's init() function is to spin up n Workers and n RetryWorkers. The server will then begin all gevent loops, and start loading all account data needed for visitor collection and processing.

init()

The init function takes an argument of config which will set various settings for the server such as buffer size, number of workers, machine code and geoip_reader. It will also create an instance of MetricGroup, create Workers for both the main and retry buffer and finally create instances of StatsRunner, AlertRunner and MetricRunner.

def __init__(self, config):
        logging.debug('server config %s')
        self.config = config

        self.main_buffer_size = int(config['server']['main_buffer_size'])
        self.retry_buffer_size = int(config['server']['retry_buffer_size'])

        self.main_buffer = Queue(maxsize=self.main_buffer_size)
        self.retry_buffer = Queue(maxsize=self.retry_buffer_size)
        self.machine_code = int(config['server']['machine_code'])
        self.geoip_reader = Reader(config['server']['geoip_database_path'])

        self.metrics = MetricGroup()    
        self.metrics.account = pydcs.accounts.metrics
        self.metrics.worker = pydcs.worker.metrics

        self.accounts = AccountList(self)

        self.main_workers = [Worker('worker.main.%d' % i, self, self.main_buffer, self.retry_buffer) 
            for i in range(0, int(config['server']['main_workers']))]
        self.retry_workers = [Worker('worker.retry.%d' % i, self, self.retry_buffer, self.retry_buffer, done_sleep=1) 
            for i in range(0, int(config['server']['retry_workers']))]

        self.statsRunner = StatsRunner(self)
        self.alertRunner = AlertRunner(self)
        self.metricRunner = MetricRunner(self)

        self.budget_max = 10
        self.budget_interval = 60
        self.budget_increment = 1
        self.budget_decrement = 1   

start() / stop()

Starts AccountListDebug which is used to continually update account, event, pattern and endpoint information. It will also start the alertRunner and statsRunner. If graphite is enabled in the config settings then metricRunner will start it's loop and metrics settings will be set using update_metrics. Lastly it will start Workers in the main and retry buffers, plus any remove Workers. Finally, Stop() will stop all those processes previously mentioned.

def start(self):        
        logging.debug('Starting server')
        lets = [
            gevent.spawn(self.accounts.start),
            gevent.spawn(self.alertRunner.start),
            gevent.spawn(self.statsRunner.run),]
        if 'graphite' in self.config and self.config['graphite']['enabled']:
            lets.append(gevent.spawn(self.metricRunner.loop))
            lets.append(gevent.spawn(self.update_metrics))
        for worker in self.main_workers:            
            lets.append(gevent.spawn(worker.start))
        for worker in self.retry_workers:
            lets.append(gevent.spawn(worker.start))
        gevent.joinall(lets)
        logging.debug('Server stopped')     

def update_metrics(self):
    while(True):
        self.metrics.buffer.main.length = self.main_buffer.qsize()
        self.metrics.buffer.main.max_length = self.main_buffer_size
        self.metrics.buffer.retry.length = self.retry_buffer.qsize()
        self.metrics.buffer.retry.max_length = self.retry_buffer_size
        gevent.sleep(1)

def stop(self):
    logging.debug('Stopping server')
    lets = [
        gevent.spawn(self.accounts.stop),
        gevent.spawn(self.alertRunner.stop),
        gevent.spawn(self.statsRunner.stop),
        ]
    for worker in self.main_workers:
        lets.append(gevent.spawn(worker.stop))
    for worker in self.retry_workers:
        lets.append(gevent.spawn(worker.stop))
    gevent.joinall(lets)
    logging.debug('Stopped server')

add_visit()

This method is called in the handle function of Main.py which will add a visit created out of deserialized data to the main buffer. Any visit in the main buffer will be later be taken by a worker and used to insert data into the database.

def add_visit(self, visit):     
        self.main_buffer.put_nowait(visit)