Apache Kafka ecosystem is relevant to detect anomalies (failure detection) in the huge volume of data sets. These data sets can come from various industries including IoT, banking or insurance. In this article, we present a use case on how to use the cloud-native ecosystem with Kafka, MQTT, Kafka Connect, Stream, KSQL to build scalable and resilient solutions for real-time IoT anomalies detection.
Processing huge volumes of data for anomaly detection is one of the most critical topics for industries, especially as they are trying to migrate from the large and heavy batch processing to something faster and real-time.
Integrating devices and machines to process the data in real-time and at scale, poses a huge challenge for enterprises. These challenges get bigger when they start anomaly detection. This is where, the Kafka ecosystem, including Kafka Connect, Kafka Streams and KSQL becomes a preferred technology for industries.
Kafka architecture supports low latency, high scalability, fault tolerance, integration of different data sources and detection of anomalies in real-time. It is also relevant to IoT (internet of things), banking, insurance transactions or any other large incoming data sets.
One area of the study involves IoT device systems, i.e. connected car infrastructures, smart cities or homes, smart retail and intelligent manufacturing. Machine learning plays a huge role here. This is shown below with one use case of IoT devices with MQTT.
The architecture above shows a cloud-native ecosystem with Kafka, MQTT, Kafka Connect, KSQL, Kafka Stream, Microservices etc. this consists of four related parts.
Firstly,
A) Kafka as the heart of the architecture,
B) KSQL integrated with a UDF for alerting system,
C) Microservices and lastly
D) MQTT
We are using Debezium Kafka, Zookeeper, an enriched Debezium Kafka Connect image to support MQTT as well as Elastic Search and Kibana for visualization.
The detailed design diagram and the step-by-step implementation are described below.
Instruction to install this Use Case
Prerequisites
Windows 10, 64or 32bit, MacOS or Linux
- Docker (Desktop at Windows)
- Postman
Set Memory for Docker to 4GB
Installation Step by Step
Run appropriate Services
A.1 Run Zookeeper,
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9
A.2 Run Kafka,
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9
A.3 Run Elastic Search,
docker run -it --rm --name elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:6.0.1
A.4 Run an enriched Debezium Kafka Connect Image with MQTT and Elastic Search support,
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link elastic:elastic --link kafka:kafka technauraservices/kafka-connect:1.0
Now we are having Kafka connect running with required config and linking.
Note: You can always docker pull the image from our docker hub. This customised Debezium connect image can additionally support both MQTT (source and sink) and Elastic Search (source and sink). I build this image by running custom script with docker file. This can be found under technauraservices/kafka-connect:1.0 image.
As a part of our use case we assume that IoT data are coming from MQTT sources and end users are also another set of MQTT enjoyers. These are MQTT pub-sub and data exchange is happening using asynchronous communication.
A.5 Let’s run the MQTT producer,
docker run --name mqtt-producer -ti -p 1883:1883 -p 9001:9001 toke/mosquitto
A.6 MQTT receiver,
docker run --name mqtt-receiver -ti -p 1884:1883 -p 9007:9001 toke/mosquitto
So, now we have MQTT producer and receiver running. We need to establish the data pipeline now as per our design diagram.
Checking for available services in Kafka connect using Postman
GET: localhost: 8083 / connectors /
[]
Should return Status 200
Configure Pipelines
If I run the docker ps, I am getting below containers running on my docker machine,
CONTAINER ID IMAGE NAMES 4bcff8b91cfc toke/mosquitto mqtt-receiver 3c8f25248dd5 toke/mosquitto mqtt-producer d0dc13f2193b swarnavac/kafka-connect:1.0 connect b8647baf94d8 docker.elastic.co/elasticsearch/elasticsearch:6.0.1 elastic 4a366180f131 debezium/kafka:0.9 kafka 0b9ea09102a0 debezium/zookeeper:0.9 zookeeper |
Now we are inspecting mqtt-producer using docker container ID (see above highlighted) and check IPAddress:
docker inspect 3c8f25248dd5
"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",
"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",
"Gateway": "172.17.0.1",
"IPAddress": "172.17.0.6",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:06",
"DriverOpts": null
}
B.1 Creating a service with Kafka connect for mqtt-source with topic name “electric-temperature”, using the highlighted IP-Address in Postman
<span style="font-size: 10pt;">POST - localhost:8083/connectors/</span>
<span style="font-size: 10pt;">{</span>
<span style="font-size: 10pt;">"name": "mqtt-source",</span>
<span style="font-size: 10pt;">"config": {</span>
<span style="font-size: 10pt;">"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",</span>
<span style="font-size: 10pt;">"tasks.max": "1",</span>
<span style="font-size: 10pt;">"mqtt.server.uri": "tcp://172.17.0.6:1883",</span>
<span style="font-size: 10pt;">"mqtt.topics": "electric-temperature",</span>
<span style="font-size: 10pt;">"kafka.topic": "electric-temperature",</span>
<span style="font-size: 10pt;">"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",</span>
<span style="font-size: 10pt;">"errors.log.enable": "true",</span>
<span style="font-size: 10pt;">"errors.log.include.messages": "true",</span>
<span style="font-size: 10pt;">"confluent.topic.bootstrap.servers": "kafka:9092",</span>
<span style="font-size: 10pt;">"confluent.topic.replication.factor": "1",</span>
<span style="font-size: 10pt;">"confluent.license": ""</span>
<span style="font-size: 10pt;">}</span>
<span style="font-size: 10pt;">}</span>
It is returning response,
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://172.17.0.6:1883",
"mqtt.topics": "electric-temperature",
"kafka.topic": "electric-temperature",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": "",
"name": "mqtt-source"
},
"tasks": [
{
"connector": "mqtt-source",
"task": 0
}
],
"type": "source"
}
If I run the docker ps, I am getting below containers running on my docker machine,
CONTAINER ID IMAGE NAMES 4bcff8b91cfc toke/mosquitto mqtt-receiver 3c8f25248dd5 toke/mosquitto mqtt-producer d0dc13f2193b swarnavac/kafka-connect:1.0 connect b8647baf94d8 docker.elastic.co/elasticsearch/elasticsearch:6.0.1 elastic 4a366180f131 debezium/kafka:0.9 kafka 0b9ea09102a0 debezium/zookeeper:0.9 zookeeper |
Now we are inspecting mqtt-receiver using docker container ID (see above highlighted),
<span style="font-size: 10pt;">docker inspect <strong>4bcff8b91cfc</strong></span>
"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",
"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",
"Gateway": "172.17.0.1",
<span style="font-family: 'andale mono', monospace;"><strong><span style="font-size: 10pt;">"IPAddress": "172.17.0.7",</span></strong></span>
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:06",
"DriverOpts": null
}
B.2 Similarly creating another service (after taking the ip using docker inspect) with Kafka Connect for mqtt-sync converter using Postman,
<span style="font-size: 10pt;">POST - localhost:8083/connectors/</span>
<span style="font-size: 10pt;">{</span>
<span style="font-size: 10pt;">"name": "mqtt-sink",</span>
<span style="font-size: 10pt;">"config": {</span>
<span style="font-size: 10pt;">"connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",</span>
<span style="font-size: 10pt;">"tasks.max": "1",</span>
<span style="font-size: 10pt;">"mqtt.server.uri": "tcp://<strong>172.17.0.7</strong>:1883",</span>
<span style="font-size: 10pt;">"topics": "electric-temperature",</span>
<span style="font-size: 10pt;">"errors.log.enable": "true",</span>
<span style="font-size: 10pt;">"errors.log.include.messages": "true",</span>
<span style="font-size: 10pt;">"confluent.topic.replication.factor": 1,</span>
<span style="font-size: 10pt;">"confluent.topic.bootstrap.servers": "kafka:9092",</span>
<span style="font-size: 10pt;">"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",</span>
<span style="font-size: 10pt;">"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"</span>
<span style="font-size: 10pt;">}</span>
<span style="font-size: 10pt;">}</span>
Now rechecking available Kafka Connect services,
<span style="font-size: 10pt;">GET - localhost:8083/connectors/</span>
<span style="font-size: 10pt;">["mqtt-sink", "mqtt-source"]</span>
So far, we have completed the complete pipeline of MQTT with Kafka Connect.
Test the Data Pipeline
Let’s test the same with some data. We have data which is having a unique number for each data set and then temperature data with ‘#’ separated. In order to do so, we need to go in the interactive mode in both MQTT source and sink container.
C.1 Checking the data in MQTT sync connector. Replace the highlighted container ID as well and enter two commands. The data with gray background will be returned after next step as reaction on sending of data
<span style="font-size: 10pt;">λ docker exec -it <strong>4bcff8b91cfc</strong> bash</span>
<span style="font-size: 10pt;">root@4bcff8b91cfc:/# mosquitto_sub -v -t 'electric-temperature'</span>
<span style="font-size: 10pt;">electric-temperature 100, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08</span>
C.2 For MQTT source container (replace the highlighted container ID accordingly) enter two commands to start bash and then to pass data,
<span style="font-size: 10pt;">λ docker exec -it <strong>3c8f25248dd5</strong> bash</span>
<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m "101, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"</span>
This completes the data pipeline using Kafka, Kafka connect and MQTT.
Configure Data Filtering with KSQL
Since, we have completed the pipeline and complete flow, we will now incorporate real-time data filtering and anomaly detection using KSQL and UDF (user define function) which is a part of deep learning.
Note: The enriched images of KSQL server with custom UDF and support for this use case can be found under technauraservices/technaura-ksql-server:1.0 image.
First, we are getting the ContainerID and inspecting the KSQL_Bootstrap_Server
CONTAINER ID IMAGE NAMES 4bcff8b91cfc toke/mosquitto mqtt-receiver 3c8f25248dd5 toke/mosquitto mqtt-producer d0dc13f2193b swarnavac/kafka-connect:1.0 connect b8647baf94d8 docker.elastic.co/elasticsearch/elasticsearch:6.0.1 elastic 4a366180f131 debezium/kafka:0.9 kafka 0b9ea09102a0 debezium/zookeeper:0.9 zookeeper |
<span style="font-size: 10pt;">docker inspect <strong>3c8f25248dd5</strong></span>
<span style="font-size: 10pt;">"Networks": {</span>
<span style="font-size: 10pt;">"bridge": {</span>
<span style="font-size: 10pt;">"IPAMConfig": null,</span>
<span style="font-size: 10pt;">"Links": null,</span>
<span style="font-size: 10pt;">"Aliases": null,</span>
<span style="font-size: 10pt;">"NetworkID": "711ab97129d7dd87c080a70eeefbbfb6e4d80dbec419d232baba2930d1816efa",</span>
<span style="font-size: 10pt;">"EndpointID": "cd232d07f3f5952bf02dd0d7469ca66cae456e34885920a2bd857655522179bf",</span>
<span style="font-size: 10pt;">"Gateway": "172.17.0.1",</span>
<span style="font-size: 10pt;">"IPAddress": "172.17.0.3",</span>
<span style="font-size: 10pt;">"IPPrefixLen": 16,</span>
<span style="font-size: 10pt;">"IPv6Gateway": "",</span>
<span style="font-size: 10pt;">"GlobalIPv6Address": "",</span>
<span style="font-size: 10pt;">"GlobalIPv6PrefixLen": 0,</span>
<span style="font-size: 10pt;">"MacAddress": "02:42:ac:11:00:06",</span>
<span style="font-size: 10pt;">"DriverOpts": null</span>
<span style="font-size: 10pt;">}</span>
D.1 Deploying the enriched KSQL image with UDF as container,
<span style="font-size: 10pt;">docker run -p 127.0.0.1:8088:8088 -e KSQL_BOOTSTRAP_SERVERS=<strong>172.17.0.3</strong>:9092 -e KSQL_LISTENERS=http://0.0.0.0:8088/ -e KSQL_KSQL_SERVICE_ID=ksql_standalone_1_ technauraservices/technaura-ksql-server:1.0</span>
D.2 Opened a KSQL CLI,
docker run --net=host --interactive --tty confluentinc/cp-ksql-cli:5.3.1 http://127.0.0.1:8088
Checking the UDF,
ksql> show functions;
Function Name | Type |
D.3 Create a new stream for the data coming from topic ‘electric-temperature’
<span style="font-size: 10pt;">CREATE STREAM TEMPERATURE_ANOMALY (eventid integer, sensorinput varchar) WITH (kafka_topic='electric-temperature', value_format='DELIMITED');</span>
Now any real-time data in the MQTT receiver with topic ‘electric-temperature’ will create a new row. We will get back later.
D.4 and D.5 Creating a new Kafka stream (and an alerting topic) if the calculated value exceeds range,
ksql> CREATE STREAM TEMPERATURE_ANOMALY_DETECTION AS SELECT EVENTID, CAST (ANOMALY(SENSORINPUT) AS DOUBLE) AS ANOMALY_VAL FROM TEMPERATURE_ANOMALY;
Message +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ksql> CREATE STREAM ANOMALYDETECTIONWITHFILTER AS SELECT EVENTID, ANOMALY_VAL FROM TEMPERATURE_ANOMALY_DETECTION WHERE ANOMALY_VAL > 1.5; Message |
Check that all streams are created:
ksql> show streams;
Stream Name | Kafka Topic | Format |
Checking existing topics inside Kafka,
λ docker exec -it 3c8f25248dd5 bash
[kafka@1eb583ce0510 ~]$ cd bin
[kafka@82883335d1a6 bin]$ ./kafka-topics.sh --list --zookeeper zookeeper:2181
ANOMALYDETECTIONWITHFILTER
TEMPERATURE_ANOMALY_DETECTION
_confluent-ksql-ksql_standalone_1__command_topic
electric-temperature
my_connect_configs
my_connect_offsets
my_connect_statuses
[kafka@82883335d1a6 bin]$
Run with Test Data
Remember the expected behaviour:
We are using different sets of test data and checking the MSE for exceeding a certain threshold.
If the MSE is greater than 1.5, an alert should be generated.
We are passing this data to an MQTT-Producer and expect that
This data is passed to another MQTT-Receiver without changes for all values
The Key and the MSE is passed to an alert, if the value is greater than threshold.
So, we are sending different sets of data to MQTT-Producer and checking MQTT-Receiver and the Topic for Alerts.
1.1 Open a window to see what data is arriving at MQTT-Receiver -> go to step C.1
1.2 Open a Window to see data in KSQL -> go to step D.2
1.3 Open a window to see data in alerting topic of Kafka -> see below E.1
2. Pass different sets of data -> go to window of C2
3. Check in the windows 1.x for data
Example data to be Passed at step (2) in window C2:
Data with MSE < 1.5
<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m <strong>"101</strong>, 2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"</span>
Data with MSE > 1.5
<span style="font-size: 10pt;"><strong>root@3c8f25248dd5:/#</strong> mosquitto_pub -h 172.17.0.6 -t 'electric-temperature' -m "<strong>106</strong>, 9.88# 9.13# 8.19# 9.28# 6.44# 9.62# 9.80# 7.04# 6.36# 8.69# 9.97# 9.24# 9.53#6.80# 9.02# 9.21# 9.40# 9.57# 6.71# 8.79# 8.86# 6.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 6.74# 5.61# 7.47# 9.35# 9.26# 9.20# 9.15# 9.10# 9.08"</span>
Attention: When applying the test data – please check that there are different Keys used (highlighted in bold).
After passing this data, results show in windows C.1, D.2 and E1
C.1:
<span style="font-size: 10pt;">Technaura: DL-UDF starting with sensorinput - 9.88# 9.13# 8.19# 9.28# 6.44# 9.62# 9.80# 7.04# 6.36# 8.69# 9.97# 9.24# 9.53#6.80# 9.02# 9.21# 9.40# 9.57# 6.71# 8.79# 8.86# 6.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 6.74# 5.61# 7.47# 9.35# 9.26# 9.20# 9.15# 9.10# 9.08</span>
<span style="font-size: 10pt;">MSE: 2.2022320685879437</span>
D.2:
Check KSQL stream TEMPERATURE_ANAMOLY by running the KSQL query,
<span style="font-size: 10pt;">SELECT EVENTID, ANOMALY(SENSORINPUT) FROM TEMPERATURE_ANOMALY;</span>
<span style="font-size: 10pt;">100 | 1.2104138026620321</span>
For value 2.5642755212297654 (> 1.5) it is triggering the alert,
<span style="font-size: 10pt;">ksql> SELECT * FROM ANOMALYDETECTIONWITHFILTER;</span>
<span style="font-size: 10pt;">1581416576774 | {"schema":{"type":"string","optional":false},"payload":"electric-temperature"} | 103 | 2.5642755212297654</span>
E.1 The value greater than 1.5 is being alerted with the new alerting topic,
Checking the topics “TEMPERATURE_ANOMALY_DETECTION”
λ docker run -it --name watcher-alert --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k ANOMALYDETECTIONWITHFILTER
WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.9:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic ANOMALYDETECTIONWITHFILTER:
{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 103,2.5642755212297654
The intermediate Topic “TEMPERATURE_ANOMALY_DETECTION” can be checked as well. Here all sets of data shall arrive with their MSE.
<span style="font-size: 10pt;">λ docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k TEMPERATURE_ANOMALY_DETECTION</span>
<span style="font-size: 10pt;">WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.</span>
<span style="font-size: 10pt;">Using ZOOKEEPER_CONNECT=172.17.0.2:2181</span>
<span style="font-size: 10pt;">Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.9:9092</span>
<span style="font-size: 10pt;">Using KAFKA_BROKER=172.17.0.3:9092</span>
<span style="font-size: 10pt;">Contents of topic TEMPERATURE_ANOMALY_DETECTION:</span>
<span style="font-size: 10pt;">{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 102,1.2748240934018513</span>
<span style="font-size: 10pt;">{"schema":{"type":"string","optional":false},"payload":"electric-temperature"} 103,2.5642755212297654</span>
Congrats. You are done. This is the complete implementation of the alerting system or fraud detection (anomalies) of real-time data using Kafka.
Written by Swarnava Chakraboty
Inspiration for this project comes from Kai Waehner and his project Deep Learning UDF for KSQL
Read similar articles:
https://www.datanami.com/2019/06/03/cassandra-kafka-help-scale-anomaly-detection/
https://towardsdatascience.com/anomaly-detection-for-dummies-15f148e559c1
https://www.confluent.io/blog/tag/anomaly-detection/
Like!! Great article post.Really thank you! Really Cool.