Automatic Kafka Stream Topic Detection and Ingestion

The technological world is a maze of data. Every interaction, transaction, and process generates data that could potentially be invaluable for businesses. But to truly benefit from data, organizations need effective ways to process and analyze it, as it’s created. Enter Kafka and Druid. Kafka serves as a robust message-broker system, adept at handling real-time data feeds efficiently, while Druid is a high-performance real-time analytics database, designed for workflows where fast queries, low latency, and stream ingest really matter.  However, managing data streams can be a challenging task, especially when you have to deal with large volumes of data from disparate sources. Streamlining and automating the process can greatly increase operational efficiency and make managing dynamic data easier.

What will you learn?

In this blog post, you will:

  • Learn how Kafka and Druid work together to manage and analyze data streams, with Kafka serving as a robust message-broker system and Druid, as a high-performance, real-time analytics database.
  • Gain insights on the process of automatically detecting new topics in Kafka and automating their ingestion into Druid.
  • Become familiar with the use of Python scripts to connect to the Kafka server, fetch the list of topics, and compare it with a previously stored list to detect new topics and ingest the new data into Druid.
  • Discover how an ‘Ingestion Spec’ is created for new topics and how it guides the ingestion and processing of data associated with these topics.
  • Learn about the schema auto-detection feature in Druid that simplifies the ingestion process.
  • Explore how to run SQL queries from within the Druid console.
  • Understand how automating the detection and ingestion of Kafka topics into Druid enhances operational efficiency and scalability.
  • Learn about the potential benefits of this automation, such as reducing manual workload, eliminating time delays, and ensuring immediate access to insights derived from the latest data.
  • Find out how to implement these automation processes through a step-by-step guide and the provided code samples.

Improve Efficiency by Automating Data Streams

Managing large-scale, real-time data requires substantial resources, and the risk of error is high. Automation offers a compelling solution to these challenges. It’s not only about doing things faster; it’s about scalability, reducing the risk of error, and freeing up valuable personnel for more complex tasks. By automating data ingestion from Kafka to Druid, organizations can streamline their data pipeline, enhancing efficiency and scalability.

In a dynamic data environment, new Kafka topics can appear at any time. Keeping track of these new topics and setting up ingestion processes for them manually is both time-consuming and prone to errors. This article demonstrates an automated discovery process that periodically checks for new topics and kickstarts the ingestion process whenever a new topic appears. Python scripts connect to the Kafka server, fetch the list of topics, and compare it with a previously stored list. Any new topics detected are then flagged for ingestion. This automatic discovery of topics ensures that no new data is missed due to a delay in setting up ingestion. It also reduces manual workload and the chance of errors that could occur during the setup process.

Once a new topic has been identified, the next step is ingesting the data associated with that topic into  Druid. This is accomplished by creating an ingestion spec, which is a JSON object describing where to find the data and how to ingest and process it, and executing that spec on a Druid cluster.  Druid’s schema auto-detection greatly simplifies the ingestion spec because a data schema is not required.  When schema auto-detection is set to “true”, Druid is capable of ingesting data without a user-defined data structure.  

How The Automatic Topic Ingestion Works

Kafka topics and corresponding messages can be automatically ingested into Druid using the following process.

Prerequisites

To begin, you will need to install and configure  Druid on your local machine or server. Then install Kafka locally, write code to monitor the topics, and automatically set up the ingestion configuration in Druid with enable auto-schema detection enabled.

Install Druid Locally

1. Download Druid from apache.org and extract the file.

2. From the terminal, change directories to the distribution directory.

cd druid_26/distribution/target/apache-druid-27.0.0-SNAPSHOT

3. From the apache-druid-26.0.0 package root, run the following command to start the micro-quickstart configuration:

                 ./bin/start-druid

4. This starts up instances of ZooKeeper and the Druid services

5. After the Druid services finish startup, open the web UI at http://localhost:8888.

Note: To stop Druid at any time, use CTRL+C in the terminal. This exits the script and terminates all Druid processes.

