C ++のRabbitMQチュートリアル

rabbitmq.comチュートリアルセクションでは、さまざまな言語での実装例を示していますが、C ++はありません。 ネコの下では、ネタバレの下で翻訳されたマニュアル、資料、コードへのリンクが収集されます。



GitHubインターフェースからコードを表示する方が便利な場合は、すぐにリポジトリにアクセスできます



この資料では、 AMQP-CPPおよびPOCO C ++のクライアント実装を使用して、ソケットを操作します。



「RabbitMQチュートリアル1-Hello World」



receive.cpp
#include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareQueue("hello"); channel.consume("hello", AMQP::noack).onReceived( [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] Received "<<message.message() << std::endl; }); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; }
      
      







send.cpp
 #include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.onReady([&]() { if(handler.connected()) { channel.publish("", "hello", "Hello World!"); std::cout << " [x] Sent 'Hello World!'" << std::endl; handler.quit(); } }); handler.loop(); return 0; }
      
      







「RabbitMQチュートリアル2-タスクキュー」



worker.cpp
 #include <iostream> #include <algorithm> #include <thread> #include <chrono> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.setQos(1); channel.declareQueue("task_queue", AMQP::durable); channel.consume("task_queue").onReceived( [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { const auto body = message.message(); std::cout<<" [x] Received "<<body<<std::endl; size_t count = std::count(body.cbegin(), body.cend(), '.'); std::this_thread::sleep_for (std::chrono::seconds(count)); std::cout<<" [x] Done"<<std::endl; channel.ack(deliveryTag); }); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; }
      
      







new_task.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { AMQP::Envelope env(msg); env.setDeliveryMode(2); channel.publish("", "task_queue", env); std::cout<<" [x] Sent '"<<msg<<"'\n"; handler.quit(); }; channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback); handler.loop(); return 0; }
      
      







「RabbitMQチュートリアル3-発行/購読」



receive_logs.cpp
 #include <iostream> #include "SimplePocoHandler.h" int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] "<<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { channel.bindQueue("logs", name,""); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }; AMQP::SuccessCallback success = [&]() { channel.declareQueue(AMQP::exclusive).onSuccess(callback); }; channel.declareExchange("logs", AMQP::fanout).onSuccess(success); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; }
      
      







emit_log.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("logs", AMQP::fanout).onSuccess([&]() { channel.publish("logs", "", msg); std::cout << " [x] Sent "<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; }
      
      







「RabbitMQチュートリアル4-ルーティング」



receive_logs_direct.cpp
 #include <iostream> #include <algorithm> #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { if(argc==1) { std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl; return 1; } SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("direct_logs", AMQP::direct); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] " <<message.routingKey() <<":" <<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { std::for_each(&argv[1], &argv[argc], [&](const char* severity) { channel.bindQueue("direct_logs","", severity); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }); }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; }
      
      







emit_log_direct.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string severity = argc > 2 ? argv[1] : "info"; const std::string msg = argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]() { channel.publish("direct_logs", severity, msg); std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; }
      
      







「RabbitMQチュートリアル5-トピック」



receive_logs_topic.cpp
 #include <iostream> #include <algorithm> #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { if(argc==1) { std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl; return 1; } SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("topic_logs", AMQP::topic); auto receiveMessageCallback = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout <<" [x] " <<message.routingKey() <<":" <<message.message() << std::endl; }; AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { std::for_each(&argv[1], &argv[argc], [&](const char* bindingKeys) { std::cout<<bindingKeys<<std::endl; channel.bindQueue("topic_logs",name, bindingKeys); channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback); }); }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); std::cout << " [*] Waiting for messages. To exit press CTRL-C\n"; handler.loop(); return 0; }
      
      







emit_log_topic.cpp
 #include <iostream> #include "SimplePocoHandler.h" #include "tools.h" int main(int argc, const char* argv[]) { const std::string msg = argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!"; const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info"; SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]() { channel.publish("topic_logs", routing_key, msg); std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl; handler.quit(); }); handler.loop(); return 0; }
      
      







「RabbitMQチュートリアル6-リモートプロシージャコール」



rpc_server.cpp
 #include <iostream> #include <algorithm> #include <thread> #include <chrono> #include "SimplePocoHandler.h" int fib(int n) { switch (n) { case 0: return 0; case 1: return 1; default: return fib(n - 1) + fib(n - 2); } } int main(void) { SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); channel.setQos(1); channel.declareQueue("rpc_queue"); channel.consume("").onReceived([&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { const auto body = message.message(); std::cout<<" [.] fib("<<body<<")"<<std::endl; AMQP::Envelope env(std::to_string(fib(std::stoi(body)))); env.setCorrelationID(message.correlationID()); channel.publish("", message.replyTo(), env); channel.ack(deliveryTag); }); std::cout << " [x] Awaiting RPC requests" << std::endl; handler.loop(); return 0; }
      
      







rpc_client.cpp
 #include <iostream> #include "tools.h" #include "SimplePocoHandler.h" int main(int argc, const char* argv[]) { const std::string correlation(uuid()); SimplePocoHandler handler("localhost", 5672); AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/"); AMQP::Channel channel(&connection); AMQP::QueueCallback callback = [&](const std::string &name, int msgcount, int consumercount) { AMQP::Envelope env("30"); env.setCorrelationID(correlation); env.setReplyTo(name); channel.publish("","rpc_queue",env); std::cout<<" [x] Requesting fib(30)"<<std::endl; }; channel.declareQueue(AMQP::exclusive).onSuccess(callback); auto receiveCallback = [&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { if(message.correlationID() != correlation) return; std::cout<<" [.] Got "<<message.message()<<std::endl; handler.quit(); }; channel.consume("", AMQP::noack).onReceived(receiveCallback); handler.loop(); return 0; }
      
      







UPD: 2015.04.09修正:プリフェッチカウントが正しくインストールされました。 作業チュートリアル2。 コードはg ++ 4.7でビルドされます



All Articles