Home>Articles>Failure detection and alert from real-time data flow using Kafka and KSQL
Real time data streaming and Analytics
Articles Popular

Failure detection and alert from real-time data flow using Kafka and KSQL

Apache Kafka ecosystem is relevant to detect anomalies (failure detection) in the huge volume of data sets. These data sets can come from various industries including IoT, banking or insurance. In this article, we present a use case on how to use the cloud-native ecosystem with Kafka, MQTT, Kafka Connect, Stream, KSQL to build scalable and resilient solutions for real-time IoT anomalies detection.

Processing huge volumes of data for anomaly detection is one of the most critical topics for industries, especially as they are trying to migrate from the large and heavy batch processing to something faster and real-time.

Integrating devices and machines to process the data in real-time and at scale, poses a huge challenge for enterprises. These challenges get bigger when they start anomaly detection. This is where, the Kafka ecosystem, including Kafka Connect, Kafka Streams and KSQL becomes a preferred technology for industries.

Kafka architecture supports low latency, high scalability, fault tolerance, integration of different data sources and detection of anomalies in real-time. It is also relevant to IoT (internet of things), banking, insurance transactions or any other large incoming data sets.

One area of the study involves IoT device systems, i.e. connected car infrastructures, smart cities or homes, smart retail and intelligent manufacturing. Machine learning plays a huge role here. This is shown below with one use case of IoT devices with MQTT.

 

Anomaly detection graphic Technaura

The architecture above shows a cloud-native ecosystem with Kafka, MQTT, Kafka Connect, KSQL, Kafka Stream, Microservices etc. this consists of four related parts.

Firstly,

A) Kafka as the heart of the architecture,

B) KSQL integrated with a UDF for alerting system,

C) Microservices and lastly

D) MQTT

We are using Debezium Kafka, Zookeeper, an enriched Debezium Kafka Connect image to support MQTT as well as Elastic Search and Kibana for visualization.

The detailed design diagram and the step-by-step implementation are described below.

Fault detection_Technaura graphic

Instruction to install this Use Case

Prerequisites

Windows 10, 64or 32bit, MacOS or Linux

  • Docker (Desktop at Windows)
  • Postman

Set Memory for Docker to 4GB

Installation Step by Step

Run appropriate Services

A.1 Run Zookeeper,

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

A.2 Run Kafka,

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

A.3 Run Elastic Search,

docker run -it --rm --name elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:6.0.1

A.4 Run an enriched Debezium Kafka Connect Image with MQTT and Elastic Search support, 

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link elastic:elastic --link kafka:kafka technauraservices/kafka-connect:1.0

Now we are having Kafka connect running with required config and linking.

Note: You can always docker pull the image from our docker hub. This customised Debezium connect image can additionally support both MQTT (source and sink) and Elastic Search (source and sink). I build this image by running custom script with docker file. This can be found under technauraservices/kafka-connect:1.0 image.

As a part of our use case we assume that IoT data are coming from MQTT sources and end users are also another set of MQTT enjoyers. These are MQTT pub-sub and data exchange is happening using asynchronous communication.

A.5 Let’s run the MQTT producer,

docker run --name mqtt-producer -ti -p 1883:1883 -p 9001:9001 toke/mosquitto

A.6 MQTT receiver,

docker run --name mqtt-receiver -ti -p 1884:1883 -p 9007:9001 toke/mosquitto

So, now we have MQTT producer and receiver running. We need to establish the data pipeline now as per our design diagram.

Checking for available services in Kafka connect using Postman

GET: localhost: 8083 / connectors /
[]
Should return Status 200

Configure Pipelines

If I run the docker ps, I am getting below containers running on my docker machine,

