From a10725b8d368343b6e6fd2bd75936571e5a0becd Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 20 Dec 2018 15:02:43 +0200 Subject: [PATCH] Add standalone setup broker feature. --- .../DependencyInjection/EnqueueExtension.php | 1 + pkg/enqueue-bundle/EnqueueBundle.php | 2 + .../Resources/config/services.yml | 9 +++ pkg/enqueue/SetupBroker/ChainSetupBroker.php | 33 ++++++++ .../SetupBroker/SetupBrokerInterface.php | 9 +++ .../BuildSetupBrokerPass.php | 47 ++++++++++++ .../DependencyInjection/TransportFactory.php | 10 +++ pkg/enqueue/Symfony/SetupBrokerCommand.php | 76 +++++++++++++++++++ 8 files changed, 187 insertions(+) create mode 100644 pkg/enqueue/SetupBroker/ChainSetupBroker.php create mode 100644 pkg/enqueue/SetupBroker/SetupBrokerInterface.php create mode 100644 pkg/enqueue/Symfony/DependencyInjection/BuildSetupBrokerPass.php create mode 100644 pkg/enqueue/Symfony/SetupBrokerCommand.php diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index b5ae10ab9..70f7bde04 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -59,6 +59,7 @@ public function load(array $configs, ContainerBuilder $container): void $transportFactory->buildContext($container, []); $transportFactory->buildQueueConsumer($container, $modules['consumption']); $transportFactory->buildRpcClient($container, []); + $transportFactory->buildSetupBroker($container, []); // client if (isset($modules['client'])) { diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 6db17e15d..1cbcaea76 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -14,6 +14,7 @@ use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass; use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass; use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass; +use Enqueue\Symfony\DependencyInjection\BuildSetupBrokerPass; use Symfony\Component\DependencyInjection\Compiler\PassConfig; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\HttpKernel\Bundle\Bundle; @@ -25,6 +26,7 @@ public function build(ContainerBuilder $container): void //transport passes $container->addCompilerPass(new BuildConsumptionExtensionsPass()); $container->addCompilerPass(new BuildProcessorRegistryPass()); + $container->addCompilerPass(new BuildSetupBrokerPass()); //client passes $container->addCompilerPass(new BuildClientConsumptionExtensionsPass()); diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml index a207569c0..2f6b305d2 100644 --- a/pkg/enqueue-bundle/Resources/config/services.yml +++ b/pkg/enqueue-bundle/Resources/config/services.yml @@ -15,6 +15,15 @@ services: tags: - { name: 'console.command' } + enqueue.transport.setup_broker: + class: 'Enqueue\Symfony\SetupBrokerCommand' + arguments: + - '@enqueue.locator' + - '%enqueue.default_transport%' + - 'enqueue.transport.%s.setup_broker' + tags: + - { name: 'console.command' } + enqueue.client.consume_command: class: 'Enqueue\Symfony\Client\ConsumeCommand' arguments: diff --git a/pkg/enqueue/SetupBroker/ChainSetupBroker.php b/pkg/enqueue/SetupBroker/ChainSetupBroker.php new file mode 100644 index 000000000..4b5a94026 --- /dev/null +++ b/pkg/enqueue/SetupBroker/ChainSetupBroker.php @@ -0,0 +1,33 @@ +setupBrokers = []; + + array_walk($setupBrokers, function (SetupBrokerInterface $setupBroker) { + $this->setupBrokers[] = $setupBroker; + }); + } + + public function setupBroker(LoggerInterface $logger = null): void + { + foreach ($this->setupBrokers as $setupBroker) { + $logger->info(get_class($setupBroker)); + + $setupBroker->setupBroker($logger); + } + } +} diff --git a/pkg/enqueue/SetupBroker/SetupBrokerInterface.php b/pkg/enqueue/SetupBroker/SetupBrokerInterface.php new file mode 100644 index 000000000..685007354 --- /dev/null +++ b/pkg/enqueue/SetupBroker/SetupBrokerInterface.php @@ -0,0 +1,9 @@ +hasParameter('enqueue.transports')) { + throw new \LogicException('The "enqueue.transports" parameter must be set.'); + } + + $names = $container->getParameter('enqueue.transports'); + $defaultName = $container->getParameter('enqueue.default_transport'); + + foreach ($names as $name) { + $diUtils = DiUtils::create(TransportFactory::MODULE, $name); + + $setupBrokerId = $diUtils->format('setup_broker'); + if (false == $container->hasDefinition($setupBrokerId)) { + throw new \LogicException(sprintf('Service "%s" not found', $setupBrokerId)); + } + + $tag = 'enqueue.transport.setup_broker'; + $map = []; + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $transport = $tagAttribute['transport'] ?? $defaultName; + + if ($transport !== $name && 'all' !== $transport) { + continue; + } + + $map[] = new Reference($serviceId); + } + } + + $setupBroker = $container->getDefinition($setupBrokerId); + $setupBroker->setArgument(0, $map); + } + } +} diff --git a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php index 5cc1a6507..dd2664766 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php +++ b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php @@ -11,6 +11,7 @@ use Enqueue\Resources; use Enqueue\Rpc\RpcClient; use Enqueue\Rpc\RpcFactory; +use Enqueue\SetupBroker\ChainSetupBroker; use Enqueue\Symfony\ContainerProcessorRegistry; use Enqueue\Symfony\DiUtils; use Interop\Queue\ConnectionFactory; @@ -243,6 +244,15 @@ public function buildRpcClient(ContainerBuilder $container, array $config): void } } + public function buildSetupBroker(ContainerBuilder $container, array $config): void + { + $container->register($this->diUtils->format('setup_broker'), ChainSetupBroker::class) + ->addArgument([]) + ; + + $this->addServiceToLocator($container, 'setup_broker'); + } + private function assertServiceExists(ContainerBuilder $container, string $serviceId): void { if (false == $container->hasDefinition($serviceId)) { diff --git a/pkg/enqueue/Symfony/SetupBrokerCommand.php b/pkg/enqueue/Symfony/SetupBrokerCommand.php new file mode 100644 index 000000000..3541302e8 --- /dev/null +++ b/pkg/enqueue/Symfony/SetupBrokerCommand.php @@ -0,0 +1,76 @@ +container = $container; + $this->defaultTransport = $defaultTransport; + $this->setupBrokerIdPattern = $setupBrokerIdPattern; + + parent::__construct(static::$defaultName); + } + + protected function configure(): void + { + $this + ->setDescription('Setup broker. Configure the broker, creates queues, topics and so on.') + ->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', $this->defaultTransport) + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): ?int + { + $transport = $input->getOption('transport'); + + $logger = new ConsoleLogger($output); + + try { + $setupBroker = $this->getSetupBroker($transport); + + $logger->info(get_class($setupBroker)); + + $setupBroker->setupBroker(); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Transport "%s" is not supported.', $transport), null, $e); + } + + $output->writeln('Broker set up'); + + return null; + } + + private function getSetupBroker(string $transport): SetupBrokerInterface + { + return $this->container->get(sprintf($this->setupBrokerIdPattern, $transport)); + } +}