KSQL、Raspberry Pi、およびラジオを使用したノイズのマッピング





一見したところ、この物語には、飛行機、愛、少しスパイ、そして最後に猫(より正確には猫)という3月8日の前夜のロマンチックな断食のステータスを獲得するためのすべてが含まれています。 これらすべてが、Kafka、KSQL、および実験「ホームインフォメーションテクノロジーを使用して最も騒がしい航空機を見つける方法」に直接関連していることを想像するのは困難です。 難しいことですが、必要です。サイモン・オベリーが実施したこのような実験であり、彼の著者の記事をプロセスのすべての詳細の説明とともに翻訳しました。



スノーフレークという名前の新しい猫が早起きします。 家の上を飛んでいる飛行機の音が彼女を目覚めさせます。 しかし、Apache Kafka、KSQL、およびRaspberry Piを使用して、猫が目を覚ましている飛行機を特定できたらどうでしょうか? 面白い追跡パネルを作成して、猫が注意を向けることができたら嬉しいです。



一般的に





KafkaとKSQLを使用して、飛行機を空からグラフィックスに転送します



航空機はGPS受信機を使用して位置を特定します。 オンボード送信機は、短い無線送信を使用して、船舶の位置、識別番号、高度、速度を定期的に報告します。 これらのブロードキャスト自動依存監視( AZN-V )のブロードキャストは、基本的に地上局からのアクセス用に開かれたデータパケットです。



Raspberry Piなどの1つのマイクロコンピューター、およびいくつかの補助コンポーネントは、私の家の上を走り回っている航空機のオンボード送信機からメッセージを受信するために必要なものすべてです。



航空機の空中信号は、絡み合ったメッセージのボールのように見え、システム化が必要です。 これらの混oticとしたデータストリームを認識することは、騒々しいパーティーで会話を聞くようなものです。 したがって、猫を心配させる飛行機を見つけるために、KafkaとKSQLの組み合わせを使用することにしました。





覚醒した猫とラズベリーパイ



Raspberry Piを使用したAZN-B測定値のコレクション



オンボード送信を収集するために、私はRaspberry PiとRTL2832Uを使用しました。RTL2832Uは、コンピューターでデジタルテレビを視聴するためのデバイスとして元々販売されていたUSBモデムです。 Raspberry Piにdump1090をインストールしました 。これは、小さなアンテナを使用してRTL2832U経由でAZN-Vからデータを受信するプログラムです。





Raspberry PiおよびRTL2832Uからの私のソフトウェアラジオ



AZN-B信号をKafkaテーマに変換する



生のAZN-B信号のストリームを受信したので、トラフィックに注意を払う必要があります。 Raspberry Piには本格的なコンピューティングに十分なパワーがないため、Kafkaのローカルクラスターにデータ処理を転送する必要がありました。







受信したメッセージは、 ロケーションメッセージ 、またはボードの識別に関する メッセージに分割されます 。 場所は次の形式のメッセージのように見えます。 「7c6db8ボードは、座標-33.8,151.0で高度6,250フィートで飛行します。 ボードの識別に関する情報は次のようになります。 「ボード7c451cはルートQJE1726に沿って飛行します。



Raspberry Pi用の小さなPythonスクリプトは、着信AZN-Bメッセージをすべて共有します。 Confluent Rest Proxyプロキシを使用して、Raspberry PiからKafkaのlocation-topicおよびident-topicテーマにデータを配布しました。 プロキシサーバーは、Kafkaクラスター用のRESTfulインターフェイスを提供します。これにより、Piで簡単なREST呼び出しを使用してメッセージを簡単に作成できます。







私はどの飛行機が私の屋根の上を飛ぶのか、そしてどのルートを理解したかったのです。 OpenFlightsデータベースを使用すると、国際民間航空機関(ICAO)によって割り当てられた7C6DB8などの空港コードを航空機のタイプ(この場合はボーイング737)と比較できます 。 マッピングデータicao-to-aircraftテーマにアップロードしました



KSQLは、Apache Kafkaトピックに関するデータをリアルタイムで処理できるようにする「SQLエンジン」を提供します。 たとえば、オンボードコード7C6DB8を見つけるには、次のクエリを記述できます。



CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI
      
      





同様に、 コールサインの詳細のトピックで、コールサインをダウンロードしました(つまり、QFA563、これはブリスベンからシドニーへのカンタス航空のフライトです)。



 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney
      
      





次に、 場所トピックのデータストリームを見てみましょう。 ここでは、飛行中の飛行機の位置に関する着信メッセージの絶え間ない流れを観察できます。



 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
      
      





KSQLクエリは次のようになります。

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
      
      





KSQL:ストリームの調和...



KSQLの本当の価値は、着信ロケーションデータストリームをトピックソースデータと結合できることです( 03_ksql.sqlを参照) 。つまり 、生データストリームに有用な情報を追加します。 これは、従来のデータベースの「左結合」に非常に似ています。 その結果、Javaコードを1行も使わずに作成された別のKafkaテーマができました!



ソース> CREATE STREAM location_and_details_stream AS \

SELECT l.ico、l.height、l.location、t.aircraft \

FROM location_stream l \

LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;

さらに、KSQLクエリを受け取ります。 データストリームは次のようになります。



 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
      
      





さらに、 コールサイン入力ストリームを固定のcallsign_detailsテーマと組み合わせることができます。



 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
      
      





現在、2つの有益なトピックがあります。



  1. location_and_details_streamは、航空機の位置と速度に関する更新された情報のストリームを提供します。
  2. ident_callsign_stream 。航空会社や目的地など、フライトの詳細を記述します。


これらの絶え間なく更新されるテーマを使用して、優れた概要パネルを作成できます。 Kafka Connectを使用して、KSQLによって作成されたKafkaテーマをElasticsearchにアップロードしました(完全なスクリプトはこちら )。



Kibanaダッシュボード



これは、地図上の飛行機の位置を示すダッシュボードの例です。 さらに、航空会社ごとのチャート、飛行高度のグラフ、および主要な目的地ごとのワードクラウドを表示できます。 ヒートマップは、航空機が集中しているエリア、つまりノイズレベルが最も高いエリアを示します。







猫に戻る



今日、猫は午前6時頃に目を覚ました。 KSQLを使用すると、この家の上空で高度3,500フィート未満で飛行した飛行機を見つけることができますか?



 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
      
      





すごい! 午前6時15分に屋根の上の飛行機を特定できます。 スノーフレークは、ドバイに飛んだエアバスA380(ちなみに巨大なライナー)によって目が覚めたことがわかりました。



わずか数日間の休みで、KSQLを使用したスト​​リーミング処理システムができました。 さらに、これにより、興味深いデータイベントをすばやく見つけることができます。 スノーフレークはそれらについて懐疑的かもしれませんが。










All Articles