diff --git a/Procfile b/Procfile index cab7526..b71ae6d 100644 --- a/Procfile +++ b/Procfile @@ -1,3 +1,3 @@ web: manage.py runserver $WEB_PORT -plugins: manage.py run_plugins +plugins: celery -A botbot worker -l info bot: botbot-bot -v=2 -logtostderr=true diff --git a/botbot/__init__.py b/botbot/__init__.py index e69de29..b64e43e 100644 --- a/botbot/__init__.py +++ b/botbot/__init__.py @@ -0,0 +1,5 @@ +from __future__ import absolute_import + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app diff --git a/botbot/apps/plugins/management/__init__.py b/botbot/apps/plugins/management/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/botbot/apps/plugins/management/commands/__init__.py b/botbot/apps/plugins/management/commands/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/botbot/apps/plugins/management/commands/run_plugins.py b/botbot/apps/plugins/management/commands/run_plugins.py deleted file mode 100644 index 4902209..0000000 --- a/botbot/apps/plugins/management/commands/run_plugins.py +++ /dev/null @@ -1,18 +0,0 @@ -from optparse import make_option - -from django.core.management.base import NoArgsCommand - -from botbot.apps.plugins import runner - -class Command(NoArgsCommand): - - help = ("Starts up all plugins in the botbot.apps.bots.plugins module") - option_list = NoArgsCommand.option_list + ( - make_option('--with-gevent', - action='store_true', - dest='with_gevent', - default=False, - help='Use gevent for concurrency'), - ) - def handle_noargs(self, **options): - runner.start_plugins(use_gevent=options['with_gevent']) diff --git a/botbot/apps/plugins/runner.py b/botbot/apps/plugins/runner.py index ca6d17e..2732a12 100644 --- a/botbot/apps/plugins/runner.py +++ b/botbot/apps/plugins/runner.py @@ -131,10 +131,7 @@ class PluginRunner(object): Calls to plugins are done via greenlets """ - def __init__(self, use_gevent=False): - if use_gevent: - import gevent - self.gevent = gevent + def __init__(self): self.bot_bus = redis.StrictRedis.from_url( settings.REDIS_PLUGIN_QUEUE_URL) self.storage = redis.StrictRedis.from_url( @@ -176,32 +173,15 @@ def register(self, plugin): getattr(self, attr.route_rule[0] + '_router').setdefault( plugin.slug, []).append((attr.route_rule[1], attr, plugin)) - def listen(self): - """Listens for incoming messages on the Redis queue""" - while 1: - val = None - try: - val = self.bot_bus.blpop('q', 1) - - # Track q length - ql = self.bot_bus.llen('q') - statsd.gauge(".".join(["plugins", "q"]), ql) - - if val: - _, val = val - LOG.debug('Recieved: %s', val) - line = Line(json.loads(val), self) - - # Calculate the transport latency between go and the plugins. - delta = datetime.utcnow().replace(tzinfo=utc) - line._received - statsd.timing(".".join(["plugins", "latency"]), - delta.total_seconds() * 1000) + def process_line(self, line_json): + LOG.debug('Recieved: %s', line_json) + line = Line(json.loads(line_json), self) + # Calculate the transport latency between go and the plugins. + delta = datetime.utcnow().replace(tzinfo=utc) - line._received + statsd.timing(".".join(["plugins", "latency"]), + delta.total_seconds() * 1000) + self.dispatch(line) - self.dispatch(line) - except Exception: - LOG.error("Line Dispatch Failed", exc_info=True, extra={ - "line": val - }) def dispatch(self, line): """Given a line, dispatch it to the right plugins & functions.""" @@ -214,16 +194,11 @@ def dispatch(self, line): # firehose gets everything, no rule matching LOG.info('Match: %s.%s', plugin_slug, func.__name__) with statsd.timer(".".join(["plugins", plugin_slug])): - # FIXME: This will not have correct timing if go back to - # gevent. channel_plugin = self.setup_plugin_for_channel( plugin.__class__, line) new_func = log_on_error(LOG, getattr(channel_plugin, func.__name__)) - if hasattr(self, 'gevent'): - self.gevent.Greenlet.spawn(new_func, line) - else: - channel_plugin.respond(new_func(line)) + channel_plugin.respond(new_func(line)) # pass line to other routers if line._is_message: @@ -252,30 +227,12 @@ def check_for_plugin_route_matches(self, line, router): if match: LOG.info('Match: %s.%s', plugin_slug, func.__name__) with statsd.timer(".".join(["plugins", plugin_slug])): - # FIXME: This will not have correct timing if go back to - # gevent. # Instantiate a plugin specific to this channel channel_plugin = self.setup_plugin_for_channel( plugin.__class__, line) # get the method from the channel-specific plugin new_func = log_on_error(LOG, getattr(channel_plugin, func.__name__)) - if hasattr(self, 'gevent'): - grnlt = self.gevent.Greenlet(new_func, line, - **match.groupdict()) - grnlt.link_value(channel_plugin.greenlet_respond) - grnlt.start() - else: - channel_plugin.respond(new_func(line, - **match.groupdict())) - -def start_plugins(*args, **kwargs): - """ - Used by the management command to start-up plugin listener - and register the plugins. - """ - LOG.info('Starting plugins. Gevent=%s', kwargs['use_gevent']) - app = PluginRunner(**kwargs) - app.register_all_plugins() - app.listen() + channel_plugin.respond(new_func(line, + **match.groupdict())) diff --git a/botbot/apps/plugins/tasks.py b/botbot/apps/plugins/tasks.py new file mode 100644 index 0000000..3530399 --- /dev/null +++ b/botbot/apps/plugins/tasks.py @@ -0,0 +1,14 @@ +from botbot.celery import app +from .runner import PluginRunner + + +runner = PluginRunner() +runner.register_all_plugins() + +@app.task(bind=True) +def route_line(self, line_json): + try: + runner.process_line(line_json) + # For any error we retry after 10 seconds. + except Exception as exc: + raise self.retry(exc, countdown=10) diff --git a/botbot/celery.py b/botbot/celery.py new file mode 100644 index 0000000..6acc244 --- /dev/null +++ b/botbot/celery.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import os + +from celery import Celery + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'botbot.settings') + +from django.conf import settings + +app = Celery('botbot') + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/botbot/settings/base.py b/botbot/settings/base.py index 65a9ee0..61a41a5 100644 --- a/botbot/settings/base.py +++ b/botbot/settings/base.py @@ -272,7 +272,9 @@ # Third party app settings # ============================================================================== -# SOUTH_DATABASE_ADAPTERS = {'default': 'south.db.postgresql_psycopg2'} +CELERY_TASK_SERIALIZER='json' +CELERY_ACCEPT_CONTENT=['json'] +BROKER_URL = REDIS_PLUGIN_QUEUE_URL SOCIAL_AUTH_USER_MODEL = AUTH_USER_MODEL SOCIAL_AUTH_PROTECTED_USER_FIELDS = ['email'] diff --git a/manage.py b/manage.py index c9ee98a..cbbde66 100755 --- a/manage.py +++ b/manage.py @@ -2,11 +2,6 @@ import sys if __name__ == "__main__": - if (len(sys.argv) > 1 and - 'run_plugins' in sys.argv and '--with-gevent' in sys.argv): - # import gevent as soon as possible - from gevent import monkey; monkey.patch_all() - from psycogreen.gevent import patch_psycopg; patch_psycopg() import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "botbot.settings") diff --git a/requirements.txt b/requirements.txt index f725c27..7387602 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ django==1.7.2 +celery==3.1.18 pytz psycopg2==2.5.2