Install Kafka

Kafka is a high-throughput message bus that works well with Druid. 

1.  Download the latest version of Kafka, using commands similar to the ones below (based on the Kafka version) in your terminal:

         curl -O https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz

2.  Unzip the .tgz file:

         tar -xzf kafka_2.13-2.7.0.tgz

3.  Go to the Kafka directory

         cd to the location of the kafka_2.13-2.7.0 folder

4.  In the Kafka root directory, run this command to start a Kafka broker:

./bin/kafka-server-start.sh config/server.properties

Note: To stop Kafka at any time, use CTRL+C in the terminal. This exits the start script and terminates all Kafka processes.

Detect New Topics

Once it is executed, this script continuously scans the Kafka server for new topics. Which is a distinct category where messages of a certain type are stored. The script establishes a connection to the Kafka server and identifies existing topics. It then enters a continuous loop, looking for any new topics that might appear. When a new topic is discovered, the script initiates the ingestion process into Druid, preparing the new data for immediate analysis.  An ingestion task is automatically created in Druid that consumes the messages from the new topic.  This process of checking for new topics, preparing for message delivery, and reading the messages continues until the script is terminated, ensuring your data is always up-to-date and ready for use.

Python
import time
from confluent_kafka import Consumer
from confluent_kafka.admin import AdminClient

# Import Kafka ingest example for downstream processing
from kafka_ingest_example import create_kafka_ingest

# Kafka client setup with bootstrap servers specified
kafka_client = AdminClient({
	'bootstrap.servers': 'localhost:9092'  # Replace with your Kafka server address
})

# Consumer configuration
conf = {
	'bootstrap.servers': "localhost:9092",
	'group.id': "group_id",
	'auto.offset.reset': 'earliest'
}


def get_existing_topics():
	"""
	Retrieves the list of existing topics on the Kafka broker
	"""
	# Request metadata from Kafka server
	metadata = kafka_client.list_topics(timeout=5)
	# Extract topic names from metadata and return them as a list
	topics = [topic.topic for topic in metadata.topics.values()]
	return topics


def msg_process(msg):
	"""
	Processes and prints the consumed Kafka message
	"""
	print(f'Message on {msg.topic()} [{msg.partition()}] at offset {msg.offset()} with key {msg.key()}: {msg.value()}')


def basic_consume_loop(conf, topic_name):
	"""
	Main loop for consuming messages from Kafka
	"""
	# Kafka Consumer creation with the specified configuration
	consumer = Consumer(conf)

	try:
    	# Subscribe the consumer to the specified topic
    	consumer.subscribe([topic_name])

    	# Infinite loop for consuming messages
    	while True:
        	# Poll a message from the consumer
        	msg = consumer.poll(timeout=1.0)
        	if msg is None:
            	break
        	else:
            	# Process the message if there's no error
            	msg_process(msg)
	finally:
    	# Close the consumer to commit final offsets
    	consumer.close()


def get_new_topics(existing_topics):
	"""
	Checks for new topics on the Kafka broker
	"""
	# Request metadata from Kafka server
	metadata = kafka_client.list_topics(timeout=5)
	# Extract new topic names from metadata
	new_topics = [topic.topic for topic in metadata.topics.values() if topic.topic not in existing_topics]
	# Add the new topics to the existing ones
	existing_topics.extend(new_topics)
	return new_topics


if __name__ == "__main__":
	# Retrieve existing topics
	existing_topics = get_existing_topics()
	print(f"Existing topics: {existing_topics}")

	# Infinite loop for checking and consuming from new topics
	while True:
    	# Check for new topics
    	new_topics = get_new_topics(existing_topics)
    	if new_topics:
        	print(f'New topics: {new_topics}')
        	for topic_name in new_topics:
            	# Start downstream Druid ingestion process
            	create_kafka_ingest(topic_name)
     	       # Pause for a while before consuming messages and sending to new topic
            	# created in Druid
            	time.sleep(5)
            	# Start consuming from the new topic
            	basic_consume_loop(conf, topic_name)

  	  else:
        	print(f'No new topics')
    	# Pause for a while before the next check
    	time.sleep(10)

