ä»æ¥ã¯ãSymfonyã§RabbitMQã䜿çšããæ¹æ³ã«ã€ããŠã話ãããããšæããŸããããæ°Žäžã³ãŒã ã«ã€ããŠã¯ã»ãšãã©èª¬æããŸããã§ããã æåŸã«ãå®å šã«æŠè»ã«ä¹ã£ãŠãã人ã®ããã«ããŠãµã®ã«ã€ããŠã®èå³æ·±ãç¬éïŒãã·ã¢èªã®ããŠãµã®ãã®ç¿»èš³ïŒãããã€ãæžããŸãã
RabbitMQèªäœã«ã€ããŠã¯èª¬æããŸããã®ã§ãããããŸã ç¥ããªãå Žåã¯ã次ã®ç¿»èš³ããèªã¿ãã ããã
èšäº1
ã»ã¯ã·ã§ã³2
ã»ã¯ã·ã§ã³3
ã»ã¯ã·ã§ã³4
ã»ã¯ã·ã§ã³5
çç ãpythonã®äŸãæããŠã¯ãããŸãã-ããã¯æãããããšã§ã¯ãããŸããããã¹ãŠã¯ãœãŒã¹ã³ãŒãããååã«æããã§ãã
+ãããŠãããèªãã ãšãããã¹ãŠãååã«è©³çŽ°ã«èšè¿°ãããŠããã®ã§ãã³ãŒãã粟ç¥çã«è§£éããŠãã©ã®ããã«ããããŠãªãããç解ããã®ã«ååã§ããã
æ¶è²»è ãäœã§ãããããããŠãã®äžã«$ em-> clearïŒïŒ+ gc_collect_cyclesãå®è¡ããããŒã¿ããŒã¹æ¥ç¶ãéããå¿ èŠãããçç±ããã§ã«ç¥ã£ãŠããå Žåãã»ãšãã©ã®å Žåãæ°ããããšã¯åŠç¿ããŸããã ãã®èšäºã¯ãAMQPãããã³ã«ã«å¯ŸåŠããããªãããä»ãããã¥ãŒãé©çšããå¿ èŠããã人ããäœããã®çç±ã§RabbitMQã§ã®éžæãæ°ã«ãããåã軜éã®
ãã€ã¯ããµãŒãã¹ã¢ãŒããã¯ãã£ããããAMQPãä»ããŠã³ã³ããŒãã³ãéã®éä¿¡ã溶æ¥ããæ¹æ³ãRPCãçŸããããæ¹æ³ãæããŠããããšæåŸ ããŠããå Žåãç§èªèº«ã¯ãã®ãããªããšãéåžžã«é·ãéåŸ ã£ãŠããŸãã...
RabbitMQã䜿çšããŠãã¥ãŒå ã®ã¡ãŒã«ã«ã¡ãã»ãŒãžãéä¿¡ãããã©ãŒã«ããã¬ã©ã³ã¹ãæäŸããã¿ã¹ã¯ããããŸãïŒã¡ãŒã«ãµãŒããŒãã¿ã€ã ã¢ãŠããŸãã¯ä»ã®äœãã§å¿çããå Žåã30ç§åŸã«ã¿ã¹ã¯ãå床å®è¡ããå¿ èŠããããŸãã
ãããã£ãŠã ãã³ãã«ãã€ã³ã¹ããŒã«ããŸã ã
AppKernelã®composer requireã³ãã³ããšè¡ãã³ããŒããæ¹æ³ã説æããã®ãé¢åã§ãã
ããªãèªèº«ããããå®è¡ãããã³ãã«ã®æ§æãéå§ããæºåãã§ããŠããããšãæ¬åœã«é¡ã£ãŠããŸãã
ããã§ãªãå Žåãæå°ã®å®å
šãªã¬ã€ãã¯æ¬¡ã®ãšããã§ãã
RabbitMQã®ã€ã³ã¹ããŒã«ïŒ
ããã§ã localhost ïŒ15672ãaccountïŒguest guestã®äžã§éããããã«ç解ããç·ã®ããã«æããå€ãã®ã¯ãŒã«ãªãã®ãèŠãããšãã§ããŸãã
次ã«ããã³ãã«èªäœãã€ã³ã¹ããŒã«ããŸãã
ãããã¢ããªã±ãŒã·ã§ã³ã«ç»é²ããŸãã
以äžã§ãã
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management
ããã§ã localhost ïŒ15672ãaccountïŒguest guestã®äžã§éããããã«ç解ããç·ã®ããã«æããå€ãã®ã¯ãŒã«ãªãã®ãèŠãããšãã§ããŸãã
次ã«ããã³ãã«èªäœãã€ã³ã¹ããŒã«ããŸãã
composer require php-amqplib/rabbitmq-bundle
ãããã¢ããªã±ãŒã·ã§ã³ã«ç»é²ããŸãã
// app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); }
以äžã§ãã
ãã³ãã«æ§æïŒ
old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 keepalive: false heartbeat: 0 use_socket: true producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } consumers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } queue_options: { name: 'notification.v1.send_email' } callback: app.consumer.mail_sender
ããã§ã¯ãçç£è ãšæ¶è²»è ã«å€§ããªæ³šæãæãå¿ èŠããããŸãã éåžžã«çãåçŽãªå ŽåïŒãããã¥ãŒãµãŒã¯RabbitMQãä»ããŠã³ã³ã·ã¥ãŒããŒã«ã¡ãã»ãŒãžãéä¿¡ããã³ã³ã·ã¥ãŒããŒã¯ãããã®ã¡ãã»ãŒãžãåä¿¡ããŠââåŠçããŸãã ããã§ãexchange_optionsã¯äº€ææ©ã®ãªãã·ã§ã³ã§ãïŒèšäºã®åé ã«ããrabbitmqã«é¢ããèšäºãèªã¿ãŸãããïŒïŒãQueue_optionsã¯ãã¥ãŒã®ãªãã·ã§ã³ã§ãïŒåæ§ã«ïŒã ã³ã³ã·ã¥ãŒããŒã®ã³ãŒã«ããã¯ã«ã泚æãæã䟡å€ããããŸããConsumerInterfaceïŒã¡ãã»ãŒãžåŒæ°ãæå®ããå®è¡ã¡ãœããïŒãæ¡åŒµãããµãŒãã¹IDã¯æ¬¡ã®ãšããã§ãã
ãªããªã ãããŸã§ã®ãšãããããªãã¯ãããæã£ãŠããŸãããã¢ããªã±ãŒã·ã§ã³ãèµ·åããããã³ã³ãããã³ã³ãã€ã«ãããšããµãŒãã¹ãèŠã€ãããªãã£ããšããããã€ãã®DIäŸå€ãåãåããŸããããããèŠæ±ããŸãã ããã§ã¯ããµãŒãã¹ãäœæããŸãããã
#app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer
ãããŠãã¯ã©ã¹èªäœïŒ
namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { echo ' : '.$msg->getBody().PHP_EOL; echo ' !...'; } }
ããŠãããªãã¯ç§ãèšäºã«SwiftMailerã䜿çšããæ¹æ³ãå«ããªãã£ãããšã«è ¹ãç«ãŠãŸããã§ãããïŒ :)ããã§ã¯ãã¡ãã»ãŒãžãã¥ãŒãä»ããŠéåæçã«è¡ãé ä¿¡ãããããšãéèŠã§ãããã®è¡ã®åŠçæ¹æ³ã¯ããžãã¹ã§ãã ã¡ãŒã«ã¯ã±ãŒã¹ã®äžäŸã§ãã
æ¶è²»è ã«æååãæž¡ãæ¹æ³ã¯ïŒ ãããè¡ãã«ã¯ããã¹ãã³ãã³ããäœæããŸãããã
namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish(' ...'); }
ç¹°ãè¿ããŸãããç§ã¯ããªãã®ããã«çŸããåã§ã³ã³ãããŒã©ãŒãäœæããªãã£ãããšããizeã³ããŸã-ç§ã¯ãããé¢åã§ãã ã¯ããå€ãããŸãã ããããç§ã¯çµæžçã«æªãã®ã§ãæãã®ã奜ãã§ããçè«ãšã¢ããªã±ãŒã·ã§ã³ã¢ãŒããã¯ãã£ãå°ã倢èŠãŠããŸãã æ°ãæ£ãã
次ã«ãã³ã³ã·ã¥ãŒããèµ·åããRabbitMQããã®ã¡ãã»ãŒãžãåŸ ã€ããã«åœŒã«åœä»€ããŸãã
bin/console rabbitmq:consumer send_email -vvv
ãããŠããã¹ãããŒã ããã¡ãã»ãŒãžãéä¿¡ããŸãã
bin/console app:test-consumer
ãããŠä»ãrabbitmqïŒæ¶è²»è ã®ããã»ã¹ã§ãç§ãã¡ã®ã¡ãã»ãŒãžãèŠãããšãã§ããŸãïŒ ãããŠããã®ç䌌ãã£ã¹ãããã¯æåããŸããã
次ã«ããšã©ãŒãçºçããå Žåã«é 延ã¡ãã»ãŒãžåŠçãå®è£ ããæ¹æ³ãèŠãŠã¿ãŸãããã ä¿çäžã®ã¡ãã»ãŒãžã«RabbitMQãã©ã°ã€ã³ã䜿çšããŸããã ãããå®çŸããã«ã¯ãæ°ãããã¥ãŒãäœæããŸãããã®ãã¥ãŒã§ã¯ãã¡ãã»ãŒãžã®æå¹æéã30ç§ã«æå®ãã次ã®èšå®ãèšå®ããŸããæ»åŸ-ã¡ã€ã³ãã¥ãŒã«è»¢éããŸãã
æ°ãããããã¥ãŒãµãŒãè¿œå ããã ãã§ãïŒ
producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } delayed_send_email: connection: default exchange_options: name: 'notification.v1.send_email_delayed_30000' type: direct queue_options: name: 'notification.v1.send_email_delayed_30000' arguments: x-message-ttl: ['I', 30000] x-dead-letter-exchange: ['S', 'notification.v1.send_email']
ããã§ã¯ãã³ã³ã·ã¥ãŒãã®ããžãã¯ãå€æŽããŸãããã
namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer */ public function __construct(ProducerInterface $delayedProducer) { $this->delayedProducer = $delayedProducer; } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo ' '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } } }
äžè¬ã«ãåºåã«LoggerInterfaceã䜿çšãããšäŸ¿å©ã§ãããŸããçŸãããŠã¹ã±ãŒã©ãã«ã§ãã
ããããç§ãã¡ã¯æ ãããã§ãè¿œå ã®ãæèããäœããããããŸããããïŒ ãã ç¥ã£ãŠããã
ããã§ãé 延ãã¥ãŒã®ãããã¥ãŒãµãŒãã¹ãããããå¿ èŠããããŸãã
#app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']
ãããŠãã³ãã³ããå€æŽããŸãã
namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish(', ...'); $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad'); } }
ä»ã圌女ã¯éåžžã®ã¡ãã»ãŒãžãšãšãã«ãæªãã¡ãã»ãŒãžãéä¿¡ããŸãã
å®è¡ãããšã次ã®åºåã衚瀺ãããŸãã
, ... ... bad... ERROR
30ç§åŸãåŠçã¡ãã»ãŒãžãåã³è¡šç€ºãããŸãã
bad... ERROR
ãããŠç¡éã«ã æ倧詊è¡åæ°ãªã©ã®ããžã㯠èªåã§èããŠãã ããã 次ã«ã販売ãšããã€ãã®æ©èœã«é¢ãããã³ããããã€ã玹ä»ããŸãã
ä»ããªãã®è²©å£²ã®ããã®ãã³ãïŒ
1ïŒåŠçã®è©Šè¡åæ°ãæ倧ã«ããŠãããã¯ããéžââè±ããããšãªããäœæ¥äžã®ã³ã³ããã¹ãã®ãã¹ãŠã®äŸå€ã®102ïŒ ã«æ³šæããŠãã ããïŒ ååŠçãå¿ èŠãªå Žåãšããã§ãªãå Žåãæ³åããŠãã ãããããã§ãªãå Žåã¯ããã°ãããŽãã«ãªããäœãèµ·ãã£ãŠããã®ãç解ã§ããŸããã RabbitMQã§å®éã®ããŒã¿ã®éåžžã®ã¿ã¹ã¯ã䜿çšããŠãbeatãããã¿ã¹ã¯ãå転ããŠããå Žåãã³ã³ã·ã¥ãŒããŒã®ã³ãŒããæŽæ°ããŠåèµ·åããããšãªããç Žæããã¿ã¹ã¯ãæŸèæãªãã§ç Žæ£ããããšã¯ã§ããŸããã ããã«èãçŽããŠãã ããã ãã®å Žåãäžéšã®SMTPTimeOutExceptionã®ã¿ããã£ããããã®ãæ£ããã§ãããã
ãŸãããã®ãããªã¢ãã«ã§ã¯ã次ã®ããšãç解ããããšãéèŠã§ããæåã®æ®µéã§-1ã€ã®ãäœãã®ç¶æ ãå€æŽããããã®ã°ããŒãã«ãªè²¬ä»»ãã å±éºãªã¿ã¹ã¯ãã¯ãŒã«ãŒã«äžããããªãã§ãã ããã 1Cã䜿çšãããªãã·ã§ã³ãæ€èšããå Žåãåé¡ã¯æ¬¡ã®ããã«ãªããŸãïŒããšãã°ã補åã1Cã«æ£åžžã«ãŸãã¯æ£åžžã«å€æŽããªãå ŽåããŸãã¯è£œåã1Cã«è¿œå ããå ŽåãããŒã¿ããŒã¹ã«äœããæžã蟌ã¿ãŸãïŒããšãã°ãæåŸã«æåããåæã®æ¥ä»ãŸãã¯å€±æããæ¥ä»ãªã©ïŒã ããªãã¡ ããã§ã¯ã1CããŒã¿ããŒã¹ãšã¢ããªã±ãŒã·ã§ã³ããŒã¿ããŒã¹ã®2ã€ã®ããŒã¿ããŒã¹ãåæã«æŽæ°ãããŸãã ãã¹ãŠã1Cã§æ£åžžã«äœæãããå ŽåããæåŸã«æåããåæã®æ¥ä»ããã£ãŒã«ããããŒã¿ããŒã¹ã§æŽæ°ããããšä»®å®ããŸã-ãããããšã©ãŒããããã¢ããããããŒã¿ããŒã¹ãµãŒããŒãå¿çããŸãã-ã¿ã¹ã¯ããåŸã§ã延æãããããŒã¿ããŒã¹ãå¿çãå§ãããŸã§ç¹°ãè¿ãããŸãã åæã«ã1Cã§ã®ãšã³ãã£ãã£ã®äœæã«é¢é£ããããµãã¿ã¹ã¯ããæ£åžžã«å®è¡ããããã³ã«ããµã€ãããŒã¿ããŒã¹ãžã®æžã蟌ã¿è©Šè¡ã倱æãããã³ã«ãããã¯ééã£ãŠããŸãã
2ïŒRabbitMQã䜿çšããŠãããããèä¹ æ§ã«ã€ããŠãèªã¿ãã ããã PSïŒãã³ãã«æ§æãç¹ã«exchange_optionsããã³queue_optionsã®ãèä¹ æ§ã®ããããã©ã°ãtrueãŸãã¯falseãšããŠèµ·åããŸã
3ïŒããã°ã©ã ãäœæ¥ãå®äºããåŸãããªãã®äººçã¯ãã¹ãŠããŒã¿ããŒã¹ãžã®æ¥ç¶ãéããŸãã ãŸããEMã¯ãªãŒã³ã¢ãããå®è¡ããã¬ããŒãžã³ã¬ã¯ã¿ãŒã®åŸã«ãªã³ã¯ãã¯ãªãŒã³ã¢ããããŸãã ããªãã¡ æçµçã«ãæ¶è²»è ã¯æ¬¡ã®ããã«ãªããŸãã
class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; private $entityManager; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer * @param EntityManagerInterface $entityManager */ public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager) { $this->delayedProducer = $delayedProducer; $this->entityManager = $entityManager; gc_enable(); } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo ' '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } $this->entityManager->clear(); $this->entityManager->getConnection()->close(); gc_collect_cycles(); } }
ã³ã³ã·ã¥ãŒããŒã¯ããŒã¢ã³ã®ããã«åäœããããããªã³ã¯ã絶ããæ ŒçŽããããŒã¿ããŒã¹ãžã®æ¥ç¶ãç¶æããã®ã¯æªãããšã§ãã MySQLã®å ŽåãMySQLãµãŒããŒããªããªã£ãŠããŸããŸãã
4ïŒä¿çäžã®ã¡ãã»ãŒãžã¢ãã«ãäºæ³å€ã«ããžãã¹ã殺ãå¯èœæ§ãããçç±ãããèããŠãã ããã ããšãã°ã管çããã«ã§è£œåãå€æŽãããšãã«ããããã®å€æŽã1Cã®ãã¥ãŒã«æµã蟌ãã¡ã«ããºã ããããŸãã ããã§ç¶æ³ãæ³åããŠãã ããã管çè ã補åãå€æŽããŸã->ã¿ã¹ã¯1ãäœæããã1CããŒã¿ããŒã¹ã®åãããŒã¿ãå€æŽããããšããŸãã ãµãŒããŒ1Cãå¿çããªãããããã¹ãŠãæ©èœãããŸã§ã¿ã¹ã¯ã¯åçŽã«ã·ããããŸãã ãã®éã管çè ã¯åã補åã§ä»ã®äœããä¿®æ£ããããšã決å®ããŸããã ã¿ã¹ã¯ïŒ2ãèšé²ãããŸãã
ããã§ãã¿ã¹ã¯ïŒ1ãšïŒ2ã1ã€ãã€å®è¡ããã延æãããç¶æ³ãæ³åããŠãã ããã
ã¿ã¹ã¯2ãå®äºãããŸã§ã«1CãåäœããŠããå Žåã¯ã©ããªããŸããïŒ ã¿ã¹ã¯ãå®äºããææ°ã®å€æŽãããµããŸãã 次ã«ãã¿ã¹ã¯ïŒ1ã䜿çšãããå®å®ããå€æŽãæ¶å»ãããŸã:)
çµäºïŒã¿ã€ã ã¹ã¿ã³ããããŒãžã§ã³ãšããŠéä¿¡ããã¿ã¹ã¯ããéå»ãããã®å Žåããããç Žæ£ããŸãã
5ïŒéåæç¶æ ã«ãªããŸã-競åç¶æ ãããŸããŸãªãã·ã³ã§ã®æ¶è²»è ã®äžäžèŽãªã©ãå€ãã®ã¢ãŒããã¯ãã£äžã®åé¡ã«ã€ããŠãèªã¿ãã ããã
6ïŒãã¥ãŒã«ããŒãžã§ã³ãæžã蟌ã¿ãŸã...ãããŒããããå®éã®è£œåã§ã©ã®ããã«åœ¹ç«ã€ãã ååãšããŠããã®äŸã§ã¯ãããè¡ããŸããã
7ïŒãããããRabbitMQãšAMQPãããã³ã«å šäœã¯å¿ èŠãããŸããã beanstalkdã«æ³šç®ããŠãã ããã
8ïŒã¹ãŒããŒãã€ã¶ãŒãä»ããŠphpã§ã³ã³ã·ã¥ãŒããŒããã³ãã®ä»ãã¹ãŠã®æªéãå®è¡ãããã®ããã»ã¹ã®ãã©ãŒã«ã®å®å šãªãã®ã³ã°ãæ¥ç¶ããŸãã 圌ã¯ãŸãããã®ãã¹ãŠã®ããžãã¹ã管çããããã®Webã€ã³ã¿ãŒãã§ãŒã¹ãæã£ãŠããŸãã åžžã«åé¡ããããŸãã