Skip to content

Commit

Permalink
Merge pull request awslabs#63 from tiratatp/bug/go_ingestion
Browse files Browse the repository at this point in the history
Ingest the remaining records outside the main loop
  • Loading branch information
tiratatp authored Apr 8, 2021
2 parents 486743e + c4d99a2 commit 1bf87d3
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions sample_apps/go/ingestion-csv-sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"encoding/csv"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/timestreamquery"
"github.com/aws/aws-sdk-go/service/timestreamwrite"
"io"
"net"
"net/http"
"os"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/timestreamquery"
"github.com/aws/aws-sdk-go/service/timestreamwrite"

"golang.org/x/net/http2"
)

Expand Down Expand Up @@ -86,8 +87,7 @@ func main() {
_, err = writeSvc.CreateDatabase(createDatabaseInput)

if err != nil {
fmt.Println("Error:")
fmt.Println(err)
panic(fmt.Sprintf("Error while creating database: %s", err))
}
}
} else {
Expand Down Expand Up @@ -117,8 +117,7 @@ func main() {
_, err = writeSvc.CreateTable(createTableInput)

if err != nil {
fmt.Println("Error:")
fmt.Println(err)
panic(fmt.Sprintf("Error while creating table: %s", err))
}
}
} else {
Expand All @@ -129,7 +128,7 @@ func main() {
csvfile, err := os.Open(*testFileName)
records := make([]*timestreamwrite.Record, 0)
if err != nil {
fmt.Println("Couldn't open the csv file", err)
panic(fmt.Sprintf("Couldn't open the csv file %s", err))
}

// Get current time in nano seconds.
Expand Down Expand Up @@ -172,24 +171,17 @@ func main() {
counter++
// WriteRecordsRequest has 100 records limit per request.
if counter%100 == 0 {
writeRecordsInput := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(*databaseName),
TableName: aws.String(*tableName),
Records: records,
}
_, err = writeSvc.WriteRecords(writeRecordsInput)

if err != nil {
fmt.Println("Error:")
fmt.Println(err)
} else {
fmt.Print("Ingested ", counter)
fmt.Println(" records to the table.")
}
writeBatch(writeSvc, *databaseName, *tableName, records)
records = make([]*timestreamwrite.Record, 0)
fmt.Printf("Ingested %d records to the table.\n", counter)
}
}

if len(records) > 0 {
writeBatch(writeSvc, *databaseName, *tableName, records)
fmt.Printf("Ingested %d records to the table.\n", counter)
}

queryInput := &timestreamquery.QueryInput{
QueryString: aws.String("select count(*) from " + *databaseName + "." + *tableName),
}
Expand All @@ -203,3 +195,16 @@ func main() {
fmt.Println(query)
}
}

func writeBatch(client *timestreamwrite.TimestreamWrite, databaseName string, tableName string, records []*timestreamwrite.Record) {
writeRecordsInput := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(databaseName),
TableName: aws.String(tableName),
Records: records,
}
_, err := client.WriteRecords(writeRecordsInput)

if err != nil {
panic(fmt.Sprintf("Error while ingesting: %s", err))
}
}

0 comments on commit 1bf87d3

Please sign in to comment.