If you thought you had perfect rollups before, you might have been wrong!

Apr 29, 2021
Maytas Monsereenusorn

The Druid engineering team at Imply recently made some changes to how rollup works with sparse data. This blog explains how rollups work, and the impact of the changes that we made. Interested? Of course you are! These changes make Druid an even faster and more storage-efficient database!

In Apache Druid, you can roll up duplicate rows into a single row to optimize storage and improve query performance. Rollup pre-aggregates data at ingestion time, which reduces the amount of data the query engine needs to process when the user makes a query. A rollup includes rows with the same timestamp and dimension values. Depending on the type of partitioning in the ingestionSpec, a rollup can be either “perfect” or “best effort”. A perfect rollup guarantees that all duplicate rows are perfectly aggregated at ingestion time. This post explains how perfect rollup works, and a bug with perfect rollup that we recently fixed when we ingest sparse data. If you thought you had perfect rollups before, you might have been wrong! Keep reading to understand why.

In dimensionsSpec, you can specify dimensions for your input data with any ordering that you want. The ordering of the dimensions is important: It can affect both how the data is compressed and the ultimate size of Druid segments. In the past, ordering your dimensions non-lexicographically (or, non-alphabetically) potentially posed problems for perfect rollups during ingestion. The bug resulted in perfect rollups not working in cases with a non-lexicographical dimension ordering specified in the dimensionsSpec when at least one of the dimensions contained sparse data. Sparse data means that a large proportion of the data within a given dimension is null. To put it another way, ingested data for which perfect rollups were enabled could still have resulted in the data having duplicate rows with the same timestamp and values for the roll up dimensions. This bug did not produce any error logs or user-facing failure messages.

The next section explains what was going wrong, and also how we fixed it.

Why sparse data impeded perfect rollups

When a batch task ingests data, row by row, it creates intermediate Druid segments that are persisted onto disk when the task runs out of in-memory storage. This method allows the task to free up in-memory storage and continue ingesting more data. These intermediate segments basically contain a subset of the entire input data that is read by the batch task and ingested into Druid with rows ordered based on the dimension ordering specified in dimensionsSpec. After the task finishes going through the assigned input data, it merges all the intermediate Druid segments that it generated into a new segment. The merge guarantees that duplicate rows across all intermediate Druid segments are perfectly rolled up, and the rows of the merged segment are still ordered based on the specified dimension ordering. When these intermediate segments contain sparse data, the bug caused the merged segments still to have duplicate rows that did not adhere to the specified dimension ordering.

A problem with merging intermediate Druid segments

When Druid merges all the intermediate segments, it first determines a shared dimension ordering across all the intermediate segments. If there isn’t one, then Druid falls back to using a lexicographic dimension ordering. This fallback used to happen with sparse data and is the root cause of the bug.

To illustrate what had been going wrong using an example, let’s say we ordered three dimensions in the dimensionsSpec as follows: dimB, dimA, dimC.

Let’s imagine that the rows of data below are data that the task received. The rows in bold are duplicates: They have the same timestamp, dimensions, and dimension values. Hence, we expect the task to roll up all the duplicate rows perfectly:

{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB:"F"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"C","dimB":"J"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"Z","dimB":"S"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"Z"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"J","dimB":"R"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"T"}
{"time":"2015-09-12T00:46:58.771Z","dimA":"H","dimB":"X"}
{"time":"2015-09-12T00:46:58.771Z","dimC":"A","dimB":"X"}

