Skip to content

Commit

Permalink
Merge pull request #112 from InfluxCommunity/fix/103-batch-write-example
Browse files Browse the repository at this point in the history
fix: 103 batch write example

@jstirnaman Concerning the failing test, I see it is failing when run through the IDE.  The relative path to the source csv file cannot be resolved, but it does not fail when running `pytest` from the project root, which is how it gets run in the build.
  • Loading branch information
karel-rehor authored Sep 23, 2024
2 parents 43b8811 + 232b4d3 commit 0303b3f
Show file tree
Hide file tree
Showing 10 changed files with 670 additions and 312 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features

1. [#108](https://github.com/InfluxCommunity/influxdb3-python/pull/108): Better expose access to response headers in `InfluxDBError`. Example `handle_http_error` added.
2. [#112](https://github.com/InfluxCommunity/influxdb3-python/pull/112): Update batching examples, add integration tests of batching.

### Bug Fixes

Expand Down
217 changes: 122 additions & 95 deletions Examples/batching_example.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,141 @@
import datetime
import random
import time

from bson import ObjectId

import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError

from config import Config


class BatchingCallback(object):

def __init__(self):
self.write_status_msg = None
self.write_count = 0
self.retry_count = 0
self.start = time.time_ns()

def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")
self.write_count += 1
self.write_status_msg = f"SUCCESS: {self.write_count} writes"

def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
self.write_status_msg = f"FAILURE - cause: {exception}"

def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


# Creating 5.000 gatewayId values as MongoDB ObjectIDs
gatewayIds = [ObjectId() for x in range(0, 100)]

# Setting decimal precision to 2
precision = 2

# Setting timestamp for first sensor reading
now = datetime.datetime.now()
now = now - datetime.timedelta(days=30)
teststart = datetime.datetime.now()

# InfluxDB connection details
token = ""
org = ""
database = ""
url = "eu-central-1-1.aws.cloud2.influxdata.com"

callback = BatchingCallback()

write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options
)
# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=token,
host=url,
org=org,
database=database, enable_gzip=True, write_client_options=wco) as _client:
# Creating iterator for one hour worth of data (6 sensor readings per
# minute)
for i in range(0, 525600):
# Adding 10 seconds to timestamp of previous sensor reading
now = now + datetime.timedelta(seconds=10)
# Iterating over gateways
for gatewayId in gatewayIds:
# Creating random test data for 12 fields to be stored in
# timeseries database
bcW = random.randrange(1501)
bcWh = round(random.uniform(0, 4.17), precision)
bdW = random.randrange(71)
bdWh = round(random.uniform(0, 0.12), precision)
cPvWh = round(random.uniform(0.51, 27.78), precision)
cW = random.randrange(172, 10001)
cWh = round(random.uniform(0.51, 27.78), precision)
eWh = round(random.uniform(0, 41.67), precision)
iWh = round(random.uniform(0, 16.67), precision)
pW = random.randrange(209, 20001)
pWh = round(random.uniform(0.58, 55.56), precision)
scWh = round(random.uniform(0.58, 55.56), precision)
# Creating point to be ingested into InfluxDB
p = InfluxDBClient3.Point("stream").tag(
"gatewayId",
str(gatewayId)).field(
"bcW",
bcW).field(
"bcWh",
bcWh).field(
"bdW",
bdW).field(
"bdWh",
bdWh).field(
"cPvWh",
cPvWh).field(
"cW",
cW).field(
"cWh",
cWh).field(
"eWh",
eWh).field(
"iWh",
iWh).field(
"pW",
pW).field(
"pWh",
pWh).field(
"scWh",
scWh).time(
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
WritePrecision.S)

# Writing point (InfluxDB automatically batches writes into sets of
# 5k points)
_client.write(record=p)
self.retry_count += 1

def elapsed(self) -> int:
return time.time_ns() - self.start


def main() -> None:
conf = Config()

# Creating 5.000 gatewayId values as MongoDB ObjectIDs
gatewayIds = [ObjectId() for x in range(0, 100)]

# Setting decimal precision to 2
precision = 2

