Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: code style of source files #80

Merged
merged 8 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ jobs:
command: |
pip install flake8 --user
flake8 influxdb_client_3/
- run:
name: Checks style consistency across tests.
command: |
pip install flake8 --user
flake8 tests/
- run:
name: Checks style consistency across examples.
command: |
pip install flake8 --user
flake8 Examples/
check-twine:
docker:
- image: *default-python
Expand Down Expand Up @@ -102,7 +112,7 @@ workflows:
not:
equal: [ scheduled_pipeline, << pipeline.trigger_source >> ]
jobs:
# - check-code-style
- check-code-style
# - check-docstyle
- check-twine
- tests-python:
Expand Down
6 changes: 6 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[flake8]
count = True
max-line-length = 120

# W504: Line break occurred after a binary operator
ignore = W504
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

### Bugfix

1. [#77](https://github.com/InfluxCommunity/influxdb3-python/pull/77): Support using pandas nullable types
1. [#77](https://github.com/InfluxCommunity/influxdb3-python/pull/77): Support using pandas nullable types

### Others

- [#80](https://github.com/InfluxCommunity/influxdb3-python/pull/80): Integrate code style check into CI
69 changes: 32 additions & 37 deletions Examples/batching_example.py
Copy link
Contributor

@karel-rehor karel-rehor Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran this example against an initial user account - it eventually resulted in the following error

The retriable error occurred during request. Reason: 'org fbb67cd4c8905be6 has exceeded limited_write plan limit'. Retry in 206s.

While a good example of batching, I'm not sure from perhaps a marketing perspective, if it is a good example for newer users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue should be addressed by #82

Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import datetime
import random
import pymongo
import pandas as pd

from bson import ObjectId

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
import datetime
import time


class BatchingCallback(object):
Expand Down Expand Up @@ -42,26 +39,24 @@ def retry(self, conf, data: str, exception: InfluxDBError):
callback = BatchingCallback()

write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=token,
host=url,
org=org,
database=database, enable_gzip=True, write_client_options=wco) as _client:


# Creating iterator for one hour worth of data (6 sensor readings per
# minute)
for i in range(0, 525600):
Expand Down Expand Up @@ -93,25 +88,25 @@ def retry(self, conf, data: str, exception: InfluxDBError):
bcWh).field(
"bdW",
bdW).field(
"bdWh",
bdWh).field(
"cPvWh",
cPvWh).field(
"cW",
cW).field(
"cWh",
cWh).field(
"eWh",
eWh).field(
"iWh",
iWh).field(
"pW",
pW).field(
"pWh",
pWh).field(
"scWh",
scWh).time(
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
"bdWh",
bdWh).field(
"cPvWh",
cPvWh).field(
"cW",
cW).field(
"cWh",
cWh).field(
"eWh",
eWh).field(
"iWh",
iWh).field(
"pW",
pW).field(
"pWh",
pWh).field(
"scWh",
scWh).time(
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
WritePrecision.S)

# Writing point (InfluxDB automatically batches writes into sets of
Expand Down
3 changes: 0 additions & 3 deletions Examples/cloud_dedicated_query.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this example as well against a new user account. Apparently the "flight2" bucket needs to be created in another example.

Failed with error

pyarrow.lib.ArrowInvalid: Flight returned invalid argument error, with message: bucket "flight2" not found. gRPC client debug context: UNKNOWN:Error received from peer ipv4:34.196.233.7:443 {created_time:"2024-04-05T10:59:19.006603056+02:00", grpc_status:3, grpc_message:"bucket \"flight2\" not found"}. Client context: IOError: Server never sent a data message. Detail: Internal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue should be addressed by #82

Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np

client = InfluxDBClient3.InfluxDBClient3(
token="",
host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
org="6a841c0c08328fb1",
database="flight2")


table = client.query(
query="SELECT * FROM flight WHERE time > now() - 4h",
language="influxql")
Expand Down
47 changes: 25 additions & 22 deletions Examples/community/custom_url.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
import pandas as pd
import random

import pandas as pd

from influxdb_client_3 import InfluxDBClient3, InfluxDBError, WriteOptions, write_client_options


class BatchingCallback(object):

Expand All @@ -14,36 +16,38 @@ def error(self, conf, data: str, exception: InfluxDBError):
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

callback = BatchingCallback()

write_options = WriteOptions(batch_size=100,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)

client = InfluxDBClient3(
token="",
host="https://eu-central-1-1.aws.cloud2.influxdata.com:442",
org="6a841c0c08328fb1",
database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443, query_port_overwrite=443)
database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443,
query_port_overwrite=443)

