Farewell Lambda Architectures: Exactly-Once Streaming Ingestion in Druid
Jul 05, 2016
David Lim
The recent rise of stream analytics has been generating palpable excitement in the big data world and it’s not all that hard to see why. Gone are the days when slow and unwieldy batch processing systems with their high query latencies were the only option for processing large quantities of data. Today, many companies are turning to nimble streaming solutions which are enabling them to understand and make business decisions from their data immediately, resulting in an operational agility that was unthinkable only a few years ago. In many cases, the ability to analyze data in real-time has not only made businesses more responsive but has also opened up new avenues of opportunity to better understand and engage their customers.
On the Druid project, we are extremely interested in the maturation of streaming technologies. Although Druid is a system able to handle vast amounts of data simultaneously from both batch and streaming sources, it is in stream-oriented systems where Druid really shines, allowing users to execute sub-second class queries against dynamic indices being updated in real-time. This has given rise to a new breed of exploratory analytics powerful enough to support interactively-fast dashboards backed not only by petabytes of historical data but also real-time events.
Engineering problems typically involve tradeoffs, and constructing distributed data pipelines is no exception. Batch processing systems are very reliable but have frustratingly high latencies that stretch out the waiting time between event and insight. Streaming systems are able to provide low latency processing, but often have difficulties supporting reprocessing and offering exactly-once message delivery guarantees.
Previously, reliable Druid deployments utilized a combination of streaming and batch pipelines known as a Lambda Architecture to support the immediate querying of real-time events while maintaining long-term correctness when message delivery was not guaranteed. Queries performed against the streamed data would return “good enough” results which may have missing or repeated data, while a batch ingestion job ran periodically to rebuild the indices once we were sure that all the data was available.
Lambda Architectures are effective, but are rightly challenged for their inherent complexity and the engineering and operational overhead involved in working with two separate pipelines that must play nicely together. We have been working hard to advance Druid’s ingestion technology to take advantage of the latency benefits of streaming systems while offering better correctness guarantees, and the 0.9.1.1 release represents a significant milestone in this pursuit.
The Difficulty With Exactly-Once Ingestion
In an ideal computing world, messages passed between components of a distributed system would be delivered to the recipient exactly one time. In practice however, achieving this kind of guarantee is non-trivial, and progress towards this ideal requires thoughtful design on the part of both the sender and the receiver.
The difficulty in achieving exactly-once delivery is fairly easy to reason about. As an example: having sent a message and not receiving an acknowledgment, what action should the sender subsequently take? The lack of acknowledgment could be because the message was not received, which leaves ambiguity in whether or not the message will ever be received in the future. If on the other hand the message delivery was successful, the lack of response may indicate that the recipient failed during processing, or it may be because the acknowledgment was lost on the way back to the sender. Without knowing exactly what happened, the system must make a decision between retrying or continuing on, and the decision it makes could lead to dropped data or repeated data.
Exactly-once delivery requires a coordination mechanism between the sender and receiver which is able to tolerate failures in the system. In the case of Druid ingestion, we need to be resilient to failures in the worker nodes and have the ability to reprocess data that was previously indexed but still stored in-memory or on local disk when the failure occurred. The ability to coordinate a re-read of data is implementation specific, and in Druid 0.9.1.1 we are introducing a new indexing service that is able to provide exactly-once delivery guarantees when ingesting data from Apache Kafka.
The Kafka Indexing Task
Apache Kafka is an ideal system to integrate with Druid, not only for its high throughput and reliability, but also because it has a well-designed architecture that allows downstream systems fine and deterministic control over their read position in the message stream. Kafka has the following properties that make exactly-once ingestion possible:
Each message written to Kafka is placed into an ordered and immutable sequence called a partition and is assigned a sequentially incrementing identifier called an offset. Thus, an individual message can be uniquely identified by its partition-offset pair and a message’s position will never change relative to all other preceding and succeeding elements in the sequence.
Messages are pulled by the consumers rather than being pushed by the brokers. This allows consumers to manage their own rate of ingestion and avoids a number of complications inherent in push-based systems.
Consumers can seek to any offset in any partition, allowing them to “rewind” the stream to any position in the past while the data is still present in Kafka’s buffers.
Messages are tagged with metadata that includes their partition and offset. This provides consumers with a mechanism to verify that they received what they expected and that no messages were inadvertently dropped or re-sent. Equally importantly, it provides consumers with markers that can be used to coordinate reads between processes, suspend and resume ingestion, and rewind to an exact position in the stream for re-reading.
In Druid, the Kafka indexing task utilizes these properties to achieve exactly-once ingestion. Each task is assigned a set of partitions with corresponding start and end offsets and will begin reading messages from Kafka sequentially until all assigned offsets have been read. During reading, every message received is verified to ensure that it follows in sequence from the previously received message before being parsed and added to the index.
When all messages assigned to the task have been read, the task will push the generated segments to deep storage to be loaded by historical nodes and will publish the segments by writing entries in the segment metadata table. Crucially for exactly-once ingestion, the task will also atomically record the final Kafka offsets in the same metadata transaction as the segment entry. This transaction prevents the segment from being published without the offset marker being updated or vice versa. Hence a successful task is guaranteed to have written both a segment descriptor and the corresponding Kafka offsets and a failed task is guaranteed to have written neither.
The offset is used to ensure that no messages are lost or duplicated between indexing tasks, and that indexing tasks which may have read the same offsets cannot both publish their segments. This requirement is enforced by a consistency check that happens when the offset marker is written to the metadata store: if the starting offsets of the to-be-published segment match the ending offsets of the last published segment then the transaction succeeds; otherwise the segment is rejected, since allowing it would mean that at least one message will be repeated or has been dropped. Synchronizing processed events at segment insertion time allows the task to provide an exactly-once guarantee without incurring a performance penalty while the index is being generated.
The Kafka Supervisor
Retrying tasks which have failed due to consistency violations is one of the jobs of the Kafka supervisor, which together with the indexing task comprise the Kafka indexing service. The supervisor runs as a component of the Druid overlord and manages the lifecycle of Kafka indexing tasks. A supervisor is configured by submitting a specification to the overlord which contains an indexing schema, the Kafka broker address and topic, and the number of concurrent tasks to run for scalability and redundancy. Supervisors are also provided with a duration defining how long tasks should run, which is necessary since indexing tasks do not push segments to deep storage until they complete and having long-lived tasks is not recommended for stability or scalability.
Once the supervisor is configured, it will create the necessary indexing tasks to achieve the scalability and redundancy targets and will monitor their progress, recreating failed tasks and coordinating the creation of subsequent tasks when the previous ones have completed. The Kafka supervisors are persistent and will survive overlord restarts and leadership changes. Supervisors will also coordinate schema migrations, by automatically stopping tasks running with the old schema and creating new tasks with the new configuration such that no messages are dropped or duplicated during transition.
Farewell Window Periods!
The Kafka indexing task is the first real-time ingestion option in Druid that does not require events to fall within a window period. The window period mechanism existed to simplify the optimal generation of segments but restricted streaming ingestion to relatively recent events. With the removal of the window period restriction, the Kafka indexing service can be used to ingest data with arbitrarily old timestamps, making a batch pipeline unnecessary in many situations.
Note that if your event stream contains a wide range of timestamps relative to your segment granularity, this will result in a large number of segments being created which may have an adverse effect on query performance. If your data falls into this category, you should monitor the number and size of segments created and periodically run batch indexing tasks to compact the segments.
A Few Numbers
The following results were obtained on an Amazon EC2 r3.8xlarge instance (Intel Xeon E5-2670 v2) ingesting randomly generated events. Each event consisted of a value field (processed with a longSum, longMin, and longMax aggregator) and a number of dimensions of varying cardinality. Your results may vary based on tuning, hardware used, and data complexity.
Dimensions/Cardinality
Ingestion rate (events/sec/task)
1 low
160k
1 high
80k
5 low
70k
5 low, 1 high
60k
10 low, 1 high
50k
10 low, 3 high
40k
30 low
30k
50 low
25k
50 high
15k
In our testing, we were able to achieve a sustained aggregate ingestion rate of 3.3M events/sec on a single r3.8xlarge instance when indexing simple events with very high roll-up. When ingesting more complicated data (10 low cardinality dimensions + 1 high cardinality dimension) which required more processing power for index generation and frequent spills to disk, a single instance was able to handle just over 600k events/sec. It is worth noting that these numbers are comparable to Druid’s other stream ingestion methods, demonstrating that the Kafka indexing service is able to provide its additional correctness guarantees without sacrificing performance.
What’s Next?
We are continuing to refine the Kafka indexing service and welcome any suggestions and feature requests. Additionally, the components that make up the indexing service can be easily extended to support ingestion from other sources of data. If you are interested in adding to Druid’s ingestion capabilities, the community warmly welcomes your contributions!
Final Thoughts
The Kafka indexing service is an exciting milestone in the maturity of Druid’s ingestion technology, giving users an easy-to-use mechanism to stream arbitrarily old data into Druid with exactly-once correctness. While it’s important to note that constructing an end-to-end exactly-once streaming pipeline is still a challenging engineering problem, with the Kafka indexing service Druid is making it easier than ever to realize business value from your big data and garner immediate insights from your real-time event streams.
If you are interested in trying out Druid with the Kafka indexing service, we recommend that you check out the documentation and work through our Kafka real-time quickstart which will get you up and running with Druid in minutes.
Other blogs you might find interesting
No records found...
Sep 21, 2023
Migrate Analytics Data from MongoDB to Apache Druid
This blog presents a concise guide on migrating data from MongoDB to Druid. It includes Python scripts to extract data from MongoDB, save it as CSV, and then ingest it into Druid. It also touches on maintaining...
How Druid Facilitates Real-Time Analytics for Mass Transit
Mass transit plays a key role in reimagining life in a warmer, more densely populated world. Learn how Apache Druid helps power data and analytics for mass transit.
Migrate Analytics Data from Snowflake to Apache Druid
This blog outlines the steps needed to migrate data from Snowflake to Apache Druid, a platform designed for high-performance analytical queries. The article covers the migration process, including Python scripts...
Apache Kafka, Flink, and Druid: Open Source Essentials for Real-Time Applications
Apache Kafka, Flink, and Druid, when used together, create a real-time data architecture that eliminates all these wait states. In this blog post, we’ll explore how the combination of these tools enables...
Visualizing Data in Apache Druid with the Plotly Python Library
In today's data-driven world, making sense of vast datasets can be a daunting task. Visualizing this data can transform complicated patterns into actionable insights. This blog delves into the utilization of...
Bringing Real-Time Data to Solar Power with Apache Druid
In a rapidly warming world, solar power is critical for decarbonization. Learn how Apache Druid empowers a solar equipment manufacturer to provide real-time data to users, from utility plant operators to homeowners
When to Build (Versus Buy) an Observability Application
Observability is the key to software reliability. Here’s how to decide whether to build or buy your own solution—and why Apache Druid is a popular database for real-time observability
How Innowatts Simplifies Utility Management with Apache Druid
Data is a key driver of progress and innovation in all aspects of our society and economy. By bringing digital data to physical hardware, the Internet of Things (IoT) bridges the gap between the online and...
Three Ways to Use Apache Druid for Machine Learning Workflows
An excellent addition to any machine learning environment, Apache Druid® can facilitate analytics, streamline monitoring, and add real-time data to operations and training
Apache Druid® is an open-source distributed database designed for real-time analytics at scale. Apache Druid 27.0 contains over 350 commits & 46 contributors. This release's focus is on stability and scaling...
Unleashing Real-Time Analytics in APJ: Introducing Imply Polaris on AWS AP-South-1
Imply, the company founded by the original creators of Apache Druid, has exciting news for developers in India seeking to build real-time analytics applications. Introducing Imply Polaris, a powerful database-as-a-Service...
In this guide, we will walk you through creating a very simple web app that shows a different embedded chart for each user selected from a drop-down. While this example is simple it highlights the possibilities...
Automate Streaming Data Ingestion with Kafka and Druid
In this blog post, we explore the integration of Kafka and Druid for data stream management and analysis, emphasizing automatic topic detection and ingestion. We delve into the creation of 'Ingestion Spec',...
This guide explores configuring Apache Druid to receive Kafka streaming messages. To demonstrate Druid's game-changing automatic schema discovery. Using a real-world scenario where data changes are handled...
Imply Polaris, our ever-evolving Database-as-a-Service, recently focused on global expansion, enhanced security, and improved data handling and visualization. This fully managed cloud service, based on Apache...
Introducing hands-on developer tutorials for Apache Druid
The objective of this blog is to introduce the new set of interactive tutorials focused on the Druid API fundamentals. These tutorials are available as Jupyter Notebooks and can be downloaded as a Docker container.
In this blog article I’ll unpack schema auto-discovery, a new feature now available in Druid 26.0, that enables Druid to automatically discover data fields and data types and update tables to match changing...
Druid now has a new function, Unnest. Unnest explodes an array into individual elements. This blog contains design methodology and examples for this new Unnest function both from native and SQL binding perspectives.
What’s new in Imply Polaris – Our Real-Time Analytics DBaaS
Every week we add new features and capabilities to Imply Polaris. This month, we’ve expanded security capabilities, added new query functionality, and made it easier to monitor your service with your preferred...
Apache Druid® 26.0, an open-source distributed database for real-time analytics, has seen significant improvements with 411 new commits, a 40% increase from version 25.0. The expanded contributor base of 60...
How to Build a Sentiment Analysis Application with ChatGPT and Druid
Leveraging ChatGPT for sentiment analysis, when combined with Apache Druid, offers results from large data volumes. This integration is easily achievable, revealing valuable insights and trends for businesses...
In this blog, we will compare Snowflake and Druid. It is important to note that reporting data warehouses and real-time analytics databases are different domains. Choosing the right tool for your specific requirements...
Learn how to achieve sub-second responses with Apache Druid
Learn how to achieve sub-second responses with Apache Druid. This article is an in-depth look at how Druid resolves queries and describes data modeling techniques that improve performance.
Apache Druid uses load rules to manage the ageing of segments from one historical tier to another and finally to purge old segments from the cluster. In this article, we’ll show what happens when you make...
Real-Time Analytics: Building Blocks and Architecture
This blog identifies the key technical considerations for real-time analytics. It answers what is the right data architecture and why. It spotlights the technologies used at Confluent, Reddit, Target and 1000s...
What’s new in Imply Polaris – Our Real-Time Analytics DBaaS
This blog explains some of the new features, functionality and connectivity added to Imply Polaris over the last two months. We've expanded ingestion capabilities, simplified operations and increased reliability...
Wow, that was easy – Up and running with Apache Druid
The objective of this blog is to provide a step-by-step guide on setting up Druid locally, including the use of SQL ingestion for importing data and executing analytical queries.
Tales at Scale Podcast Kicks off with the Apache Druid Origin Story
Tales at Scale cracks open the world of analytics projects and shares stories from developers and engineers who are building analytics applications or working within the real-time data space. One of the key...
Real-time Analytics Database uses partitioning and pruning to achieve its legendary performance
Apache Druid uses partitioning (splitting data) and pruning (selecting subset of data) to achieve its legendary performance. Learn how to use the CLUSTERED BY clause during ingestion for performance and high...
Easily embed analytics into your own apps with Imply’s DBaaS
This blog explains how developers can leverage Imply Polaris to embed robust visualization options directly into their own applications without them having to build a UI. This is super important because consuming...
Building an Event Analytics Pipeline with Confluent Cloud and Imply’s real time DBaaS, Polaris
Learn how to set up a pipeline that generates a simulated clickstream event stream and sends it to Confluent Cloud, processes the raw clickstream data using managed ksqlDB in Confluent Cloud, delivers the processed...
We are excited to announce the availability of Imply Polaris in Europe, specifically in AWS eu-central-1 region based in Frankfurt. Since its launch in March 2022, Imply Polaris, the fully managed Database-as-a-Service...
Should You Build or Buy Security Analytics for SecOps?
When should you build—or buy—a security analytics platform for your environment? Here are some common considerations—and how Apache Druid is the ideal foundation for any in-house security solution.
Combating financial fraud and money laundering at scale with Apache Druid
Learn how Apache Druid enables financial services firms and FinTech companies to get immediate insights from petabytes-plus data volumes for anti-fraud and anti-money laundering compliance.
This is a what's new to Imply in Dec 2022. We’ve added two new features to Imply Polaris to make it easier for your end users to take advantage of real-time insights.
Imply Pivot delivers the final mile for modern analytics applications
This blog is focused on how Imply Pivot delivers the final mile for building an anlaytics app. It showcases two customer examples - Twitch and ironsource.
For decades, analytics has been defined by the standard reporting and BI workflow, supported by the data warehouse. Now, 1000s of companies are realizing an expansion of analytics beyond reporting, which requires...
Apache Druid is at the heart of Imply. We’re an open source business, and that’s why we’re committed to making Druid the best open source database for modern analytics applications
When it comes to modern data analytics applications, speed is of the utmost importance. In this blog we discuss two approximation algorithms which can be used to greatly enhance speed with only a slight reduction...
The next chapter for Imply Polaris: celebrating 250+ accounts, continued innovation
Today we announced the next iteration of Imply Polaris, the fully managed Database-as-a-Service that helps you build modern analytics applications faster, cheaper, and with less effort. Since its launch in...
We obviously talk a lot about #ApacheDruid on here. But what are folks actually building with Druid? What is a modern analytics application, exactly? Let's find out
Elasticity is important, but beware the database that can only save you money when your application is not in use. The best solution will have excellent price-performance under all conditions.
Druid 0.23 – Features And Capabilities For Advanced Scenarios
Many of Druid’s improvements focus on building a solid foundation, including making the system more stable, easier to use, faster to scale, and better integrated with the rest of the data ecosystem. But for...
Apache Druid 0.23.0 contains over 450 updates, including new features, major performance enhancements, bug fixes, and major documentation improvements.
Imply Polaris is a fully managed database-as-a-service for building realtime analytics applications. John is the tech lead for the Polaris UI, known internally as the Unified App. It began with a profound question:...
There is a new category within data analytics emerging which is not centered in the world of reports and dashboards (the purview of data analysts and data scientists), but instead centered in the world of applications...
We are in the early stages of a stream revolution, as developers build modern transactional and analytic applications that use real-time data continuously delivered.
Developers and architects must look beyond query performance to understand the operational realities of growing and managing a high performance database and if it will consume their valuable time.
Building high performance logging analytics with Polaris and Logstash
When you think of querying with Apache Druid, you probably imagine queries over massive data sets that run in less than a second. This blog is about some of the things we did as a team to discover the user...
Horizontal scaling is the key to performance at scale, which is why every database claims this. You should investigate, though, to see how much effort it takes, especially compared to Apache Druid.
When you think of querying with Apache Druid, you probably imagine queries over massive data sets that run in less than a second. This blog is about some of the things we did as a team to discover the user...
Building Analytics for External Users is a Whole Different Animal
Analytics aren’t just for internal stakeholders anymore. If you’re building an analytics application for customers, then you’re probably wondering…what’s the right database backend?
After over 30 years of working with data analytics, we’ve been witness (and sometimes participant) to three major shifts in how we find insights from data - and now we’re looking at the fourth.
Every year industry pundits predict data and analytics becoming more valuable the following year. But this doesn’t take a crystal ball to predict. There’s instead something much more interesting happening...
Today, I'm prepared to share our progress on this effort and some of our plans for the future. But before diving further into that, let's take a closer look at how Druid's core query engine executes queries,...
Product Update: SSO, Cluster level authorization, OAuth 2.0 and more security features
When you think of querying with Apache Druid, you probably imagine queries over massive data sets that run in less than a second. This blog is about some of the things we did as a team to discover the user...
When you think of querying with Apache Druid, you probably imagine queries over massive data sets that run in less than a second. This blog is about some of the things we did as a team to discover the user...
Druid Nails Cost Efficiency Challenge Against ClickHouse & Rockset
To make a long story short, we were pleased to confirm that Druid is 2 times faster than ClickHouse and 8 times faster than Rockset with fewer hardware resources!.
Unveiling Project Shapeshift Nov. 9th at Druid Summit 2021
There is a new category within data analytics emerging which is not centered in the world of reports and dashboards (the purview of data analysts and data scientists), but instead centered in the world of applications...
How we made long-running queries work in Apache Druid
When you think of querying with Apache Druid, you probably imagine queries over massive data sets that run in less than a second. This blog is about some of the things we did as a team to discover the user...
Uneven traffic flow in streaming pipelines is a common problem. Providing the right level of resources to keep up with spikes in demand is a requirement in order to deliver timely analytics.
Community Discoveries: multi-value dimensions in Apache Druid
Hellmar Becker is an Imply solutions engineer based in Germany, where he has been delving into the nooks-and-crannies of multi-valued dimension support in Druid. In this interview, Hellmar explains why...
Community Spotlight: Apache Pulsar and Apache Druid get close…
The community team at Imply spoke with an Apache Pulsar community member, Giannis Polyzos, about how collaboration between open source communities generates great things, and more specifically, about how...
Meet the team: Abhishek Agarwal, engineering lead in India
Abhishek is Imply’s first engineer in India. We spoke to him about setting up our operations in Bangalore and asked what kind of local talent the company is looking for.