A Reference Architecture for Real-Time IoT Analytics feat. Apache Druid

by Eric Graham · February 28, 2020

By 2025, Statistica estimates that the total number of Internet-of-Things (IoT) connected devices will be 75.44 billion worldwide. Regardless or not it happens that quickly, we will eventually have 10x more devices than people. There are already billions around today and every company needs to support thousands to millions of IoT devices - from mobile phones to home devices, devices in smarter cars, factories, the supply chain or oil and energy networks.

Analyzing the enormous amounts of data collected and created by these devices goes way beyond the capacity of existing data warehouses or data lakes. Data warehouses and data lakes simply aren’t optimal architectures to provide real time intelligence into IoT’s constant stream of small amounts of data from an extremely large numbers of sources.

Fortunately companies have already implemented IoT analytics using Imply, the real-time intelligence platform built on Apache Druid, the leading open source real-time analytics database.

There are several reasons companies chose Imply for real time intelligence for IoT. First, it is designed for real-time analytics against streaming pipelines like Apache Kafka and Apache Kinesis. Druid, the underlying analytics database, is built to handle vast amounts of streaming data, storing and processing billions of rows in a fault-tolerant manner. Imply Pivot, our visual analytics UI, is built to offer real-time alerting, dashboarding and visualization for analyzing streaming data stored in Druid.

An IoT Reference Architecture

What follows is a simple IoT reference architecture you can use as a starting point. I’m not a developer, but I decided to build a reference architecture that could collect streaming JSON data from an example IOT device, my iphone. After doing some quick research, I decided to use SensorLog (available on the Apple app store). I chose this application because it already supported streaming JSON and includes a large number of iphone metrics it can stream to a collection endpoint.

I chose to use the following open source and enterprise tools for my data pipeline:

  • SensorLog on my iPhone to generate streaming JSON that includes interesting, ever changing metrics. This will be the source of my IOT data.
  • Apache NiFi is a great transformation tool. It gives you the ability to create transformation pipelines using object modeling. I decided to include NiFI in this architecture because there were already procedures built for TCP session establishment and a Kafka writer. NiFi is a scalable system capable of handling a large number of TCP sessions. The JSON was in the correct single line object format, thanks to the author of SensorLog.
  • Apache Kafka is a scalable, fault tolerant message bus capable of handling very large streaming datasets and works perfectly with NiFi and Imply.
  • Imply includes Druid (Analytics datastore) and Pivot an easy to use UI for fast OLAP style queries.

Here is the high-level reference architecture. In this case NiFi, Kafka and Imply are running on a single AWS EC2 instance and SensorLog on my iPhone.

This architecture can scale up to support many NiFi nodes, as well as large Kafka and Imply clusters to support a large number of IOT devices.

The SensorLog setup was simple. In the configuration section of the app, I set log rate to the lowest value, enabled JSON, named my device, enabled client mode for streaming and configured my IP endpoint and port that I have configured in my ListenTCP procedure in NiFi . At the bottom of the app you can enable the specific output parameters you want to collect.

SensorLog config Log format: JSON JSON pretty print: off (only available in very latest version) Log to Stream: Enabled Streaming Settings Mode: client Protocol: tcp ip: X.X.X.X (should be your NiFi instance IP) Port: port NiFi is listening on - defined in your ListenTCP procedure. Sensor and Data: Choose whatever metrics are interesting for you.

The SensorLog data outputs JSON that looks similar to the following with most options enabled for a single event output:

