Automate Streaming Data Ingestion with Kafka and Druid
Jul 20, 2023
Rick Jacobs
The technological world is a maze of data. Every interaction, transaction, and process generates data that could potentially be invaluable for businesses. But to truly benefit from data, organizations need effective ways to process and analyze it, as it’s created. Enter Kafka and Druid. Kafka serves as a robust message-broker system, adept at handling real-time data feeds efficiently, while Druid is a high-performance real-time analytics database, designed for workflows where fast queries, low latency, and stream ingest really matter. However, managing data streams can be a challenging task, especially when you have to deal with large volumes of data from disparate sources. Streamlining and automating the process can greatly increase operational efficiency and make managing dynamic data easier.
What will you learn?
In this blog post, you will:
Learn how Kafka and Druid work together to manage and analyze data streams, with Kafka serving as a robust message-broker system and Druid, as a high-performance, real-time analytics database.
Gain insights on the process of automatically detecting new topics in Kafka and automating their ingestion into Druid.
Become familiar with the use of Python scripts to connect to the Kafka server, fetch the list of topics, and compare it with a previously stored list to detect new topics and ingest the new data into Druid.
Discover how an ‘Ingestion Spec’ is created for new topics and how it guides the ingestion and processing of data associated with these topics.
Learn about the schema auto-detection feature in Druid that simplifies the ingestion process.
Explore how to run SQL queries from within the Druid console.
Understand how automating the detection and ingestion of Kafka topics into Druid enhances operational efficiency and scalability.
Learn about the potential benefits of this automation, such as reducing manual workload, eliminating time delays, and ensuring immediate access to insights derived from the latest data.
Find out how to implement these automation processes through a step-by-step guide and the provided code samples.
Improve Efficiency by Automating Data Streams
Managing large-scale, real-time data requires substantial resources, and the risk of error is high. Automation offers a compelling solution to these challenges. It’s not only about doing things faster; it’s about scalability, reducing the risk of error, and freeing up valuable personnel for more complex tasks. By automating data ingestion from Kafka to Druid, organizations can streamline their data pipeline, enhancing efficiency and scalability.
In a dynamic data environment, new Kafka topics can appear at any time. Keeping track of these new topics and setting up ingestion processes for them manually is both time-consuming and prone to errors. This article demonstrates an automated discovery process that periodically checks for new topics and kickstarts the ingestion process whenever a new topic appears. Python scripts connect to the Kafka server, fetch the list of topics, and compare it with a previously stored list. Any new topics detected are then flagged for ingestion. This automatic discovery of topics ensures that no new data is missed due to a delay in setting up ingestion. It also reduces manual workload and the chance of errors that could occur during the setup process.
Once a new topic has been identified, the next step is ingesting the data associated with that topic into Druid. This is accomplished by creating an ingestion spec, which is a JSON object describing where to find the data and how to ingest and process it, and executing that spec on a Druid cluster. Druid’s schema auto-detection greatly simplifies the ingestion spec because a data schema is not required. When schema auto-detection is set to “true”, Druid is capable of ingesting data without a user-defined data structure.
How The Automatic Topic Ingestion Works
Kafka topics and corresponding messages can be automatically ingested into Druid using the following process.
Prerequisites
To begin, you will need to install and configure Druid on your local machine or server. Then install Kafka locally, write code to monitor the topics, and automatically set up the ingestion configuration in Druid with enable auto-schema detection enabled.
Install Druid Locally
1. Download Druid from apache.org and extract the file.
2. From the terminal, change directories to the distribution directory.
cd druid_26/distribution/target/apache-druid-27.0.0-SNAPSHOT
3. From the apache-druid-26.0.0 package root, run the following command to start the micro-quickstart configuration:
./bin/start-druid
4. This starts up instances of ZooKeeper and the Druid services
Note: To stop Kafka at any time, use CTRL+C in the terminal. This exits the start script and terminates all Kafka processes.
Detect New Topics
Once it is executed, this script continuously scans the Kafka server for new topics. Which is a distinct category where messages of a certain type are stored. The script establishes a connection to the Kafka server and identifies existing topics. It then enters a continuous loop, looking for any new topics that might appear. When a new topic is discovered, the script initiates the ingestion process into Druid, preparing the new data for immediate analysis. An ingestion task is automatically created in Druid that consumes the messages from the new topic. This process of checking for new topics, preparing for message delivery, and reading the messages continues until the script is terminated, ensuring your data is always up-to-date and ready for use.
Python
import timefrom confluent_kafka import Consumerfrom confluent_kafka.admin import AdminClient# Import Kafka ingest example for downstream processingfrom kafka_ingest_example import create_kafka_ingest# Kafka client setup with bootstrap servers specifiedkafka_client =AdminClient({'bootstrap.servers':'localhost:9092'# Replace with your Kafka server address})# Consumer configurationconf ={'bootstrap.servers':"localhost:9092",'group.id':"group_id",'auto.offset.reset':'earliest'}defget_existing_topics():""" Retrieves the list of existing topics on the Kafka broker"""# Request metadata from Kafka server metadata = kafka_client.list_topics(timeout=5)# Extract topic names from metadata and return them as a list topics =[topic.topic for topic in metadata.topics.values()]return topicsdefmsg_process(msg):""" Processes and prints the consumed Kafka message"""print(f'Message on {msg.topic()} [{msg.partition()}] at offset {msg.offset()} with key {msg.key()}: {msg.value()}')defbasic_consume_loop(conf,topic_name):""" Main loop for consuming messages from Kafka"""# Kafka Consumer creation with the specified configuration consumer =Consumer(conf)try:# Subscribe the consumer to the specified topic consumer.subscribe([topic_name])# Infinite loop for consuming messageswhileTrue:# Poll a message from the consumer msg = consumer.poll(timeout=1.0)if msg isNone:breakelse:# Process the message if there's no errormsg_process(msg)finally:# Close the consumer to commit final offsets consumer.close()defget_new_topics(existing_topics):""" Checks for new topics on the Kafka broker"""# Request metadata from Kafka server metadata = kafka_client.list_topics(timeout=5)# Extract new topic names from metadata new_topics =[topic.topic for topic in metadata.topics.values()if topic.topic notin existing_topics]# Add the new topics to the existing ones existing_topics.extend(new_topics)return new_topicsif __name__ =="__main__":# Retrieve existing topics existing_topics =get_existing_topics()print(f"Existing topics: {existing_topics}")# Infinite loop for checking and consuming from new topicswhileTrue:# Check for new topics new_topics =get_new_topics(existing_topics)if new_topics:print(f'New topics: {new_topics}')for topic_name in new_topics:# Start downstream Druid ingestion processcreate_kafka_ingest(topic_name)# Pause for a while before consuming messages and sending to new topic# created in Druid time.sleep(5)# Start consuming from the new topicbasic_consume_loop(conf, topic_name)else:print(f'No new topics')# Pause for a while before the next check time.sleep(10)
Ingest New Topics Into Druid
Once the monitoring code identifies the new topic, I automatically create an ingestion job for that new topic using the code sample below which creates a detailed ‘Ingestion Spec’, for how to process messages from the new Kafka topic. This plan includes information about where the messages are coming from, and the specific topic, and sets schema auto-detection to ‘true’. It also includes instructions to start from the earliest message in the topic and not to group the messages. The granularity of the messages is set to ‘hour’, which means the data is divided into one-hour blocks. When the script is executed, the Druid host will start processing messages according to the plan. This process ensures that the messages from the new Kafka topic are properly processed and organized in the Druid analytics database, keeping your data up-to-date and ready for analysis.
Python
import jsonimport requests# Define a function to create a Kafka Ingestion Spec for Druiddefcreate_kafka_ingest(topic_name,bootstrap_servers='localhost:9092',druid_host='http://localhost:8081/druid/indexer/v1/supervisor'):# Create the Kafka Ingestion Spec in JSON format kafka_ingestion_spec = json.dumps({"type":"kafka","spec":{"ioConfig":{"type":"kafka","consumerProperties":{"bootstrap.servers": bootstrap_servers},"topic": topic_name,"inputFormat":{"type":"kafka","valueFormat":{"type":"json"}},"useEarliestOffset":True},"tuningConfig":{"type":"kafka"},"dataSchema":{"dataSource": topic_name,"timestampSpec":{"column":"kafka.timestamp","format":"millis"},"dimensionsSpec":{"dimensions":[],"dimensionExclusions":[],"spatialDimensions":[],"useSchemaDiscovery":True},"granularitySpec":{"queryGranularity":"none","rollup":False,"segmentGranularity":"hour"}}}})# Set headers for the post request headers ={'Content-Type':'application/json'}print(f'Creating Kafka ingestion spec for {topic_name}')print(f'using this ingestion spec:\n{kafka_ingestion_spec}')try:# Make a post request to Druid with the Kafka ingestion spec kafka_supervisor_post = requests.post(druid_host, kafka_ingestion_spec,headers=headers)# Raise an exception if the request was unsuccessful kafka_supervisor_post.raise_for_status()exceptExceptionas e:print("Something went wrong with the request:", e)# Print the responseprint(kafka_supervisor_post.text)
Note that with the new schema auto detection feature no schema requited in ingestion spec, see the “dimensionsSpec” section in the spec above.
Create Topic
I tested the monitoring and ingestion code using the code sample below. This script sends a series of messages to a specific Kafka topic. Once the topic is recognized by the monitoring code we initiated earlier, then the topic is ingested into Druid. If the code is interrupted, it makes sure that all the messages that have been generated are sent. This ensures that no messages are lost.
Python
import jsonimport osimport subprocessfrom datetime import datetimefrom confluent_kafka import Producerfrom confluent_kafka import KafkaExceptiondefcreate_topic(topic_name):try:# Change directory to the location of the Kafka scripts os.chdir('/Users/rick/IdeaProjects/CodeProjects/myKafka/kafka_2.13-2.7.0/kafka_2.13-2.7.0')# Run the Kafka script to create a new topic subprocess.run(['./bin/kafka-topics.sh','--create','--topic', topic_name,'--bootstrap-server','localhost:9092'],check=True)exceptExceptionas e:# Log and raise any exception that occurs while creating the topicprint(f"Failed to create topic: {e}")raisedefsimple_message(val):try:# Create a timestamp and a message now = datetime.now().isoformat() msg =f'This is message number {val} from Kafka created at {now}'# Bundle the message and the timestamp into a JSON object data ={"message": msg,"timestamp": now,}return json.dumps(data)exceptExceptionas e:# Log and raise any exception that occurs while creating the messageprint(f"Failed to create message: {e}")raisedefsend_simple_messages(i,producer,topic):try:# Create a simple message msg =simple_message(i)print(msg)# Produce the message to the Kafka topic producer.produce(topic,value=msg) producer.flush(30)except KafkaException as e:# Log and raise any KafkaException that occurs while sending the messageprint(f"Failed to send message: {e}")raisedefrun_producer(topic_name):try:# Create the new topic# Todo - Comment out the below line to prevent the topic from being created# print('Line commented out to prevent the topic from being created')create_topic(topic_name) producer =Producer({'bootstrap.servers':'localhost:9092'})print(f'The following messages will be sent to the topic: {topic_name}')for i inrange(1,11):# Adjusted to send only 10 messagessend_simple_messages(i, producer, topic_name)exceptKeyboardInterrupt:# If the user interrupts the process, log the event and stop the producerprint("KeyboardInterrupt occurred, stopping producer...")exceptExceptionas e:# Log and raise any general exception that occurs while running the producerprint(f"Failed to run producer: {e}")finally:# Ensure that all messages are delivered before exiting producer.flush(30)if __name__ =="__main__":try: topic_name ='my_topic_new'# Run the Kafka producer with the specified topicrun_producer(topic_name)exceptExceptionas e:# Log and raise any exception that occurs during the program's executionprint(f"Error occurred: {e}")
Query Data
To verify that the messages were ingested, select the query interface from within the Druid console. And execute SQL similar to the example below:
The Analytics Query UI within the Druid Console allows for easy querying and analysis of the data stored. The Console provides real-time updates as data is ingested into Druid enabling instantaneous analysis of large data. Users can create custom queries using SQL query language or utilize pre-built templates for common analytical tasks. Advanced features such as filters, aggregations, and rollups enable complex data analysis thereby providing insights to enhance your business performance. Additionally, integration with other tools such as Grafana, Tableau, and Supersets allows for seamless end-to-end analytics workflows that improve your business intelligence processes.
Conclusion
In conclusion, this blog highlights the importance of effectively managing and analyzing data streams in the modern data-centric world. This solution utilizes Kafka as a powerful message-broker system and Druid as a high-performance, real-time analytics database. Automating the detection and ingestion of Kafka topics into Druid significantly enhances operational efficiency and scalability.
This blog describes some benefits, such as reducing manual workload, eliminating time delays, and ensuring immediate access to insights derived from the latest data. Also included is a step-by-step guide with implementation code for the automation process. So don’t let your valuable data get lost in the shuffle or languish in latency. Start automating your data ingestion today, and take advantage of new opportunities to harness the power of your data.
About the Author
Rick Jacobs is a Senior Technical Product Marketing Manager at Imply. His varied background includes experience at IBM, Cloudera, and Couchbase. He has over 20 years of technology experience garnered from serving in development, consulting, data science, sales engineering, and other roles. He holds several academic degrees including an MS in Computational Science from George Mason University. When not working on technology, Rick is trying to learn Spanish and pursuing his dream of becoming a beach bum.
Other blogs you might find interesting
No records found...
Nov 14, 2024
Recap: Druid Summit 2024 – A Vibrant Community Shaping the Future of Data Analytics
In today’s fast-paced world, organizations rely on real-time analytics to make critical decisions. With millions of events streaming in per second, having an intuitive, high-speed data exploration tool to...
Pivot by Imply: A High-Speed Data Exploration UI for Druid
In today’s fast-paced world, organizations rely on real-time analytics to make critical decisions. With millions of events streaming in per second, having an intuitive, high-speed data exploration tool to...