Continuing the Virtual Druid Summit Conversation: Netflix Closes the Loop
Sep 30, 2020
Jelena Zanko
Our Virtual Druid Summit speakers fielded so many good questions from the community following their sessions that it’s no surprise there wasn’t enough time to tackle them all. But, no question shall be left behind! We promised you closure, and we’re offering exactly that in a neatly-packaged blog post. Ben Sykes, Sr. Software Engineer at Netflix, has answered the questions that he wasn’t able to address during his Virtual Druid Summit II session: How Netflix Uses Druid in Real-time to Ensure a High Quality Streaming Experience.
How is historical data stored? For example AWS EBS (either on Kafka or Druid) is at most 4 times more expensive than S3, do you archive the data after a retention period? The real-time nodes hand-off their segments via S3. Historical nodes then load the segment from S3 into EBS. We use EBS on each historical node to provide sufficient volume and throughput of storage.
How do you autoscale your cluster to handle query load spikes? The cluster is sized to accommodate more than our anticipated query load. We don’t auto-scale for query traffic. The only auto-scale policy we have is for data volume. When we hit that threshold, the auto-scaler adds a new historical node. We let the overlord and coordinator rebalance the segments.
How do you handle late events with real-time indexing when a new 1h segment is created? There is a balance. Our use-case is for real-time alerting and monitoring, so we’re not overly concerned with old data. We have other systems for long-term fact data. We use the lateMessageRejectionPeriod Kafka ingestion configuration option to discard any very-late data that might fall into time chunks that have already been compacted. Any late-arriving data in the real-time nodes will create a new segment for the time chunk that the data falls into. At worst, that segment would have only that 1 event in it. This will get merged into more efficient segments during compaction. Compaction is scheduled to run with a few hours delay to accommodate this situation and ensure that all segments have been handed-off by that point.
Since you are ingesting real-time data, did you encounter a scenario where you had multiple schemas in the same topic. If yes, how did you manage that? Our schemas evolve over time. We add and remove dimensions and metrics as our needs change. Druid supports having different schemas per segment, so it’s not a huge issue. The utilities that we use to query the data periodically do a schema query to know which dimensions and metrics are available in each datasource. The query layer uses this information to direct queries to the right datasources as well as populating UI details for which values are available to query.
What is the scale on the rollup ratio? It’s a ratio computed as SUM(“count”) / COUNT(“count”).
Can you include an array type to accommodate some kind of sub-cardinality? Multi-value dimensions can help where you need to capture multiple values for a given dimension. We use this for some tags. However, this does not help with rollup because only rows with identical arrays can be rolled-up together.
Do you have any tips and tricks for ingesting a large amount of data from Kafka? Use dedicated machines for the middle managers. Recycle the indexer tasks semi-regularly (daily or so). We found that they can get stuck and lag in some circumstances, especially if the kafka nodes change IP addresses. Tune the number of peon slots such that you don’t overwhelm the middle manager machines. You’ll need some extra capacity when the indexer processes are handing-off and new ones are starting. Balance the number of indexers with the number of partitions on your topic. Try to spread the load evenly and not have one indexers reading from more partitions than any other as it will be the first to lag when the rate picks up.
Do you use any approximation? We haven’t yet had a case where we’ve needed to do so.
Have you tried using Kafka keys/partitioning around the rollup dimensions? It would limit the amount of compaction rollups. I guess the time cost of compaction is lower for Netflix than the main cost of having Kafka split data partitions up? We have not tried keyed topics, but have discussed it. The main reason for not pursuing it was to avoid hotspotting, where we heavily overload some partitions vs others. The compaction cost is negligible as it runs on an existing shared Hadoop cluster and not within the Druid cluster, so doesn’t interfere with ingestion or queries. Even with perfect topic keys, we’d likely still need to run compaction to get the best rollup. We have on-going experiments to test whether different dimensions as the primary partition (after time) make a difference to segment size or query performance.
Real-time analytics architecture with Imply Polaris on Microsoft Azure
This article provides an architectural overview of how Imply Polaris integrates with Microsoft Azure services to power real-time analytics applications.
Streamlining Time Series Analysis with Imply Polaris
We are excited to share the latest enhancements in Imply Polaris, introducing time series analysis to revolutionize your analytics capabilities across vast amounts of data in real time.
Transform your data management with upserts in Imply Polaris! Ensure data consistency and supercharge efficiency by seamlessly combining insert and update operations into one powerful action. Discover how Polaris’s...