heka + elasticsearch + kibanaを使用した集中アプリケーションログ

この記事では、かなり新しいHekaプロジェクトを使用して、さまざまなタイプのアプリケーション(Python、Java(java.util.logging)、Go、bash)の中央ロギングを構成する方法について説明します。



HekaはMozillaで開発され、Goで書かれています。 そのため、同様の機能を持つlogstashの代わりに使用します。



Hekaは、5つのタイプのプラグインに基づいています。

  1. 入力-何らかの方法でデータを受信します(ポートをリッスンし、ファイルを読み取ります)。
  2. デコーダ-着信要求を処理し、メッセージの内部構造に変換します。
  3. フィルター-メッセージでアクションを実行します。
  4. エンコーダー(翻訳方法が不明)-内部メッセージを出力プラグインを介して送信される形式にエンコードします。
  5. 週末-どこかにデータを送信します。


たとえば、Javaアプリケーションの場合、入力プラグインはLogstreamerInputであり、ログファイルへの変更を探します。 ログの新しい行はPayloadRegexDecoderデコーダーによって(指定された形式に従って)処理され、ElasticSearchOutput出力プラグインを介してelasticsearchに送信されます。 出力プラグインは、内部構造からESJsonEncoderを介してelasticsearch形式にメッセージをエンコードします。



ヘカのインストール



すべてのインストール方法は、Webサイト( http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries )で説明されています。 しかし、最も簡単な方法は、 https://github.com/mozilla-services/heka/releasesページからシステムの既製のバイナリパッケージをダウンロードすることです。



私はubuntuの下でサーバーを使用しているため、説明はこのシステムのものになります。 この場合、インストールは、debパッケージ自体のインストール、/ etc / hekad.toml構成ファイルの構成、およびupstartのサービスへの追加に要約されます。



/etc/hekad.tomlの基本設定には、プロセスの数(コアの数を設定します)、ダッシュボード(プラグインを表示できます)、およびポート5565のudpサーバーが含まれます。 python and go applications):



maxprocs = 4 [Dashboard] type = "DashboardOutput" address = ":4352" ticker_interval = 15 [UdpInput] address = "127.0.0.1:5565" parser_type = "message.proto" decoder = "ProtobufDecoder"
      
      





upstart /etc/init/hekad.confの構成:



 start on runlevel [2345] respawn exec /usr/bin/hekad -config=/etc/hekad.toml
      
      







Pythonアプリケーションのロギング



これは、 https://github.com/kalail/heka-pyライブラリとロギングモジュール用の特別なハンドラーを使用します 。 コード:



 import logging from traceback import format_exception try: import heka HEKA_LEVELS = { logging.CRITICAL: heka.severity.CRITICAL, logging.ERROR: heka.severity.ERROR, logging.WARNING: heka.severity.WARNING, logging.INFO: heka.severity.INFORMATIONAL, logging.DEBUG: heka.severity.DEBUG, logging.NOTSET: heka.severity.NOTICE, } except ImportError: heka = None class HekaHandler(logging.Handler): _notified = None conn = None host = '127.0.0.1:5565' def __init__(self, name, host=None): if host is not None: self.host = host self.name = name super(HekaHandler, self).__init__() def emit(self, record): if heka is None: return fields = { 'Message': record.getMessage(), 'LineNo': record.lineno, 'Filename': record.filename, 'Logger': record.name, 'Pid': record.process, 'Exception': '', 'Traceback': '', } if record.exc_info: trace = format_exception(*record.exc_info) fields['Exception'] = trace[-1].strip() fields['Traceback'] = ''.join(trace[:-1]).strip() msg = heka.Message( type=self.name, severity=HEKA_LEVELS[record.levelno], fields=fields, ) try: if self.conn is None: self.__class__.conn = heka.HekaConnection(self.host) self.conn.send_message(msg) except: if self.__class__._notified is None: print "Sending HEKA message failed" self.__class__._notified = True
      
      





デフォルトでは、ハンドラーはHekaがポート5565でリッスンすることを想定しています。



Goアプリケーションのロギング



ロギングについては、 https://github.com/ivpusic/gologをロギングするためのライブラリを分岐し、Hekaに直接メッセージを送信する機能を追加しました。 結果はここにあります: https : //github.com/ildus/golog



使用法:



 import "github.com/ildus/golog" import "github.com/ildus/golog/appenders" ... logger := golog.Default logger.Enable(appenders.Heka(golog.Conf{ "addr": "127.0.0.1", "proto": "udp", "env_version": "2", "message_type": "myserver.log", })) ... logger.Debug("some message")
      
      







Javaアプリケーションのロギング



Javaアプリケーションの場合、特別な正規表現デコーダーを備えたLogstreamerInputタイプの入力プラグインが使用されます。 特定の形式で書き込まれる必要があるファイルからアプリケーションログを読み取ります。



