-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinflux.py
45 lines (34 loc) · 1.62 KB
/
influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient
import logging
class InfluxConnector:
def __init__(self, bucket: str, token: str, org: str, url: str):
self.bucket = bucket
self.token = token
self.org = org
self.url = url
def __get_client(self) -> InfluxDBClient:
return InfluxDBClient(url=self.url, token=self.token, org=self.org, debug=False)
def get_last_recorded_time(self, measurement: str, max_hours: int, to_time: datetime) -> datetime:
query = f'from(bucket: "{self.bucket}") |> range(start: -{max_hours}h) |> filter(fn: (r) => r._measurement == "{measurement}") |> last()'
result = self.__run_query(query)
results = list(result)
if not results:
logging.info(f"Found no records dated less than {max_hours} hour(s) in influx bucket {self.bucket} measurement {measurement}.")
return to_time - timedelta(hours=max_hours)
fluxtable = results[-1]
fluxrecord = fluxtable.records[-1]
fluxtime = fluxrecord.get_time()
return fluxtime
def add_samples(self, records: list) -> None:
if not records:
logging.debug("No records to import")
return
logging.info(f"Importing {len(records)} record(s) to influx")
with self.__get_client() as client:
with client.write_api() as write_api:
write_api.write(bucket=self.bucket, record=records)
def __run_query(self, query):
with self.__get_client() as client:
query_api = client.query_api()
return query_api.query(query)