The Tale of Two Vehicles: Apache Druid’s New Shape Takes Form
Apache Druid today is not the Druid you know. It has evolved or dare I say, shapeshifted into something new, something even better.
Druid is great for slice-and-dice at scale but some analytics applications need longer running queries and more complex queries. In the past, this sometimes posed a bit of a challenge. The same broker process that enabled subsecond performance across petabyte queries could also become a bottleneck. The old workaround was to use other systems alongside Druid, forcing developers to load the same data in two locations, creating two separate pipelines, adding complexity and adding cost.
But in 2022, the dedicated Druid community changed all that with the introduction of a multi-stage query engine.
Fast forward to today where Druid has transformed the way it handles queries, allowing data transformation and ingestion within Druid from one table to another using SQL. As a result, Druid has become about 40% faster. It has become a garage with two powerful vehicles: a Ferrari, for when you need speed, and a SUV, for when you need more storage. You now have options for all your data needs.
But the work around MSQ and continued improvements to Druid isn’t done. Listen to the podcast to learn more about exciting developments coming this year including:
- Cold tier storage
- Asynchronous queries
- More complex JOINs and more
With these updates, Druid is poised to become an even more powerful tool for managing and analyzing large amounts of data.
- Apache Druid in 5 Minutes
- Introducing Apache Druid 25.0
- Get started with with Druid with Imply Polaris
About the guest
Reena Leone is the host and showrunner of the Tales at Scale podcast and Senior Developer Evangelist at Imply. She’s been creating content for a variety of audiences, from business leaders to technical experts, for the better part of decade. Prior to Imply, she was at Pegasystems, Acquia, Sony Electronics and Mirum.
Welcome to Tales at Scale, a podcast that cracks open the world of analytics projects. I’m your host, Reena from Imply. I’m here to bring you stories from developers doing cool things with analytics, way beyond your basic BI. I’m talking about analytics applications that are taking data and insights to a whole new level.
Today, I’m rolling solo as I dive into the ins and outs of the multi-stage query engine for Apache Druid. The work around MSQ started long before its release in Druid 24 in 2022 and is fundamentally changing what Druid can do. This is a whole new shape for Druid. But what does that mean exactly? What changes needed to be made? What are the benefits? Basically…why was this built? Let’s go back to November of 2021.
Gian Merlino, a committer for Apache Druid and CTO and Co-Founder of Imply, announced Project Shapeshift. The next phase of Druid had three key goals:
- Cloud Native with the goal being to enable a true database as-a-service built from Druid. Checkmark here for Imply Polaris.
- Simple, which is pretty self-explanatory. improving the overall ease of use across Druid, and
- Complete: which means expanding the Druid architecture, making it the most capable database for analytics apps
For this discussion, I’m going to focus on the latter two.
Now if you’re familiar with Druid, you probably know that it’s great for slice-and-dice at scale but some analytics applications need more. One thing they need is longer running queries and more complex queries. And this previously could sometimes pose a bit of a challenge for Druid. The same broker process that enables subsecond performance across Petabyte queries can also, for some queries, become a bottleneck. The old workaround was to use other systems alongside Druid but that’s a pain because now you’re loading the same data in two locations, creating two separate pipelines, adding complexity and adding cost.
But in 2022, the dedicated Druid community changed all that with the introduction of a multi-stage query engine. But the work that went into building that engine actually enabled some other helpful features, like the ability to run queries and ingestion using a single system and a single SQL language.
This eliminates the need to have separate operational models for ingestion and query execution by allowing developers to query external data, and enable deployment modes with separated storage and compute. This new functionality literally has transformed Druid, and sets it up to be THE real-time analytics database that will work for all use cases
A good way to think of this is Druid is the garage and inside you have two vehicles: A ferrari, because in this scenario you’re independently wealthy and thriving, and a Suburban. The ferrari is super fast, but the space inside for stuff is limited. No amount of pizza and beer will get you to use this one to help your friend move. That’s Druid’s core query engine: fast and great for interactive queries
Next to it, is your big, huge SUV that fits like 7-8 people or 6 people and 3-5 dogs depending on the size of the dogs. You can put a lot in here, but it’s not AS fast. That’s the new multi-stage query engine- built to move large amounts of data. It also enables SQl ingestion, long running queries, long reports, etc. but we’ll get to that.
The point is, you have a choice for when you need to move a lot of stuff (data) and when you need lots of speed.
Previously, Druid was just a ferrari. But now, you get the multi-stage query engine / Chevy Suburban as well.
Actually, it’s more like an 18 wheeler but I liked my garage metaphor and no one wants to see a truck that big cruising down the road haha.
So let’s get into it…by going back a little bit
I think Gian explained it best so I’m just going to quote him here “Druid doesn’t load data from disk to memory, or from memory to CPU, when it isn’t needed for a query. It doesn’t decode data when it can operate directly on encoded data. It doesn’t read the full dataset when it can read a smaller index. It doesn’t start up new processes for each query when it can use a long-running process. It doesn’t send data unnecessarily across process boundaries or from server to server.”
So what does that mean? Druid achieves its high level of efficiency through a tightly integrated query engine and storage format, designed in tandem to minimize the amount of work that each data server has to do.
And if you’ve worked with Druid, you know one of the key elements is that it’s distributed. Druid’s distributed design partitions tables into segments, balances those segments between servers automatically, then quickly identifies which segments are relevant to a query, and then pushes as much computation as possible down to individual data servers.
This brings us back to the aforementioned broker. So the broker is the query entry point. Druid uses scatter/gather to execute queries, which works pretty much like it sounds- it sends bits of the query to the relevant data series, then gathers up the results from each, merges them and then returns the result to the user. It’s great for common queries. The technique enables Druid to scale from running on a single server or laptop to petabytes of data served by thousands of data servers. This also means the broker isn’t dealing with the large amounts of data because it only needs to pull together the query results returned by the data nodes. So the Broker can handle a query that spans thousands of data servers and billions or trillions of rows.
OK so what’s the problem then? The Broker can become a bottleneck. This can happen when query results are very large, or when a query needs multiple passes over the data.
For one example, if you have a table with a billion events and you query “how many events happened last Tuesday between 9 am and 10 am that were marked “important””, then the Druid broker will scatter the query across the right segment in the data nodes, then gather the results back to give you and answer, which might be 37,010,392. Druid will usually do this query in under a second. But if, instead, you query, “give me all the events from last Tuesday between 9 am and 10 am that were marked “important”, then Druid needs to return 37,010,392 rows. That can overload the broker – if each row is 1 kilobyte, that’s over 37 terabytes of data to return. It won’t happen in under a second, and it might cause the query to fail.
But sometimes we need Druid to provide these sorts of data exports or long downloadable reports.
What is the solution? Shuffle mesh, which is just fun to say.
With the multi-stage query engine, Druid has a system that splits queries into stages and enables data to be exchanged in a shuffle mesh between stages. Each stage can run in parallel across many data servers at once. Druid runs multiple broker processes automatically using skew-resistant shuffling and cooperative scheduling- so no tuning.
By allowing each stage of a multi-stage query to run distributed across the shuffle-mesh of brokers, we can efficiently carry much larger sets of data through all stages of the query, so we don’t get the bottleneck.
But you can also keep running single-stage queries. They’ll execute using the same fast and efficient scatter/gather technique that Druid has always used.
Ok so we’ve tackled the broker bottleneck but why stop at having a shuffle-capable query engine only for doing queries? Go big or go home, right? The multi-stage query engine has also helped reimagine Druid’s ingestion. Think of ingestion as a query that reads external data, clusters and partitions it, and writes the result to a Druid table.
Before this release of Druid 24, in order to load data into Druid, you’d need to learn to use the Druid ingestion spec. While the Druid console makes this not-too-hard to do, this was a bit of a pain because let’s be real, more developers are familiar with SQL. But now you can use SQL queries to load data into Druid thanks to the multi-stage query engine.
To help ease the transition into the new engine, it is now easier to connect with data sources like CSV or Parquet or protobuf files, parse the incoming data’s schema and generate the SQL query to copy the data into the Druid tables you want. The web console also has a new query editor, complete with feature tabs because who doesn’t love an organized workspace?
This isn’t just easier than using the classic ingestion spec, but faster, too: batch ingestion with -powered SQL is about 40% faster.
Another data source you can use is existing Druid tables, so you can transform data inside of Druid!
But the biggest benefit here is that data and transformation on ingestion and inside of Druid from one table to another using SQL makes Druid about 40% faster. The new the multi-stage query engine-powered SQL batch ingestion experience is the recommended method to load batch data into Druid.
So what is coming next?
As I said before the multi-stage query engine has enabled a lot of other features and functionalities. So everything I just mentioned is in Druid now but here are a few things to keep in eye for in upcoming releases
The first one is cold tier or cold storage, which will let you choose which data you need high speed for and which you don’t to save money.
You’ll be able to query Druid tables or parts of tables based on time to exist only on Druid’s deep storage tier. Normally, every Druid segment has a copy both in deep storage (which is cloud object storage like Amazon S3 or Azure Blob or Google GCS or, if you’re off the cloud, an HDFS cluster) and on the data nodes (the Ferrari). For the data you designate for cold storage, the only copy is on the reliable, but slower, deep storage (the SUV). It’s much less expensive, but queries will be slower.
Even though it’s slower, it’s still Druid segments, with all of the indexing, dictionary encoding, and compression that makes Druid fast. This gives you the full power of Druid for any sort of query, and Druid will figure out automatically the best way to access each segment it needs, whether it’s the Ferrari of the Druid data nodes or the SUV of Druid cold storage. The benefit here is that it makes Druid much less expensive and uses less infrastructure but the trade off is that it’s a bit slower. But not everything needs to be at top speed. One thing we talked about on this show before is who needs real time data and who doesn’t. Even if you do need real-time data, you likely only need it for a certain time frame or else, it’s not real time. So what do you do with the rest?
Say you’re working with a data stream that delivers a terabyte of data every day. You might find that 98% of your queries only need to look at the last 90 days at any given time so that’s loaded on data nodes for subsecond queries. But for data that’s 91 days or longer, that you can put in deep storage. How often do you need data from the last 5 years? Likely, once in a while. If you find 90 days is too long and you need data for the last 60 instead, you can change it. Or you can change it to 120 days. Or you can set different rules for different tables. You can choose which data you need high speed for and which you don’t to save money.
The next big thing for Druid is asynchronous queries.
If I do a query against a petabyte dataset, say it’s a trillion log entries, and I want to ask how many log entries came from a particular application. It’s going to scan a lot of stuff and it will send back a number. But this gets harder when you’re looking over the entries and want to send back all of the entries in the table that deal with that application and put them in order or country. Let’s say that garners 10 million results that you need organized by country. The system will find and pull those 10 million, and then I can put it in groups by country. This is another case where I can create a bottleneck.
Back to our good friend, the broker. When the answer is a whole lotta data rows, the broker can become overloaded. With the shuffle mesh, a team of brokers create and shuffle the data across the mesh. So instead of one broker trying its best, they work together to pull it together AND organize it in the way that you’re asking.
This also changes how queries time out. With asynchronous queries, each query can run for as long as it needs to complete, since you’re no longer dependent on a single broker. This means you can run longer queries. With scatter/gather, it could time out. Now it will update you and give you how percentage- x-percent complete. If you’re doing a really complex thing, you know if you get like, 0.0001% complete after a minute, you can decide to keep going or to cancel it. So that’s coming soon to Druid.
Now, I love a good myth busting. One that has plagued Druid is that it can’t do joins. But that’s not true. JOINs have been part of Druid for a while. Coming this year, Druid will be able to do much more COMPLEX joins.
In a data schema environment, there are usually fact tables and dimension tables. A fact table usually has millions or billions of rows, so maybe that’s orders or customers for a large enterprise. A dimension table usually has from a few hundred to a million or so. So maybe that’s a table to convert abbreviations to state names for the United States, which is actually more than 50, if you count DC, Guam. Anyway…In Druid, we do joins between fact tables and dimension tables. But what If you want to join a fact table to another fact table, you’ll timeout – once more, you’re creating a broker bottleneck.. But moving forward, you’ll be able to do those more complex joins as Druid uses the multi-stage query engine shuffle-mesh.
Also on the horizon is SQL stream ingestion. And just like batch, it will be simpler with SQL commands.
So to reiterate, with the addition of the multi-stage query engine, Druid now gives you two fancy vehicles to cover all your data-driving needs. And again, this is just the beginning of a whole new Druid.
So this is the time where I usually thank my guests haha. But instead I’ll just say I’ll be back to catch you up on the latest Druid features, improvements, and fixes. In the meantime, if you want to learn more or try any of Druid’s new capabilities, you can visit druid.apache.org for more info and if you need help getting up and running on Druid or need support, visit imply.io. Until then, keep it real.