From f81b5130e89181f11d196aca8dccf745aab1b6e4 Mon Sep 17 00:00:00 2001 From: Manali Shah Date: Thu, 18 Feb 2021 11:36:55 -0800 Subject: [PATCH 1/2] Add support for endpoint url in continous ingestor. --- ...m_sample_continuous_data_ingestor_application.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py b/tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py index 63ffb180..2de99ace 100644 --- a/tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py +++ b/tools/continuous-ingestor/timestream_sample_continuous_data_ingestor_application.py @@ -353,7 +353,7 @@ def ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args): ######################################### ######### Timestream API calls ########## ######################################### -def createWriteClient(region, profile = None): +def createWriteClient(region, endpoint_url, profile = None): if profile == None: print("Using credentials from the environment") @@ -362,11 +362,11 @@ def createWriteClient(region, profile = None): if profile != None: session = boto3.Session(profile_name = profile) client = session.client(service_name = 'timestream-write', - region_name = region, config = config) + region_name = region, endpoint_url=endpoint_url, config = config) else: session = boto3.Session() client = session.client(service_name = 'timestream-write', - region_name = region, config = config) + region_name = region, endpoint_url=endpoint_url, config = config) return client def describeTable(client, databaseName, tableName): @@ -387,7 +387,8 @@ def writeRecords(client, databaseName, tableName, commonAttributes, records): parser.add_argument('--database-name', '-d', dest="databaseName", action = "store", required = True, help = "The database name in Amazon Timestream - must be already created.") parser.add_argument('--table-name', '-t', dest="tableName", action = "store", required = True, help = "The table name in Amazon Timestream - must be already created.") - parser.add_argument('--endpoint', '-e', action = "store", required = True, help="Specify the service region endpoint. E.g. 'us-east-1'") + parser.add_argument('--endpoint', '-e', action = "store", required = True, help="Specify the service region. E.g. 'us-east-1'") + parser.add_argument('--endpoint-url', '-url', action = "store", required = False, help="Specify the service endpoint url that you have been mapped to. E.g. 'https://ingest-cell2.timestream.us-east-1.amazonaws.com'") parser.add_argument('--concurrency', '-c', action = "store", type = int, default = 30, help = "Number of concurrent ingestion threads (default: 1)") parser.add_argument('--host-scale', dest = "hostScale", action = "store", type = int, default = 10, help = "The scale factor that determines the number of hosts emitting events and metrics (default: 1).") parser.add_argument('--profile', action = "store", type = str, default= None, help = "The AWS Config profile to use.") @@ -412,11 +413,11 @@ def writeRecords(client, databaseName, tableName, commonAttributes, records): ## Verify the table try: - tsClient = createWriteClient(args.endpoint, profile=args.profile) + tsClient = createWriteClient(args.endpoint, args.endpoint_url, profile=args.profile) describeTable(tsClient, args.databaseName, args.tableName) except Exception as e: print(e) sys.exit(0) ## Run the ingestion load. - ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args) \ No newline at end of file + ingestRecords(tsClient, dimensionsMetrics, dimensionsEvents, args) From 2fb3f86242b48966e6ffa3595117c44082e7ae09 Mon Sep 17 00:00:00 2001 From: Manali Shah Date: Thu, 18 Feb 2021 13:15:16 -0800 Subject: [PATCH 2/2] Install prerequisites through requirements and update readme --- tools/continuous-ingestor/README.md | 12 ++++++++---- tools/continuous-ingestor/requirements.txt | 2 ++ 2 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 tools/continuous-ingestor/requirements.txt diff --git a/tools/continuous-ingestor/README.md b/tools/continuous-ingestor/README.md index e9c0ce09..75b2b96a 100644 --- a/tools/continuous-ingestor/README.md +++ b/tools/continuous-ingestor/README.md @@ -11,11 +11,9 @@ A script to generate a continuous stream of records that are ingested into Times ---- ## How to use it -1. Install and configure Boto3 set up following the instructions at https://boto3.amazonaws.com/v1/documentation/api/latest/index.html - -1. Install numpy +1. Requires the latest boto and numpy packages. Install requirements ``` - pip3 install numpy + pip3 install -r requirements.txt ``` 1. Run the following command to continuously generate and ingest sample data into Timestream. @@ -36,6 +34,12 @@ Starts a single-threaded ingest process the continues until SIGINT signal (CTRL python3 timestream_sample_continuous_data_ingestor_application.py -c 1 --host-scale 1 -d testDb -t testTable -e 'us-east-1' ``` +#### Single-threaded ingest to specified endpoint +Starts a single-threaded ingest process the continues until SIGINT signal (CTRL + C) is received. +``` +python3 timestream_sample_continuous_data_ingestor_application.py -c 1 --host-scale 1 -d testDb -t testTable -e 'us-east-1' -url 'https://ingest-cell2.timestream.us-east-1.amazonaws.com' + +``` #### Concurrent ingest Starts a multi-threaded ingest process the continues until SIGINT signal (CTRL + C) is received. The number of threads is controlled by the option -c or --concurrency. diff --git a/tools/continuous-ingestor/requirements.txt b/tools/continuous-ingestor/requirements.txt new file mode 100644 index 00000000..dd868d2e --- /dev/null +++ b/tools/continuous-ingestor/requirements.txt @@ -0,0 +1,2 @@ +boto3 +numpy