Skip to content

Commit

Permalink
Merge pull request #8 from opencost/atm/dd-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
cliffcolvin authored Mar 7, 2024
2 parents 0ba2a63 + a74596e commit 0c240d0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 40 deletions.
44 changes: 22 additions & 22 deletions datadog/cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/hashicorp/go-plugin"
datadogplugin "github.com/opencost/opencost-plugins/datadog/datadogplugin"
"github.com/opencost/opencost/core/pkg/log"
"github.com/opencost/opencost/core/pkg/model"
"github.com/opencost/opencost/core/pkg/model/pb"
"github.com/opencost/opencost/core/pkg/opencost"
ocplugin "github.com/opencost/opencost/core/pkg/plugin"
)
Expand All @@ -43,14 +44,14 @@ type DatadogCostSource struct {
rateLimiter *rate.Limiter
}

func (d *DatadogCostSource) GetCustomCosts(req model.CustomCostRequestInterface) []model.CustomCostResponse {
results := []model.CustomCostResponse{}
func (d *DatadogCostSource) GetCustomCosts(req pb.CustomCostRequest) []pb.CustomCostResponse {
results := []pb.CustomCostResponse{}

targets, err := opencost.GetWindows(*req.GetTargetWindow().Start(), *req.GetTargetWindow().End(), req.GetTargetResolution())
targets, err := opencost.GetWindows(req.Start.AsTime(), req.End.AsTime(), req.Resolution.AsDuration())
if err != nil {
log.Errorf("error getting windows: %v", err)
errResp := model.CustomCostResponse{
Errors: []error{err},
errResp := pb.CustomCostResponse{
Errors: []string{fmt.Sprintf("error getting windows: %v", err)},
}
results = append(results, errResp)
return results
Expand All @@ -60,8 +61,8 @@ func (d *DatadogCostSource) GetCustomCosts(req model.CustomCostRequestInterface)
listPricing, err := scrapeDatadogPrices(url)
if err != nil {
log.Errorf("error getting dd pricing: %v", err)
errResp := model.CustomCostResponse{
Errors: []error{err},
errResp := pb.CustomCostResponse{
Errors: []string{fmt.Sprintf("error getting dd pricing: %v", err)},
}
results = append(results, errResp)
return results
Expand Down Expand Up @@ -105,22 +106,24 @@ func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
GRPCServer: plugin.DefaultGRPCServer,
})
}

func boilerplateDDCustomCost(win opencost.Window) model.CustomCostResponse {
return model.CustomCostResponse{
func boilerplateDDCustomCost(win opencost.Window) pb.CustomCostResponse {
return pb.CustomCostResponse{
Metadata: map[string]string{"api_client_version": "v2"},
Costsource: "observability",
CostSource: "observability",
Domain: "datadog",
Version: "v1",
Currency: "USD",
Window: win,
Errors: []error{},
Costs: []*model.CustomCost{},
Start: timestamppb.New(*win.Start()),
End: timestamppb.New(*win.End()),
Errors: []string{},
Costs: []*pb.CustomCost{},
}
}
func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPricing *datadogplugin.PricingInformation) model.CustomCostResponse {
func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPricing *datadogplugin.PricingInformation) pb.CustomCostResponse {
ccResp := boilerplateDDCustomCost(window)
params := datadogV2.NewGetHourlyUsageOptionalParameters()
params.FilterTimestampEnd = window.End()
Expand All @@ -137,21 +140,19 @@ func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPric
err := d.rateLimiter.Wait(context.TODO())
if err != nil {
log.Errorf("error waiting on rate limiter`: %v\n", err)
ccResp.Errors = append(ccResp.Errors, err)
ccResp.Errors = append(ccResp.Errors, err.Error())
return ccResp
}


resp, r, err := d.usageApi.GetHourlyUsage(d.ddCtx, *window.Start(), "all", *params)
if err != nil {
log.Errorf("Error when calling `UsageMeteringApi.GetHourlyUsage`: %v\n", err)
log.Errorf("Full HTTP response: %v\n", r)
ccResp.Errors = append(ccResp.Errors, err)
ccResp.Errors = append(ccResp.Errors, err.Error())
}

for _, hourlyUsageData := range resp.Data {
for _, meas := range hourlyUsageData.Attributes.Measurements {
clonedPtr := window.Clone()
usageQty := float32(0.0)

if meas.Value.IsSet() {
Expand All @@ -165,7 +166,7 @@ func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPric

desc, usageUnit, pricing, currency := getListingInfo(*hourlyUsageData.Attributes.ProductFamily, *meas.UsageType, listPricing)
ccResp.Currency = currency
cost := model.CustomCost{
cost := pb.CustomCost{
Zone: *hourlyUsageData.Attributes.Region,
AccountName: *hourlyUsageData.Attributes.OrgName,
ChargeCategory: "usage",
Expand All @@ -174,11 +175,10 @@ func (d *DatadogCostSource) getDDCostsForWindow(window opencost.Window, listPric
ResourceType: *hourlyUsageData.Attributes.ProductFamily,
Id: *hourlyUsageData.Id,
ProviderId: *hourlyUsageData.Attributes.PublicId + "/" + *meas.UsageType,
Window: &clonedPtr,
Labels: map[string]string{},
ListCost: usageQty * pricing,
ListUnitPrice: pricing,
UsageQty: usageQty,
UsageQuantity: usageQty,
UsageUnit: usageUnit,
ExtendedAttributes: nil,
}
Expand Down
14 changes: 8 additions & 6 deletions datadog/tests/datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
datadogplugin "github.com/opencost/opencost-plugins/datadog/datadogplugin"
harness "github.com/opencost/opencost-plugins/test/pkg/harness"
"github.com/opencost/opencost/core/pkg/log"
"github.com/opencost/opencost/core/pkg/model"
"github.com/opencost/opencost/core/pkg/opencost"
"github.com/opencost/opencost/core/pkg/model/pb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

// this test gets DD keys from env vars, writes a config file
Expand Down Expand Up @@ -75,10 +76,11 @@ func TestDDCostRetrieval(t *testing.T) {
windowStart := time.Date(2024, 2, 27, 0, 0, 0, 0, time.UTC)
// query for qty 2 of 1 hour windows
windowEnd := time.Date(2024, 2, 27, 2, 0, 0, 0, time.UTC)
win := opencost.NewClosedWindow(windowStart, windowEnd)
req := model.CustomCostRequest{
TargetWindow: &win,
Resolution: time.Hour,

req := pb.CustomCostRequest{
Start: timestamppb.New(windowStart),
End: timestamppb.New(windowEnd),
Resolution: durationpb.New(time.Hour),
}
response := harness.InvokePlugin(file.Name(), pluginFile, req)

Expand Down
25 changes: 13 additions & 12 deletions test/pkg/harness/harness.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package harness

import (
"log"
"os"
"os/exec"
"path"
"strings"

"github.com/opencost/opencost/core/pkg/log"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/opencost/opencost/core/pkg/model"
"github.com/opencost/opencost/core/pkg/model/pb"
ocplugin "github.com/opencost/opencost/core/pkg/plugin"
)

// the test harness is designed to run plugins locally, and return the results
// the harness expects to be given a path to a valid config, and a path to a plugin implementation
// it does not run binaries, and instead uses go run
func InvokePlugin(pathToConfig, pathToPluginSrc string, req model.CustomCostRequest) []model.CustomCostResponse {
func InvokePlugin(pathToConfig, pathToPluginSrc string, req pb.CustomCostRequest) []pb.CustomCostResponse {
filename := path.Base(pathToConfig)
pluginName := strings.Split(filename, "_")[0]
// Create an hclog.Logger
Expand All @@ -36,28 +37,28 @@ func InvokePlugin(pathToConfig, pathToPluginSrc string, req model.CustomCostRequ
}
// We're a host! Start by launching the plugin process.
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
Cmd: exec.Command("go", "run", pathToPluginSrc, pathToConfig),
Logger: logger,
HandshakeConfig: handshakeConfig,
Plugins: pluginMap,
Cmd: exec.Command("go", "run", pathToPluginSrc, pathToConfig),
Logger: logger,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
})

defer client.Kill()

//log.Infof("got protocol: %s", client.Protocol())
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
log.Fatal(err)
log.Fatalf(err.Error())
}

// Request the plugin
raw, err := rpcClient.Dispense("CustomCostSource")
if err != nil {
log.Fatal(err)
log.Fatalf(err.Error())
}

src := raw.(ocplugin.CustomCostSource)
var iface model.CustomCostRequestInterface = &req
resp := src.GetCustomCosts(iface)
resp := src.GetCustomCosts(req)
return resp
}

0 comments on commit 0c240d0

Please sign in to comment.