CONTAINER ID                   IMAGE                                                                                         NAMES                                             
4bcff8b91cfc                          toke/mosquitto                                                                       mqtt-receiver                                     
3c8f25248dd5                       toke/mosquitto                                                                       mqtt-producer                                      
d0dc13f2193b                        swarnavac/kafka-connect:1.0                                             connect                                           
b8647baf94d8                      docker.elastic.co/elasticsearch/elasticsearch:6.0.1        elastic                                           
4a366180f131                       debezium/kafka:0.9                                                               kafka                                             
0b9ea09102a0                    debezium/zookeeper:0.9                                                       zookeeper 

Now we are inspecting mqtt-producer using docker container ID (see above highlighted) and check IPAddress:

docker inspect 3c8f25248dd5

"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",
"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",
"Gateway": "172.17.0.1",
"IPAddress": "172.17.0.6",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:06",
"DriverOpts": null
}

B.1 Creating a service with Kafka connect for mqtt-source with topic name “electric-temperature”, using the highlighted IP-Address in Postman

<span style="font-size: 10pt;">POST - localhost:8083/connectors/</span>
<span style="font-size: 10pt;">{</span>
<span style="font-size: 10pt;">"name": "mqtt-source",</span>
<span style="font-size: 10pt;">"config": {</span>
<span style="font-size: 10pt;">"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",</span>
<span style="font-size: 10pt;">"tasks.max": "1",</span>
<span style="font-size: 10pt;">"mqtt.server.uri": "tcp://172.17.0.6:1883",</span>
<span style="font-size: 10pt;">"mqtt.topics": "electric-temperature",</span>
<span style="font-size: 10pt;">"kafka.topic": "electric-temperature",</span>
<span style="font-size: 10pt;">"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",</span>
<span style="font-size: 10pt;">"errors.log.enable": "true",</span>
<span style="font-size: 10pt;">"errors.log.include.messages": "true",</span>
<span style="font-size: 10pt;">"confluent.topic.bootstrap.servers": "kafka:9092",</span>
<span style="font-size: 10pt;">"confluent.topic.replication.factor": "1",</span>
<span style="font-size: 10pt;">"confluent.license": ""</span>
<span style="font-size: 10pt;">}</span>
<span style="font-size: 10pt;">}</span>

It is returning response,

{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://172.17.0.6:1883",
"mqtt.topics": "electric-temperature",
"kafka.topic": "electric-temperature",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": "",
"name": "mqtt-source"
},
"tasks": [
{
"connector": "mqtt-source",
"task": 0
}
],
"type": "source"
}

If I run the docker ps, I am getting below containers running on my docker machine,

CONTAINER ID                                         IMAGE                                                                                    NAMES
4bcff8b91cfc                                              toke/mosquitto                                                                     mqtt-receiver
3c8f25248dd5                                           toke/mosquitto                                                                     mqtt-producer
d0dc13f2193b                                           swarnavac/kafka-connect:1.0                                             connect
b8647baf94d8                                         docker.elastic.co/elasticsearch/elasticsearch:6.0.1        elastic
4a366180f131                                          debezium/kafka:0.9                                                               kafka
0b9ea09102a0                                      debezium/zookeeper:0.9                                                        zookeeper

Now we are inspecting mqtt-receiver using docker container ID (see above highlighted),

<span style="font-size: 10pt;">docker inspect <strong>4bcff8b91cfc</strong></span>
"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",
"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",
"Gateway": "172.17.0.1",
<span style="font-family: 'andale mono', monospace;"><strong><span style="font-size: 10pt;">"IPAddress": "172.17.0.7",</span></strong></span>
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:06",
"DriverOpts": null
}

B.2 Similarly creating another service (after taking the ip using docker inspect) with Kafka Connect for mqtt-sync converter using Postman,

<span style="font-size: 10pt;">POST - localhost:8083/connectors/</span>

