Skip to content

Commit

Permalink
Merge pull request awslabs#61 from manalishah/manashah
Browse files Browse the repository at this point in the history
Add support for endpoint url in continous ingestor.
  • Loading branch information
tiratatp authored Feb 18, 2021
2 parents 18de7d1 + 2fb3f86 commit 15f860a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
12 changes: 8 additions & 4 deletions tools/continuous-ingestor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ A script to generate a continuous stream of records that are ingested into Times
----
## How to use it

1. Install and configure Boto3 set up following the instructions at https://boto3.amazonaws.com/v1/documentation/api/latest/index.html

1. Install numpy
1. Requires the latest boto and numpy packages. Install requirements
```
pip3 install numpy
pip3 install -r requirements.txt
```

1. Run the following command to continuously generate and ingest sample data into Timestream.
Expand All @@ -36,6 +34,12 @@ Starts a single-threaded ingest process the continues until SIGINT signal (CTRL
python3 timestream_sample_continuous_data_ingestor_application.py -c 1 --host-scale 1 -d testDb -t testTable -e 'us-east-1'
```
#### Single-threaded ingest to specified endpoint
Starts a single-threaded ingest process the continues until SIGINT signal (CTRL + C) is received.
```
python3 timestream_sample_continuous_data_ingestor_application.py -c 1 --host-scale 1 -d testDb -t testTable -e 'us-east-1' -url 'https://ingest-cell2.timestream.us-east-1.amazonaws.com'

```
#### Concurrent ingest
Starts a multi-threaded ingest process the continues until SIGINT signal (CTRL + C) is received. The number of threads is controlled by the option -c or --concurrency.
Expand Down
2 changes: 2 additions & 0 deletions tools/continuous-ingestor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
numpy
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args):
#########################################
######### Timestream API calls ##########
#########################################
def createWriteClient(region, profile = None):
def createWriteClient(region, endpoint_url, profile = None):
if profile == None:
print("Using credentials from the environment")

Expand All @@ -362,11 +362,11 @@ def createWriteClient(region, profile = None):
if profile != None:
session = boto3.Session(profile_name = profile)
client = session.client(service_name = 'timestream-write',
region_name = region, config = config)
region_name = region, endpoint_url=endpoint_url, config = config)
else:
session = boto3.Session()
client = session.client(service_name = 'timestream-write',
region_name = region, config = config)
region_name = region, endpoint_url=endpoint_url, config = config)
return client

def describeTable(client, databaseName, tableName):
Expand All @@ -387,7 +387,8 @@ def writeRecords(client, databaseName, tableName, commonAttributes, records):

parser.add_argument('--database-name', '-d', dest="databaseName", action = "store", required = True, help = "The database name in Amazon Timestream - must be already created.")
parser.add_argument('--table-name', '-t', dest="tableName", action = "store", required = True, help = "The table name in Amazon Timestream - must be already created.")
parser.add_argument('--endpoint', '-e', action = "store", required = True, help="Specify the service region endpoint. E.g. 'us-east-1'")
parser.add_argument('--endpoint', '-e', action = "store", required = True, help="Specify the service region. E.g. 'us-east-1'")
parser.add_argument('--endpoint-url', '-url', action = "store", required = False, help="Specify the service endpoint url that you have been mapped to. E.g. 'https://ingest-cell2.timestream.us-east-1.amazonaws.com'")
parser.add_argument('--concurrency', '-c', action = "store", type = int, default = 30, help = "Number of concurrent ingestion threads (default: 1)")
parser.add_argument('--host-scale', dest = "hostScale", action = "store", type = int, default = 10, help = "The scale factor that determines the number of hosts emitting events and metrics (default: 1).")
parser.add_argument('--profile', action = "store", type = str, default= None, help = "The AWS Config profile to use.")
Expand All @@ -412,11 +413,11 @@ def writeRecords(client, databaseName, tableName, commonAttributes, records):

## Verify the table
try:
tsClient = createWriteClient(args.endpoint, profile=args.profile)
tsClient = createWriteClient(args.endpoint, args.endpoint_url, profile=args.profile)
describeTable(tsClient, args.databaseName, args.tableName)
except Exception as e:
print(e)
sys.exit(0)

## Run the ingestion load.
ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args)
ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args)

0 comments on commit 15f860a

Please sign in to comment.