diff --git a/sample_apps/go/ingestion-csv-sample.go b/sample_apps/go/ingestion-csv-sample.go index ab3ba5d9..41192952 100644 --- a/sample_apps/go/ingestion-csv-sample.go +++ b/sample_apps/go/ingestion-csv-sample.go @@ -4,10 +4,6 @@ import ( "encoding/csv" "flag" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/timestreamquery" - "github.com/aws/aws-sdk-go/service/timestreamwrite" "io" "net" "net/http" @@ -15,6 +11,11 @@ import ( "strconv" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/timestreamquery" + "github.com/aws/aws-sdk-go/service/timestreamwrite" + "golang.org/x/net/http2" ) @@ -86,8 +87,7 @@ func main() { _, err = writeSvc.CreateDatabase(createDatabaseInput) if err != nil { - fmt.Println("Error:") - fmt.Println(err) + panic(fmt.Sprintf("Error while creating database: %s", err)) } } } else { @@ -117,8 +117,7 @@ func main() { _, err = writeSvc.CreateTable(createTableInput) if err != nil { - fmt.Println("Error:") - fmt.Println(err) + panic(fmt.Sprintf("Error while creating table: %s", err)) } } } else { @@ -129,7 +128,7 @@ func main() { csvfile, err := os.Open(*testFileName) records := make([]*timestreamwrite.Record, 0) if err != nil { - fmt.Println("Couldn't open the csv file", err) + panic(fmt.Sprintf("Couldn't open the csv file %s", err)) } // Get current time in nano seconds. @@ -172,24 +171,17 @@ func main() { counter++ // WriteRecordsRequest has 100 records limit per request. if counter%100 == 0 { - writeRecordsInput := ×treamwrite.WriteRecordsInput{ - DatabaseName: aws.String(*databaseName), - TableName: aws.String(*tableName), - Records: records, - } - _, err = writeSvc.WriteRecords(writeRecordsInput) - - if err != nil { - fmt.Println("Error:") - fmt.Println(err) - } else { - fmt.Print("Ingested ", counter) - fmt.Println(" records to the table.") - } + writeBatch(writeSvc, *databaseName, *tableName, records) records = make([]*timestreamwrite.Record, 0) + fmt.Printf("Ingested %d records to the table.\n", counter) } } + if len(records) > 0 { + writeBatch(writeSvc, *databaseName, *tableName, records) + fmt.Printf("Ingested %d records to the table.\n", counter) + } + queryInput := ×treamquery.QueryInput{ QueryString: aws.String("select count(*) from " + *databaseName + "." + *tableName), } @@ -203,3 +195,16 @@ func main() { fmt.Println(query) } } + +func writeBatch(client *timestreamwrite.TimestreamWrite, databaseName string, tableName string, records []*timestreamwrite.Record) { + writeRecordsInput := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(databaseName), + TableName: aws.String(tableName), + Records: records, + } + _, err := client.WriteRecords(writeRecordsInput) + + if err != nil { + panic(fmt.Sprintf("Error while ingesting: %s", err)) + } +}