Metric Processing Tricks in Kapacitor

Most likely, today no one has a question why they need to collect service metrics. The next logical step is to configure the alert for the collected metrics, which will notify you of any deviations in the data in the channels convenient for you (mail, Slack, Telegram). In the online reservation service of Ostrovok.ru hotels, all metrics of our services are poured into InfluxDB and displayed in Grafana, basic alert is also set there. For tasks like “you need to calculate something and compare with it” we use Kapacitor.









Kapacitor is part of the TICK stack that can process metrics from InfluxDB. He can connect several measurements (join), calculate something useful from the received data, write the result back to InfluxDB, send an alert to Slack / Telegram / mail.



The whole stack has cool and detailed documentation , but there are always useful things that are not explicitly specified in the manuals. In this article, I decided to collect a number of such useful non-obvious tips (the basic TICKscipt syntax is described here ) and show how they can be applied, using an example of solving one of our problems.



Go!



float & int, calculation errors



Absolutely standard problem, is solved through castes:



var alert_float = 5.0 var alert_int = 10 data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))
      
      





Using default ()



If the tag / field is not filled, errors will occur in the calculations:



 |default() .tag('status', 'empty') .field('value', 0)
      
      





fill in join (inner vs outer)



By default, join will drop points where there is no data (inner).

With fill ('null'), outer join will be performed, after which you need to do default () and fill in the empty values:



 var data = res1 |join(res2) .as('res1', 'res2) .fill('null') |default() .field('res1.value', 0.0) .field('res2.value', 100.0)
      
      





There is still a nuance. If in the example above one of the series (res1 or res2) is empty, the final series (data) will also be empty. There are several tickets on this topic on the github ( 1633 , 1871 , 6967 ) - we are waiting for fixes and suffer a little.



Using conditions in calculations (if in lambda)



 |eval(lambda: if("value" > 0, true, false)
      
      





The last five minutes from the pipeline for the period



For example, you need to compare the values ​​of the last five minutes with the previous week. You can take two packets of data with two separate batch'ami or pull out part of the data from a larger period:



  |where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)
      
      





An alternative for the last five minutes could be to use the BarrierNode node, which cuts off data before the specified time:



 |barrier() .period(5m)
      
      





Examples of using Go'sh patterns in message



The templates correspond to the format from the text.template package, below are some common tasks.



if-else



We put things in order, we don’t trigger people with the text once again:



 |alert() ... .message( '{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}' )
      
      





Two decimal places in message



Improving the readability of the message:



 |alert() ... .message( 'now value is {{ index .Fields "value" | printf "%0.2f" }}' )
      
      





Expanding variables in message



We display more information in the message to answer the question “Why is it yelling?”



 var warnAlert = 10 |alert() ... .message( 'Today value less then '+string(warnAlert)+'%' )
      
      





Unique Alert Identifier



The right thing when there is more than one group in the data, otherwise only one alert will be generated:



 |alert() ... .id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')
      
      





Custom handler's



The large list of handlers has exec, which allows you to execute your script with the passed parameters (stdin) - creativity and more!



One of our custom is a small python script for sending notifications to slack.

First, we wanted to send a picture from a graphana protected by authorization in the message. After - write OK to the thread to the previous alert from the same group, and not as a separate message. A little later - to add in the message the most common mistake in the last X minutes.



A separate topic is communication with other services and any actions initiated by an alert (only if your monitoring works well enough).

An example of a description of a handler, where slack_handler.py is our self-written script:



 topic: slack_graph id: slack_graph.alert match: level() != INFO AND changed() == TRUE kind: exec options: prog: /sbin/slack_handler.py args: ["-c", "CHANNELID", "--graph", "--search"]
      
      





How to debut?



Log output option



 |log() .level("error") .prefix("something")
      
      





Watch (cli): kapacitor -url host-or-ip : 9092 logs lvl = error



Variant with httpOut



Shows data in the current pipeline:



 |httpOut('something')
      
      





Watch (get): host-or-ip : 9092 / kapacitor / v1 / tasks / task_name / something



Execution scheme







Where else can I get a rake



influxdb timestamp on writeback



For example, we configure an alert for the sum of requests per hour (groupBy (1h)) and want to record the incident in influxdb (to beautifully show the problem on the graph in grafana).



influxDBOut () will write the time value from the alert to the timestamp, respectively, the point on the chart will be recorded earlier / later than the alert arrived.



When accuracy is required: we bypass this problem by calling a custom handler, which will write data to influxdb with the current timestamp.



docker, build and deploy



At startup, kapacitor can load tasks, templates, and handlers from the directory specified in the config in the [load] block.



To correctly create a task, the following things are needed:



  1. File name - expands to id / script name
  2. Type - stream / batch
  3. dbrp - a keyword to indicate in which database + policy the script works (dbrp "supplier". "autogen")


If there is no line with dbrp in some batch task, the whole service will refuse to start and honestly write about it in the log.



In chronograf, on the contrary, this line should not be, it is not accepted through the interface and throws an error.



Hack when building the container: Dockerfile exits with -1 if there are lines with //.+dbrp, which will immediately understand the reason for the file when building the build.



join one to many



Example task: you need to take the 95th percentile of the service’s operating time per week, compare every minute of the last 10 with this value.



You cannot join one to many, last / mean / median, by group of points, turn the node into stream, the error “cannot add child mismatched edges: batch -> stream” will return.



The result of batch, as a variable in a lambda expression, is also not substituted.



There is an option to save the desired numbers from the first batch to a file via udf and load this file through sideload.



What did we decide?



We have about 100 suppliers of hotels, each of them can have several connections, let's call it a channel. There are approximately 300 of these channels; each of the channels may fall off. Of all the recorded metrics, we will monitor the error rate (requests and errors).



Why not grafana?



Error alerts that are configured in grafan have several disadvantages. Some critical, some you can close your eyes, depending on the situation.



Grafana does not know how to calculate between dimensions + alert, but we need a rate (requests-errors) / requests.



Errors look vicious:







And less vicious when viewed with successful requests:







Okay, we can pre-calculate the rate in the service before grafana, and in some cases it will do. But not ours, because for each channel, its ratio is considered “normal”, and alerts work according to static values ​​(we look with our eyes, we change, if often alert).



These are examples of “normal” for different channels:











We neglect the previous paragraph and assume that all suppliers have a "normal" picture. Now everything is fine, and can we get by with alerts in grafana?

We can, but really do not want to, because we must choose one of the options:

a) make a lot of charts for each channel separately (and painfully accompany them)

