SSHを介したリモートアプリケーションとの非同期データ交換

良い一日、友人と同僚。 私の名前は今でもDmitry Smirnovであり、それでも嬉しいことに、私はISPsystemの開発者です。 少し前に、私は完全に新しいプロジェクトに取り組み始めました。それは私に多くのインスピレーションを与えました。なぜなら、新しいプロジェクトはレガシーコードと古いコンパイラのサポートの欠如だからです。 こんにちは、Boost、C ++ 17、およびその他の最新の開発の喜び。



私の過去のプロジェクトはすべてマルチスレッドであったため、非同期ソリューションの経験はほとんどありませんでした。 これは、最新の強力なツールに加えて、この開発で私にとって最も楽しいものになりました。



最後の関連タスクの1つは、 Boost.Asioを使用し、2つ以下のスレッドを生成できる非同期アプリケーションの現実で、 libssh2ライブラリにラッパーを書き込む必要があることでした。 これについて説明します。







注:著者は、読者が非同期開発とブースト:: asioの基本に精通していることを前提としています。



挑戦する



一般的な用語では、タスクは次のとおりでした。rsaキーまたはユーザー名とパスワードを使用してリモートサーバーに接続します。 スクリプトをリモートマシンにアップロードして実行します。 彼の答えを読み、同じ接続を介してコマンドを送信します。 この場合、もちろん、フローをブロックすることなく(これは可能な総プールの半分です)。



免責事項 :PocoはSSHで動作することは知っていますが、Asioと結婚する方法を見つけられませんでした。自分で何かを書くほうが面白かったです:-)。



初期化



ライブラリを初期化して最小化するために、通常のシングルトンを使用することにしました。



初期化()
class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } };
      
      









もちろん、この決定には落とし穴があります。私のお気に入りのハンドブック「C ++であなたの足を撃つ千と一つの方法」によると。 誰かが突くのを忘れたストリームを生成し、メインストリームが早く終了した場合、興味深い特殊効果が発生する可能性があります。 しかし、この場合、この可能性を考慮しません。



キーエンティティ



サンプルを分析した後、小さなライブラリには3つの単純なエンティティ(ソケット、セッション、チャネル)が必要であることが明らかになります。 同期ツールがあると便利なため、ここではAsioを無視します。



簡単なソケットから始めましょう。



ソケット
 class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; }
      
      







今のセッション:



セッション
 class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; }
      
      







これでソケットとセッションができたので、libssh2の現実にソケットの待機関数を書くといいでしょう。



待機ソケット
 int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); }
      
      







実際には、これはポーリングの代わりにselectを使用することを除いて、上記の例と実質的に違いはありません。



チャンネルは残ります。 libssh2にはいくつかのタイプのチャネルがあります:シンプル、SCP、ダイレクトtcp。 最も簡単で基本的なチャネルに興味があります。



チャンネル
 class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; };
      
      







すべての基本的なツールの準備ができたので、ホストとの接続を確立し、必要な操作を実行します。 もちろん、チャネルへの非同期記録と同期は非常に異なりますが、接続を確立するプロセスはそうではありません。



したがって、基本クラスを記述します。



基本的な接続
 class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data) ///<    ,     : m_session(connect_data.enable_compression) , m_connect_data(connect_data) { LibSSH2::Init(); ConnectSocket(); HandShake(); ProcessKnownHosts(); Auth(); } ///       bool CheckSocket(int type) const { pollfd fds{}; fds.fd = m_sock; fds.events = type; return poll(&fds, 1, 0) == 1; } bool WantRead() const { return CheckSocket(POLLIN); } bool WantWrite() const { return CheckSocket(POLLOUT); } /*   ,   ,       *  - . */ void ConnectSocket() {...} void HandShake() {...} void Auth() {...} class Socket m_sock; class Session m_session; class SimpleChannel; SshConnectData m_connect_data; };
      
      







これで、リモートホストに接続してコマンドを実行する最も簡単なクラスを作成する準備ができました。



同期接続
 class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } };
      
      







これまでのところ、私たちが書いたのは、libssh2の例を単純化して、より文明的な形式にすることだけです。 しかし、今では、チャネルにデータを同期的に書き込むためのすべての簡単なツールが揃ったので、Asioに進むことができます。



標準のソケットを使用することは良いことですが、プロセスで独自のビジネスを行っている間に読み取り/書き込みを非同期に待機する必要がある場合、あまり実用的ではありません。 ここで、boost :: asio :: ip :: tcp :: socketが助けとなり、素晴らしい方法があります:



 async_wait(wait_type, WaitHandler)
      
      





これは通常のソケットから素晴らしく構築されており、事前に接続を設定し、アプリケーションの実行コンテキストである:: asio :: io_contextを後押しします。



非同期接続コンストラクター
 class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } };
      
      









ここで、リモートホストでコマンドの実行を開始し、コマンドが到着したらすぐにコールバックに送信する必要があります。



 void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); }
      
      





したがって、コマンドを実行することで、TryRead()メソッドに制御を移します。



 void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); }
      
      





まず、読み取りプロセスが以前の呼び出しによって既に実行されているかどうかを確認します。 そうでない場合、ソケットの読み取り準備が整うことを期待し始めます。 shared_from_this()をキャプチャした通常のラムダが待機ハンドラとして使用されます。



WantRead()への呼び出しに注意してください。 判明したように、Async_waitにも欠陥があり、タイムアウトによって単純に戻ることができます。 この場合の不必要なアクションを回避するために、タイムアウトなしでポーリングを介してソケットをチェックすることにしました-ソケットは実際に読み取りたいですか? そうでない場合は、TryRead()を再度実行して待機します。 それ以外の場合、データの読み取りとコールバックへの転送をすぐに開始します。



 void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; }
      
      





したがって、実行中のアプリケーションから無限の非同期読み取りサイクルが開始されます。 次のステップは、アプリケーションに指示を送信することです。



 void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); }
      
      





非同期記録に転送されたデータとコールバックは、接続内に保存されます。 そして次のサイクルを実行します。今回はエントリのみです:



記録周期
 void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; }
      
      







したがって、データがすべて正常に転送されるまで、チャネルにデータを書き込みます。 次に、制御を呼び出し元に戻し、新しいデータを転送できるようにします。 そのため、ホスト上の一部のアプリケーションに命令を送信できるだけでなく、たとえば、スレッドをブロックせずに、任意のサイズのファイルを小さな部分でアップロードすることもできます。これは重要です。



このライブラリを使用すると、出力を読み取り、さまざまなコマンドを送信しながら、ファイルシステムへの変更を追跡するスクリプトをリモートサーバーで正常に実行できました。 一般的に:Boostを使用して、siスタイルライブラリを最新のC ++プロジェクトに適合させる非常に貴重な経験。



経験豊富なBoost.Asioユーザーのヒントを読んで、私のソリューションをさらに学び、改善してください:-)。



All Articles