{"accelerometerAccelerationX":"0.206345","accelerometerAccelerationY":"-0.149734","accelerometerAccelerationZ":"-0.708450","accelerometerTimestamp_sinceReboot":"12322.923060","activity":"unknown","activityActivityConfidence":"0","activityActivityStartDate":"null","activityTimestamp_sinceReboot":"0.000000","altimeterPressure":"101.091057","altimeterRelativeAltitude":"-0.068699","altimeterReset":"0","altimeterTimestamp_sinceReboot":"12319.933139","avAudioRecorderAveragePower":"-120.000000","avAudioRecorderPeakPower":"-120.000000","batteryLevel":"0.480000","batteryState":"1","deviceID":"my_iOS_device","deviceOrientation":"5","gyroRotationX":"1.595904","gyroRotationY":"-0.880825","gyroRotationZ":"-0.463508","gyroTimestamp_sinceReboot":"12321.937155","identifierForVendor":"50D68147-40C0-481C-AB78-9BECF6416B74","IP_en0":"10.0.0.114","IP_pdp_ip0":"100.119.128.241","label":"0","locationAltitude":"35.824947","locationCourse":"57.656250","locationFloor":"-9999","locationHeadingAccuracy":"13.419568","locationHeadingTimestamp_since1970":"1580917360.120752","locationHeadingX":"16.425980","locationHeadingY":"-12.129242","locationHeadingZ":"-36.213379","locationHorizontalAccuracy":"10.000000","locationLatitude":"43.089976","locationLongitude":"-70.973703","locationMagneticHeading":"241.354004","locationSpeed":"0.310000","locationTimestamp_since1970":"1580917358.999484","locationTrueHeading":"226.733871","locationVerticalAccuracy":"8.000000","loggingTime":"2020-02-05 10:42:40.551 -0500","logSampleNr":"33","magnetometerTimestamp_sinceReboot":"12322.782553","magnetometerX":"181.729111","magnetometerY":"235.620132","magnetometerZ":"-855.741699","motionAttitudeReferenceFrame":"XArbitraryCorrectedZVertical","motionGravityX":"0.065677","motionGravityY":"-0.191383","motionGravityZ":"-0.979316","motionMagneticFieldCalibrationAccuracy":"2.000000","motionMagneticFieldX":"17.921021","motionMagneticFieldY":"-10.764999","motionMagneticFieldZ":"-36.255920","motionPitch":"0.192571","motionQuaternionW":"0.632697","motionQuaternionX":"0.035703","motionQuaternionY":"0.095223","motionQuaternionZ":"0.767693","motionRoll":"0.066964","motionRotationRateX":"-0.220875","motionRotationRateY":"-0.100239","motionRotationRateZ":"-0.071315","motionTimestamp_sinceReboot":"12322.512960","motionUserAccelerationX":"0.092631","motionUserAccelerationY":"-0.047748","motionUserAccelerationZ":"-0.046274","motionYaw":"1.756530","pedometerAverageActivePace":"0.000000","pedometerCurrentCadence":"0.000000","pedometerCurrentPace":"0.000000","pedometerDistance":"0.000000","pedometerEndDate":"null","pedometerFloorsAscended":"0","pedometerFloorsDescended":"0","pedometerNumberOfSteps":"0","pedometerStartDate":"null"}

The NiFi object model is designed as follows. A copy of the underlying flow json is attached as well. The ListenTCP procedure defines a TCP endpoint that you can connect to with a TCP client, in this case SensorLog. The PuiblishKafka procedure interacts with the Kafka API and writes data to the specific Kafka Topic.

Add the NiFi JSON config:

