Skip to content

Commit

Permalink
Merge pull request #5 from tompedals/feature/poller
Browse files Browse the repository at this point in the history
Add support for polling rabbitmq
  • Loading branch information
rcwsr authored Mar 8, 2017
2 parents 9f07c7f + 6a4fd3c commit 33e06f5
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 33 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
],
"require": {
"php": ">=5.4.0",
"radish/radish": "~0.2",
"radish/radish": "~0.3",
"symfony/dependency-injection": "~2.6|~3.0",
"symfony/config": "~2.6|~3.0",
"symfony/http-kernel": "~2.6|~3.0",
Expand Down
31 changes: 31 additions & 0 deletions src/Command/PollCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Radish\RadishBundle\Command;

use Radish\Broker\Connection;
use Radish\Broker\ExchangeRegistry;
use Radish\Broker\QueueRegistry;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class PollCommand extends ContainerAwareCommand
{
public function __construct()
{
parent::__construct('radish:poll');
}

public function configure()
{
$this->addArgument('poller', InputArgument::REQUIRED, 'The name of the consumer to consume');
}

public function execute(InputInterface $input, OutputInterface $output)
{
$consumerName = $input->getArgument('poller');
$this->getContainer()->get(sprintf('radish.poller.%s', $consumerName))->consume();
}
}
67 changes: 39 additions & 28 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Radish\RadishBundle\DependencyInjection;

use Symfony\Component\Config\Definition\ArrayNode;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

Expand Down Expand Up @@ -71,36 +72,46 @@ public function getConfigTreeBuilder()
->end()
->end()
->end()
->arrayNode('consumers')
->requiresAtLeastOneElement()
->useAttributeAsKey('name')
->prototype('array')
->validate()
->ifTrue(function ($v) {
return isset($v['queues']) && !empty($v['queues']) && isset($v['queue']) && !empty($v['queue']);
})
->thenInvalid('A consumer configuration can contain either "queue" or "queues", but not both.')
->end()
->children()
->scalarNode('queue')->end()
->scalarNode('worker')->end()
->arrayNode('queues')
->useAttributeAsKey('queue')
->prototype('array')
->children()
->scalarNode('queue')->end()
->scalarNode('worker')->end()
->end()
->end()
->end()
->arrayNode('middleware')
->prototype('variable')->end()
->append($this->consumerNode('consumers'))
->append($this->consumerNode('pollers'))
->end();

return $treeBuilder;
}

private function consumerNode($type)
{
$builder = new TreeBuilder();
$node = $builder->root($type);

$node->requiresAtLeastOneElement()
->useAttributeAsKey('name')
->prototype('array')
->validate()
->ifTrue(function ($v) {
return isset($v['queues']) && !empty($v['queues']) && isset($v['queue']) && !empty($v['queue']);
})
->thenInvalid('A consumer configuration can contain either "queue" or "queues", but not both.')
->end()
->children()
->scalarNode('queue')->end()
->scalarNode('worker')->end()
->arrayNode('queues')
->useAttributeAsKey('queue')
->prototype('array')
->children()
->scalarNode('queue')->end()
->scalarNode('worker')->end()
->end()
->end()
->end()
->end()
->end();
->arrayNode('middleware')
->prototype('variable')->end()
->end()
->scalarNode('interval')->end()
->end()
->end();

return $treeBuilder;
return $node;
}
}
}
31 changes: 29 additions & 2 deletions src/DependencyInjection/RadishExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public function load(array $configs, ContainerBuilder $container)
foreach ($config['consumers'] as $name => $consumer) {
$this->loadConsumer($name, $consumer, $container);
}
foreach ($config['pollers'] as $name => $consumer) {
$this->loadPoller($name, $consumer, $container);
}

foreach ($config['producers'] as $name => $producer) {
$this->loadProducer($name, $producer, $container);
Expand All @@ -60,15 +63,39 @@ private function loadConsumer($name, array $consumer, ContainerBuilder $containe
}

$definition = new DefinitionDecorator('radish.consumer');
$definition->setArguments([

$args = [
array_keys($consumer['queues']),
$consumer['middleware'],
$workers
]);
];

$definition->setArguments($args);

$container->setDefinition(sprintf('radish.consumer.%s', $name), $definition);
}

