Skip to content

Commit

Permalink
Adds duplication detection via DynamoDB with policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattdonders committed Dec 30, 2019
1 parent a99ab82 commit 1cfa805
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 5 deletions.
24 changes: 24 additions & 0 deletions dynamodb-policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:PutItem",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:<region>:<account>:table/<table-name>"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "dynamodb:ListTables",
"Resource": "*"
}
]
}
40 changes: 36 additions & 4 deletions shotmaps_gamescraper/lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ def get_game_id(event: dict):
sns_event = sns_record.get("Sns")
topic = sns_event.get("TopicArn")

# Set TESTING global based on topic name
TESTING = True if topic in TEST_TOPICS else False

msg = sns_event.get("Message")
msg_json = json.loads(msg)
game_id = msg_json.get("gamePk")
Expand Down Expand Up @@ -82,6 +79,28 @@ def get_game_id(event: dict):
return {"status": True, "game_id": game_id}


def check_db_for_last_period(game_id):
dynamo_client = boto3_client("dynamodb")
response = dynamo_client.get_item(
TableName='nhl-shotmaps-tracking',
Key={'gamePk': {'N': game_id}}
)

try:
item = response['Item']
last_period_processed = int(item['lastPeriodProcessed']['N'])
except KeyError:
logging.info("NEW Game Detected - record does not exist yet.")
last_period_processed = 0

return last_period_processed


def is_event_period_newer(game_id, event_period):
db_last_period = check_db_for_last_period(game_id)
return bool(event_period > db_last_period)


def lambda_handler(event, context):
LAMBDA_GENERATOR = os.environ.get("LAMBDA_GENERATOR")
IS_SNS_TRIGGER = bool(event.get("Records"))
Expand All @@ -96,6 +115,7 @@ def lambda_handler(event, context):
msg = json.loads(msg)

game_id = msg['gamePk']
period = msg['play']['about']['period']
goals = msg['play']['about']['goals']
home_score = goals['home']
away_score = goals['away']
Expand All @@ -111,6 +131,18 @@ def lambda_handler(event, context):
# If status is True, set the game_id variable
game_id = game_id_dict["game_id"]

# Check that the event period is greater than the last processed period
is_event_newer = is_event_period_newer(game_id, period)
if not is_event_newer:
logging.error("The event received for %s is not newer than the last "
"event recorded in the database - skip this record.", game_id)

return {
'status': 409,
'body': 'A shotmap was already produced for this event.'
}

# If all of the above checks pass, scrape the game.
scraped_data = hockey_scraper.scrape_games([game_id], False, data_format="Pandas")
pbp = scraped_data.get("pbp")

Expand All @@ -126,7 +158,7 @@ def lambda_handler(event, context):
pbp.columns = map(str.lower, pbp.columns)

pbp_json = pbp.to_json()
payload = {"pbp_json": pbp_json, "testing": TESTING, "home_score": home_score, "away_score": away_score}
payload = {"pbp_json": pbp_json, "game_id": game_id, "testing": TESTING, "home_score": home_score, "away_score": away_score}
small_payload = {"game_id": game_id, "testing": TESTING, "home_score": home_score, "away_score": away_score}

logging.info("Scraping completed. Triggering the generator & twitter Lambda.")
Expand Down
38 changes: 37 additions & 1 deletion shotmaps_generator_sendtweet/lambda_handler.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import datetime
import json
import logging
import math
import os
import time

import requests
import tweepy
import pandas as pd
from boto3 import client as boto3_client

# Custom Imports
import clean_pbp
Expand All @@ -16,6 +19,31 @@

ordinal = lambda n: "%d%s" % (n, "tsnrhtdd"[(math.floor(n / 10) % 10 != 1) * (n % 10 < 4) * n % 10 :: 4])

def db_upsert_event(game_id, event_period):
current_ts = int(time.time())

current_dt = datetime.datetime.fromtimestamp(current_ts)
ttl_dt = current_dt + datetime.timedelta(days=90)
ttl_ts = int(ttl_dt.timestamp())

dynamo_client = boto3_client("dynamodb")
response = dynamo_client.update_item(
TableName='nhl-shotmaps-tracking',
Key={'gamePk': {'N': game_id}},
UpdateExpression="SET lastPeriodProcessed = :period, #ts = :ts, tsPlusTTL = :ts_ttl",
ExpressionAttributeNames={
"#ts": "timestamp"
},
ExpressionAttributeValues={
':period': {'N': str(event_period)},
':ts': {'N': str(current_ts)},
':ts_ttl': {'N': str(ttl_ts)}
},
ReturnValues="ALL_NEW"
)

logging.info("DynamoDB Record Updated: %s", response)


def send_shotmap_discord(testing: bool, images: list, text: str):
""" Takes a completed shotmap path & some text and sends out a message to a Discord webhook.
Expand Down Expand Up @@ -93,7 +121,7 @@ def get_team_from_abbreviation(abbreviation: str):
team_name_dict = {
"NJD": {"team_name": "New Jersey Devils", "short_name": "Devils", "hashtag": "#NJDevils"},
"NYI": {"team_name": "New York Islanders", "short_name": "Islanders", "hashtag": "#Isles"},
"NYR": {"team_name": "New York Rangers", "short_name": "Rangers", "hashtag": "#PlayLikeANewYorker"},
"NYR": {"team_name": "New York Rangers", "short_name": "Rangers", "hashtag": "#NYR"},
"PHI": {"team_name": "Philadelphia Flyers", "short_name": "Flyers", "hashtag": "#FlyOrDie"},
"PIT": {"team_name": "Pittsburgh Penguins", "short_name": "Penguins", "hashtag": "#LetsGoPens"},
"BOS": {"team_name": "Boston Bruins", "short_name": "Bruins", "hashtag": "#NHLBruins"},
Expand Down Expand Up @@ -130,6 +158,8 @@ def get_team_from_abbreviation(abbreviation: str):
def lambda_handler(event, context):
# logging.info(event)
testing = event.get("testing")
game_id = event.get("game_id")
logging

# Get the JSON-serialized DataFrame from the payload & convert back to a DataFrame
pbp_json = event.get("pbp_json")
Expand Down Expand Up @@ -246,3 +276,9 @@ def lambda_handler(event, context):
status = send_shotmap_tweet(testing=testing, images=shotmap_files, tweet_text=tweet_text)
discord_status = send_shotmap_discord(testing=testing, images=shotmap_files, text=tweet_text)

logging.info("Shotmap Text: %s", tweet_text)
logging.info("Twitter Status: %s", status)
logging.info("Discord Status: %s", discord_status)

# Update DynamoDB with last processed period
db_upsert_event(game_id, period)

0 comments on commit 1cfa805

Please sign in to comment.