Principles for building streaming analytics systems

image



Designing streaming analytics and streaming data processing systems has its own nuances, its own problems, and its own technological stack. We talked about this in the next open lesson , which took place on the eve of the launch of the Data Engineer course.



At the webinar discussed:





Lecturer - Egor Mateshuk , Senior Data Engineer at MaximaTelecom.



When is streaming needed? Stream vs Batch



First of all, we need to figure out when we need streaming, and when batch processing. Let us explain the strengths and weaknesses of these approaches.



So, the disadvantages of batch processing:





But batch processing has its advantages:





Advantages of streaming data processing (streaming):





The main disadvantage of streaming processing:



So, if you are thinking about whether you need streams , answer the following questions for yourself:



  1. Do you really need real-time?
  2. Are there many streaming sources?
  3. Is losing one record critical?


Let's look at two examples :



Example 1. Stock analytics for retail:



In this example, it is better to use batch.



Example 2. Analytics for a web portal:





Imagine that analytics reflects how visitors to a web portal feel using your product. For example, you rolled out a new release and you need to understand within 10-30 minutes whether everything is in order, if any custom features have broken. Let's say the text from the “Order” button is gone - analytics will allow you to quickly respond to a sharp drop in the number of orders, and you will immediately understand that you need to roll back.



Thus, in the second example, it is better to use streams.



SPOD elements



Data processing engineers capture, move, deliver, convert and store this very data (yes, data storage is also an active process!).

Therefore, to build a streaming data processing system (SPOD), we will need the following elements:



  1. data loader (means of delivering data to the storage);
  2. data exchange bus (not always needed, but there is no way to stream it, because you need a system through which you will exchange data in real time);
  3. data storage (as without it);
  4. ETL engine (necessary to do various filtering, sorting and other operations);
  5. BI (to display results);
  6. orchestrator (links the whole process together, organizing multi-stage data processing).


In our case, we will consider the simplest situation and focus only on the first three elements.



Data Stream Processing Tools



We have several “candidates” for the role of data loader :





Apache flume



The first one we'll talk about is Apache Flume , a tool for transporting data between different sources and repositories.



image



Pros:





Minuses:





As for its configuration, it looks something like this:



image



Above, we create one simple channel that sits on the port, takes data from there and simply logs it. In principle, to describe one process, this is still normal, but when you have dozens of such processes, the configuration file turns into hell. Someone adds some visual configurators, but why bother if there are tools that make it out of the box? For example, the same NiFi and StreamSets.



Apache nifi



In fact, it performs the same role as Flume, but with a visual interface, which is a big plus, especially when there are a lot of processes.



A couple of facts about NiFi





The system looks something like this:



image



We have a field for creativity and stages of data processing that we throw there. There are many connectors to all possible systems, etc.



Streamset



It is also a data flow control system with a visual interface. It was developed by people from Cloudera, it is easily installed as Parcel on CDH, it has a special version of SDC Edge for collecting data from devices.



Consists of two components:





It looks something like this:



image



Unpleasant moment - StreamSets has both free and paid parts.



Data bus



Now let's figure out where we will upload this data. Applicants:





Apache Kafka is the best option, but if you have RabbitMQ or NATS in your company, and you need to add a little bit of analytics, then deploying Kafka from scratch will not be very profitable.



In all other cases, Kafka is a great choice. In fact, it is a message broker with horizontal scaling and huge bandwidth. It is perfectly integrated into the entire ecosystem of tools for working with data and can withstand heavy loads. It has a universal interface and is the circulatory system of our data processing.



Inside, Kafka is divided into Topic - a certain separate data stream from messages with the same scheme or, at least, with the same purpose.



To discuss the next nuance, you need to remember that data sources may vary slightly. The data format is very important:



image



The Apache Avro data serialization format deserves special mention. The system uses JSON to determine the data structure (schema) that is serialized into a compact binary format . Therefore, we save a huge amount of data, and serialization / deserialization is cheaper.



Everything seems to be fine, but the presence of separate files with circuits poses a problem, since we need to exchange files between different systems. It would seem simple, but when you work in different departments, the guys on the other end can change something and calm down, and everything will break down for you.



In order not to transfer all these files to flash drives, floppy disks and rock paintings, there is a special service - Schema registry. This is a service for synchronizing avro-schemes between services that write and read from Kafka.



image



In terms of Kafka, the producer is the one who writes, the consumer is the one who consumes (reads) the data.



Data store



Applicants (in fact, there are many more options, but take only a few):