<span style="font-size: 10pt;">{</span>
<span style="font-size: 10pt;">"name": "mqtt-sink",</span>
<span style="font-size: 10pt;">"config": {</span>
<span style="font-size: 10pt;">"connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",</span>
<span style="font-size: 10pt;">"tasks.max": "1",</span>
<span style="font-size: 10pt;">"mqtt.server.uri": "tcp://<strong>172.17.0.7</strong>:1883",</span>
<span style="font-size: 10pt;">"topics": "electric-temperature",</span>
<span style="font-size: 10pt;">"errors.log.enable": "true",</span>
<span style="font-size: 10pt;">"errors.log.include.messages": "true",</span>
<span style="font-size: 10pt;">"confluent.topic.replication.factor": 1,</span>
<span style="font-size: 10pt;">"confluent.topic.bootstrap.servers": "kafka:9092",</span>
<span style="font-size: 10pt;">"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",</span>
<span style="font-size: 10pt;">"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"</span>
<span style="font-size: 10pt;">}</span>
<span style="font-size: 10pt;">}</span>

Now rechecking available Kafka Connect services,

<span style="font-size: 10pt;">GET - localhost:8083/connectors/</span>
<span style="font-size: 10pt;">["mqtt-sink", "mqtt-source"]</span>

So far, we have completed the complete pipeline of MQTT with Kafka Connect.

Test the Data Pipeline

Let’s test the same with some data. We have data which is having a unique number for each data set and then temperature data with ‘#’ separated. In order to do so, we need to go in the interactive mode in both MQTT source and sink container.

C.1 Checking the data in MQTT sync connector. Replace the highlighted container ID as well and enter two commands. The data with gray background will be returned after next step as reaction on sending of data

<span style="font-size: 10pt;">λ docker exec -it <strong>4bcff8b91cfc</strong> bash</span>
<span style="font-size: 10pt;">root@4bcff8b91cfc:/# mosquitto_sub -v -t 'electric-temperature'</span>
<span style="font-size: 10pt;">electric-temperature 100, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08</span>

C.2 For MQTT source container (replace the highlighted container ID accordingly) enter two commands to start bash and then to pass data,

<span style="font-size: 10pt;">λ docker exec -it <strong>3c8f25248dd5</strong> bash</span>
<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m "101, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"</span>

This completes the data pipeline using Kafka, Kafka connect and MQTT.

 

Configure Data Filtering with KSQL

Since, we have completed the pipeline and complete flow, we will now incorporate real-time data filtering and anomaly detection using KSQL and UDF (user define function) which is a part of deep learning.

Note: The enriched images of KSQL server with custom UDF and support for this use case can be found under technauraservices/technaura-ksql-server:1.0 image.

First, we are getting the ContainerID and inspecting the KSQL_Bootstrap_Server

CONTAINER ID                        IMAGE                                                                                   NAMES
4bcff8b91cfc                              toke/mosquitto                                                                      mqtt-receiver
3c8f25248dd5                           toke/mosquitto                                                                     mqtt-producer
d0dc13f2193b                            swarnavac/kafka-connect:1.0                                            connect
b8647baf94d8                          docker.elastic.co/elasticsearch/elasticsearch:6.0.1       elastic
4a366180f131                            debezium/kafka:0.9                                                             kafka
0b9ea09102a0                          debezium/zookeeper:0.9                                                    zookeeper

 

<span style="font-size: 10pt;">docker inspect <strong>3c8f25248dd5</strong></span>