To demonstrate when the bug happened, let’s assume that Druid can process three rows of input data in memory before having to persist data onto disk as intermediate segments. These intermediate segments are shown below. Notice that each intermediate segment has the rows ordered by the specified dimension ordering: dimB, dimA, dimC. You can tell because the rows in each segment are ordered alphabetically according to the values of dimB (dimB is evaluated first since it is the first dimension in the ordering).

Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimB":"F","dimA":"C"}
Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimB":"J","dimA":"C"}
Segment 1: {"time":"2015-09-12T00:46:58.771Z","dimB":"X","dimA":"H"}
Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimB":"S","dimA":"Z"} Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimB":"X","dimA":"H"} Segment 2: {"time":"2015-09-12T00:46:58.771Z","dimB":"Z","dimA":"H"}
Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimB":"R","dimA":"J"} Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimB":"T","dimA":"H"} Segment 3: {"time":"2015-09-12T00:46:58.771Z","dimB":"X","dimA":"H"}
Segment 4: {"time":"2015-09-12T00:46:58.771Z","dimB":"X","dimC":"A"}

Notice that no single intermediate segment has all three dimensions (dimB, dimA, dimC), because dimA and dimC are sparse. Specifically, rows in the first three segments do not contain dimC, and the rows in the fourth segment do not contain dimA. The dimensions of each intermediate segment are shown below:

Intermediate segment 1: dimB, dimA
Intermediate segment 2: dimB, dimA
Intermediate segment 3: dimB, dimA
Intermediate segment 4: dimB, dimC

Since there is not a common dimension ordering across all four intermediate segments, merging them used to fall back to a lexicographic ordering: dimA, dimB, dimC.

The process of merging intermediate segments begins by finding the smallest (timestamp, rollup dimensions) tuple value across the first row of every intermediate segment. Since the timestamps in this example are the same, the process begins by finding the smallest value for the first listed dimension: dimA, in the lexicographically reordered case. Therefore, the smallest value for dimA is null in Segment Four: {“dimB”:”X”, “dimC”:”A”}. After this row, the candidates for the next iteration of merging are the first rows of the three remaining intermediate segments.

By contrast, with the original ordering from the dimensionsSpec (dimB, dim A, dimC), the merging process would have begun by looking for the smallest value of dimB. The smallest value for dimB is a completely different row—namely, the first row in Segment One: {"dimA":"C", "dimB":"F"}. After this row, the candidates for the next iteration are the first rows of Segment Two, Segment Three, Segment Four as well as the second row of Segment One.

Let’s compare how the merging process would iterate through all ten rows in this example with the dimensionsSpec dimension ordering versus the lexicographical reordering:

Merging intermediate segments with
intended dimensionsSpec ordering

Merging intermediate segments with lexicographical ordering

Seg 1: {"dimB":"F", "dimA":"C"}       Seg 4: {"dimB":"X", "dimC":"A"}
Seg 1: {"dimB":"J", "dimA":"C"}     Seg 1: {"dimA":"C", "dimB":"F"} 
Seg 3: {"dimB":"R", "dimA":"J"}     Seg 1: {"dimA":"C", "dimB":"J"}
Seg 2: {"dimB":"S", "dimA":"Z"}     Seg 1: {"dimA":"H", "dimB":"X"}
Seg 3: {"dimB":"T", "dimA":"H"}     Seg 3: {"dimA":"J", "dimB":"R"}
Seg 4: {"dimB":"X", "dimC":"A"}     Seg 3: {"dimA":"H", "dimB":"T"} 
Seg 1: {"dimB":"X", "dimA":"H"}     Seg 3: {"dimA":"H", "dimB":"X"}
Seg 2: {"dimB":"X", "dimA":"H"}     Seg 2: {"dimA":"Z", "dimB":"S"}
Seg 3: {"dimB":"X", "dimA":"H"}     Seg 2: {"dimA":"H", "dimB":"X"}
Seg 2: {"dimB":"Z", "dimA":"H"}     Seg 2: {"dimA":"H", "dimB":"Z"}

