diff --git a/snowflake-py-pulumi-search-export/.gitignore b/snowflake-py-pulumi-search-export/.gitignore new file mode 100644 index 000000000..6dc143627 --- /dev/null +++ b/snowflake-py-pulumi-search-export/.gitignore @@ -0,0 +1,4 @@ +*.pyc +venv/ +.vscode +.build \ No newline at end of file diff --git a/snowflake-py-pulumi-search-export/Pulumi.yaml b/snowflake-py-pulumi-search-export/Pulumi.yaml new file mode 100644 index 000000000..8ccc0980a --- /dev/null +++ b/snowflake-py-pulumi-search-export/Pulumi.yaml @@ -0,0 +1,6 @@ +name: snowflake-pulumi-search-export-py +runtime: + name: python + options: + virtualenv: venv +description: A minimal Python Pulumi program diff --git a/snowflake-py-pulumi-search-export/README.md b/snowflake-py-pulumi-search-export/README.md new file mode 100644 index 000000000..8a4832f86 --- /dev/null +++ b/snowflake-py-pulumi-search-export/README.md @@ -0,0 +1,81 @@ +# Pulumi Cloud Data Export to Snowflake using AWS Lambda + +This folder contains an example to extract [Pulumi Cloud Data Export](https://www.pulumi.com/docs/pulumi-cloud/cloud-rest-api/#data-export) and load it into Snowflake. Pulumi Cloud's exported data contains detailed information about all of your resources that are managed by Pulumi. After deploying and running this example, you can query your Pulumi Cloud data in Snowflake directly or join it to other data sets of your choosing (like pricing) to create dashboards that provide valuable visibility into your organization's cloud usage. + +The infrastructure contains the following resources: + +![Architecture diagram showing a Lambda function reading a CSV from the Pulumi API and writing it to an S3 bucket, and a Snowflake pipe reading the file into Snowflake](images/snowflake-pulumi-architecture.png) + +- An S3 bucket which will contain our exported data from the Pulumi Cloud. (The exported data is in CSV format.) +- An AWS Lambda function that queries the Pulumi Cloud REST API to [export search data](https://www.pulumi.com/docs/pulumi-cloud/cloud-rest-api/#resource-search) and place the file in an S3 bucket. +- Snowflake resources (database, schema, table) to hold the data along with [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro) resources that automatically import the data whenever a file is written to the S3 bucket. The Snowflake table is designed to be append-only while still allowing easy point-in-time queries. + +## Prerequisites + +1. [Install Pulumi](https://www.pulumi.com/docs/get-started/install/) +1. [Configure AWS credentials](https://www.pulumi.com/registry/packages/aws/installation-configuration/#configuration) +1. [Configure Snowflake credentials](https://www.pulumi.com/registry/packages/snowflake/installation-configuration/#configuring-credentials) +1. [Install Python](https://www.pulumi.com/docs/languages-sdks/python/) + +## Deploy the App + +### Step 1: Initialize the Project + +1. Install packages: + + ```bash + python3 -m venv venv + venv/bin/pip install -r requirements.txt + ``` + +1. Create a new Pulumi stack: + + ```bash + pulumi stack init + ``` + +1. Deploy the Pulumi stack: + + ```bash + pulumi up + ``` + +Once the `pulumi up` command completes, we'll execute the Lambda which will pull the data from the Pulumi Cloud API and place it in the S3 bucket. + +### Step 2: Trigger the Lambda + +Trigger the Lambda with the following command: + +```bash +aws lambda invoke --function-name $(pulumi stack output lambdaArn) /dev/stdout +``` + +You should see output similar to the following: + +```json +{ + "StatusCode": 200, + "ExecutedVersion": "$LATEST" +} +``` + +After a few seconds, your data should be visible in your Snowflake database: + +![Screenshot of Snowflake Worksheet showing querying of imported Pulumi data](images/snowflake-query.png) + +## Clean Up + +Once you're finished experimenting, you can destroy your stack and remove it to avoid incurring any additional cost: + +```bash +pulumi destroy +pulumi stack rm +``` + +## Summary + +In this tutorial, you created a simple extract/load process that exports data from the Pulumi Cloud API to Snowflake. Now you can query this data in Snowflake and join it with other data sets to gain valuable insights into your organization's cloud usage! + +## Next Steps + +To enhance this architecture, you could [add a rule to run the Lambda on a schedule](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-run-lambda-schedule.html). diff --git a/snowflake-py-pulumi-search-export/__main__.py b/snowflake-py-pulumi-search-export/__main__.py new file mode 100644 index 000000000..ea27af95e --- /dev/null +++ b/snowflake-py-pulumi-search-export/__main__.py @@ -0,0 +1,352 @@ +"""A Python Pulumi program""" + +import pulumi +import pulumi_aws as aws +import pulumi_command as command +import pulumi_snowflake as snowflake + +import datetime +import json +import os + +bucket = aws.s3.Bucket("pulumi-search-export") + +config = pulumi.Config() +# NOTE: In a production scenario, for security reasons this should should call +# `config.require_secret()`, use a Pulumi access token specifically designated +# for this export process, and not fall back to an environment variable: +pulumi_access_token = config.get_secret( + "pulumi-access-token") or os.environ['PULUMI_ACCESS_TOKEN'] + +lambda_name = "pulumi-search-export-to-s3" + +ssm_parameter = aws.ssm.Parameter( + "pulumi-token", + name=f'/{lambda_name}/pulumi-access-token', + type="SecureString", + value=pulumi_access_token, + description="Pulumi Token that has access to invoke the Pulumi Cloud REST API to export search results", +) + +lambda_role = aws.iam.Role( + "lambda-role", + assume_role_policy=json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + }) +) + +aws.iam.RolePolicyAttachment( + "lambda-execute-policy", + role=lambda_role.name, + policy_arn="arn:aws:iam::aws:policy/AWSLambdaExecute", +) + +ssm_policy = aws.iam.Policy( + f"{lambda_name}-policy", + description=f"All perms needed by Lambda function {lambda_name}", + policy=pulumi.Output.all(ssm_parameter.arn, bucket.arn).apply(lambda args: json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "ssm:GetParameter", + "ssm:GetParameters", + "ssm:GetParametersByPath" + ], + "Effect": "Allow", + "Resource": args[0] + }, + { + "Action": [ + "s3:PutObject", + ], + "Effect": "Allow", + "Resource": args[1] + }, + ] + })), +) + +aws.iam.RolePolicyAttachment( + "ssm-params-attachment", + aws.iam.RolePolicyAttachmentArgs( + role=lambda_role, + policy_arn=ssm_policy.arn + ) +) + + +vendor_deps = command.local.Command( + "vendor-deps", + command.local.CommandArgs( + create="rm -rf .build && mkdir .build && pip install -r ./search_export_to_s3/requirements.txt --target .build && cp search_export_to_s3/handler.py .build", + # TODO: See if there's an easy way to base this off the contents of requirements.txt + triggers=[str(datetime.datetime.now())] + ) +) + +function = aws.lambda_.Function( + lambda_name, + aws.lambda_.FunctionArgs( + role=lambda_role.arn, + runtime="python3.8", + handler="handler.handle", + code=pulumi.FileArchive('./.build'), + timeout=60, + environment=aws.lambda_.FunctionEnvironmentArgs( + variables={ + 'DESTINATION_BUCKET_NAME': bucket.bucket, + } + ) + ), + opts=pulumi.ResourceOptions( + depends_on=vendor_deps + ) +) + +pulumi.export('lambdaArn', function.arn) + +ROLE_NAME = "pulumi-snowflake-storage-integration" + +account_id = aws.get_caller_identity().account_id + +storage_integration = snowflake.StorageIntegration( + "snowflake-storage-integration", + enabled=True, + storage_aws_role_arn=f"arn:aws:iam::{account_id}:role/{ROLE_NAME}", + storage_provider="S3", + type="EXTERNAL_STAGE", + storage_allowed_locations=["*"] +) + +snowflake_assume_role_policy = pulumi.Output.all(storage_integration.storage_aws_iam_user_arn, storage_integration.storage_aws_external_id).apply(lambda args: json.dumps({ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"AWS": args[0]}, + "Action": "sts:AssumeRole", + "Condition": { + "StringEquals": {"sts:ExternalId": args[1]} + } + }] +})) + +snowflake_role = aws.iam.Role( + "snowflake-integration-role", + name=ROLE_NAME, + description="Allows Snowflake to access the bucket containing Pulumi Cloud search export files", + assume_role_policy=snowflake_assume_role_policy +) + +snowflake_policy = aws.iam.Policy( + "snowflake-storage-integation-policy", + policy=bucket.arn.apply(lambda arn: json.dumps({ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:GetObject", + "s3:GetObjectVersion", + "s3:DeleteObject", + "s3:DeleteObjectVersion" + ], + "Resource": f"{arn}/*" + }, + { + "Effect": "Allow", + "Action": [ + "s3:ListBucket", + "s3:GetBucketLocation" + ], + "Resource": f"{arn}", + } + ] + })) +) + +aws.iam.RolePolicyAttachment( + "snowflake-policy-attachment", + role=ROLE_NAME, + policy_arn=snowflake_policy.arn +) + +database = snowflake.Database( + "pulumi-snowflake-integration", +) + +schema = snowflake.Schema( + "pulumi-snowflake-integration", + name="PULUMI_SNOWFLAKE_INTEGRATION", + database=database.name, +) + +table = snowflake.Table( + "pulumi-search-exports", + name="PULUMI_SEARCH_EXPORTS", + database=database.name, + schema=schema.name, + columns=[ + # Metadata fields: + { + "name": "FILENAME", + "type": "STRING", + "nullable": False + }, + { + "name": "LAST_MODIFIED_AT", + "type": "TIMESTAMP_NTZ", + "nullable": False + }, + { + "name": "LOADED_AT", + "type": "TIMESTAMP_NTZ", + "nullable": False + }, + # Fields from the exported file: + { + "name": "CREATED", + "type": "TIMESTAMP_NTZ", + "nullable": True + }, + { + "name": "CUSTOM", + "type": "BOOLEAN", + "nullable": False + }, + { + "name": "DELETE", + "type": "BOOLEAN", + "nullable": False + }, + { + "name": "ID", + "type": "VARCHAR", + "nullable": True + }, + { + "name": "MODIFIED", + "type": "TIMESTAMP_NTZ", + "nullable": False + }, + { + "name": "MODULE", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "NAME", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "PACKAGE", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "PARENT_URL", + "type": "VARCHAR", + "nullable": True + }, + # TODO: Our sample data does not have any rows that have a value in this + # column. Try to refine this column's definition. + { + "name": "PENDING", + "type": "VARCHAR", + "nullable": True + }, + { + "name": "PROJECT", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "PROTECTED", + "type": "BOOLEAN", + "nullable": False + }, + { + "name": "PROVIDER_URN", + "type": "VARCHAR", + "nullable": True + }, + { + "name": "STACK", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "TYPE", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "URN", + "type": "VARCHAR", + "nullable": False + }, + { + "name": "TEAMS", + "type": "VARCHAR", + "nullable": True + }, + { + "name": "PROPERTIES", + "type": "VARCHAR", + "nullable": True + }, + ] +) + +stage = snowflake.Stage( + "snowpipe-stage", + url=pulumi.Output.format("s3://{0}", bucket.bucket), + database=database.name, + schema=schema.name, + storage_integration=storage_integration.name, + comment="Loads data from an S3 bucket containing Pulumi Insights export data" +) + +# Notes: +# 1. The Snowflake PATTERN arguments are regex-style, not `ls` style. +# 2. The PATTERN clause is so that we do not run the COPY statement for files we don't want to import. +copy_statment = pulumi.Output.format(""" +COPY INTO \"{0}\".\"{1}\".\"{2}\" +FROM (SELECT metadata$filename, metadata$file_last_modified, sysdate(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18 FROM @"{0}"."{1}"."{3}") +FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1) +PATTERN=\"pulumi-search-exports/.*.csv\" +""", database.name, schema.name, table.name, stage.name) + + +pulumi.export("copy_statement", copy_statment) + +pipe = snowflake.Pipe( + "pipe", + auto_ingest=True, + comment="My pipe's comment", + copy_statement=copy_statment, + database=database.name, + schema=schema.name +) + +aws.s3.BucketNotification( + "bucket-notification", + bucket=bucket.bucket, + queues=[{ + "queue_arn": pipe.notification_channel, + "events": ["s3:ObjectCreated:*"] + }] +) + +pulumi.export("lambdaArn", function.arn) diff --git a/snowflake-py-pulumi-search-export/images/snowflake-pulumi-architecture.png b/snowflake-py-pulumi-search-export/images/snowflake-pulumi-architecture.png new file mode 100644 index 000000000..6fc4cb9f0 Binary files /dev/null and b/snowflake-py-pulumi-search-export/images/snowflake-pulumi-architecture.png differ diff --git a/snowflake-py-pulumi-search-export/images/snowflake-query.png b/snowflake-py-pulumi-search-export/images/snowflake-query.png new file mode 100644 index 000000000..500a2917a Binary files /dev/null and b/snowflake-py-pulumi-search-export/images/snowflake-query.png differ diff --git a/snowflake-py-pulumi-search-export/requirements.txt b/snowflake-py-pulumi-search-export/requirements.txt new file mode 100644 index 000000000..0a42dcbe4 --- /dev/null +++ b/snowflake-py-pulumi-search-export/requirements.txt @@ -0,0 +1,4 @@ +pulumi>=3.0.0,<4.0.0 +pulumi_aws>=5.0.0,<6.0.0 +pulumi_command>=0.7.2,<1.0.0 +pulumi_snowflake>=0.2.0,<1.0.0 \ No newline at end of file diff --git a/snowflake-py-pulumi-search-export/search_export_to_s3/handler.py b/snowflake-py-pulumi-search-export/search_export_to_s3/handler.py new file mode 100644 index 000000000..4f6a421e8 --- /dev/null +++ b/snowflake-py-pulumi-search-export/search_export_to_s3/handler.py @@ -0,0 +1,63 @@ +import boto3 +import requests +import os +import datetime + +API_ENDPOINT = 'https://api.pulumi.com/api' +ORG = 'jkodrofftest' # TODO: Make org an env var + + +def get_pulumi_token(): + ssm_path = '/pulumi-search-export-to-s3/pulumi-access-token' + + ssm = boto3.client('ssm') + response = ssm.get_parameter( + Name=ssm_path, + WithDecryption=True + ) + + value = response['Parameter']['Value'] + if not value: + raise Exception( + f"An SSM secret must be defined at the path '{ssm_path}' and must be set to the value of a valid Pulumi access token.") + + return value + + +PULUMI_TOKEN = get_pulumi_token() + +HEADERS = { + 'Authorization': f'token {PULUMI_TOKEN}', + 'Accept': 'application/vnd.pulumi+8', + 'Content-Type': 'application/json', +} + + +def get_bucket_name(): + key = 'DESTINATION_BUCKET_NAME' + if key not in os.environ: + raise Exception( + f"Environment variable '{key}' must be set to the ARN of the destination bucket in which to place exported search results.") + + return os.environ[key] + + +BUCKET_NAME = get_bucket_name() + + +def handle(event, context): + timestamp = datetime.datetime.now() + r = requests.get( + f'{API_ENDPOINT}/orgs/{ORG}/search/resources/export', headers=HEADERS) + + filename = f'{str(timestamp).replace(" ", "-").replace(":","-")}.csv' + + s3 = boto3.client('s3') + s3.put_object( + Bucket=BUCKET_NAME, + Key=f"pulumi-search-exports/{filename}", + Body=r.content + ) + + print(r.content) + return "OK" diff --git a/snowflake-py-pulumi-search-export/search_export_to_s3/requirements.txt b/snowflake-py-pulumi-search-export/search_export_to_s3/requirements.txt new file mode 100644 index 000000000..1ddfcee01 --- /dev/null +++ b/snowflake-py-pulumi-search-export/search_export_to_s3/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.30.0,<3.0.0 +boto3>=1.26.129,<2.0.0 \ No newline at end of file