Python examples
In this article you will find examples in python of common use cases for the Ardexa API.
Last updated
Was this helpful?
In this article you will find examples in python of common use cases for the Ardexa API.
Last updated
Was this helpful?
To complete the examples below you will need the following information:
or an .
Workgroup ID - ID of the workgroup containing the data. This can be found under .
Cloud - Cluster hosting the data. "app" for US and "eur1" for Europe. This can be confirmed by checking the first part of the url when accessing the Ardexa platform.
Table - Data table storing the data.
A list of all available tables can visualized on the selector.
The tables where a given node is logging data and the specific fields can be visualized in .
A list of tables can be extracted via API (refer to example below)
Device ID - ID of the Node to extract data from. This can be found on
Source - Source which collected the desired data.
A list of available sources can be visualized on to extract data from.
Field(s) - Name of the data fields to extract.
A list of available fields per source can be visualized on .
A list of fields within a table can be extracted via API (refer to example below)
Date field - Field selected as timestamp.
Check the differences between date fields in our .
A list of available date fields can be visualized on the selector.
For additional information, check the in our knowledge base and the .
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
r = requests.get("https://{}.ardexa.com/api/v1/orgs/{}".format(CLOUD, WORKGROUP_ID), headers=headers)
results = r.json()
print(results)
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
node_names = []
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
r = requests.get("https://{}.ardexa.com/api/v1/devices/{}".format(CLOUD, WORKGROUP_ID), headers=headers)
results = r.json()
for device in results:
node_names.append(device["name"])
print(node_names)
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
r = requests.get("https://{}.ardexa.com/api/v1/tables/{}".format(CLOUD, WORKGROUP_ID), headers=headers)
results = r.json()
print(results)
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
TABLE = "<Table>" # Input table containing the desired data
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
field_names = []
r = requests.get("https://{}.ardexa.com/api/v1/tables/{}/{}".format(CLOUD, WORKGROUP_ID, TABLE), headers=headers)
results = r.json()
for field in results["fields"]:
field_names.append(field)
print(field_names)
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
TABLE = "<Table>" # Input table containing the desired data
DEVICE_ID = "<Node id>" # Input node to extract data from. This will be applied as a filter
SOURCE = "<Source>" # Input source to extract data from. This will be applied as a filter
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
# Apply filters to data stored in table <TABLE>, filter by device and source
filters = [{
"field": "device",
"operator": "=",
"value": DEVICE_ID,
}, {
"field": "source",
"operator": "IN",
"value": SOURCE,
}]
# Configure search parameters
params = {
"dateField": "Datetime",
"fields": ["Datetime", "device", "source", "Exported Active Energy"], # List of fields to extract
"rows": 10000, # maximum of 10000
"scroll" : True, # enable Scroll search
"sort": "Datetime",
"timeframe": "2025-01-01T00:00:00 until 2025-01-01T23:59:59", #configure timeframe
"timezone": "Europe/Madrid",
"filters": map(json.dumps, filters),
}
print("Fetching data...")
# Initial API call
r = requests.get("https://{}.ardexa.com/api/v1/tables/{}/{}/search".format(CLOUD, WORKGROUP_ID, TABLE), params=params, headers=headers)
results = r.json()
print("GOT RESULTS: {} ROWS".format(len(results["records"])))
print(results)
# Additional API calls for Scroll Searching
while 'scroll_id' in results:
params = { "scroll_id": results["scroll_id"] } # Input Scroll ID as sole parameter
r = requests.get("https://{}.ardexa.com/api/v1/tables/{}/{}/search/scroll".format(CLOUD, WORKGROUP_ID, TABLE), params=params, headers=headers)
results = r.json()
print("GOT RESULTS (scroll): {}".format(len(results["records"])))
print(results)
print("*** Raw data fetch completed ***")
import json
import requests
TOKEN = "<API or personal token>"
WORKGROUP_ID = "<Workgroup ID>"
CLOUD = "<Cloud>" # Modify according to the server hosting the data, app/eur1/other
TABLE = "<Table>" # Input table containing the desired data
DEVICE_ID = "<Node id>" # Input node to extract data from. This will be applied as a filter
SOURCE = "<Source>" # Input source to extract data from. This will be applied as a filter
headers = {
"authorization": "Bearer {}".format(TOKEN),
}
# Configure aggregation parameters
aggs = [{
"field":"Exported Active Energy",
"dateField":"Datetime",
"type":"date_histogram",
"interval":"15m",
"groupBy":["source"]
}]
# Apply filters to data stored in table <TABLE>, filter by device and source
filters = [{
"field": "device",
"operator": "=",
"value": DEVICE_ID,
}, {
"field": "source",
"operator": "=",
"value": SOURCE,
}]
params = {
"dateField": "Datetime",
"fields": ["Datetime","device","source", "AC Power"],
"rows": 0, #Rows must be set to 0 and no Scroll Search must be set.
"sort": "Datetime",
"timeframe": "2022-03-01T00:00:00+01:00 until 2024-10-31T23:59:59+01:00",
"timezone": "Europe/Madrid",
"filters": map(json.dumps, filters),
"aggs": map(json.dumps, aggs),
}
print("Fetching data...")
r = requests.get(f"https://{CLOUD}.ardexa.com/api/v1/tables/{WORKGROUP_ID}/{TABLE}/search", params=params, headers=headers)
results = r.json() # Convert the response to JSON
print(results)
print("*** Downsampled data fetch completed ***")