{"flowContents":{"identifier":"31996acd-c76a-3aeb-9969-2984e167227f","name":"IOT","comments":"","position":{"x":-152.0,"y":-256.0},"processGroups":[],"remoteProcessGroups":[],"processors":[{"identifier":"ab0e67f3-b3b5-3f98-8943-28c980b37ff1","name":"PublishKafka_0_11","comments":"","position":{"x":-144.0,"y":224.0},"bundle":{"group":"org.apache.nifi","artifact":"nifi-kafka-0-11-nar","version":"1.11.0"},"style":{},"type":"org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_11","properties":{"compression.type":"none","ack.wait.time":"5 secs","use-transactions":"false","attribute-name-regex":null,"acks":"0","bootstrap.servers":"localhost:9092","sasl.kerberos.principal":null,"sasl.kerberos.service.name":null,"security.protocol":"PLAINTEXT","kerberos-credentials-service":null,"ssl.context.service":null,"max.request.size":"1 MB","kafka-key":null,"max.block.ms":"5 sec","partitioner.class":"org.apache.kafka.clients.producer.internals.DefaultPartitioner","sasl.kerberos.keytab":null,"topic":"iphone5","message-header-encoding":"UTF-8","message-demarcator":null,"key-attribute-encoding":"utf-8","transactional-id-prefix":null},"propertyDescriptors":{"compression.type":{"name":"compression.type","displayName":"Compression Type","identifiesControllerService":false,"sensitive":false},"ack.wait.time":{"name":"ack.wait.time","displayName":"Acknowledgment Wait Time","identifiesControllerService":false,"sensitive":false},"use-transactions":{"name":"use-transactions","displayName":"Use Transactions","identifiesControllerService":false,"sensitive":false},"attribute-name-regex":{"name":"attribute-name-regex","displayName":"Attributes to Send as Headers (Regex)","identifiesControllerService":false,"sensitive":false},"acks":{"name":"acks","displayName":"Delivery Guarantee","identifiesControllerService":false,"sensitive":false},"bootstrap.servers":{"name":"bootstrap.servers","displayName":"Kafka Brokers","identifiesControllerService":false,"sensitive":false},"sasl.kerberos.principal":{"name":"sasl.kerberos.principal","displayName":"Kerberos Principal","identifiesControllerService":false,"sensitive":false},"sasl.kerberos.service.name":{"name":"sasl.kerberos.service.name","displayName":"Kerberos Service Name","identifiesControllerService":false,"sensitive":false},"security.protocol":{"name":"security.protocol","displayName":"Security Protocol","identifiesControllerService":false,"sensitive":false},"kerberos-credentials-service":{"name":"kerberos-credentials-service","displayName":"Kerberos Credentials Service","identifiesControllerService":true,"sensitive":false},"ssl.context.service":{"name":"ssl.context.service","displayName":"SSL Context Service","identifiesControllerService":true,"sensitive":false},"max.request.size":{"name":"max.request.size","displayName":"Max Request Size","identifiesControllerService":false,"sensitive":false},"kafka-key":{"name":"kafka-key","displayName":"Kafka Key","identifiesControllerService":false,"sensitive":false},"max.block.ms":{"name":"max.block.ms","displayName":"Max Metadata Wait Time","identifiesControllerService":false,"sensitive":false},"partitioner.class":{"name":"partitioner.class","displayName":"Partitioner class","identifiesControllerService":false,"sensitive":false},"sasl.kerberos.keytab":{"name":"sasl.kerberos.keytab","displayName":"Kerberos Keytab","identifiesControllerService":false,"sensitive":false},"topic":{"name":"topic","displayName":"Topic Name","identifiesControllerService":false,"sensitive":false},"message-header-encoding":{"name":"message-header-encoding","displayName":"Message Header Encoding","identifiesControllerService":false,"sensitive":false},"message-demarcator":{"name":"message-demarcator","displayName":"Message Demarcator","identifiesControllerService":false,"sensitive":false},"key-attribute-encoding":{"name":"key-attribute-encoding","displayName":"Key Attribute Encoding","identifiesControllerService":false,"sensitive":false},"transactional-id-prefix":{"name":"transactional-id-prefix","displayName":"Transactional Id Prefix","identifiesControllerService":false,"sensitive":false}},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":["success","failure"],"scheduledState":"ENABLED","componentType":"PROCESSOR","groupIdentifier":"31996acd-c76a-3aeb-9969-2984e167227f"},{"identifier":"64dcf628-82bf-31a6-b3aa-f9f03d183810","name":"ListenTCP","comments":"","position":{"x":-152.0,"y":-256.0},"bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"1.11.0"},"style":{},"type":"org.apache.nifi.processors.standard.ListenTCP","properties":{"SSL Context Service":null,"Max Size of Socket Buffer":"1 MB","Max Number of TCP Connections":"2","Local Network Interface":"en0","Max Batch Size":"10","Port":"8011","Receive Buffer Size":"65507 B","Character Set":"UTF-8","Client Auth":"REQUIRED","Max Size of Message Queue":"10000","Message Delimiter":"\\n"},"propertyDescriptors":{"SSL Context Service":{"name":"SSL Context Service","displayName":"SSL Context Service","identifiesControllerService":true,"sensitive":false},"Max Size of Socket Buffer":{"name":"Max Size of Socket Buffer","displayName":"Max Size of Socket Buffer","identifiesControllerService":false,"sensitive":false},"Max Number of TCP Connections":{"name":"Max Number of TCP Connections","displayName":"Max Number of TCP Connections","identifiesControllerService":false,"sensitive":false},"Local Network Interface":{"name":"Local Network Interface","displayName":"Local Network Interface","identifiesControllerService":false,"sensitive":false},"Max Batch Size":{"name":"Max Batch Size","displayName":"Max Batch Size","identifiesControllerService":false,"sensitive":false},"Port":{"name":"Port","displayName":"Port","identifiesControllerService":false,"sensitive":false},"Receive Buffer Size":{"name":"Receive Buffer Size","displayName":"Receive Buffer Size","identifiesControllerService":false,"sensitive":false},"Character Set":{"name":"Character Set","displayName":"Character Set","identifiesControllerService":false,"sensitive":false},"Client Auth":{"name":"Client Auth","displayName":"Client Auth","identifiesControllerService":false,"sensitive":false},"Max Size of Message Queue":{"name":"Max Size of Message Queue","displayName":"Max Size of Message Queue","identifiesControllerService":false,"sensitive":false},"Message Delimiter":{"name":"Message Delimiter","displayName":"Batching Message Delimiter","identifiesControllerService":false,"sensitive":false}},"schedulingPeriod":"0 sec","schedulingStrategy":"TIMER_DRIVEN","executionNode":"ALL","penaltyDuration":"30 sec","yieldDuration":"1 sec","bulletinLevel":"WARN","runDurationMillis":0,"concurrentlySchedulableTaskCount":1,"autoTerminatedRelationships":[],"scheduledState":"ENABLED","componentType":"PROCESSOR","groupIdentifier":"31996acd-c76a-3aeb-9969-2984e167227f"}],"inputPorts":[],"outputPorts":[],"connections":[{"identifier":"2f93f26e-92a1-376d-8423-f6e988c727e8","name":"","source":{"id":"64dcf628-82bf-31a6-b3aa-f9f03d183810","type":"PROCESSOR","groupId":"31996acd-c76a-3aeb-9969-2984e167227f","name":"ListenTCP","comments":""},"destination":{"id":"ab0e67f3-b3b5-3f98-8943-28c980b37ff1","type":"PROCESSOR","groupId":"31996acd-c76a-3aeb-9969-2984e167227f","name":"PublishKafka_0_11","comments":""},"labelIndex":1,"zIndex":0,"selectedRelationships":["success"],"backPressureObjectThreshold":10000,"backPressureDataSizeThreshold":"1 GB","flowFileExpiration":"0 sec","prioritizers":[],"bends":[],"loadBalanceStrategy":"DO_NOT_LOAD_BALANCE","partitioningAttribute":"","loadBalanceCompression":"DO_NOT_COMPRESS","componentType":"CONNECTION","groupIdentifier":"31996acd-c76a-3aeb-9969-2984e167227f"}],"labels":[],"funnels":[],"controllerServices":[],"variables":{},"componentType":"PROCESS_GROUP"},"externalControllerServices":{},"parameterContexts":{},"flowEncodingVersion":"1.0"}

