Skip to content

Commit

Permalink
Add Lambda to requeue undownloaded granules (#35)
Browse files Browse the repository at this point in the history
Fixes #31
  • Loading branch information
chuckwondo authored Dec 6, 2023
1 parent 21a1283 commit 5f10f4c
Show file tree
Hide file tree
Showing 14 changed files with 1,920 additions and 8 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
echo "downloader=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/downloader/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubproductapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_product_api/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubsearchapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_search_api/Pipfile)" >> $GITHUB_OUTPUT
echo "requeuer=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/requeuer/Pipfile)" >> $GITHUB_OUTPUT
# echo "db=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/layers/db/Pipfile)" >> $GITHUB_OUTPUT
- name: Setup root cache
Expand Down Expand Up @@ -88,6 +89,13 @@ jobs:
path: /home/runner/.local/share/virtualenvs/mock_scihub_search_api-${{ steps.hashes.outputs.mockscihubsearchapi }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/mock_scihub_search_api/Pipfile.lock') }}

- name: Setup requeuer cache
uses: actions/cache@v3
id: requeuer-cache
with:
path: /home/runner/.local/share/virtualenvs/requeuer-${{ steps.hashes.outputs.requeuer }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/requeuer/Pipfile.lock') }}

# - name: Setup db cache
# uses: actions/cache@v3
# id: db-cache
Expand Down Expand Up @@ -130,6 +138,11 @@ jobs:
run: |
make -C lambdas/mock_scihub_search_api install
- name: Install requeuer dependencies
if: steps.requeuer-cache.outputs.cache-hit != 'true'
run: |
make -C lambdas/requeuer install
- name: Install db dependencies
# if: steps.db-cache.outputs.cache-hit != 'true'
run: |
Expand Down Expand Up @@ -168,6 +181,7 @@ jobs:
echo "downloader=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/downloader/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubproductapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_product_api/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubsearchapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_search_api/Pipfile)" >> $GITHUB_OUTPUT
echo "requeuer=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/requeuer/Pipfile)" >> $GITHUB_OUTPUT
# echo "db=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/layers/db/Pipfile)" >> $GITHUB_OUTPUT
- name: Setup root cache
Expand Down Expand Up @@ -219,6 +233,13 @@ jobs:
path: /home/runner/.local/share/virtualenvs/mock_scihub_search_api-${{ steps.hashes.outputs.mockscihubsearchapi }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/mock_scihub_search_api/Pipfile.lock') }}

- name: Setup requeuer cache
uses: actions/cache@v3
id: requeuer-cache
with:
path: /home/runner/.local/share/virtualenvs/requeuer-${{ steps.hashes.outputs.requeuer }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/requeuer/Pipfile.lock') }}

# - name: Setup db cache
# uses: actions/cache@v3
# id: db-cache
Expand Down Expand Up @@ -261,6 +282,11 @@ jobs:
run: |
make -C lambdas/mock_scihub_search_api install
- name: Install requeuer dependencies
if: steps.requeuer-cache.outputs.cache-hit != 'true'
run: |
make -C lambdas/requeuer install
- name: Install db dependencies
# if: steps.db-cache.outputs.cache-hit != 'true'
run: |
Expand All @@ -281,6 +307,12 @@ jobs:
PG_DB="test-db"
AWS_DEFAULT_REGION="us-east-1"
EOF
cat <<EOF >> lambdas/requeuer/.env
PG_PASSWORD="test-pass"
PG_USER="test-user"
PG_DB="test-db"
AWS_DEFAULT_REGION="us-east-1"
EOF
cat <<EOF >> layers/db/.env
PG_PASSWORD="test-pass"
PG_USER="test-user"
Expand Down
19 changes: 19 additions & 0 deletions .github/workflows/deploy-on-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
echo "downloader=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/downloader/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubproductapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_product_api/Pipfile)" >> $GITHUB_OUTPUT
echo "mockscihubsearchapi=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/mock_scihub_search_api/Pipfile)" >> $GITHUB_OUTPUT
echo "requeuer=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/lambdas/requeuer/Pipfile)" >> $GITHUB_OUTPUT
# echo "db=$(python -c 'import sys; import base64; import hashlib; print(base64.urlsafe_b64encode(hashlib.sha256(sys.argv[-1].encode()).digest()[:6]).decode()[:8])' $(pwd)/layers/db/Pipfile)" >> $GITHUB_OUTPUT
- name: Setup root cache
Expand Down Expand Up @@ -84,6 +85,13 @@ jobs:
path: /home/runner/.local/share/virtualenvs/mock_scihub_search_api-${{ steps.hashes.outputs.mockscihubsearchapi }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/mock_scihub_search_api/Pipfile.lock') }}

- name: Setup requeuer cache
uses: actions/cache@v3
id: requeuer-cache
with:
path: /home/runner/.local/share/virtualenvs/requeuer-${{ steps.hashes.outputs.requeuer }}
key: ${{ hashFiles('/home/runner/work/hls-sentinel2-downloader-serverless/hls-sentinel2-downloader-serverless/lambdas/requeuer/Pipfile.lock') }}

# - name: Setup db cache
# uses: actions/cache@v3
# id: db-cache
Expand Down Expand Up @@ -126,6 +134,11 @@ jobs:
run: |
make -C lambdas/mock_scihub_search_api install
- name: Install requeuer dependencies
if: steps.requeuer-cache.outputs.cache-hit != 'true'
run: |
make -C lambdas/requeuer install
- name: Install db dependencies
# if: steps.db-cache.outputs.cache-hit != 'true'
run: |
Expand All @@ -144,6 +157,12 @@ jobs:
PG_DB="test-db"
AWS_DEFAULT_REGION="us-east-1"
EOF
cat <<EOF >> lambdas/requeuer/.env
PG_PASSWORD="test-pass"
PG_USER="test-user"
PG_DB="test-db"
AWS_DEFAULT_REGION="us-east-1"
EOF
cat <<EOF >> layers/db/.env
PG_PASSWORD="test-pass"
PG_USER="test-user"
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ clean:
$(MAKE) -C lambdas/downloader clean
$(MAKE) -C lambdas/mock_scihub_search_api clean
$(MAKE) -C lambdas/mock_scihub_product_api clean
$(MAKE) -C lambdas/requeuer clean
$(MAKE) -C alembic_migration clean

install:
Expand All @@ -32,6 +33,7 @@ install:
$(MAKE) -C lambdas/downloader install
$(MAKE) -C lambdas/mock_scihub_product_api install
$(MAKE) -C lambdas/mock_scihub_search_api install
$(MAKE) -C lambdas/requeuer install
$(MAKE) -C layers/db install

lint:
Expand All @@ -43,6 +45,7 @@ lint:
$(MAKE) -C lambdas/downloader lint
$(MAKE) -C lambdas/mock_scihub_search_api lint
$(MAKE) -C lambdas/mock_scihub_product_api lint
$(MAKE) -C lambdas/requeuer lint
$(MAKE) -C layers/db lint
$(MAKE) -C alembic_migration lint

Expand All @@ -54,6 +57,7 @@ format:
$(MAKE) -C lambdas/downloader format
$(MAKE) -C lambdas/mock_scihub_search_api format
$(MAKE) -C lambdas/mock_scihub_product_api format
$(MAKE) -C lambdas/requeuer format
$(MAKE) -C layers/db format
$(MAKE) -C alembic_migration format

Expand Down Expand Up @@ -81,6 +85,7 @@ unit-tests:
$(MAKE) -C lambdas/downloader test
$(MAKE) -C lambdas/mock_scihub_search_api test
$(MAKE) -C lambdas/mock_scihub_product_api test
$(MAKE) -C lambdas/requeuer test
$(MAKE) -C layers/db test
$(MAKE) -C alembic_migration test

Expand Down
80 changes: 72 additions & 8 deletions cdk/downloader_stack.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Sequence

from aws_cdk import (
aws_cloudwatch,
Expand Down Expand Up @@ -93,11 +93,15 @@ def __init__(
if removal_policy_destroy
else core.RemovalPolicy.RETAIN,
)
downloader_rds_secret = downloader_rds.secret

# Make static type checkers happy
assert downloader_rds_secret

aws_ssm.StringParameter(
self,
id=f"{identifier}-downloader-rds-secret-arn",
string_value=downloader_rds.secret.secret_arn,
string_value=downloader_rds_secret.secret_arn,
parameter_name=f"/integration_tests/{identifier}/downloader_rds_secret_arn",
)

Expand Down Expand Up @@ -163,10 +167,10 @@ def __init__(
db_layer,
psycopg2_layer,
],
environment={"DB_CONNECTION_SECRET_ARN": downloader_rds.secret.secret_arn},
environment={"DB_CONNECTION_SECRET_ARN": downloader_rds_secret.secret_arn},
)

downloader_rds.secret.grant_read(migration_function)
downloader_rds_secret.grant_read(migration_function)

core.CustomResource(
self,
Expand Down Expand Up @@ -224,7 +228,7 @@ def __init__(
link_fetcher_environment_vars = {
"STAGE": identifier,
"TO_DOWNLOAD_SQS_QUEUE_URL": to_download_queue.queue_url,
"DB_CONNECTION_SECRET_ARN": downloader_rds.secret.secret_arn,
"DB_CONNECTION_SECRET_ARN": downloader_rds_secret.secret_arn,
**({"SEARCH_URL": search_url} if search_url else {}),
}

Expand Down Expand Up @@ -280,7 +284,7 @@ def __init__(

downloader_environment_vars = {
"STAGE": identifier,
"DB_CONNECTION_SECRET_ARN": downloader_rds.secret.secret_arn,
"DB_CONNECTION_SECRET_ARN": downloader_rds_secret.secret_arn,
"UPLOAD_BUCKET": upload_bucket,
"USE_INTHUB2": "YES" if use_inthub2 else "NO",
**({"COPERNICUS_ZIPPER_URL": zipper_url} if zipper_url else {}),
Expand Down Expand Up @@ -336,8 +340,8 @@ def __init__(

self.downloader.role.add_managed_policy(lambda_insights_policy)

downloader_rds.secret.grant_read(link_fetcher)
downloader_rds.secret.grant_read(self.downloader)
downloader_rds_secret.grant_read(link_fetcher)
downloader_rds_secret.grant_read(self.downloader)

scihub_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
Expand Down Expand Up @@ -457,3 +461,63 @@ def __init__(
id=f"{identifier}-link-fetch-rule",
schedule=aws_events.Schedule.expression("cron(0 12 * * ? *)"),
).add_target(aws_events_targets.SfnStateMachine(link_fetcher_step_function))

add_requeuer(
self,
identifier=identifier,
secret=downloader_rds_secret,
layers=[db_layer, psycopg2_layer],
queue=to_download_queue,
removal_policy_destroy=removal_policy_destroy,
)


def add_requeuer(
scope: core.Construct,
*,
identifier: str,
layers: Sequence[aws_lambda.ILayerVersion],
removal_policy_destroy: bool,
secret: aws_secretsmanager.ISecret,
queue: aws_sqs.Queue,
) -> None:
# Requeuer Lambda function for manually requeuing undownloaded granules for
# a given date.
requeuer = aws_lambda_python.PythonFunction(
scope,
id=f"{identifier}-requeuer",
entry="lambdas/requeuer",
index="handler.py",
handler="handler",
layers=layers,
memory_size=200,
timeout=core.Duration.minutes(15),
runtime=aws_lambda.Runtime.PYTHON_3_8,
environment={
"STAGE": identifier,
"TO_DOWNLOAD_SQS_QUEUE_URL": queue.queue_url,
"DB_CONNECTION_SECRET_ARN": secret.secret_arn,
},
)

aws_logs.LogGroup(
scope,
id=f"{identifier}-requeuer-log-group",
log_group_name=f"/aws/lambda/{requeuer.function_name}",
removal_policy=core.RemovalPolicy.DESTROY
if removal_policy_destroy
else core.RemovalPolicy.RETAIN,
retention=aws_logs.RetentionDays.ONE_DAY
if removal_policy_destroy
else aws_logs.RetentionDays.TWO_WEEKS,
)

secret.grant_read(requeuer)
queue.grant_send_messages(requeuer)

core.CfnOutput(
scope,
id=f"{identifier}-requeuer-function-name",
value=requeuer.function_name,
export_name=f"{identifier}-requeuer-function-name",
)
19 changes: 19 additions & 0 deletions lambdas/requeuer/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.PHONEY: clean install lint format test

clean:
pipenv --rm || true

install:
pipenv install --dev

lint:
pipenv run flake8 .
pipenv run isort --check-only --profile black .
pipenv run black --check --diff .

format:
pipenv run isort --profile black *.py tests/
pipenv run black *.py tests/

test:
pipenv run pytest -v --cov=handler --cov-report term-missing tests/
25 changes: 25 additions & 0 deletions lambdas/requeuer/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
boto3 = "*"
iso8601 = "*"

[dev-packages]
alembic = "*"
black = "*"
boto3-stubs = {extras = ["lambda", "sqs", "secretsmanager"], version = "*"}
db = {editable = true, path = "./../../layers/db"}
flake8 = "*"
isort = "*"
moto = "*"
mypy = "*"
psycopg2 = "*"
pytest = "*"
pytest-cov = "*"
pytest-docker = "*"

[requires]
python_version = "3.8"
Loading

0 comments on commit 5f10f4c

Please sign in to comment.