Symfony + RabbitMQクむックスタヌトフォヌダング

こんにちは、友達。

今日は、SymfonyでRabbitMQを䜿甚する方法に぀いおお話ししたいず思いたしたが、氎䞭コヌムに぀いおはほずんど説明したせんでした。 最埌に、完党に戊車に乗っおいる人のために、りサギに぀いおの興味深い瞬間ロシア語の「りサギ」の翻蚳をいく぀か曞きたす。



RabbitMQ自䜓に぀いおは説明したせんので、これをただ知らない堎合は、次の翻蚳をお読みください。



蚘事1

セクション2

セクション3

セクション4

セクション5



真珠やpythonの䟋を恐れおはいけたせん-これは恐ろしいこずではありたせん。すべおは゜ヌスコヌドから十分に明らかです。



+やがおそれを読んだずき、すべおが十分に詳现に蚘述されおいるので、コヌドを粟神的に解釈しお、どのように、そしおなぜかを理解するのに十分でした。



消費者が䜕であるか、そしおその䞭に$ em-> clear+ gc_collect_cyclesを実行し、デヌタベヌス接続を閉じる必芁がある理由をすでに知っおいる堎合、ほずんどの堎合、新しいこずは孊習したせん。 この蚘事は、AMQPプロトコルに察凊したくないが、今すぐキュヌを適甚する必芁がある人や、䜕らかの理由でRabbitMQでの遞択が気にせず、同じ軜量のbeanstalkdである可胜性が高いです。

マむクロサヌビスアヌキテクチャがあり、AMQPを介しおコンポヌネント間の通信を溶接する方法、RPCを矎しくする方法を教えおくれるず期埅しおいる堎合、私自身はこのようなこずを非垞に長い間埅っおいたした...



RabbitMQを䜿甚しおキュヌ内のメヌルにメッセヌゞを送信し、フォヌルトトレランスを提䟛するタスクがありたすメヌルサヌバヌがタむムアりトたたは他の䜕かで応答した堎合、30秒埌にタスクを再床実行する必芁がありたす。



したがっお、 バンドルをむンストヌルしたす 。



AppKernelのcomposer requireコマンドず行をコピヌする方法を説明するのが面倒です。



あなた自身がこれを実行し、バンドルの構成を開始する準備ができおいるこずを本圓に願っおいたす。



そうでない堎合、最小の完党なガむドは次のずおりです。
RabbitMQのむンストヌル

echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management
      
      





これで、 localhost 15672をaccountguest guestの䞋で開き、すぐに理解し、男のように感じる倚くのクヌルなものを芋るこずができたす。



次に、バンドル自䜓をむンストヌルしたす。



 composer require php-amqplib/rabbitmq-bundle
      
      





それをアプリケヌションに登録したす。



 // app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); }
      
      





以䞊です。







バンドル構成



 old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 keepalive: false heartbeat: 0 use_socket: true producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } consumers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } queue_options: { name: 'notification.v1.send_email' } callback: app.consumer.mail_sender
      
      





ここでは、生産者ず消費者に倧きな泚意を払う必芁がありたす。 非垞に短く単玔な堎合プロデュヌサヌはRabbitMQを介しおコンシュヌマヌにメッセヌゞを送信し、コンシュヌマヌはこれらのメッセヌゞを受信しお​​凊理したす。 ここで、exchange_optionsは亀換機のオプションです蚘事の冒頭にあるrabbitmqに関する蚘事を読みたしたか。Queue_optionsはキュヌのオプションです同様に。 コンシュヌマヌのコヌルバックにも泚意を払う䟡倀がありたす。ConsumerInterfaceメッセヌゞ匕数を指定した実行メ゜ッドを拡匵するサヌビスIDは次のずおりです。



なぜなら これたでのずころ、あなたはそれを持っおいたせん。アプリケヌションを起動するか、コンテナをコンパむルするず、サヌビスが芋぀からなかったずいういく぀かのDI䟋倖を受け取りたすが、それを芁求したす。 それでは、サヌビスを䜜成したしょう。



 #app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer
      
      





そしお、クラス自䜓



 namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { echo '     : '.$msg->getBody().PHP_EOL; echo ' !...'; } }
      
      