In the Druid data loader, use the Kafka connector to specify the Kafka IP:consumer port and topic.

For “Parse Time” define the timestamp using the loggingTime column and a format of yyyy-MM-dd HH:mm:ss.SSS Z based on Joda time format.

Disable rollups for the “Configure schema” section

Step through the rest of the data loader and Druid will start loading your iPhone IOT data.

Now we can visualize the data in Pivot.

In this case I defined “Battery Level” as a metric, which I am tracking on my iPhone 6 over time. SensorLog outputs a large number of iPhone data points that you can play with including geo, pedometer, compass, battery, activity level and more.

This is just a fun example of what is possible with my iphone and this reference architecture. This could easily be used in real-world architectures to collect data from any IOT device. Other ingest layers or NiFi procedures could be added to parse protocols like MQTT, OPC-UA, Modbus and more.

If you would like to connect to an Imply Pivot portal with your iPhone data please email eric.graham@imply.io with your iphone IP address and the name you are using in the “Device ID” field in your SensorLog application. In the email header specify IOT blog. I will respond with the endpoint IP and port you can send data to and a link to login to the Imply portal so you can visualize your statistics.

Don’t Miss Druid Summit

If you’re interested in more details about Druid, come to the Druid Summit where streaming services like Netflix and Twitch, carriers like NTT, BT and Charter (Spectrum) and supply chain specialists will talk about their experiences implementing IoT backbones.

Back to blog

How can we help?