# Setting timestamp for first sensor reading
sample_window_days = 7
now = datetime.datetime.now()
now = now - datetime.timedelta(days=sample_window_days)
target_sample_count = sample_window_days * 24 * 60 * 6

callback = BatchingCallback()

write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
max_close_wait=600_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options)

# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=conf.token,
host=conf.host,
org=conf.org,
database=conf.database,
enable_gzip=True,
write_client_options=wco) as _client:
# Creating iterator for one hour worth of data (6 sensor readings per
# minute)
print(f"Writing {target_sample_count} data points.")
for i in range(0, target_sample_count):
# Adding 10 seconds to timestamp of previous sensor reading
now = now + datetime.timedelta(seconds=10)
# Iterating over gateways
for gatewayId in gatewayIds:
# Creating random test data for 12 fields to be stored in
# timeseries database
bcW = random.randrange(1501)
bcWh = round(random.uniform(0, 4.17), precision)
bdW = random.randrange(71)
bdWh = round(random.uniform(0, 0.12), precision)
cPvWh = round(random.uniform(0.51, 27.78), precision)
cW = random.randrange(172, 10001)
cWh = round(random.uniform(0.51, 27.78), precision)
eWh = round(random.uniform(0, 41.67), precision)
iWh = round(random.uniform(0, 16.67), precision)
pW = random.randrange(209, 20001)
pWh = round(random.uniform(0.58, 55.56), precision)
scWh = round(random.uniform(0.58, 55.56), precision)
# Creating point to be ingested into InfluxDB
p = InfluxDBClient3.Point("stream").tag(
"gatewayId",
str(gatewayId)).field(
"bcW",
bcW).field(
"bcWh",
bcWh).field(
"bdW",
bdW).field(
"bdWh",
bdWh).field(
"cPvWh",
cPvWh).field(
"cW",
cW).field(
"cWh",
cWh).field(
"eWh",
eWh).field(
"iWh",
iWh).field(
"pW",
pW).field(
"pWh",
pWh).field(
"scWh",
scWh).time(
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
WritePrecision.S)

# Writing point (InfluxDB automatically batches writes into sets of
# 5k points)
_client.write(record=p)

print(callback.write_status_msg)
print(f"Write retries: {callback.retry_count}")
print(f"Wrote {target_sample_count} data points.")
print(f"Elapsed time ms: {int(callback.elapsed() / 1_000_000)}")


if __name__ == "__main__":
main()
74 changes: 50 additions & 24 deletions Examples/file-import/csv_write.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import logging
import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError


class BatchingCallback(object):

def __init__(self):
self.write_count = 0

def success(self, conf, data: str):
self.write_count += 1
print(f"Written batch: {conf}, data: {data}")

def error(self, conf, data: str, exception: InfluxDBError):
Expand All @@ -14,27 +19,48 @@ def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


callback = BatchingCallback()

write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options
)

with InfluxDBClient3.InfluxDBClient3(
token="INSERT_TOKEN",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="python", write_client_options=wco) as client:
client.write_file(
file='./out.csv',
timestamp_column='time', tag_columns=["provider", "machineID"])
def main() -> None:

# allow detailed inspection
logging.basicConfig(level=logging.DEBUG)

callback = BatchingCallback()

write_options = WriteOptions(batch_size=100,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options
)

"""
token: access token generated in cloud
host: ATTN could be another AWS region or even another cloud provider
org: organization associated with account and database
database: should have retention policy 'forever' to handle older sample data timestamps
write_client_options: see above
debug: allows low-level inspection of communications and context-manager termination
"""
with InfluxDBClient3.InfluxDBClient3(
token="INSERT_TOKEN",
host="https://us-east-1-1.aws.cloud2.influxdata.com/",
org="INSERT_ORG",
database="example_data_forever",
write_client_options=wco,
debug=True) as client:
client.write_file(
file='./out.csv',
timestamp_column='time', tag_columns=["provider", "machineID"])

print(f'DONE writing from csv in {callback.write_count} batch(es)')


if __name__ == "__main__":
main()
Loading

0 comments on commit 0303b3f

Please sign in to comment.