public function loadPoller($name, array $poller, ContainerBuilder $container)
{
$workers = [];
foreach ($poller['queues'] as $queueName => $queue) {
$workers[$queueName] = new Reference($queue['worker']);
}

$definition = new DefinitionDecorator('radish.poller');

$args = [
array_keys($poller['queues']),
$poller['middleware'],
$workers,
$poller['interval']
];

$definition->setArguments($args);

$container->setDefinition(sprintf('radish.poller.%s', $name), $definition);
}

private function loadProducer($name, array $producer, ContainerBuilder $container)
{
$definition = new DefinitionDecorator('radish.producer');
Expand Down
1 change: 1 addition & 0 deletions src/RadishBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public function build(ContainerBuilder $container)
public function registerCommands(Application $application)
{
$application->add($this->container->get('radish.command.consume'));
$application->add($this->container->get('radish.command.poll'));
$application->add($this->container->get('radish.command.setup'));
}
}
34 changes: 32 additions & 2 deletions src/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ parameters:
radish.broker.radish.class: "Radish\\Broker\\Queue"
radish.broker.queue_binding.class: "Radish\\Broker\\QueueBinding"
radish.broker.queue_registry.class: "Radish\\Broker\\QueueRegistry"
radish.broker.queue_loader.class: "Radish\\Broker\\QueueLoader"
radish.consumer.class: "Radish\\Consumer\\Consumer"
radish.consumer_factory.class: "Radish\\Consumer\\ConsumerFactory"
radish.poller.class: "Radish\\Consumer\\Poller"
radish.poller_factory.class: "Radish\\Consumer\\PollerFactory"
radish.middleware_registry.class: "Radish\\Middleware\\MiddlewareRegistry"
radish.middleware_loader.class: "Radish\\Middleware\\MiddlewareLoader"
radish.middleware.ack.class: "Radish\\Middleware\\Ack\\AckMiddleware"
radish.middleware.doctrine.connection.class: "Radish\\Middleware\\Doctrine\\ConnectionMiddleware"
radish.middleware.doctrine.object_manager.class: "Radish\\Middleware\\Doctrine\\ObjectManagerMiddleware"
Expand All @@ -21,6 +25,7 @@ parameters:
radish.producer.class: "Radish\\Producer\\Producer"
radish.producer_factory.class: "Radish\\Producer\\ProducerFactory"
radish.command.consume.class: "Radish\\RadishBundle\\Command\\ConsumeCommand"
radish.command.poll.class: "Radish\\RadishBundle\\Command\\PollCommand"
radish.command.setup.class: "Radish\\RadishBundle\\Command\\SetupCommand"

services:
Expand All @@ -30,14 +35,27 @@ services:
radish.broker.exchange_registry:
class: "%radish.broker.exchange_registry.class%"

radish.broker.queue_loader:
class: "%radish.broker.queue_loader.class%"
arguments: ['@radish.broker.queue_registry']

radish.broker.queue_registry:
class: "%radish.broker.queue_registry.class%"

radish.consumer_factory:
class: "%radish.consumer_factory.class%"
arguments:
- "@radish.broker.queue_registry"
- "@radish.middleware_registry"
- "@radish.broker.queue_loader"
- "@radish.middleware_loader"
- "@logger"
tags:
- { name: monolog.logger, channel: queue }

radish.poller_factory:
class: "%radish.poller_factory.class%"
arguments:
- "@radish.broker.queue_loader"
- "@radish.middleware_loader"
- "@logger"
tags:
- { name: monolog.logger, channel: queue }
Expand All @@ -47,6 +65,15 @@ services:
factory: ["@radish.consumer_factory", "create"]
abstract: true

radish.poller:
class: "%radish.poller.class%"
factory: ["@radish.poller_factory", "create"]
abstract: true

radish.middleware_loader:
class: "%radish.middleware_loader.class%"
arguments: ['@radish.middleware_registry']

radish.middleware_registry:
class: "%radish.middleware_registry.class%"
arguments:
Expand Down Expand Up @@ -140,6 +167,9 @@ services:
radish.command.consume:
class: "%radish.command.consume.class%"

radish.command.poll:
class: "%radish.command.poll.class%"

radish.command.setup:
class: "%radish.command.setup.class%"
arguments:
Expand Down

0 comments on commit 33e06f5

Please sign in to comment.