Co-authors: Paul Rogers, Adarsh Sanjeev
A brief intro to Druid-SQL and Apache Calcite
Apache Calcite is a “dynamic data management framework” that provides a SQL parser, API for converting SQL tree to relational algebra, and much more. Calcite is a general purpose library used by many popular databases including Druid.
Druid-SQL is Druid’s dialect of SQL built using Apache Calcite. Druid SQL simplifies the Druid’s native query interface. Druid-SQL validates and processes a SQL statement and then translates it into a native query (JSON string) which is then serialized into a Java object and executed.
Nuances of ingestion in Druid
Ingestion in Druid is a bit different than a standard SQL INSERT. Some of the nuances include:
INSERT
typically adds just one or a handful of rows at a time. However, Druid works by ingesting large numbers of rows (millions or more).
INSERT
typically receives data directly from the application (the order to insert, say.) In Druid, data comes from an external source such as a set of CSV or JSON files.
- Druid has a special column called __time, and partitions data by time using that column.
- Normally, INSERT uses system metadata to determine which indexes or partitions to create. In Druid, we must specify the time partitioning explicitly.
- Druid also provides secondary partitioning: as a set of columns used to further partition the segments which Druid creates.
- INSERT typically adds data to an existing table, using an existing table schema. Druid creates new segments, with a possibly distinct schema, on each ingest: there is no pre-existing schema.
A quick detour: The importance of partitioning while ingestion
Druid partitions data into files called segments. Each segment contains data for a specific time interval, like an hour or a day. Incoming data typically has data from multiple time periods: perhaps mostly for the 1-2 PM interval, but perhaps some lagging data for earlier times. Primary partitioning is the process of shuffling the data so that rows from the same interval end up together in the same segment. We use secondary partitioning to further split up the data to achieve the target segment size. We choose the secondary partitioning key to provide optimal query performance by grouping together data which is typically queried together. The ability to tweak the partitioning configuration contributes significantly to Druid’s performance on the dataset.
Towards a Druid syntax
Given the above requirements for Druid ingestion, it becomes necessary that a Druid INSERT
statement include several components in addition to the standard SQL syntax. While these might be provided as context parameters (key-value pairs), it is cleaner to have first class support for them in the INSERT
statement itself:
- A source of the data, including external data sources
- The schema of that incoming data
- Primary partitioning (the segment time interval)
- Secondary partitioning
We solved these requirements by adding new PARTITIONED BY
and CLUSTERED BY
clauses to the standard SQL syntax.
Example:
INSERT INTO sales
SELECT *
FROM customer_data
PARTITIONED BY day
CLUSTERED BY customer_id, commodity
Here, customer_data is our source of data. Typically this is an external data source, but we’ll omit those details here for brevity. The Druid-specific extensions are:
PARTITIONED BY
: Specifies the time granularity on which the datasource is primarily partitioned on, during ingestion. It can support keywords such as HOUR, DAY, MONTH, YEAR, ALL
and expressions such as FLOOR
(__time
to DAY
) and TIME_FLOOR
(__time, ‘PT1H
’).
- C
LUSTERED BY
: Specifies the dimension for secondary partitioning. This can be the name of one or more columns in the datasource.
The datasource is physically partitioned on the granularity specified in the PARTITIONED BY
clause, and within those partitions, it’s sorted and clustered (grouped) based on the columns specified in the CLUSTERED BY
clause.
Replacing Data
Druid sometimes acts like a materialized view: it holds a snapshot of data where the copy-of-record resides somewhere else. Perhaps we load into Druid a copy of events from a sales system because we want to perform real-time analytical queries on it. The sales database is the ultimate source of truth. Suppose we want to update the view in Druid with updated values. In Druid, we replace the existing data for a specific time period with the new data. That is, in Druid, we do not replace individual records; instead we replace all data within some specified time period (which generally corresponds to the time partition explained above.) To express this idea in SQL we have added a REPLACE statement which looks something like this:
REPLACE INTO sales
OVERWRITE ALL
SELECT *
FROM CUSTOMER_DATA
PARTITIONED BY day
CLUSTERED BY customer_id, commodity
To replace specific chunks of data, we can specify where conditions on the __time
column OVERWRITE
clause instead of ALL
Extending the Calcite’s SQL grammar
Given the specificity of INSERT
and REPLACE
in Druid, and the need for a custom syntax to properly explain the insert/replace queries, extend SQL syntax to provide support for the PARTITIONED
BY and CLUSTERED BY
clauses and the REPLACE
statement.
Here comes the fun part – implementing the extended syntax by adding rules to the Calcite parser configuration file. Before we get into the details, here are the requirements for the new rules, and the code that accompanies it:
- Add the syntax as an extension, avoid changes to the Calcite itself. This allows easier upgrades as we adopt newer Calcite versions.
- Don’t make the code dependent on a particular Calcite version.
- Reuse, rather than copy, Calcite’s code wherever possible.
Under the hood, Calcite uses JavaCC (a Java compiler generator) to define its accepted SQL syntax. To get started, we have to understand a bit about how JavaCC works and the syntax used.
What’s EBNF and why should I care?
Every parser is based on a grammar: the strings that are valid according to a language’s syntax. SQL syntax is notoriously complex. Fortunately, Calcite has done most of the work for use. Our job is to choose extensions that are consistent with the existing grammar. JavaCC uses Extended Backus–Naur form (EBNF) to express a grammar. EBNF contains the following:
- Terminal symbols: Words that can appear in the final statement (sentence).
- Non-terminal production rules: Symbols that do not appear in the statement but help to define the structure of the grammar.
Each grammar contains an initial non-terminal symbol and a series of production rules, in which the left side is a single non-terminal rule and the right side can be a concatenation of both terminal and non-terminal symbols. When working with SQL, a good general rule is to avoid introducing new terminals as doing so may conflict with table or column names in user queries.
A simple grammar which accepts all of the components of the English language could look something like this:
Alphabets -> Vowels | Consonants
Vowels -> <A> | <E> | <I> |…
Consonants -> <B> | <C> | <D> | <F> | …
How JavaCC works as a parser generator
JavaCC is a parser generator. It takes a grammar file (written in EBNF format, along with some enhancements) and converts it into a Java program that recognizes strings accepted by the grammar. JavaCC includes rules, expressed in Java, that says what to do as the parser recognizes each parser rule. That’s where we put our code to capture information from the parser to build up our AST (abstract syntax tree).
Calcite defines its parser in a file called Parser.jj
which:
- Defines the SQL syntax (EBNF rules)
- Provides the top-level parser function
- Uses the parser to analyze a SQL string and to build the Calcite AST
An excerpt of the rules looks something like the following:
SqlStmt -> SqlInsert | SqlSelect | SqlCreate | SqlExplain…
SqlInsert -> <INSERT> <INTO> identifier SqlSelect
SqlSelect -> <SELECT> columnList <FROM> identifier
SqlReplace -> <REPLACE> <INTO> identifier SqlSelect
SqlExplain -> <EXPLAIN> <PLAN> <FOR> (SqlSelect | SqlInsert | SqlReplace)
Note that this is a highly simplified version of the rules.
The standard SQL grammar resides in a file called Parser.jj. We want to extend the parser, but we don’t want to modify the Parser.jj file itself. Calcite allows us to define new parser rules in our own file, and to combine them with the Parser.jj file. Calcite uses the Apache FreeMarker template engine to combine the pieces to create the final parser.
To make this work, the Calcite Parser.jj file is a FreeMarker template. It accepts parameters and injects them into the output parser.jj file. This includes things such as new keywords, statements and builtin functions. Here, the main object of our interest is custom statements. These are added to the right hand side of the statement rule, which is exactly what we want with the new INSERT statement.
Validating the query
The parser returns a SqlNode:
the Calcite’s class for the SQL parse tree (AST).
The parser’s job is to make sure there are no syntax errors present. Semantic checking is done as a separate step within Calcite. In Druid, semantic analysis is split between the Calcite validator and the Druid SQL planner.
For instance, since we are only adding rules to the calcite parser instead of writing an entirely new set of rules, statements such as “UPDATE
” and “DELETE
” will also be parsed successfully. We need to assert that the statements must start with either “INSERT
” or “REPLACE
”. The statement could also be trying to read from a datasource which doesn’t exist or for which it doesn’t have permissions. Some of the authentications are simple. To check the type of statement, we can just check the root node, and we could filter out anything that wasn’t an INSERT
or REPLACE
. The datasources case is a little more complicated. We can’t simply assert at just the root node as the statement could contain any number of datasources like nested join statements. Instead, we use a class called an SqlShuttle to visit each node and extract the required information. We can extend this to do a variety of tasks, such as collecting all the datasources for validation.
Extracting the custom parameters
One question that you may be wondering – did we add any rules or tweak Calcite’s code that would help us work on the custom clauses from the syntactical expressions that we defined above. The answer is no, we didn’t. You see, in an intermediate step, we extract out the information that is passed in the custom clauses. So for PARTITIONED
BY it would be the granularity, for CLUSTERED BY
, it would be the column list and in case it is a REPLACE
statement, we extract the time range to replace, and then use it to modify the final SQL query. The underlying Calcite’s engine doesn’t have an idea of the voodoo and the custom clauses that we passed along in the original query. This works to our advantage because this means we donot have to write our own custom code in Calcite and maintain a separate version of it.
Converting the parse tree to a native Druid Query
Once we have validated the SqlNode
, we convert it to a RelNode
, a relational expression. It is in this form that Calcite optimizes queries by repeatedly applying planner rules that we define. Calcite allows us the option to provide rules which determine the kind of optimizations that will be done to the query. We do this by extending RelOptRule
and providing them while converting the SqlNode
into a RelNode
.
Finally, we can convert this into DruidRel
which is a Calcite relational operator that represents an entire Druid native query. This is the final tree which Calcite has fully validated and optimized, and which can be directly converted to a native JSON druid query, which can be executed on Druid.
Wrapping up
We have attempted to define the entire arduous process from parsing to query execution in a few paragraphs and we hope that it shed some light as to how things work when you click the RUN button on the Druid’s super cool web console. You can try out the result in Druid today, and share some thoughts on what you think of the new INSERT
syntax and the brand new SQL based ingestion in Druid.