The Promise (and Limitations) of Range Partitions

Also known as multi-dimensional range partitions, this technique is a popular partition strategy for Druid, as it is the best method for improving read times in most, if not all, use cases. (Note that multi-dimensional range partitions are a secondary approach, as time grains are always the primary partition strategy). 

Several nuances are discussed in this developer blog post and in this video log.

While relying on range partitions are the logical choice in terms of usage, due to improved performance from dimension-based pruning and other tricks, there are some nuances to keep in mind.

How to partition data as multi-dim range partition

In a batch ingest scenario, there are two ways range partition can be applied: 

MSQ 

With the SQL-based ingest, range partition can be applied pretty easily. In the below example, the CLUSTERED BY “channel”, “page” at the very bottom enables the data to be partitioned by those two dimensions. Therefore, during query time, pruning would occur based on these dimensions, next to __time dimension.

REPLACE INTO wiki_de
OVERWRITE ALL
select * from (
 SELECT
  TIME_PARSE("timestamp") AS "__time",
  "isRobot",
  "channel",
  '***Added***' as "flags",
  "isUnpatrolled",
  "page",
  "diffUrl",
  "added",
  "comment",
  "commentLength",
  "isNew",
  "isMinor",
  "delta",
  "user",
  "deltaBucket",
  "deleted",
  "namespace"
 FROM  TABLE(
  EXTERN(
    '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
    '{"type":"json"}',
    '[{"name":"isRobot","type":"string"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"page","type":"string"},{"name":"diffUrl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"},{"name":"isMinor","type":"string"},{"name":"delta","type":"long"},{"name":"isAnonymous","type":"string"},{"name":"user","type":"string"},{"name":"deltaBucket","type":"long"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]'
  )
 ))
 where __time >= TIMESTAMP '2016-06-11 20:00:00' and __time < TIMESTAMP '2016-06-30 21:00:00.000'
  and channel = '#de.wikipedia'
PARTITIONED BY HOUR
CLUSTERED BY "channel", "page"

Native/JSON Ingest 

In the native ingestion, range is specified within the partitionSpec, as demonstrated below:  

"partitionsSpec": {
      "type": "range",
      "targetRowsPerSegment": 5000000,
      "maxRowsPerSegment": null,
      "partitionDimensions": [
        "channel",
        "page"
      ],
      "assumeGrouped": false

Streaming ingest

In the case of steaming, Druid honors the write-time partitioning strategy of dynamic partition. However, range partitioning is applied as a post-ingest process, in the form of auto-compaction, where the partitionSpec can be mentioned as above. This also achieves range partitioning for streaming ingestion as well. 

Caveats

Some of the pain points that customers encounter with the range partitioning strategy, particularly when performed post streaming ingestion, are: 

Slow speeds

In comparison to the other two partition schemes, range partitioning is the slowest as it has three sub tasks, in which the final task is the merge task that hogs about 40 to 50% of the total partitioning time. 

Need for large temp/local space in MiddleManager 

When compaction begins, one of the tasks is partial_range_index_generate, which creates partitioned data. During this phase, data is stored in the local storage of the MiddleManager process. During legacy data load, data re-indexing, or when many subTasks are spawned in parallel, this local storage tends to get filled up and errors out. 

Need for high heap space  

During the final phase of range partitioning (partial_index_generic_merge), the partitioned data is shuffled based on time chunks and partitionDimension values. The worker tasks reads the segments in the same partitions of the same range from multiple peon tasks and merges them to create the final segment. 

In such cases, if the cardinality of dimensions defined in partitionDimension are high, then the heap space tends to fill up fast, leading to OutOfMemoryException. In some (but not all) cases, tweaks to maxRowsPerSegment or targetRowsPerSegment may do the trick. 

There is one possible workaround. When range partitioning is applied using the MSQ ingestion, the MSQ engine is optimized to handle the high cardinality situation and thus does not fail with an OutOfMemoryException. But the only downside is that MSQ is not an option for auto-compaction yet—as of publishing time, the latest Imply release was 2023.12.1 STS. 

Still a winner

Despite the above mentioned caveats, range partitioning has proven to be the most optimized partitioning strategy and the best read-time strategy thus far. 

Newsletter Signup

Let us help with your analytics apps

Request a Demo