Before choosing a repository, remember what idempotency is . Wikipedia says that idempotency (Latin idem - the same + potens - capable) - the property of an object or operation when applying the operation to the object again, gives the same result as the first. In our case, the process of streaming processing should be built so that when re-filling the source data, the result remains correct.



How to achieve this in streaming systems:





HDFS + Hive storage does not provide idempotency for streaming recording “out of the box”, so we have:





Kudu is a repository suitable for analytic queries, but with a Primary Key for deduplication. Impala is the SQL interface to this repository (and several others).



As for ClickHouse, this is an analytical database from Yandex. Its main purpose is analytics on a table filled with a large stream of raw data. Of the advantages - there is a ReplacingMergeTree engine for key deduplication (deduplication is designed to save space and may leave duplicates in some cases, you need to take into account the nuances ).



It remains to add a few words about Divolte . If you remember, we talked about the fact that some data needs to be captured. If you need to quickly and easily organize analytics for any portal, then Divolte is an excellent service for capturing user events on a web page via JavaScript.



image



Practical example



What are we trying to do? Let's try to build a pipeline to collect Clickstream data in real time. Clickstream is a virtual footprint that a user leaves while on your site. We will capture data using Divolte, and write it to Kafka.



image



To work, you need Docker, plus you will need to clone the following repository . Everything that happens will be launched in containers. To consistently run multiple containers at once, docker-compose.yml will be used. In addition, there is a Dockerfile compiling our StreamSets with certain dependencies.



There are also three folders:



  1. clickhouse data will be written to clickhouse-data
  2. exactly the same daddy ( sdc-data ) we will have for StreamSets, where the system can store configurations
  3. the third folder ( examples ) includes a request file and a pipe configuration file for StreamSets




image



To start, enter the following command:



docker-compose up
      
      





And we enjoy how slowly but surely containers start. After starting, we can go to the address http: // localhost: 18630 ​​/ and immediately touch Divolte:



image



So, we have Divolte, which has already received some events and recorded them in Kafka. Let's try to calculate them using StreamSets: http: // localhost: 18630 ​​/ (password / username - admin / admin).



image



In order not to suffer, it is better to import Pipeline , naming it, for example, clickstream_pipeline . And from the examples folder we import clickstream.json . If everything is ok, we will see the following picture :



image



So, we created a connection to Kafka, registered which Kafka we need, registered which topic interests us, then selected the fields that interest us, then put a drain in Kafka, registering which Kafka and which topic. The differences are that in one case, the Data format is Avro, and in the second it is just JSON.



Go ahead. We can, for example, make a preview that captures certain records in real time from Kafka. Then we write everything down.



After launching, we’ll see that a stream of events flies to Kafka, and this happens in real time:



image



Now you can make a repository for this data in ClickHouse. To work with ClickHouse, you can use a simple native client by running the following command:



 docker run -it --rm --network divolte-ss-ch_default yandex/clickhouse-client --host clickhouse
      
      





Please note that this line indicates the network to which you want to connect. And depending on how you name the folder with the repository, your network name may differ. In general, the command will be as follows:



 docker run -it --rm --network {your_network_name} yandex/clickhouse-client --host clickhouse
      
      





The list of networks can be viewed with the command:



 docker network ls
      
      





Well, there is nothing left:



1. First, “sign” our ClickHouse to Kafka , “explaining to him” what format the data we need there:



 CREATE TABLE IF NOT EXISTS clickstream_topic ( firstInSession UInt8, timestamp UInt64, location String, partyId String, sessionId String, pageViewId String, eventType String, userAgentString String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092', kafka_topic_list = 'clickstream', kafka_group_name = 'clickhouse', kafka_format = 'JSONEachRow';
      
      





2. Now create a real table where we will put the final data:



 CREATE TABLE clickstream ( firstInSession UInt8, timestamp UInt64, location String, partyId String, sessionId String, pageViewId String, eventType String, userAgentString String ) ENGINE = ReplacingMergeTree() ORDER BY (timestamp, pageViewId);
      
      





3. And then we will provide a link between these two tables :



 CREATE MATERIALIZED VIEW clickstream_consumer TO clickstream AS SELECT * FROM clickstream_topic;
      
      





4. And now we will select the necessary fields :



 SELECT * FROM clickstream;
      
      





As a result, the choice from the target table will give us the result we need.







That's all, it was the simplest Clickstream that you can build. If you want to complete the above steps yourself, watch the entire video .



All Articles