now = pd.Timestamp.now(tz='UTC').floor('ms')
now = pd.Timestamp.now(tz='UTC').floor('ms')

# Lists of possible trainers
trainers = ["ash", "brock", "misty", "gary", "jessie", "james"]

# Read the CSV into a DataFrame
pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv")
pokemon_df = pd.read_csv(
"https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv") # noqa: E501

# Creating an empty list to store the data
data = []
Expand All @@ -57,17 +61,17 @@ def retry(self, conf, data: str, exception: InfluxDBError):
# Generating random data
for i in range(num_entries):
trainer = random.choice(trainers)

# Randomly select a row from pokemon_df
random_pokemon = pokemon_df.sample().iloc[0]
caught = random_pokemon['Name']

# Count the number of times this trainer has caught this Pokémon
if (trainer, caught) in trainer_pokemon_counts:
trainer_pokemon_counts[(trainer, caught)] += 1
else:
trainer_pokemon_counts[(trainer, caught)] = 1

# Get the number for this combination of trainer and Pokémon
num = trainer_pokemon_counts[(trainer, caught)]

Expand All @@ -93,9 +97,8 @@ def retry(self, conf, data: str, exception: InfluxDBError):
# Print the DataFrame
print(caught_pokemon_df)


try:
client.write(caught_pokemon_df, data_frame_measurement_name='caught',
data_frame_tag_columns=['trainer', 'id', 'num'])
data_frame_tag_columns=['trainer', 'id', 'num'])
except Exception as e:
print(f"Error writing point: {e}")
print(f"Error writing point: {e}")
45 changes: 17 additions & 28 deletions Examples/community/database_transfer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import random
import pymongo
import pandas as pd
from bson import ObjectId
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
import datetime
import time

import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError


class BatchingCallback(object):

Expand All @@ -22,44 +16,41 @@ def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")




# InfluxDB connection details
token = ""
org = "6a841c0c08328fb1"
dbfrom = "a"
dbto = "b"
url = "eu-central-1-1.aws.cloud2.influxdata.com"
measurement = "airSensors"
taglist= []
taglist = []

callback = BatchingCallback()

write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)
# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=token,
host=url,
org=org,
enable_gzip=True, write_client_options=wco) as _client:
enable_gzip=True, write_client_options=wco) as _client:
query = f"SHOW TAG KEYS FROM {measurement}"
tags = _client.query(query=query, language="influxql", database=dbfrom)
tags = tags.to_pydict()
taglist = tags['tagKey']


query = f"SELECT * FROM {measurement}"
reader = _client.query(query=query, language="influxql", database=dbfrom, mode="chunk")
try:
Expand All @@ -69,10 +60,8 @@ def retry(self, conf, data: str, exception: InfluxDBError):
pd = batch.to_pandas()
pd = pd.set_index('time')
print(pd)
_client.write(database=dbto, record=pd, data_frame_measurement_name=measurement, data_frame_tag_columns=taglist)
_client.write(database=dbto, record=pd, data_frame_measurement_name=measurement,
data_frame_tag_columns=taglist)
time.sleep(2)
except StopIteration:
print("No more chunks to read")



Loading
Loading