Data Reliability

How to Load and Stream Data from AWS S3 to Snowflake Using Snowpipe

s3 to snowflake connection using snowpipe

Michael Segner

Michael writes about data engineering, data quality, and data teams.

Do you want near real-time streaming of data from Amazon S3 to Snowflake? Using Snowflake’s Snowpipe, it’s possible to upload a CSV to a S3 bucket and see the data populated in a Snowflake table within 60 seconds.

It works like this:

  1. A Snowpipe is built between an S3 bucket and a Snowflake data warehouse. 
  2. An Amazon SQS event notification is added to the S3 bucket.
  3. When new files are inserted into the S3 bucket, Snowpipe is notified via SQS. 
  4. Snowpipe reads the data from S3 and appends them into Snowflake. 
s3 snowflake streaming architecture
Near real-time streaming architecture

More specifically, in order for this to work seamlessly:

  1. A secure connection needs to be built between AWS and Snowflake
    1. We will use AWS IAM to establish the secure connection
      1. We will be creating a policy and attach it to a role
  2. A S3 bucket needs to be created
    1. An SQS event notification needs to be generated when new CSV files are loaded into this S3 bucket
      1. The event notifier will be connected to Snowpipe notification channel 
  3. A Snowpipe needs to be created
    1. We will also create a integration object to securely transport data

In the following example, we’ll assume that the use case we’re solving for has CSV files dropped into an S3 bucket, which will then append the information in the new file to an existing table.

Let’s goooo! 

Create AWS IAM Policy

  1. Choose IAM [Identity & Access Management] from AWS console
s3 aws iam policy
  1. Click on “Policies” and choose “Create policy”
s3 aws iam policies
  1. Paste the following code on the JSON tab
    1. Note: change the <bucket> name and <prefix> to suit your case
{ 
	"Version": "2012-10-17", 
	"Statement": [
	{ 
		"Effect": "Allow",
		"Action": [
			"s3:GetObject",
			"s3:GetObjectVersion" 
		], 
		"Resource": "arn:aws:s3:::<bucket>/<prefix>/*" 
	},
	{
		"Effect": "Allow",
		"Action": [
			"s3:ListBucket",
			"s3:GetBucketLocation"
		],
		"Resource": "arn:aws:s3:::<bucket>",
		"Condition": {
			"StringLike": {
				"s3:prefix": [
				"<prefix>/*" 
				]
			}
		}
	}
	] 
}
s3 aws json code
  1. Enter the policy name as snowflake_access and create the policy

Create AWS IAM Role

  1. Choose “Roles” and click “Create role” under Identity and Access Management
Select AWS IAM role before connecting to Snowflake
  1. Select “Trusted entity type” =”AWS account
    1. Choose “Another AWS account
      1. Account ID should be a random 12 digit number for now
      2. External ID should be a random 4 digit number for now 
s3 aws account trusted entity
  1. Name the role as mysnowflakerole and attach the policy created in the previous step
s3 aws snowflake role
  1. On the summary page of the role you will find a ARN[Amazon Resource Names] that looks like this arn:aws:iam::965746391279:role/mysnowflake
    1. Keep track of this; we will need it later. 

Integrate Snowflake and AWS

In this step, the identity generated on AWS is stored in Snowflake. This helps data engineers avoid passing credentials during every transaction.

Run the following command inside Snowflake to create an Integration object: 

CREATE STORAGE INTEGRATION S3_Snowflake 
	TYPE = EXTERNAL_STAGE 
	STORAGE_PROVIDER = S3
	ENABLED = TRUE 
        # STORAGE_AWS_ROLE_ARN retrieved from previous step
	STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::965746391279:role/mysnowflake'
        # Plug in your own bucket name here
	STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/')

(Note you will need to add your own 12 digit number in this string)

After creating the integration object run this command DESC INTEGRATION S3_Snowflake; and record the values of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID 


Update AWS IAM Role

The IAM role created by us needs to be updated with the new values retrieved from the Snowflake integration object.

In the AWS console search for IAM and click it. 

And then click Roles → Trust relationships → Edit trust relationships and paste the following code:

{ 
"Version": "2012-10-17",
"Statement": [
	{ "Sid": "", 
	"Effect": "Allow",
	"Principal": {
		"AWS": ""
		},
		"Action": "sts:AssumeRole",
		"Condition": {
			"StringEquals": {
				"sts:ExternalId": "<snowflake_external_id>"
				}
			}
		}
	]
}

Note: Plug in your own snowflake_user_arn and snowflake_external_id recorded in the previous step and save the changes.


Create an External Snowflake Stage

We need to create this in order to extract data from S3 via Snowpipe. 

Run the following code on Snowflake worksheet:

create stage mystage 

url = ‘s3://yourbucket/’ 

        # We created this in the previous step

storage_integration = S3_Snowflake; 


Create an Auto Ingesting Snowpipe

This object helps with near real-time streaming of data. 

Run the following code on Snowflake worksheet: 

create pipe S3_Snowflake auto_ingest=true as 
	copy into streaming_table
	from @mystage
	file_format = (type = 'CSV');

Note: The above statement assumes that there is a table called streaming_table within the database and its structure is identical to the CSVs structure that will be uploaded to the S3 bucket. 


Create an External Snowflake Stage

We need to create an event notification on S3. This event notifier will inform the Snowpipe when a new CSV is uploaded to the S3 bucket.

Execute Show pipes on Snowflake worksheet and copy the ARN of the SQS queue for the stage from the  notification_channel column. 

  1. On the S3 bucket click on “Properties”, scroll down, and click on “Create Event Notification
  2. Key in an appropriate name
  3. Choose “All object create events
s3 external snowflake create event notification
  1. Scroll down to Destination section and choose “SQS queue”
    1. Enter SQS queue ARN copied from the notification_channel column from the previous step
    2. Click “Save changes
s3 snowflake sqs queue

The near real-time streaming of data between S3 and Snowflake is complete. Upload a CSV to S3 bucket and try refreshing the corresponding table on Snowflake. Within 60 seconds you will see the data populated on the corresponding Snowflake table.


Interested in learning more about Monte Carlo? Fill out the form to schedule a time to speak with us!

Our promise: we will show you the product.