-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHope Azure Cost Analysis Fabric Refine.txt
54 lines (40 loc) · 2.55 KB
/
Hope Azure Cost Analysis Fabric Refine.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
refinestarter = '202406'
relative_path = "Files/raw/azurecosts/"
filetype = '*.snappy.parquet'
table_name = 'HopeAzureCosts'
formatted_date = f"{refinestarter[:4]}/{refinestarter[4:]}/"
print(formatted_date)
fullpath = relative_path+formatted_date+filetype
print(fullpath)
#Easy button generated code tweaked to read parquet files in location above
df = spark.read.parquet(fullpath)
# df now is a Spark DataFrame containing parquet data from "Files/refined/azurecosts/2024/08/part-00000-144dc673-7c07-4f49-8015-c0126f629ac7-c000.snappy.parquet".
#display(df)
display(df.head(10))
from pyspark.sql.functions import split
from datetime import datetime
dfrefine = df.withColumn("BillingPeriodStartDate", split(df["BillingPeriodStart"], "T").getItem(0)) \
.withColumn("BillingPeriodStartTime", split(split(df["BillingPeriodStart"], "T").getItem(1), "Z").getItem(0)) \
.withColumn("BillingPeriodEndDate", split(df["BillingPeriodEnd"], "T").getItem(0)) \
.withColumn("BillingPeriodEndTime", split(split(df["BillingPeriodEnd"], "T").getItem(1), "Z").getItem(0)) \
.withColumn("ChargePeriodStartDate", split(df["ChargePeriodStart"], "T").getItem(0)) \
.withColumn("ChargePeriodStartTime", split(split(df["ChargePeriodStart"], "T").getItem(1), "Z").getItem(0))\
.withColumn("ChargePeriodEndDate", split(df["ChargePeriodEnd"], "T").getItem(0)) \
.withColumn("ChargePeriodEndTime", split(split(df["ChargePeriodEnd"], "T").getItem(1), "Z").getItem(0))
display(dfrefine.head(10))
selected_columns = ["BilledCost", "BillingAccountId", "BillingAccountName", "BillingPeriodStart",
"BillingPeriodStartDate", "BillingPeriodStartTime", "BillingPeriodEnd", "BillingPeriodEndDate",
"BillingPeriodEndTime", "ChargePeriodStart", "ChargePeriodStartDate", "ChargePeriodStartTime",
"ChargePeriodEnd", "ChargePeriodEndDate", "ChargePeriodEndTime", "ConsumedQuantity", "ConsumedUnit",
"ContractedCost", "ContractedUnitPrice", "EffectiveCost", "ListCost", "ListUnitPrice", "PricingUnit",
"RegionName", "ResourceId", "ResourceName", "ServiceCategory", "ServiceName", "SubAccountId", "SubAccountName",
"Tags"
]
# Create a new DataFrame with only the selected columns
slimdf = dfrefine.select(*selected_columns)
# Show the new DataFrame
display(slimdf.head(10))
record_count = slimdf.count()
print(f"Number of records: {record_count}")
#df.write.format('delta').mode('overwrite').saveAsTable(table_name)
slimdf.write.format('delta').mode('append').saveAsTable(table_name)