Ingest New Topics Into Druid

Once the monitoring code identifies the new topic, I automatically create an ingestion job for that new topic using the code sample below which creates a detailed  ‘Ingestion Spec’, for how to process messages from the new Kafka topic. This plan includes information about where the messages are coming from, and the specific topic, and sets schema auto-detection to ‘true’. It also includes instructions to start from the earliest message in the topic and not to group the messages. The granularity of the messages is set to ‘hour’, which means the data is divided into one-hour blocks.  When the script is executed, the Druid host will start processing messages according to the plan. This process ensures that the messages from the new Kafka topic are properly processed and organized in the Druid analytics database, keeping your data up-to-date and ready for analysis.

Python
import json
import requests


# Define a function to create a Kafka Ingestion Spec for Druid
def create_kafka_ingest(topic_name, bootstrap_servers='localhost:9092', druid_host='http://localhost:8081/druid/indexer/v1/supervisor'):
	# Create the Kafka Ingestion Spec in JSON format
	kafka_ingestion_spec = json.dumps(
    	{
        	"type": "kafka",
        	"spec": {
            	"ioConfig": {
               	 "type": "kafka",
                	"consumerProperties": {
                    	"bootstrap.servers": bootstrap_servers
                	},
                	"topic": topic_name,
                	"inputFormat": {
                    	"type": "kafka",
                    	"valueFormat": {
                        	"type": "json"
                    	}
                	},
                	"useEarliestOffset": True
            	},
            	"tuningConfig": {
         	       "type": "kafka"
            	},
            	"dataSchema": {
                	"dataSource": topic_name,
                	"timestampSpec": {
                    	"column": "kafka.timestamp",
                    	"format": "millis"
                	},
                	"dimensionsSpec": {
                    	"dimensions": [],
                    	"dimensionExclusions": [],
                    	"spatialDimensions": [],
                    	"useSchemaDiscovery": True
                	},
                	"granularitySpec": {
                    	"queryGranularity": "none",
                    	"rollup": False,
                    	"segmentGranularity": "hour"
               	 }
            	}
        	}
    	}
	)

	# Set headers for the post request
	headers = {
    	'Content-Type': 'application/json'
	}

	print(f'Creating Kafka ingestion spec for {topic_name}')
	print(f'using this ingestion spec:\n{kafka_ingestion_spec}')

	try:
    	# Make a post request to Druid with the Kafka ingestion spec
    	kafka_supervisor_post = requests.post(druid_host, kafka_ingestion_spec, headers=headers)
    	# Raise an exception if the request was unsuccessful
    	kafka_supervisor_post.raise_for_status()
	except Exception as e:
    	print("Something went wrong with the request:", e)

	# Print the response
	print(kafka_supervisor_post.text)

Note that with the new schema auto detection feature no schema requited in ingestion spec, see the “dimensionsSpec” section in the spec above.

Create Topic

I tested the monitoring and ingestion code using the code sample below.  This script sends a series of messages to a specific Kafka topic. Once the topic is recognized by the monitoring code we initiated earlier, then the topic is ingested into Druid.  If the code is interrupted, it makes sure that all the messages that have been generated are sent. This ensures that no messages are lost.

Python
import json
import os
import subprocess
from datetime import datetime
from confluent_kafka import Producer
from confluent_kafka import KafkaException


def create_topic(topic_name):
	try:
    	# Change directory to the location of the Kafka scripts
    	os.chdir('/Users/rick/IdeaProjects/CodeProjects/myKafka/kafka_2.13-2.7.0/kafka_2.13-2.7.0')

    	# Run the Kafka script to create a new topic
    	subprocess.run(
        	['./bin/kafka-topics.sh', '--create', '--topic', topic_name, '--bootstrap-server', 'localhost:9092'],
        	check=True)
	except Exception as e:
    	# Log and raise any exception that occurs while creating the topic
    	print(f"Failed to create topic: {e}")
    	raise