さお、あなたは私が蚘事にSwiftMailerを䜿甚する方法を含めなかったこずに腹を立おたせんでしたか :)ここでは、メッセヌゞキュヌを介しお非同期的に行が配信されるこずが重芁です。この行の凊理方法はビゞネスです。 メヌルはケヌスの䞀䟋です。



消費者に文字列を枡す方法は これを行うには、テストコマンドを䜜成したしょう。



 namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('    ...'); }
      
      





繰り返したすが、私はあなたのために矎しい型でコントロヌラヌを䜜成しなかったこずをおizeびしたす-私はこれが面倒です。 はい、倚すぎたす。 しかし、私は経枈的に悪いので、描くのが奜きです。理論ずアプリケヌションアヌキテクチャを少し倢芋おいたす。 気が散る。



次に、コンシュヌマを起動し、RabbitMQからのメッセヌゞを埅぀ように圌に呜什したす。



 bin/console rabbitmq:consumer send_email -vvv
      
      





そしお、テストチヌムからメッセヌゞを送信したす。



 bin/console app:test-consumer
      
      





そしお今、rabbitmq消費者のプロセスで、私たちのメッセヌゞを芋るこずができたす そしお、その疑䌌ディスパッチは成功したした。



次に、゚ラヌが発生した堎合に遅延メッセヌゞ凊理を実装する方法を芋おみたしょう。 保留䞭のメッセヌゞにRabbitMQプラグむンを䜿甚したせん。 これを実珟するには、新しいキュヌを䜜成したす。このキュヌでは、メッセヌゞの有効期間を30秒に指定し、次の蚭定を蚭定したす。死埌-メむンキュヌに転送したす。



新しいプロデュヌサヌを远加するだけです



 producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } delayed_send_email: connection: default exchange_options: name: 'notification.v1.send_email_delayed_30000' type: direct queue_options: name: 'notification.v1.send_email_delayed_30000' arguments: x-message-ttl: ['I', 30000] x-dead-letter-exchange: ['S', 'notification.v1.send_email']
      
      





それでは、コンシュヌマのロゞックを倉曎したしょう。



 namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer */ public function __construct(ProducerInterface $delayedProducer) { $this->delayedProducer = $delayedProducer; } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo '     '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } } }
      
      





䞀般に、出力にLoggerInterfaceを䜿甚するず䟿利です。たた、矎しくおスケヌラブルです。

しかし、私たちは怠けすぎで、远加の「思考」を䜜りたくありたせんよね ただ知っおいる。



ここで、遅延キュヌのプロデュヌサヌをスキップする必芁がありたす。



 #app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']
      
      





そしお、コマンドを倉曎したす。



 namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish(', ...'); $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad'); } }
      
      





今、圌女は通垞のメッセヌゞずずもに、悪いメッセヌゞを送信したす。



実行するず、次の出力が衚瀺されたす。



      , ...  ...      bad... ERROR
      
      





30秒埌、凊理メッセヌゞが再び衚瀺されたす。

      bad... ERROR
      
      





そしお無限に。 最倧詊行回数などのロゞック 自分で考えおください。 次に、販売ずいく぀かの機胜に関するヒントをいく぀か玹介したす。



今あなたの販売のためのヒント



1凊理の詊行回数を最倧にしおトピックから逞​​脱するこずなく、䜜業䞭のコンテキストのすべおの䟋倖の102に泚意しおください 再凊理が必芁な堎合ずそうでない堎合を想像しおください。そうでない堎合は、ログからゎミになり、䜕が起こっおいるのか理解できたせん。 RabbitMQで実際のデヌタの通垞のタスクを䜿甚しお、beatられたタスクが回転しおいる堎合、コンシュヌマヌのコヌドを曎新しお再起動するこずなく、砎損したタスクを束葉杖なしで砎棄するこずはできたせん。 すぐに考え盎しおください。 この堎合、䞀郚のSMTPTimeOutExceptionのみをキャッチするのが正しいでしょう。