b) leave one chart with all channels (and get lost in colorful lines and tuned alerts)







How did you do it?



Again, the documentation has a good starting example ( Calculating rates across joined series ), you can peek or take as a basis in similar tasks.



What did you do as a result:





In my opinion, we managed as beautifully as possible everything that we would like to get at the output (and even a little more with custom handlers).



On github.com you can see the sample code and the minimal diagram (graphviz) of the resulting script.



Example of the resulting code:
 dbrp "supplier"."autogen" var name = 'requests.rate' var grafana_dash = 'pczpmYZWU/mydashboard' var grafana_panel = '26' var period = 8h var todayPeriod = 10m var every = 1m var warnAlert = 15 var warnReset = 5 var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"' var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"' var prevErr = batch |query(errQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var prevReq = batch |query(reqQuery) .period(period) .every(every) .groupBy(1m, 'channel', 'supplier') var rates = prevReq |join(prevErr) .as('req', 'err') .tolerance(1m) .fill('null') //   ,     |default() .field('err.value', 0.0) .field('req.value', 0.0) // if  lambda:  ,     |eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0)) .as('rate') //      rates |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('rates') //     10 ,   var todayRate = rates |where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod) |median('rate') .as('median') var prevRate = rates |median('rate') .as('median') var joined = todayRate |join(prevRate) .as('today', 'prev') |httpOut('join') var trigger = joined |alert() .warn(lambda: ("prev.median" - "today.median") > warnAlert) .warnReset(lambda: ("prev.median" - "today.median") < warnReset) .flapping(0.25, 0.5) .stateChangesOnly() //   message      .message( '{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }}) {{ if eq .Level "OK" }}It is ok now{{ else }} '+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }} http://grafana.ostrovok.in/d/'+string(grafana_dash)+ '?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00' ) .id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}') .levelTag('level') .messageField('message') .durationField('duration') .topic('slack_graph') // "today.median"   "value",        (keep) trigger |eval(lambda: "today.median") .as('value') .keep() |influxDBOut() .quiet() .create() .database('kapacitor') .retentionPolicy('autogen') .measurement('alerts') .tag('alertName', name)
      
      







And what is the conclusion?



Kapacitor is great at monitoring alert with a bunch of groups, perform additional calculations on already recorded metrics, perform custom actions and run scripts (udf).



The entry threshold is not very high - try it if grafana or other tools do not fully satisfy your Wishlist.



All Articles