ログの読み取りとデコードを行うhekaの構成:



 [JLogs] type = "LogstreamerInput" log_directory = "/some/path/to/logs" file_match = 'app_(?P<Seq>\d+\.\d+)\.log' decoder = "JDecoder" priority = ["Seq"] [JDecoder] type = "PayloadRegexDecoder" #Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started match_regex = '^(?P<LoggerName>[\w\.]+)\[(?P<Severity>[AZ]+)\|(?P<Thread>[\w\d\-]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [JDecoder.severity_map] SEVERE = 3 WARNING = 4 INFO = 6 CONFIG = 6 FINE = 6 FINER = 7 FINEST = 7 [JDecoder.message_fields] Type = "myserver.log" Message = "%Message%" Logger = "%LoggerName%" Thread = "%Thread%" Payload = ""
      
      







アプリケーションでは、logging.propertiesを使用してFormatterを変更する必要があります。 logging.propertiesの例:



 handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler java.util.logging.FileHandler.level=ALL java.util.logging.FileHandler.pattern = logs/app_%g.%u.log java.util.logging.FileHandler.limit = 1024000 java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter java.util.logging.FileHandler.append=tru java.util.logging.ConsoleHandler.level=ALL java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter
      
      







BriefLogFormatterコード:



 package com.asdf; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.logging.Formatter; import java.util.logging.LogRecord; public class BriefLogFormatter extends Formatter { private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final String lineSep = System.getProperty("line.separator"); /** * A Custom format implementation that is designed for brevity. */ public String format(LogRecord record) { String loggerName = record.getLoggerName(); if(loggerName == null) { loggerName = "root"; } StringBuilder output = new StringBuilder() .append(loggerName) .append("[") .append(record.getLevel()).append('|') .append(Thread.currentThread().getName()).append('|') .append(format.format(new Date(record.getMillis()))) .append("]: ") .append(record.getMessage()).append(' ') .append(lineSep); return output.toString(); } }
      
      







スクリプトロギング(bash)



bashの場合、hekaはTcpInput入力フィルター(特定のポートでリッスンする)とメッセージをデコードするためのPayloadRegexDecoderを追加します。 hekad.tomlの構成:



 [TcpInput] address = "127.0.0.1:5566" parser_type = "regexp" decoder = "TcpPayloadDecoder" [TcpPayloadDecoder] type = "PayloadRegexDecoder" #Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6 match_regex = '^(?P<LoggerName>[\w\.\-]+)\[(?P<Hostname>[^\|]+)\|(?P<Severity>[AZ]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)' timestamp_layout = "2006-01-02 15:04:05" timestamp_location = "Europe/Moscow" [TcpPayloadDecoder.severity_map] ERROR = 3 WARNING = 4 INFO = 6 ALERT = 1 [TcpPayloadDecoder.message_fields] Type = "scripts" Message = "%Message%" Logger = "%LoggerName%" Hostname = "%Hostname%" Payload = "[%Hostname%|%LoggerName%] %Message%"
      
      





ロギングのために、指定された形式でメッセージをTCPポートに送信する関数が作成されました。



 log() { if [ "$1" ]; then echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 127.0.0.1 5566 || true echo $1 fi }
      
      





タイプapp1でINFOレベルのメッセージを送信します。



 log "test test test"
      
      







elasticsearchへのレコードの送信



次の構成がhekad.confに追加されます。



 [ESJsonEncoder] index = "heka-%{Type}-%{2006.01.02}" es_index_from_timestamp = true type_name = "%{Type}" [ElasticSearchOutput] message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'" server = "http://<elasticsearch_ip>:9200" flush_interval = 5000 flush_count = 10 encoder = "ESJsonEncoder"
      
      





ここでは、elasticsearchの場所、インデックスの作成方法、そこに送信するメッセージの種類を示します。



ログを見る



ログを表示するには、Kibana 4を使用しますが、まだベータ版ですが、すでにかなり機能しています。 インストールするには、ページhttp://www.elasticsearch.org/overview/kibana/installation/からアーカイブをダウンロードし、サーバー上のフォルダーに解凍し、config / kibana.ymlファイル(elasticsearch_urlパラメーター)でelasticsearchサーバーの場所を指定する必要があります。



最初の起動時に、[設定]タブでインデックスを追加する必要があります。 インデックステンプレートを追加してフィールドを正しく定義できるようにするには、各アプリケーションからテストメッセージを送信する必要があります。 次に、heka-*(すべてのメッセージを表示します)またはheka-scripts- *の形式のインデックステンプレートを追加して、アプリケーションを互いに分離できます。 次に、[検出]タブに移動して、レコード自体を確認できます。



おわりに



Hekaで記録できる内容の一部のみを示しました。

興味のある方は、Ansible構成の一部を表示できます。この構成では、すべてのサーバー、および選択したElasticsearch with kibanaにhekaが自動的にインストールされます。



All Articles