Automate flow delivery in Apache NiFi

Hello!







The task is as follows - there is flow, presented in the picture above, which must be rolled out to N servers with Apache NiFi . Flow test - file is being generated and sent to another NiFi instance. Data is transmitted using the NiFi Site to Site protocol.




NiFi Site to Site (S2S) is a secure, easily customizable way to transfer data between NiFi instances. See how the S2S works in the documentation and it is important not to forget to configure the NiFi instance to enable S2S see here .



In those cases when it comes to data transfer using S2S - one instance is called the client, the second server. Client sends data, server sends. Two ways to configure data transfer between them:



  1. Push From a client instance, data is sent using the Remote Process Group (RPG). On a server instance, data is received using the Input Port.
  2. Pull The server receives data using the RPG, the client sends using the Output port.



We store the flow for rolling in the Apache Registry.




Apache NiFi Registry is a subproject of Apache NiFi that provides a tool for flow storage and version control. A sort of git. Information on installing, configuring, and working with the registry can be found in the official documentation . Flow for storage is combined into a process group and stored as such in the registry. Further in the article we will return to this.




At the start, when N is a small number, flow is delivered and updated by hands in an acceptable time.



But with the growth of N, there are more problems:



  1. updating flow takes more time. It is necessary to go to all servers
  2. there are errors updating templates. Here they updated, but here they forgot
  3. human errors when performing a large number of operations of the same type


All this leads us to the fact that we need to automate the process. I tried the following ways to solve this problem:



  1. Use MiNiFi instead of NiFi
  2. NiFi CLI
  3. NiPyAPI


Using MiNiFi



Apache MiNiFy is a subproject of Apache NiFi. MiNiFy is a compact agent that uses the same processors as NiFi, allowing you to create the same flow as in NiFi. The lightness of the agent is achieved, among other things, due to the fact that MiNiFy does not have a graphical interface for the flow configuration. The lack of a graphical interface in MiNiFy means that it is necessary to solve the problem of flow delivery in minifi. Since MiNiFy is actively used in IOT, there are many components and the process of delivering flow to final minifi instances needs to be automated. A familiar task, right?



Another subproject will help solve this problem - MiNiFi C2 Server. This product is designed to be a central point in the architecture of rolling configurations. How to configure the environment - described in this article on Habré and enough information to solve the problem. MiNiFi in conjunction with the C2 server automatically updates the configuration at home. The only drawback of this approach is that you have to create templates on the C2 Server, a simple registry commit is not enough.



The option described in the article above is working and not difficult to implement, but do not forget the following:



  1. In minifi there are not all processors from nifi
  2. The processor versions in Minifi lag behind the processor versions in NiFi.


At the time of writing, the latest version of NiFi is 1.9.2. The processor version of the latest version of MiNiFi is 1.7.0. Processors can be added to MiNiFi, but due to version discrepancies between NiFi and MiNiFi processors this may not work.



NiFi CLI



Judging by the description of the tool on the official website, this is a tool for automating the interaction of NiFI and NiFi Registry in the field of flow delivery or process control. To get started, this tool must be downloaded from here .



Run the utility



./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete.
      
      





In order for us to load the necessary flow from the registry, we need to know the identifiers of the basket (bucket identifier) ​​and flow (flow identifier) ​​itself. This data can be obtained either through cli or in the NiFi registry web interface. In the web interface it looks like this:







Using the CLI does this:



 #> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
      
      





We start the import of the process group from the registry:



 #> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3
      
      





The important point is that any nifi instance can be specified as the host on which we roll the process group.



Process group added with stopped processors, they must be started



 #> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
      
      





Great, the processors started. However, according to the conditions of the problem, we need NiFi instances to send data to other instances. Suppose that you selected the Push method to transfer data to the server. In order to organize data transfer, you need to enable data transmission (Enable transmitting) on ​​the added Remote Process Group (RPG), which is already included in our flow.







In the documentation in the CLI and other sources, I did not find a way to enable data transfer. If you know how to do this, please write in the comments.



