Real-time analytics architecture with Imply Polaris on Microsoft Azure
This article provides an architectural overview of how Imply Polaris integrates with Microsoft Azure services to power real-time analytics applications.
Learn MoreLately we have noticed a surge in demand for GCP amongst enterprises. The volume of data moving to the cloud is growing, and part of that data is of very high value, with analysts and operators requiring lightning fast access in order for the business to identify and act on important trends. New technologies that enable low-latency queries at scale in the cloud will need to be adopted. One of them is Apache Druid (incubating).
Druid is a distributed, realtime database that is designed to deliver sub-second query response on batch and streaming data at petabyte scale. On top of Druid, Imply provides an interactive query-and-visualize UI so non-technical business operators can iteratively explore the data and quickly discover opportunities for improvement.
Imply was founded by the authors of Druid and delivers an enterprise-ready Druid solution – including visualization, management and security – to customers across the globe. Imply enables enterprises to operate on-prem or via their cloud platform of choice, including GCP.
To help you get to know GCP and Druid, the tutorial below will walk you through how to install and configure Druid to work with Dataproc (GCP’s managed Hadoop offering) for Hadoop Indexing. Then it will show you how to ingest and query data as well.
Hadoop Indexing using Druid is an important use case since the majority of enterprises today have Hadoop deployments but Hadoop does not natively support indexing or low-latency real-time queries.
There are several key requirements that need to be completed before Imply and Dataproc are deployed.
The service account that will be deploying GCP vms for Imply and Dataproc environment must have the following roles set up.
Here’s a sample entry from the GCP IAM page.
When creating the GCP compute vms, make sure to choose the service account you are provisioning with the proper roles and that the Cloud API access scopes is set to “Allow full access to all Cloud APIs”. Do not use the default GCE service account.
Ensure that the vms provisioned for Imply and Dataproc are visible to one another. It is recommended that you put Dataproc in its own subnet. A high speed network is ideal. For large, batch ingest in the double-digit TB range or larger, it is best to have 100G of bandwidth, specially for time-sensitive processing. For time-insensitive ingest, between 10G to 40G is sufficient.
The only thing to consider for the firewall is to make sure that you are providing IP ranges for each rule along with its ports/range of ports. The default rules are sufficient but something specific needs to be setup, specifically for Pivot. As shown below, all except for “pivot-2” rule are defaults. The IPs that would need access to Pivot (port 9095), Coordinator (8081), Overlord (8090), and Broker (8082) need to be defined. The term “Ingress” is equivalent to “Inbound” and “Egress” is the same as “Outbound”.
Also make sure to allow HTTP traffic.
Login to your GCP account and validate that you can create folders, upload files, and delete files in your assigned bucket.
You can download the Google Cloud SDK here. There are two tools from this SDK that you will be using on a regular basis – gcloud and gsutil. Install the SDK and follow the instructions for initializing it. It will eventually ask you to authenticate yourself with your gmail account.
There are several options to login to the vms for Imply and Dataproc. You can check all the options here but often it is easier to use the OS Login as this doesn’t require managing ssh keys. Before deploying vms, make sure to update your Metadata key/value pair as shown below. The key is “enable-oslogin” and value is “TRUE”. This is at the project level. So if you have multiple projects and you want to use OS Login option, make sure to go to each project and enter the same information. If you do have vms already deployed prior to enabling OS Login metadata update, you can go and edit the vm configuration under “Custom metadata” and enter the same key/value pair. It’s not necessary to restart the vm as this change will take effect immediately.
Here are the two steps to login into your vms:
Create your vms with the proper configuration that will meet your use case. GCP provides you the ability to expand the memory up to 624GB per vm. A sample setup is shown below.
Before installing Imply, make sure that the vms can read/write from/to your GCS bucket. You can use the sample command format below. Most of the commands from here on out will require “sudo” level access.
To write to a bucket:sudo gsutil cp /path_to_file gs://some_bucket
To read data from a bucket:sudo gsutil cp gs://some_bucket /path_to_file_directory
If both commands are successful, your vms have correct access to your buckets.
At this point, you can now install Imply. Use the latest version available here. Update all the necessary Druid configuration files.
Make sure to deploy Dataproc in the same GCP region as Imply. Choose the right number of CPUs and sufficient memory to meet the SLA for hadoop ingest into Druid. One thing to note is that the “Local SSDs (0-8)” field is for storing temporary/staging data when Hadoop jobs are running. Choose the appropriate number of disk for the use case.
The Advanced section below highlights Dataproc version 1.3. This version (or greater) is what is needed as it ships with druid-google-extensions and the gcs-connector-hadoop that is compatible with Imply 2.8.x and above.
You can leave the defaults in the last section. If you do have to automate the configuration after you install the cluster, you can define your initialization scripts stored in the GCS bucket and they will be executed upon completion of the vm provisioning.
Run a test to read files into the bucket using the hadoop command below. It should list all directories/file underneath it.
hadoop fs -ls gs://your_bucket
Dataproc has some configuration files, along with the packaged open source Druid jars that are needed to be copied over to Imply.
There are several things that needs to be copied from Dataproc and pushed over to Imply. Here is the list; make sure to apply proper permissions after copying them.
Create a bucket that will store these configuration files. You can use a command similar to the one below to copy them over. Login to one of the Dataproc vms and run the command.
sudo gsutil cp /usr/lib/hadoop/etc/hadoop/*-site.xml gs://your_bucket
This will copy over all the Hadoop configurations to your bucket. The next step is to copy the druid-google-extensions folder, google client/api and gcs-connector jars to your bucket. The gcs-connector and the rest of the google jars also needs to be copied over to all Dataproc vms under /usr/lib/hadoop/lib/.
sudo gsutil cp \
/opt/druid/apache-druid-0.13.0-incubating/extensions/druid-google-extensions \
gs://your_bucket
sudo gsutil cp /usr/lib/hadoop/lib/gcs-connector.jar gs://your_bucket
sudo cp \
/opt/druid/apache-druid-0.13.0-incubating/extensions/druid-google-extensions/*google* \
/usr/lib/hadoop/lib/
sudo cp \
/opt/druid/apache-druid-0.13.0-incubating/extensions/druid-google-extensions/gcs-connector
.jar /usr/lib/hadoop/lib/
Update the hadoop-env.sh script to include the gcs-connector.jar in the HADOOP_CLASSPATH. Your entry should look like this. It will automatically be picked up by Dataproc.
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/hadoop/lib/gcs-connector.jar
First and foremost, remove the druid-hdfs-storage extension if you have it in your loadList. Your loadList from your common.runtime.properties file should look similar to below.
druid.extensions.loadList=["druid-parser-route","druid-lookups-cached-global","mysql-metadata-storage","druid-google-extensions"]
If you don’t remove the hdfs extension, it will show an error below which will not allow your Dataproc job to run and finish.
1) Error injecting constructor, java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.druid.storage.hdfs.HdfsDataSegmentKiller.<init>(HdfsDataSegmentKiller.java:47)
while locating org.apache.druid.storage.hdfs.HdfsDataSegmentKiller
at org.apache.druid.storage.hdfs.HdfsStorageDruidModule.configure(HdfsStorageDruidModule.java:94) (via modules: com.google.inject.util.Modules$OverrideModule -> org.apac
he.druid.storage.hdfs.HdfsStorageDruidModule)
while locating org.apache.druid.segment.loading.DataSegmentKiller annotated with @com.google.inject.multibindings.Element(setName=,uniqueId=146, type=MAPBINDER, keyType=
java.lang.String)
Run the following commands to copy over all the configurations and jars from the bucket and put them in their respective location. All of these commands should be applied to all Imply vms. Ensure proper permissions are set.
sudo gsutil cp gs://your_bucket/gcs-connector.jar <imply_home_dir>/dist/druid/lib
sudo gsutil cp gs://your_bucket/druid-google-extensions \
<imply_home_dir>/dist/druid/extensions \
sudo gsutil cp gs://your_bucket/gcs-connector.jar \
<imply_home_dir>/dist/druid/extensions/druid-google-extensions
sudo gsutil cp gs://your_bucket/*-site.xml <imply_home_dir>/conf/druid/_common
The default Hadoop working tmp directory from Imply doesn’t exist in Dataproc so the MiddleManager runtime.properties need to be updated to reflect the Hadoop working tmp directory (/hadoop/tmp
) in Dataproc. The update property file should look like this.
druid.indexer.task.hadoopWorkingPath=/hadoop/tmp
Update the common.runtime.properties as shown below.
# For GCS as Deep Storage
# Cloudfiles storage configuration
druid.storage.type=google
druid.google.bucket=imply
druid.google.prefix=druid
# Indexing service logs
druid.indexer.logs.type=google
druid.indexer.logs.bucket=<bucketname>
druid.indexer.logs.prefix=druid/indexing-logs
There are two levels of testing required to validate initial setup before doing hadoop-indexing using Dataproc.
Go to Pivot UI http://pivot_ip:9095. Then follow the steps below to do a native batch ingest and storing segments in GCS bucket.
The historicals also should have the Wikipedia segments.
This completes the batch ingest into GCS.
We will be using the same Wikipedia dataset for this test. Copy over the compressed Wikipedia file locally onto your machine and load it to GCS.
$ wget https://static.imply.io/data/wikipedia.json.gz .
$ gunzip wikipedia.json.gz
$ gsutil cp wikipedia.json gs://your_bucket
Follow the steps below to run the Hadoop indexer.
{ "type" : "index_hadoop", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "parser" : { "type" : "hadoopyString", "parseSpec" : { "format" : "json", "dimensionsSpec" : { "dimensions" : [ "isRobot", "diffUrl", { "name": "added", "type": "long" }, "channel", "flags", { "name": "delta", "type": "long" }, "isUnpatrolled", "isNew", { "name": "deltaBucket", "type": "long" }, "isMinor", "isAnonymous", { "name": "deleted", "type": "long" }, "namespace", "comment", "page", { "name": "commentLength", "type": "long" }, "user", "countryIsoCode", "regionName", "cityName", "countryName", "regionIsoCode", { "name": "metroCode", "type": "long" } ] }, "timestampSpec": { "column": "timestamp", "format": "iso" } } }, "metricsSpec" : [], "granularitySpec" : { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": { "type": "none" }, "rollup": false, "intervals": null } }, "ioConfig" : { "type" : "hadoop", "inputSpec" : { "type" : "static", "paths" : "gs://your_bucket/wikipedia.json" } }, "tuningConfig" : { "type" : "hadoop", "partitionsSpec" : { "type" : "hashed", "targetPartitionSize" : 5000000 }, "forceExtendableShardSpecs" : true, "jobProperties" : { "mapreduce.job.classloader": "true", "mapreduce.job.user.classpath.first": "true", "mapreduce.map.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8", "mapreduce.reduce.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8" } } } }
com.google.inject.Guice - UnknownHostExceptionX
2019-03-16T19:54:08,402 INFO [main] com.google.inject.Guice - An exception was caught and reported. Message: java.net.UnknownHostException: some-host-m java.lang.IllegalArgumentException: java.net.UnknownHostException: some-host-m
2019-03-19T15:09:38,646 INFO [task-runner-0-priority-0] org.apache.druid.indexer.DetermineHashedPartitionsJob - Path[var/druid/hadoop-tmp/wikipedia-2/2019-03-19T150739.100Z_1c16425db4864050bbf859979c3da5b2/20160627T000000.000Z_20160628T000000.000Z/partitions.json] didn't exist!? 2019-03-19T15:09:38,646 INFO [task-runner-0-priority-0] org.apache.druid.indexer.DetermineHashedPartitionsJob - DetermineHashedPartitionsJob took 106649 millis 2019-03-19T15:09:38,647 INFO [task-runner-0-priority-0] org.apache.druid.indexer.JobHelper - Deleting path[var/druid/hadoop-tmp/wikipedia-2/2019-03-19T150739.100Z_1c16425db4864050bbf859979c3da5b2] 2019-03-19T15:09:38,781 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_hadoop_wikipedia-2_2019-03-19T15:07:39.100Z]: LockListAction{} 2019-03-19T15:09:38,784 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_hadoop_wikipedia-2_2019-03-19T15:07:39.100Z] to overlord: [LockListAction{}]. 2019-03-19T15:09:38,802 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.HadoopIndexTask - Setting version to: 2019-03-19T15:07:39.112Z 2019-03-19T15:09:39,075 INFO [task-runner-0-priority-0] org.apache.druid.indexing.common.task.HadoopIndexTask - Starting a hadoop index generator job... 2019-03-19T15:09:39,126 INFO [task-runner-0-priority-0] org.apache.druid.indexer.path.StaticPathSpec - Adding paths[gs://imply-walmart/test/wikipedia-2016-06-27-sampled.json] 2019-03-19T15:09:39,130 INFO [task-runner-0-priority-0] org.apache.druid.indexer.HadoopDruidIndexerJob - No metadataStorageUpdaterJob set in the config. This is cool if you are running a hadoop index task, otherwise nothing will be uploaded to database. 2019-03-19T15:09:39,189 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.common.task.HadoopIndexTask - Encountered exception in HadoopIndexGeneratorInnerProcessing.
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
2019-03-19T13:54:59,913 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.common.task.HadoopIndexTask - Got invocation target exception in run(), cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
com.google.inject.ProvisionException
com.google.inject.ProvisionException: Unable to provision, see the following errors: 1) Error injecting constructor, java.lang.IllegalArgumentException: Can not create a Path from an empty string at org.apache.druid.storage.hdfs.HdfsDataSegmentKiller.<init>(HdfsDataSegmentKiller.java:47) while locating org.apache.druid.storage.hdfs.HdfsDataSegmentKiller at org.apache.druid.storage.hdfs.HdfsStorageDruidModule.configure(HdfsStorageDruidModule.java:94) (via modules: com.google.inject.util.Modules$OverrideModule -> org.apache.druid.storage.hdfs.HdfsStorageDruidModule) while locating org.apache.druid.segment.loading.DataSegmentKiller annotated with @com.google.inject.multibindings.Element(setName=,uniqueId=147, type=MAPBINDER, keyType=java.lang.String) at org.apache.druid.guice.Binders.dataSegmentKillerBinder(Binders.java:41) (via modules: com.google.inject.util.Modules$OverrideModule -> org.apache.druid.storage.hdfs.HdfsStorageDruidModule -> com.google.inject.multibindings.MapBinder$RealMapBinder) while locating java.util.Map<java.lang.String, org.apache.druid.segment.loading.DataSegmentKiller> for the 1st parameter of org.apache.druid.segment.loading.OmniDataSegmentKiller.<init>(OmniDataSegmentKiller.java:38) while locating org.apache.druid.segment.loading.OmniDataSegmentKiller at org.apache.druid.cli.CliPeon$1.configure(CliPeon.java:218) (via modules: com.google.inject.util.Modules$OverrideModule -> com.google.inject.util.Modules$OverrideModule -> org.apache.druid.cli.CliPeon$1) while locating org.apache.druid.segment.loading.DataSegmentKiller for the 5th parameter of org.apache.druid.indexing.common.TaskToolboxFactory.<init>(TaskToolboxFactory.java:113) at org.apache.druid.cli.CliPeon$1.configure(CliPeon.java:201) (via modules: com.google.inject.util.Modules$OverrideModule -> com.google.inject.util.Modules$OverrideModule -> org.apache.druid.cli.CliPeon$1) while locating org.apache.druid.indexing.common.TaskToolboxFactory for the 1st parameter of org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner.<init>(SingleTaskBackgroundRunner.java:95) at org.apache.druid.cli.CliPeon$1.configure(CliPeon.java:240) (via modules: com.google.inject.util.Modules$OverrideModule -> com.google.inject.util.Modules$OverrideModule -> org.apache.druid.cli.CliPeon$1) while locating org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner while locating org.apache.druid.indexing.overlord.TaskRunner for the 4th parameter of org.apache.druid.indexing.worker.executor.ExecutorLifecycle.<init>(ExecutorLifecycle.java:79) at org.apache.druid.cli.CliPeon$1.configure(CliPeon.java:224) (via modules: com.google.inject.util.Modules$OverrideModule -> com.google.inject.util.Modules$OverrideModule -> org.apache.druid.cli.CliPeon$1) while locating org.apache.druid.indexing.worker.executor.ExecutorLifecycle
Real-time analytics architecture with Imply Polaris on Microsoft Azure
This article provides an architectural overview of how Imply Polaris integrates with Microsoft Azure services to power real-time analytics applications.
Learn MoreStreamlining Time Series Analysis with Imply Polaris
We are excited to share the latest enhancements in Imply Polaris, introducing time series analysis to revolutionize your analytics capabilities across vast amounts of data in real time.
Learn MoreUsing Upserts in Imply Polaris
Transform your data management with upserts in Imply Polaris! Ensure data consistency and supercharge efficiency by seamlessly combining insert and update operations into one powerful action. Discover how Polaris’s...
Learn More