<span style="font-size: 10pt;">"Networks": {</span>
<span style="font-size: 10pt;">"bridge": {</span>
<span style="font-size: 10pt;">"IPAMConfig": null,</span>
<span style="font-size: 10pt;">"Links": null,</span>
<span style="font-size: 10pt;">"Aliases": null,</span>
<span style="font-size: 10pt;">"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",</span>
<span style="font-size: 10pt;">"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",</span>
<span style="font-size: 10pt;">"Gateway": "172.17.0.1",</span>
<span style="font-size: 10pt;">"IPAddress": "172.17.0.3",</span>
<span style="font-size: 10pt;">"IPPrefixLen": 16,</span>
<span style="font-size: 10pt;">"IPv6Gateway": "",</span>
<span style="font-size: 10pt;">"GlobalIPv6Address": "",</span>
<span style="font-size: 10pt;">"GlobalIPv6PrefixLen": 0,</span>
<span style="font-size: 10pt;">"MacAddress": "02:42:ac:11:00:06",</span>
<span style="font-size: 10pt;">"DriverOpts": null</span>
<span style="font-size: 10pt;">}</span>

D.1 Deploying the enriched KSQL image with UDF as container,

<span style="font-size: 10pt;">docker run -p 127.0.0.1:8088:8088 -e KSQL_BOOTSTRAP_SERVERS=<strong>172.17.0.3</strong>:9092 -e KSQL_LISTENERS=http://0.0.0.0:8088/ -e KSQL_KSQL_SERVICE_ID=ksql_standalone_1_ technauraservices/technaura-ksql-server:1.0</span>

D.2 Opened a KSQL CLI,

docker run --net=host --interactive --tty confluentinc/cp-ksql-cli:5.3.1 http://127.0.0.1:8088

Checking the UDF,

ksql> show functions;

Function Name                     | Type
——————————————
ABS                                         | SCALAR
ANOMALY                            | SCALAR
ANOMALYBASE64             | SCALAR

D.3 Create a new stream for the data coming from topic ‘electric-temperature’

<span style="font-size: 10pt;">CREATE STREAM TEMPERATURE_ANOMALY (eventid integer, sensorinput varchar) WITH (kafka_topic='electric-temperature', value_format='DELIMITED');</span>

Now any real-time data in the MQTT receiver with topic ‘electric-temperature’ will create a new row. We will get back later.

D.4 and D.5 Creating a new Kafka stream (and an alerting topic) if the calculated value exceeds range,

ksql> CREATE STREAM TEMPERATURE_ANOMALY_DETECTION AS SELECT EVENTID, CAST (ANOMALY(SENSORINPUT) AS DOUBLE) AS ANOMALY_VAL FROM TEMPERATURE_ANOMALY;

Message
—————————-
Stream created and running

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

ksql> CREATE STREAM ANOMALYDETECTIONWITHFILTER AS SELECT EVENTID, ANOMALY_VAL FROM TEMPERATURE_ANOMALY_DETECTION WHERE ANOMALY_VAL > 1.5;

Message
—————————-
Stream created and running
—————————-
ksql>

Check that all streams are created:

ksql> show streams;

Stream Name                                                                               | Kafka Topic                                                                   | Format
——————————————————————————————————————————————- 
TEMPERATURE_ANOMALY_DETECTION                     | TEMPERATURE_ANOMALY_DETECTION         | DELIMITED
ANOMALYDETECTIONWITHFILTER                               | ANOMALYDETECTIONWITHFILTER                    | DELIMITED
TEMPERATURE_ANOMALY                                                | electric-temperature                                                      | DELIMITED
————————————————————————————————————————————————–

Checking existing topics inside Kafka,

λ docker exec -it 3c8f25248dd5 bash
[kafka@1eb583ce0510 ~]$ cd bin
[kafka@82883335d1a6 bin]$ ./kafka-topics.sh --list --zookeeper zookeeper:2181
ANOMALYDETECTIONWITHFILTER
TEMPERATURE_ANOMALY_DETECTION
_confluent-ksql-ksql_standalone_1__command_topic
electric-temperature
my_connect_configs
my_connect_offsets
my_connect_statuses
[kafka@82883335d1a6 bin]$

Run with Test Data

Remember the expected behaviour:

We are using different sets of test data and checking the MSE for exceeding a certain threshold.

If the MSE is greater than 1.5, an alert should be generated.

We are passing this data to an MQTT-Producer and expect that

This data is passed to another MQTT-Receiver without changes for all values
The Key and the MSE is passed to an alert, if the value is greater than threshold.

So, we are sending different sets of data to MQTT-Producer and checking MQTT-Receiver and the Topic for Alerts.

1.1 Open a window to see what data is arriving at MQTT-Receiver -> go to step C.1

1.2 Open a Window to see data in KSQL -> go to step D.2

1.3       Open a window to see data in alerting topic of Kafka -> see below E.1

2. Pass different sets of data -> go to window of C2
3. Check in the windows 1.x for data

Example data to be Passed at step (2) in window C2:
Data with MSE < 1.5

<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m <strong>"101</strong>, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"</span>

Data with MSE > 1.5

<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m "<strong>106</strong>, 9.88# 9.13# 8.19# 9.28# 6.44# 9.62# 9.80# 7.04# 6.36# 8.69# 9.97# 9.24# 9.53#6.80# 9.02# 9.21# 9.40# 9.57# 6.71# 8.79# 8.86# 6.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 6.74# 5.61# 7.47# 9.35# 9.26# 9.20# 9.15# 9.10# 9.08"</span>

Attention: When applying the test data – please check that there are different Keys used (highlighted in bold).

After passing this data, results show in windows C.1, D.2 and E1

C.1:

<span style="font-size: 10pt;">Technaura: DL-UDF starting with sensorinput - 9.88# 9.13# 8.19# 9.28# 6.44# 9.62# 9.80# 7.04# 6.36# 8.69# 9.97# 9.24# 9.53#6.80# 9.02# 9.21# 9.40# 9.57# 6.71# 8.79# 8.86# 6.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 6.74# 5.61# 7.47# 9.35# 9.26# 9.20# 9.15# 9.10# 9.08</span>
<span style="font-size: 10pt;">MSE: 2.2022320685879437</span>

D.2:

Check KSQL stream TEMPERATURE_ANAMOLY by running the KSQL query,

<span style="font-size: 10pt;">SELECT EVENTID, ANOMALY(SENSORINPUT) FROM TEMPERATURE_ANOMALY;</span>

<span style="font-size: 10pt;">100 | 1.2104138026620321</span>

For value 2.5642755212297654 (> 1.5) it is triggering the alert,

<span style="font-size: 10pt;">ksql&gt; SELECT * FROM ANOMALYDETECTIONWITHFILTER;</span>
<span style="font-size: 10pt;">1581416576774 | {"schema":{"type":"string","optional":false},"payload":"electric-temperature"} | 103 | 2.5642755212297654</span>

E.1 The value greater than 1.5 is being alerted with the new alerting topic,
Checking the topics “TEMPERATURE_ANOMALY_DETECTION”

λ docker run -it --name watcher-alert --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k ANOMALYDETECTIONWITHFILTER
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.9:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic ANOMALYDETECTIONWITHFILTER:
{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 103,2.5642755212297654

The intermediate Topic “TEMPERATURE_ANOMALY_DETECTION” can be checked as well. Here all sets of data shall arrive with their MSE.

<span style="font-size: 10pt;">λ docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k TEMPERATURE_ANOMALY_DETECTION</span>
<span style="font-size: 10pt;">WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.</span>
<span style="font-size: 10pt;">Using ZOOKEEPER_CONNECT=172.17.0.2:2181</span>
<span style="font-size: 10pt;">Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.9:9092</span>
<span style="font-size: 10pt;">Using KAFKA_BROKER=172.17.0.3:9092</span>
<span style="font-size: 10pt;">Contents of topic TEMPERATURE_ANOMALY_DETECTION:</span>
<span style="font-size: 10pt;">{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 102,1.2748240934018513</span>
<span style="font-size: 10pt;">{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 103,2.5642755212297654</span>

Congrats. You are done. This is the complete implementation of the alerting system or fraud detection (anomalies) of real-time data using Kafka.

Written by Swarnava Chakraboty

Inspiration for this project comes from Kai Waehner and his project Deep Learning UDF for KSQL

Read similar articles:

https://www.datanami.com/2019/06/03/cassandra-kafka-help-scale-anomaly-detection/

https://towardsdatascience.com/anomaly-detection-for-dummies-15f148e559c1

https://www.confluent.io/blog/tag/anomaly-detection/

One thought on “Failure detection and alert from real-time data flow using Kafka and KSQL

Leave a Reply

Your email address will not be published. Required fields are marked *