def simple_message(val):
	try:
    	# Create a timestamp and a message
    	now = datetime.now().isoformat()
    	msg = f'This is message number {val} from Kafka created at {now}'

    	# Bundle the message and the timestamp into a JSON object
    	data = {
        	"message": msg,
        	"timestamp": now,
    	}

    	return json.dumps(data)
	except Exception as e:
    	# Log and raise any exception that occurs while creating the message
    	print(f"Failed to create message: {e}")
    	raise


def send_simple_messages(i, producer, topic):
	try:
    	# Create a simple message
    	msg = simple_message(i)
    	print(msg)
    	# Produce the message to the Kafka topic
    	producer.produce(topic, value=msg)
    	producer.flush(30)
	except KafkaException as e:
    	# Log and raise any KafkaException that occurs while sending the message
    	print(f"Failed to send message: {e}")
    	raise


def run_producer(topic_name):
	try:
    	# Create the new topic
    	# Todo - Comment out the below line to prevent the topic from being created
    	# print('Line commented out to prevent the topic from being created')
    	create_topic(topic_name)

    	producer = Producer({'bootstrap.servers': 'localhost:9092'})
    	print(f'The following messages will be sent to the topic: {topic_name}')

    	for i in range(1, 11):  # Adjusted to send only 10 messages
        	send_simple_messages(i, producer, topic_name)
	except KeyboardInterrupt:
    	# If the user interrupts the process, log the event and stop the producer
    	print("KeyboardInterrupt occurred, stopping producer...")
	except Exception as e:
    	# Log and raise any general exception that occurs while running the producer
    	print(f"Failed to run producer: {e}")
	finally:
    	# Ensure that all messages are delivered before exiting
    	producer.flush(30)


if __name__ == "__main__":
	try:
    	topic_name = 'my_topic_new'
    	# Run the Kafka producer with the specified topic
    	run_producer(topic_name)
	except Exception as e:
    	# Log and raise any exception that occurs during the program's execution
    	print(f"Error occurred: {e}")

Query Data

To verify that the messages were ingested, select the query interface from within the Druid console.  And execute SQL similar to the example below:

The Analytics Query UI within the Druid Console allows for easy querying and analysis of the data stored. The Console provides real-time updates as data is ingested into Druid enabling instantaneous analysis of large data. Users can create custom queries using SQL query language or utilize pre-built templates for common analytical tasks. Advanced features such as filters, aggregations, and rollups enable complex data analysis thereby providing insights to enhance your business performance. Additionally, integration with other tools such as Grafana, Tableau, and Supersets allows for seamless end-to-end analytics workflows that improve your business intelligence processes.

Conclusion

In conclusion, this blog highlights the importance of effectively managing and analyzing data streams in the modern data-centric world. This solution utilizes Kafka as a powerful message-broker system and  Druid as a high-performance, real-time analytics database. Automating the detection and ingestion of Kafka topics into Druid significantly enhances operational efficiency and scalability.

This blog describes some benefits, such as reducing manual workload, eliminating time delays, and ensuring immediate access to insights derived from the latest data. Also included is a step-by-step guide with implementation code for the automation process. So don’t let your valuable data get lost in the shuffle or languish in latency. Start automating your data ingestion today, and take advantage of new opportunities to harness the power of your data.

About the Author

Rick Jacobs is a Senior Technical Product Marketing Manager at Imply. His varied background includes experience at IBM, Cloudera, and Couchbase. He has over 20 years of technology experience garnered from serving in development, consulting, data science, sales engineering, and other roles. He holds several academic degrees including an MS in Computational Science from George Mason University. When not working on technology, Rick is trying to learn Spanish and pursuing his dream of becoming a beach bum. 

Newsletter Signup

Let us help with your analytics apps

Request a Demo