@igalarzab and @maraujop
"Resiliency is the ability to provide and maintain an acceptable level of service in the face of faults and challenges to normal operation."
RQ (Redis)
At least for us :)
from sqjobs.job import Job
class AdderJob(Job):
name = 'adder_job'
queue = 'my_queue'
def run(self, *args, **kwargs):
return sum(args)
from sqjobs import create_sqs_broker
from myapp.jobs import AdderJob
kwargs = {
'access_key': settings.SQJOBS_SQS_ACCESS_KEY,
'secret_key': settings.SQJOBS_SQS_ACCESS_KEY
}
broker = create_sqs_broker(**kwargs)
broker.add_job(AdderJob, *[1, 2, 3, 4])
$payload = array(
'name' => $task_name,
'args' => $args,
'kwargs' => $kwargs
);
$json_payload = json_encode($payload);
$this->_sqs = new AmazonSQS($amazon_config['aws_key'], $amazon_config['aws_secret_key']);
$result = $this->_sqs->send_message($this->_queue_urls[$queue_name], base64_encode($json_payload));
$ ./manage.py sqjobs worker $queue_name
>>> from sqjobs import create_eager_broker
>>> broker = create_eager_broker()
>>> from jobs import AdderJob
>>> job_added = broker.add_job(AdderJob, *[1, 2, 3])
>>> job_added
('fdb005d3-276f-4f75-8e8e-c8fcde67043c', AdderJob())
>>> job_added[1].result
6
from sqjobs.contrib.django.djsqjobs.result_job import ResultJob
class DummyResultJob(ResultJob):
name = 'dummy_result_job'
queue = 'dummy_queue'
def run(self, *args, **kwargs):
pass
>>> from sqjobs.contrib.django.djsqjobs.models import JobStatus >>> my_job = JobStatus.objects.get(job_id='1234') >>> if my_job.status == JobStatus.SUCCESS: ... print my_job.result
called Beater
from djsqjobs import PeriodicJob
class DummyPeriodicJob(PeriodicJob):
name = "dummy_periodic_job"
queue = "my_queue"
schedule = "1 0 * * *"
timezone = "Europe/Madrid"
def run(self, *args, **kwargs):
pass
$ ./manage.py sqjobs beater $queue_name
set_up, run, tear_down
from abc import abstractmethod, ABCMeta
from six import add_metaclass
import logging
logger = logging.getLogger('timed_job')
@add_metaclass(ABCMeta)
class TimedJob(Job):
def set_up(self, *args, **kwargs):
super(TimedJob, self).set_up(*args, **kwargs)
self.start_time = datetime.now()
def tear_down(self, *args, **kwargs):
end_time = datetime.now()
delta = end_time - self.start_time
logger.info('%s finished in %d seconds' % (self.name, (delta * 1000).seconds))
super(TimedJob, self).tear_down(*args, **kwargs)
@abstractmethod
def run(self, *args, **kwargs):
raise NotImplementedError
on_success and on_failure methods will be called depending on the output of our job execution.
from abc import abstractmethod, ABCMeta
from six import add_metaclass
import logging
logger = logging.getLogger('logger_job')
@add_metaclass(ABCMeta)
class LoggerJob(Job):
def on_success(self, *args, **kwargs):
logger.log('Successfully finished job %s' % self.name)
super(LoggerJob, self).on_success(*args, **kwargs)
def on_failure(self, *args, **kwargs):
logger.log('Failed job %s' % self.name)
super(LoggerJob, self).on_failure(*args, **kwargs)
@abstractmethod
def run(self, *args, **kwargs):
raise NotImplementedError
@gnufede