The problem with the lexicographical ordering on the right is that rollup can only happen when the duplicate rows are returned in consecutive iterations—a rollup cannot happen when non-duplicate rows merge in between the duplicates. Merging the intermediate segments uses an n-way merge algorithm where each intermediate segment is already sorted. Basically, Druid finds every row for a given (time, rollup dimensions) tuple across all intermediate segments and rolls them up into a single row before moving to a new (time, rollup dimensions) tuple value. When Druid moves to a new tuple value, Druid no longer keeps track of the rows for the previous (time, rollup dimensions) tuple value. Therefore, even if Druid encounters a row with the previous tuple value, it cannot roll the row up, even though it ideally should. This is exactly what happens with the lexicographical reordering on the right. In the fourth iteration, Druid encountered {"dimA":"H", "dimB":"X"}. But in the fifth iteration, Druid moved to a new tuple value:{"dimA":"J", "dimB":"R"}. Therefore, when Druid encountered the duplicate {"dimA":"H", "dimB":"X"} rows later, Druid could no longer roll up these duplicate rows.

Fixing getLongestSharedDimOrder() for perfect rollups

The problem outlined above begins with a method called getLongestSharedDimOrder. This method used to return null when intermediate segments did not share a common dimension ordering. The returned null value triggered the change from the dimension ordering specified in the dimensionsSpec file to a lexicographical dimension ordering for the merging of the intermediate segments. (Note that if the dimension ordering in the dimensionsSpec is lexicographical, then this problem would not happen, since the change to lexicographical dimension ordering by getLongestSharedDimOrder resulted in the same ordering as the one specified in the dimensionsSpec.)

To preserve a non-lexicographical dimensionsSpec dimension ordering, we now pass this ordering as a new argument into the getLongestSharedDimOrder method.

In getLongestSharedDimOrder, we first try to find a common dimension ordering as before. But if the intermediate segments do not share a dimension ordering, we now try an extra step before reverting to lexicographical ordering. This new step uses the dimension ordering from the dimensionsSpec, and checks whether following conditions are met:

  • The dimension ordering in the dimensionsSpec is not null and not empty.
  • The dimension ordering in the dimensionsSpec does not have extra or missing columns compared to the set of dimension names from all intermediate segments.
  • The dimensions in all intermediate segments follow the same ordering as in the dimensionsSpec. For example, if the dimensionsSpec is dimB, dimC, dimA and one of the intermediate segment’s dimension ordering is dimB, dimA then this ordering is valid. But if a segment’s ordering is dimA, dimB, then it is invalid.

For batch ingestion with a non-lexicographical dimension specified, all three of these conditions should always hold. (There is a concept in Druid for schema-less dimensions, which can still fall back to lexicographic ordering, but this is out of scope here.) Now that the getLongestSharedDimOrder method correctly returns the same, non-lexicographical dimension ordering that was used to create intermediate segments, perfect rollups are possible—even with sparse data.

Conclusion

Since this bug didn’t produce any error messages, you might have thought you were creating perfect rollups when, in fact, you were not. With the dimension ordering specified in dimensionsSpec passed into getLongestSharedDimOrder, perfect rollups happen even with a non-lexicographical ordering and sparse data. While the fix is simple, the impact is huge. This change not only improves storage optimization but also improves query performance —two of the most important metrics for using Druid—for users who specify a non-lexicographical dimension ordering in dimensionsSpec when ingesting sparse data.

Other blogs you might find interesting

No records found...
Jul 23, 2024

Streamlining Time Series Analysis with Imply Polaris

We are excited to share the latest enhancements in Imply Polaris, introducing time series analysis to revolutionize your analytics capabilities across vast amounts of data in real time.

Learn More
Jul 03, 2024

Using Upserts in Imply Polaris

Transform your data management with upserts in Imply Polaris! Ensure data consistency and supercharge efficiency by seamlessly combining insert and update operations into one powerful action. Discover how Polaris’s...

Learn More
Jul 01, 2024

Make Imply Polaris the New Home for your Rockset Data

Rockset is deprecating its services—so where should you go? Try Imply Polaris, the database built for speed, scale, and streaming data.

Learn More

Let us help with your analytics apps

Request a Demo