Snowflake Data Pipelines on Kubernetes- An Event-Driven Approach with Microsoft Azure, Argo Events and Argo Workflows
Event-driven data ingestion loads data into a target system in response to specific events or triggers. This approach is generally more efficient than traditional batch processing since it allows for real-time responses to events. Snowflake provides various methods for data ingestion including bulk loading and continuous processing. It allows the loading of data from the following cloud services:
- Amazon S3
- Google Cloud Storage
- Microsoft Azure Blob Storage.
In this article, we’ll explore a practical setup in which data ingestion is triggered by new file uploads to a Microsoft Azure Blob Storage account, which will function as our external stage for Snowflake. The diagram below illustrates the architecture we’ll be building:
Table of Contents
- Prerequisites
- Introducing GloboLatte
- Create the Azure components
- Create the Snowflake components
- Create the Argo components
- Putting It to the Test
- Summary
Prerequisites
Before you get started, please ensure you have the following:
- Snowflake account: This is where we will set up the Snowflake resources such as data warehouses, databases and tables to support the data ingestion processes.
- Microsoft Azure account: This is where we will set up the cloud infrastructure needed to support the data ingestion processes.
- Azure service principal: For Terraform provider authentication
- Terraform: You will need this to provision the Cloud and Snowflake resources we need to support the data ingestion processes.
- SnowSQL: This is the command line client for connecting to Snowflake to execute SQL queries and perform all DDL and DML operations.
- Kubectl: You’ll need this to interact with a Kubernetes cluster to create the Argo components.
- Kubernetes: A cluster with Argo Events and Argo Workflows installed. Checkout Part 1 and Part 1 of my series on this topic
Introducing GloboLatte
GloboLatte, is a fictitious company that specializes in selling coffee-derived products like beverages and pastries. Their goal is to provide the best coffee, offering swift service regardless of when and where customers place their orders.
GloboLatte operates business units in America, Canada, and Mexico. At the end of each day, the operations team at each business unit uploads sales data files to an Azure Blob Storage account. To improve operational efficiency, GloboLatte aims to implement an event-driven architecture for data ingestion into their Snowflake account. This system will allow them to react promptly to the new sales data uploaded by each business unit.
To design an effective Snowflake database for GloboLatte, we’ll establish a structured schema that accommodates their sales data and optimizes for event-driven data ingestion. Below is a proposed design including database, schema, tables, and warehouses.
-
Snowflake database design
- Database: GLOBO_LATTE_DB
- Warehouse: GLOBO_LATTE_WH
- Schema: SALES_DATA
- File Format CSV
-
Ingestion architecture overview
The image below illustrates the relationship between event publishers, event subscriptions, and event handlers.
- Data upload: At the end of each day, the sales operations team from each business unit uploads their sales data files to Azure Blob Storage for centralized access and analysis.
- Event generation: Each time a new data file is uploaded, a BlobCreated event is triggered in Azure Blob Storage. This event is then pushed using Azure Event Grid to an Event Hub subscriber. Event Grid uses event subscriptions to route event messages to subscribers.
- Event handling: GloboLatte utilizes Azure Event Hubs to capture these BlobCreated events in real time, tracking every file upload efficiently across their global network.
- Workflow execution: The BlobCreated event are routed to Argo Events, triggering an Argo workflow. Within this workflow, we load the files into a Snowflake internal stage. Finally, we execute a COPY command to transfer the data from the internal stage into Snowflake tables
-
Differences from Snowpipe
While Snowpipe is a powerful tool for continuous data ingestion into Snowflake, our kubernetes based approach offers several advantages, particularly in terms of cost efficiency and resource utilization.
- Cost Considerations: Snowpipe can be expensive at high data volumes due to its pricing model based on the amount of data processed and the frequency of loading.
- Kubernetes Integration: By leveraging existing Kubernetes clusters, we take advantage of built-in mechanisms for cost savings.
- Open-Source Flexibility: Our solution employs Argo Events and Argo Workflows, both of which are open-source tools - hence eliminates ongoing licensing fees associated with Snowpipe, making our approach more budget-friendly for organizations with high ingestion needs.
- Enhanced Control: Unlike Snowpipe, which can feel like a black box, our Kubernetes-based solution provides greater control and allows for extensive customization of the data ingestion process.
Now that we have an example scenario to work with and the benefits outlined above, let’s explore how we can implement this architecture for the GloboLatte data platform.
Create the Azure components
This implementation will leverage a variety of cloud components, including Azure Storage and Azure Event Hub, alongside Snowflake resources such as databases, warehouses, and tables. All resources will be defined and provisioned using Terraform, ensuring a streamlined and efficient setup. To setup the foundation for our data ingestion platform, we’ll start by deploying the necessary resources in Azure.
Apply the terraform configuration to provision the resources.
Here’s a breakdown of what gets created:
- Dedicated Resource group is created to organize and manage all related resources.
- Virtual Network
- Established with a defined address space specified
- A core subnet is created within the virtual network
- Service endpoints are configured for both Azure Storage and Azure Event Hubs, for secure access.
- Azure Event Hub namespace
- Trusted service access is enabled, and default action is set to “Deny,” ensuring a secure environment.
- Specific IP rules are established to allow access from designated IP addresses.
- A virtual network rule is configured to enable access from the specified subnet.
- A system-assigned identity is created for secure access management.
- A Private endpoint is created, linking the namespace to a specific subnet, isolating it from the public internet
- A private DNS zone group is configured, ensuring that DNS resolution for the private endpoint works seamlessly.
- An Event Hub named “snowflake” is created within the namespace.
- Event Grid system topic is set up to connect the Azure Storage account.
- This topic is where BlobCreated events will originate.
- This topic facilitates event routing from the storage account to the Event Hub.
- A system-assigned identity is also configured for secure interactions.
- A role assignment is made, granting the “Azure Event Hubs Data Sender” role to the Event Grid system topic.
- Event subscription is created for the Event Grid system topic, specifically configured to handle “BlobCreated” events. This subscription routes these events to the Event Hub, enabling real-time processing of new data files uploaded to Azure Blob Storage
- Private Link Access is configured for direct access to system topics and domains within the Event Grid, ensuring that events can be sent securely without exposing data to the public internet
Create the Snowflake componets
To establish the necessary infrastructure in Snowflake for GloboLatte’s data ingestion and analysis, let’s execute our Terraform code for the same.
Here’s a breakdown of what gets created:
- Database named GLOBO_LATTE_DB. This serves as the primary container for all data objects, including schemas, tables, and file formats related to GloboLatte’s operations. Schema: SALES_DATA
- Schema schema called SALES_DATA within the GLOBO_LATTE_DB databse. Schemas help organize and group related tables logically, providing a clear structure for managing data assets associated with sales transactions and other related information.
- Warehouse named GLOBO_LATTE_WH with a size set to small. This warehouse will be utilized for processing queries, loading data, and running analytics. The small size is suitable for initial workloads, with the ability to scale as needed.
- Tables:
- Sales transactions table to store transaction records.
- Products table to hold product details
- Customer table to store information about customers
- Business units table to hold records of different business units
- File format named CSV_FORMAT which specifies how CSV files will be handled when loaded into Snowflake. The format configuration includes settings such as field delimiters, skipping headers, handling blank lines, and compression settings. This prepares the environment for seamless data ingestion from CSV files.
You can verify that all componets have been created using the SnowSQL CLI commands below
Create the Argo Events componets
With the necessary resources for our data ingestion pipeline established in Azure and Snowflake, let’s now enhance our system by integrating event-driven capabilities through Argo Events and Argo Workflows. The goal is to listen for messages in Azure Event Hubs and trigger workflows in Argo based on these events, which will execute data ingestion commands using the SnowSQL CLI.
To accomplish this, we will set up the following resources:
- EventSource: The EventSource will define the configurations required to consume events from various external sources, transform them into CloudEvents and dispatch them to the EventBus. In our setup, the EventSource will be configured to consume events from Azure Event Hub.
- EventBus: The EventBus will serve as the transport layer for Argo Events, connecting our EventSource and Sensor. EventSources publish events, while Sensors subscribe to these events to execute corresponding triggers. In our setup, the Azure Event Hub EventSource will publish messages to the Argo Events EventBus
- Sensor: The Sensor wil define a set of event dependencies (inputs) and triggers (outputs). It will listen for events on the EventBus and acts as an event dependency manager, resolving and executing triggers as events are received. In our setup, the Sensor leverages the EventSource and EventBus as its dependencies.
Connect to your Kubernetes cluster and create the resources following the steps below:
After deploying the resources, verify that they have been successfully created by running the following commands:
Create the Argo Worklow componets
Before creating the Workflow component which is the final piece of our ingestion pipeline, we need to configure a Snowflake storage integration to allow Snowflake to read data from and write data to an Azure container referenced in an external (Azure) stage.
Integrations are named, first-class Snowflake objects that avoid the need for passing explicit cloud provider credentials such as secret keys or access tokens. Integration objects store an Azure identity and access management (IAM) user ID called the app registration. You need to grant the app the necessary permissions in the Azure account.
Let’s configure the integration in three simple steps:
-
Define the ingestion Workflow: The Workflow defines the SnowSQL steps needed to transfer files from the named external stage (in this case, our Azure Blob storage account) into our Snowflake internal stage, and subsequently copy them into the tables. The Workflow is structured as a Directed Acyclic Graph (DAG) with the following steps for data ingestion:
- Create a Named External Stage
- Load the Data from the named external stage into a target table
- Validate the Load by verifying that the rows were successfully loaded into the table
- Cleanup the Stage after confirming the successful data load
Connect to your Kubernetes cluster and create the resources following the steps below:
Putting It to the Test
Now that we have deployed and configured all components, it’s time to test our event-driven data ingestion pipeline. For this test, we will upload a sales_transaction.csv file, which can be downloaded from the /snowflake/sample_data folder in the GitHub repository for this project. Once the file is uploaded, we should see our Argo workflow initiate and execute all defined steps.
The following log entry from the Argo EventSource indicates an event was successfully published following our file upload.
Additionally, the log from the Argo Sensor indicates indicates that the workflow was successfully triggered:
Monitoring these logs allows us to ensure that our pipeline is functioning as intended and provides visibility into each step of the workflow execution from start to finish as you can see below.
- Step 1: Creating the Internal Stage
- Step 2: Loading Data into the Table
- Step 3: Validating Loaded Rows
- Step 4: Cleaning Up by Dropping the Stage
Conclusion
In conclusion, the event-driven architecture we have implemented for GloboLatte marks a significant leap toward achieving their operational excellence. By seamlessly integrating sales data uploads from their business units in America, Canada, and Mexico into a centralized Snowflake database, GloboLatte can now react swiftly to real-time data insights.
This architecture also enhances the GloboLatte’s ability to analyze sales trends, manage inventory efficiently, and ultimately provide exceptional service to their customers. The structured schema we designed optimally supports their data needs, ensuring that as new sales data flows in, it can be processed and utilized effectively.
As GloboLatte continues to evolve, this implementation will empower them to adapt quickly to market demands, improving both customer satisfaction and business performance.
I’d love to hear your thoughts! If you found this article helpful, please leave a comment below and share it with your network. Your insights and feedback are invaluable.