たた、このようなモデルでは、次のこずを理解するこずが重芁です。最初の段階で-1぀の「䜕かの状態を倉曎するためのグロヌバルな責任」。 危険なタスクをワヌカヌに䞎えすぎないでください。 1Cを䜿甚するオプションを怜蚎する堎合、問題は次のようになりたすたずえば、補品を1Cに正垞にたたは正垞に倉曎しない堎合、たたは補品を1Cに远加する堎合、デヌタベヌスに䜕かを曞き蟌みたすたずえば、最埌に成功した同期の日付たたは倱敗した日付など。 すなわち ここでは、1Cデヌタベヌスずアプリケヌションデヌタベヌスの2぀のデヌタベヌスが同時に曎新されたす。 すべおが1Cで正垞に䜜成された堎合、「最埌に成功した同期の日付」フィヌルドがデヌタベヌスで曎新されたず仮定したす-ホップ、゚ラヌがポップアップし、デヌタベヌスサヌバヌが応答したせん-タスクが「埌で」延期され、デヌタベヌスが応答し始めるたで繰り返されたす。 同時に、1Cでの゚ンティティの䜜成に関連する「サブタスク」が正垞に実行されるたびに、サむトデヌタベヌスぞの曞き蟌み詊行が倱敗するたびに、これは間違っおいたす。



2RabbitMQを䜿甚しおいるため、耐久性に぀いおお読みください。 PSバンドル構成、特にexchange_optionsおよびqueue_optionsの「耐久性のある」フラグをtrueたたはfalseずしお起動したす



3プログラムが䜜業を完了した埌、あなたの人生はすべおデヌタベヌスぞの接続を閉じたす。 たた、EMクリヌンアップを実行し、ガベヌゞコレクタヌの埌にリンクをクリヌンアップしたす。 すなわち 最終的に、消費者は次のようになりたす。



 class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; private $entityManager; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer * @param EntityManagerInterface $entityManager */ public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager) { $this->delayedProducer = $delayedProducer; $this->entityManager = $entityManager; gc_enable(); } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo '     '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo ' ...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } $this->entityManager->clear(); $this->entityManager->getConnection()->close(); gc_collect_cycles(); } }
      
      





コンシュヌマヌはデヌモンのように動䜜するため、リンクを絶えず栌玍し、デヌタベヌスぞの接続を維持するのは悪いこずです。 MySQLの堎合、MySQLサヌバヌがなくなっおしたいたす。



4保留䞭のメッセヌゞモデルが予想倖にビゞネスを殺す可胜性がある理由をよく考えおください。 たずえば、管理パネルで補品を倉曎するずきに、これらの倉曎を1Cのキュヌに流し蟌むメカニズムがありたす。 ここで状況を想像しおください。管理者が補品を倉曎したす->タスク1が䜜成され、1Cデヌタベヌスの同じデヌタを倉曎しようずしたす。 サヌバヌ1Cが応答しないため、すべおが機胜するたでタスクは単玔にシフトしたす。 この間、管理者は同じ補品で他の䜕かを修正するこずを決定したした。 タスク2が蚘録されたす。



ここで、タスク1ず2が1぀ず぀実行され、延期される状況を想像しおください。



タスク2が完了するたでに1Cが動䜜しおいる堎合はどうなりたすか タスクが完了し、最新の倉曎があふれたす。 次に、タスク1が䜿甚され、安定した倉曎が消去されたす:)

終了タむムスタンプをバヌゞョンずしお送信し、タスクが「過去から」の堎合、それを砎棄したす。



5非同期状態になりたす-競合状態、さたざたなマシンでの消費者の䞍䞀臎など、倚くのアヌキテクチャ䞊の問題に぀いおお読みください。



6キュヌにバヌゞョンを曞き蟌みたす...うわヌ、それが実際の補品でどのように圹立぀か。 原則ずしお、この䟋ではそれを行いたした。



7おそらく、RabbitMQずAMQPプロトコル党䜓は必芁ありたせん。 beanstalkdに泚目しおください。



8スヌパヌバむザヌを介しおphpでコンシュヌマヌおよびその他すべおの悪魔を実行し、そのプロセスのフォヌルの完党なロギングを接続したす。 圌はたた、このすべおのビゞネスを管理するためのWebむンタヌフェヌスを持っおいたす。 垞に問題がありたす。



All Articles