Migrate Analytics Data from MongoDB to Apache Druid
Sep 21, 2023
Rick Jacobs
In recent years, MongoDB has become a popular online transactional processing (OLTP) option because of its scalability and the flexibility inherent in NoSQL databases. However, as data scales, some developers feel the need to migrate to an online analytical processing (OLAP) specific platform like Apache Druid, for more advanced analytical capabilities beyond what MongoDB can offer. This blog will guide readers on migrating data from MongoDB to Druid, including the steps and code to connect to MongoDB, extract the data, and then ingest it into Druid.
Druid is specifically designed for high-performance analytical queries. It stores data in a columnar format but what sets Druid apart is its indexing engine and segmented data storage which tremendously boosts query performance. The indexing allows Druid to swiftly filter and aggregate data at the segment level. Consequently, Druid can fetch only the pertinent data for a specific query instead of scanning the entire dataset. This method considerably cuts down the I/O operations required to pull data from storage, leading to faster query results.
The Data Migration Process
To execute the steps outlined in this blog, you will need the following prerequisites:
Extracting Data from MongoDBThe Python script below is designed to fetch data from a MongoDB database and store the retrieved records in a CSV file format. The script loads MongoDB connection configurations from the database_configs.json file using the load_config function. Then the connect_to_mongodb function establishes a connection to the MongoDB instance, utilizing the credentials and connection details specified in the configuration. With the connection established, the fetch_documents function is invoked to pull all documents from the customer_data collection in the my_sample database. These documents are stored in a CSV file with the save_results_to_csv function. If any exceptions or errors arise during the process, relevant error messages are shown to the user. After the data retrieval and storage processes are completed, the script calls the create_ingest_spec function from the mongodb_ingest module which will be shown later.
Python
import csvimport jsonimport pymongoimport sysfrom pymongo import MongoClientfrom pymongo.server_api import ServerApifrom mongodb_ingest import create_ingest_specdefload_config(file_path='database_configs.json'):"""Load MongoDB configuration from a JSON file."""withopen(file_path,'r')as file:return json.load(file)defconnect_to_mongodb(config):"""Connect to MongoDB using the provided configuration.""" user = config['mongodb']['user'] password = config['mongodb']['password'] cluster = config['mongodb']['cluster'] uri =f"mongodb+srv://{user}:{password}{cluster}" client =MongoClient(uri,server_api=ServerApi('1'))return clientdeffetch_documents(client,db_name,collection_name):"""Fetch documents from the specified database and collection.""" db = client[db_name] collection = db[collection_name]returnlist(collection.find())defsave_results_to_csv(documents,output_file="data/mongodb_data.csv"):"""Save the result set to a CSV file."""withopen(output_file,'w',newline='')as file: writer = csv.writer(file)if documents: writer.writerow(documents[0].keys())for document in documents: writer.writerow(document.values())defget_data():try: config =load_config()withconnect_to_mongodb(config)as client:print("Connected to MongoDB!") results =fetch_documents(client,'my_sample','customer_data')save_results_to_csv(results)except pymongo.errors.ConfigurationError:print("Invalid URI host error. Check the Atlas host name in your connection string.")exceptExceptionas e:print(f"An unexpected error occurred: {e}") sys.exit(1)if __name__ =="__main__":get_data()create_ingest_spec()
Note that the load_config function uses a config file to store MongoDB connection data.
Below is an example of the `database_configs.json` that stores the database configuration details that are used in the script above.
For this batch upload process, the script below constructs an ingestion specification for Druid using the construct_ingestion_spec function. This function takes the name of the data source and the directory where the CSV data is located. The post_ingest_spec_to_druid function is utilized to send the specification to Druid. This function employs the requests library to dispatch the ingestion specification via an HTTP POST request. If any errors arise during this process, they are captured and presented to the user. For added convenience, the script comes with default values for the data source, directory, and Druid host which can be overridden by passing alternate parameters to the create_ingest_spec function.
Python
import jsonimport requestsdefconstruct_ingestion_spec(DATA_SOURCE,BASE_DIR):"""Construct the ingestion spec based on the data source and directory."""return{"type":"index_parallel","spec":{"ioConfig":{"type":"index_parallel","inputSource":{"type":"local","baseDir": BASE_DIR,"filter":f"{DATA_SOURCE}.csv"},"inputFormat":{"type":"csv","findColumnsFromHeader":True}},"tuningConfig":{"type":"index_parallel","partitionsSpec":{"type":"dynamic"}},"dataSchema":{"dataSource": DATA_SOURCE,"timestampSpec":{"column":"CREATED_AT","format":"auto"},"dimensionsSpec":{"useSchemaDiscovery":True,"dimensionExclusions":[]},"granularitySpec":{"queryGranularity":"none","rollup":False,"segmentGranularity":"hour"}}}}defpost_ingest_spec_to_druid(spec,DRUID_HOST):"""Post the ingestion spec to Druid and return the response.""" headers ={'Content-Type':'application/json'} response = requests.post(DRUID_HOST, json.dumps(spec),headers=headers)return responsedefcreate_ingest_spec(DATA_SOURCE='mongodb_data',BASE_DIR='/Users/rick/IdeaProjects/CodeProjects/druid_data_integrations/data',DRUID_HOST='http://localhost:8081/druid/indexer/v1/task'):"""Create the ingestion spec and post it to Druid.""" spec =construct_ingestion_spec(DATA_SOURCE, BASE_DIR)print(f'Creating .csv ingestion spec for {DATA_SOURCE}')try: response =post_ingest_spec_to_druid(spec, DRUID_HOST) response.raise_for_status()print(response.text)except requests.HTTPError:print(f"HTTP error occurred: {response.text}")exceptExceptionas e:print(f"An error occurred: {e}")if __name__ =="__main__":create_ingest_spec()
After the primary data ingestion, subsequent updates from MongoDB can be efficiently managed using a change data capture (CDC) approach. This method exploits the LAST_UPDATED timestamp to pinpoint new or modified records. Then, either data loading techniques are employed to append or refresh the records, or various ELT tools are utilized. For example:
MongoDB Change Streams: MongoDB boasts a feature termed “Change Streams” which lets users track and respond to data modifications in real-time. Leveraging MongoDB Change Streams, you can pinpoint altered data and then employ ETL tools or bespoke scripts to channel these modifications into Druid.
ETL Tools: Established tools such as Apache NiFi, Talend, or StreamSets offer robust capabilities to design workflows that can capture, process, and transfer alterations from MongoDB straight to Druid.
Custom Scripts: If your changes exhibit a unique complexity or sheer volume, it might be worthwhile to develop custom scripts. These scripts can be scheduled to routinely check MongoDB for modifications and subsequently relay these updates to Druid.
Third-party Solutions: The market offers specialized third-party solutions and services with inherent CDC features. These can be tailored to bridge MongoDB and Druid, offering a hands-off, managed approach for those who prefer it.
Sample Query
To verify that the data was ingested, select the query interface from within the Druid console and execute SQL similar to the example below:
Summary
MongoDB is one of the more popular NoSQL databases among developers. However, for fast analytics on large datasets, moving data to Druid may offer more efficient data analysis and quicker dynamic visualizations. Druid’s design philosophy is centered around delivering swift analytical queries. It employs a columnar data structure, liberal indexing, and segment-oriented storage, which form the foundation of its superior performance. These design choices ensure that only the relevant data chunks are accessed during queries, optimizing resource usage. Additionally, Druid’s native support for real-time data streams establishes it as a top choice for real-time analytics. By adhering to the steps outlined in this blog, you can harness your data more effectively for near real-time analytics and ensure swifter data-based decision-making.
About the Author
Rick Jacobs is a Senior Technical Product Marketing Manager at Imply. His varied background includes experience at IBM, Cloudera, and Couchbase. He has over 20 years of technology experience garnered from serving in development, consulting, data science, sales engineering, and other roles. He holds several academic degrees including an MS in Computational Science from George Mason University. When not working on technology, Rick is trying to learn Spanish and pursuing his dream of becoming a beach bum.
Other blogs you might find interesting
No records found...
Sep 06, 2024
Real-time analytics architecture with Imply Polaris on Microsoft Azure
This article provides an architectural overview of how Imply Polaris integrates with Microsoft Azure services to power real-time analytics applications.
Streamlining Time Series Analysis with Imply Polaris
We are excited to share the latest enhancements in Imply Polaris, introducing time series analysis to revolutionize your analytics capabilities across vast amounts of data in real time.
Transform your data management with upserts in Imply Polaris! Ensure data consistency and supercharge efficiency by seamlessly combining insert and update operations into one powerful action. Discover how Polaris’s...