Since we have bash and we are ready to go to the end - we will find a way out! You can use the NiFi API to solve this problem. We use the following method, we take the ID from the examples above (in our case it is 7f522a13-016e-1000-e504-d5b15587f2f3). Description of NiFi API methods here .





In body, you need to pass JSON, of the following form:



 { "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true }
      
      





Parameters that must be filled in order for it to work:

state - data transfer status. Available TRANSMITTING to enable data transfer, STOPPED to turn off

version - processor version



version will default to 0 when created, but these parameters can be obtained using the method







For lovers of bash scripts, this method may seem suitable, but it's hard for me - bash scripts are not my favorite. The following method is more interesting and comfortable in my opinion.



NiPyAPI



NiPyAPI is a Python library for interacting with NiFi instances. The documentation page contains the necessary information for working with the library. Quick start is described in a github project .



Our script for rolling out the configuration is a Python program. We pass to the coding.

We configure configs for further work. We will need the following parameters:



 nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #  nifi-api ,    process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #  nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry,      nifi nipyapi.config.bucket_name = 'BucketName' # bucket,    flow nipyapi.config.flow_name = 'FlowName' # flow,  
      
      





Further I will insert the names of the methods of this library, which are described here .



Connect registry to nifi instance with



 nipyapi.versioning.create_registry_client
      
      





At this step, you can also add a check that the registry has already been added to the instance, for this you can use the method



 nipyapi.versioning.list_registry_clients
      
      





Find a bucket to further search for flow in the basket.



 nipyapi.versioning.get_registry_bucket
      
      





Search bucket for flow



 nipyapi.versioning.get_flow_in_bucket
      
      





Further it is important to understand if this process group has already been added. The process group is placed in the coordinates and a situation may arise when a second one is superimposed on top of one component. I checked, this can be :) To get all the process group added, we use the method



 nipyapi.canvas.list_all_process_groups
      
      





and then we can search, for example by name.



I will not describe the process of updating the template, I will only say that if processors are added in the new version of the template, then there are no problems with the presence of messages in the queues. But if the processors are deleted, then problems may arise (nifi does not allow the processor to be deleted if a message queue has accumulated in front of it). If you are interested in how I solved this problem - write to me, please, we will discuss this point. Contact at the end of the article. Let's move on to the step of adding a process group.



When debugging the script, I came across a feature that the latest version of flow is not always pulled up, so I recommend that this version be clarified first:



 nipyapi.versioning.get_latest_flow_ver
      
      





Deployment process group:



 nipyapi.versioning.deploy_flow_version
      
      





We start processors:



 nipyapi.canvas.schedule_process_group
      
      





In the CLI block, it was written that data transfer does not automatically turn on in the remote process group? When implementing the script, I ran into this problem too. At that time, I did not succeed in starting data transfer using the API and I decided to write to the developer of the NiPyAPI library and ask for advice / help. The developer answered me, we discussed the problem and he wrote that he needed time to “check something”. And now, after a couple of days, a letter arrives in which a Python function is written that solves my launch problem !!! At that time, the NiPyAPI version was 0.13.3 and, of course, there was nothing like that in it. But in version 0.14.0, which was released recently, this function has already been included in the library. Meet



 nipyapi.canvas.set_remote_process_group_transmission
      
      





So, using the NiPyAPI library, we connected the registry, rolled flow and even started processors and data transfer. Then you can comb the code, add all kinds of checks, logging, and that’s all. But this is a completely different story.



Of the automation options I have considered, the latter seemed to me the most efficient. Firstly, it is still python code into which you can embed auxiliary program code and take full advantage of the programming language. Secondly, the NiPyAPI project is actively developing and in case of problems you can write to the developer. Thirdly, NiPyAPI is still a more flexible tool for interacting with NiFi in solving complex problems. For example, in determining whether message queues are empty now in flow and whether the process group can be updated.



That's all. I described 3 approaches to automating flow delivery in NiFi, pitfalls that a developer may encounter and gave a working code for automating delivery. If you are just as interested in this topic - write!



All Articles