diff --git a/composer.json b/composer.json index 5b69e93..7d444fd 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.4.0", - "radish/radish": "~0.2", + "radish/radish": "~0.3", "symfony/dependency-injection": "~2.6|~3.0", "symfony/config": "~2.6|~3.0", "symfony/http-kernel": "~2.6|~3.0", diff --git a/src/Command/PollCommand.php b/src/Command/PollCommand.php new file mode 100644 index 0000000..1202867 --- /dev/null +++ b/src/Command/PollCommand.php @@ -0,0 +1,31 @@ +addArgument('poller', InputArgument::REQUIRED, 'The name of the consumer to consume'); + } + + public function execute(InputInterface $input, OutputInterface $output) + { + $consumerName = $input->getArgument('poller'); + $this->getContainer()->get(sprintf('radish.poller.%s', $consumerName))->consume(); + } +} diff --git a/src/DependencyInjection/Configuration.php b/src/DependencyInjection/Configuration.php index 35f4731..245ede6 100644 --- a/src/DependencyInjection/Configuration.php +++ b/src/DependencyInjection/Configuration.php @@ -2,6 +2,7 @@ namespace Radish\RadishBundle\DependencyInjection; +use Symfony\Component\Config\Definition\ArrayNode; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\ConfigurationInterface; @@ -71,36 +72,46 @@ public function getConfigTreeBuilder() ->end() ->end() ->end() - ->arrayNode('consumers') - ->requiresAtLeastOneElement() - ->useAttributeAsKey('name') - ->prototype('array') - ->validate() - ->ifTrue(function ($v) { - return isset($v['queues']) && !empty($v['queues']) && isset($v['queue']) && !empty($v['queue']); - }) - ->thenInvalid('A consumer configuration can contain either "queue" or "queues", but not both.') - ->end() - ->children() - ->scalarNode('queue')->end() - ->scalarNode('worker')->end() - ->arrayNode('queues') - ->useAttributeAsKey('queue') - ->prototype('array') - ->children() - ->scalarNode('queue')->end() - ->scalarNode('worker')->end() - ->end() - ->end() - ->end() - ->arrayNode('middleware') - ->prototype('variable')->end() + ->append($this->consumerNode('consumers')) + ->append($this->consumerNode('pollers')) + ->end(); + + return $treeBuilder; + } + + private function consumerNode($type) + { + $builder = new TreeBuilder(); + $node = $builder->root($type); + + $node->requiresAtLeastOneElement() + ->useAttributeAsKey('name') + ->prototype('array') + ->validate() + ->ifTrue(function ($v) { + return isset($v['queues']) && !empty($v['queues']) && isset($v['queue']) && !empty($v['queue']); + }) + ->thenInvalid('A consumer configuration can contain either "queue" or "queues", but not both.') + ->end() + ->children() + ->scalarNode('queue')->end() + ->scalarNode('worker')->end() + ->arrayNode('queues') + ->useAttributeAsKey('queue') + ->prototype('array') + ->children() + ->scalarNode('queue')->end() + ->scalarNode('worker')->end() ->end() ->end() ->end() - ->end() - ->end(); + ->arrayNode('middleware') + ->prototype('variable')->end() + ->end() + ->scalarNode('interval')->end() + ->end() + ->end(); - return $treeBuilder; + return $node; } -} \ No newline at end of file +} diff --git a/src/DependencyInjection/RadishExtension.php b/src/DependencyInjection/RadishExtension.php index 1be300f..c1a368c 100644 --- a/src/DependencyInjection/RadishExtension.php +++ b/src/DependencyInjection/RadishExtension.php @@ -35,6 +35,9 @@ public function load(array $configs, ContainerBuilder $container) foreach ($config['consumers'] as $name => $consumer) { $this->loadConsumer($name, $consumer, $container); } + foreach ($config['pollers'] as $name => $consumer) { + $this->loadPoller($name, $consumer, $container); + } foreach ($config['producers'] as $name => $producer) { $this->loadProducer($name, $producer, $container); @@ -60,15 +63,39 @@ private function loadConsumer($name, array $consumer, ContainerBuilder $containe } $definition = new DefinitionDecorator('radish.consumer'); - $definition->setArguments([ + + $args = [ array_keys($consumer['queues']), $consumer['middleware'], $workers - ]); + ]; + + $definition->setArguments($args); $container->setDefinition(sprintf('radish.consumer.%s', $name), $definition); } + public function loadPoller($name, array $poller, ContainerBuilder $container) + { + $workers = []; + foreach ($poller['queues'] as $queueName => $queue) { + $workers[$queueName] = new Reference($queue['worker']); + } + + $definition = new DefinitionDecorator('radish.poller'); + + $args = [ + array_keys($poller['queues']), + $poller['middleware'], + $workers, + $poller['interval'] + ]; + + $definition->setArguments($args); + + $container->setDefinition(sprintf('radish.poller.%s', $name), $definition); + } + private function loadProducer($name, array $producer, ContainerBuilder $container) { $definition = new DefinitionDecorator('radish.producer'); diff --git a/src/RadishBundle.php b/src/RadishBundle.php index ef438fb..e07260c 100644 --- a/src/RadishBundle.php +++ b/src/RadishBundle.php @@ -19,6 +19,7 @@ public function build(ContainerBuilder $container) public function registerCommands(Application $application) { $application->add($this->container->get('radish.command.consume')); + $application->add($this->container->get('radish.command.poll')); $application->add($this->container->get('radish.command.setup')); } } diff --git a/src/Resources/config/services.yml b/src/Resources/config/services.yml index e305215..82cfaa8 100644 --- a/src/Resources/config/services.yml +++ b/src/Resources/config/services.yml @@ -6,9 +6,13 @@ parameters: radish.broker.radish.class: "Radish\\Broker\\Queue" radish.broker.queue_binding.class: "Radish\\Broker\\QueueBinding" radish.broker.queue_registry.class: "Radish\\Broker\\QueueRegistry" + radish.broker.queue_loader.class: "Radish\\Broker\\QueueLoader" radish.consumer.class: "Radish\\Consumer\\Consumer" radish.consumer_factory.class: "Radish\\Consumer\\ConsumerFactory" + radish.poller.class: "Radish\\Consumer\\Poller" + radish.poller_factory.class: "Radish\\Consumer\\PollerFactory" radish.middleware_registry.class: "Radish\\Middleware\\MiddlewareRegistry" + radish.middleware_loader.class: "Radish\\Middleware\\MiddlewareLoader" radish.middleware.ack.class: "Radish\\Middleware\\Ack\\AckMiddleware" radish.middleware.doctrine.connection.class: "Radish\\Middleware\\Doctrine\\ConnectionMiddleware" radish.middleware.doctrine.object_manager.class: "Radish\\Middleware\\Doctrine\\ObjectManagerMiddleware" @@ -21,6 +25,7 @@ parameters: radish.producer.class: "Radish\\Producer\\Producer" radish.producer_factory.class: "Radish\\Producer\\ProducerFactory" radish.command.consume.class: "Radish\\RadishBundle\\Command\\ConsumeCommand" + radish.command.poll.class: "Radish\\RadishBundle\\Command\\PollCommand" radish.command.setup.class: "Radish\\RadishBundle\\Command\\SetupCommand" services: @@ -30,14 +35,27 @@ services: radish.broker.exchange_registry: class: "%radish.broker.exchange_registry.class%" + radish.broker.queue_loader: + class: "%radish.broker.queue_loader.class%" + arguments: ['@radish.broker.queue_registry'] + radish.broker.queue_registry: class: "%radish.broker.queue_registry.class%" radish.consumer_factory: class: "%radish.consumer_factory.class%" arguments: - - "@radish.broker.queue_registry" - - "@radish.middleware_registry" + - "@radish.broker.queue_loader" + - "@radish.middleware_loader" + - "@logger" + tags: + - { name: monolog.logger, channel: queue } + + radish.poller_factory: + class: "%radish.poller_factory.class%" + arguments: + - "@radish.broker.queue_loader" + - "@radish.middleware_loader" - "@logger" tags: - { name: monolog.logger, channel: queue } @@ -47,6 +65,15 @@ services: factory: ["@radish.consumer_factory", "create"] abstract: true + radish.poller: + class: "%radish.poller.class%" + factory: ["@radish.poller_factory", "create"] + abstract: true + + radish.middleware_loader: + class: "%radish.middleware_loader.class%" + arguments: ['@radish.middleware_registry'] + radish.middleware_registry: class: "%radish.middleware_registry.class%" arguments: @@ -140,6 +167,9 @@ services: radish.command.consume: class: "%radish.command.consume.class%" + radish.command.poll: + class: "%radish.command.poll.class%" + radish.command.setup: class: "%radish.command.setup.class%" arguments: