diff --git a/README.md b/README.md
index 2f553d1..319fce4 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,7 @@ This readme contains three sections:
## Benchmark
Using the test setup from this repository we ran tests against PostgreSQL, CockroachDB, YugabyteDB, ArangoDB, Cassandra and InfluxDB with (aside from PostgreSQL and InfluxDB) 3 nodes and 16 parallel workers that insert data (more concurrent workers might speed up the process but we didn't test that as this test was only to establish a baseline). All tests were done on a k3s cluster consisting of 8 m5ad.2xlarge (8vCPU, 32GB memory) and 3 i3.8xlarge (32vCPU, 244GB memory) EC2 instances on AWS. The database pods were run on the i3 instances with a resource limit of 8 cores and 10 GB memory per node, the client pods of the benchmark on the m5ad instances. In the text below all mentions of nodes refer to database nodes (pods) and not VMs or kubernetes nodes.
+For Azure Data Explorer the test setup was adapted to the following: AKS with Standard_D8_v5 on 3 Nodes.
All tests were run on an empty database.
@@ -31,23 +32,31 @@ For generating primary keys for the events we have two modes: Calculating it on
For the insert testcase we use single inserts (for PostgreSQL with autocommit) to simulate an ingest where each message needs to be persisted as soon as possible so no batching of messages is possible. Depending on the architecture and implementation buffering and batching of messages is possible. To see what effect this has on the performance we have implemented a batch mode for the test.
For ArangoDB batch mode is implemented using the document batch API (`/_api/document`), the older generic batch API (`/_api/batch`) will be deprecated and produces worse performance so we did not use it. For PostgreSQL we implemented batch mode by doing a manual commit every x inserts and using COPY instead of INSERT. Another way to implement batch inserts is to use values lists (one insert statement with a list of values tuples) but this is not as fast as COPY. This mode can however be activated by passing `--extra-option use_values_lists=true`.
+Azure Data Explorer offers compute and storage optimized sku types, as well as batch and stream ingestion. For storage optimized it was decided to test against Standard_L8s_v2, for compute optimized against Standard_E8a_v4 and in both cases a sku capacity of 2 was used.
+
### Insert performance
The table below shows the best results for the databases for a 3 node cluster and a resource limit of 8 cores and 10 GB memory per node. The exceptions are PostgreSQL, InfluxDB and TimescaleDB which were launched as only a single instance. Influx provides a clustered variant only with their Enterprise product and for TimescaleDB there is no official and automated way to create a cluster with a distributed hypertable. All tests were run with the newest available version of the databases at the time of testing and using the opensource or free versions.
Inserts were done with 16 parallel workers, and each test was run 3 times with the result being the average of these runs. For each run the inserts per second was calculated as the number of inserts divided by the sumed up duration of the workers.
-| Database (Version tested) | Inserts/s | Insert mode | Primary-key mode |
-|---------------------------------------------|------------|--------------------------------------|------------------|
-| PostgreSQL | 428000 | copy, size 1000 | sql |
-| CockroachDB (22.1.3) | 91000 | values lists, size 1000 | db |
-| YugabyteDB YSQL (2.15.0) | 295000 | copy, size 1000 | sql |
-| YugabyteDB YCQL (2.15.0) | 288000 | batch, size 1000 | - |
-| ArrangoDB | 137000 | batch, size 1000 | db |
-| Cassandra sync inserts | 389000 | batch, size 1000, max_sync_calls 1 | - |
-| Cassandra async inserts | 410000 | batch, size 1000, max_sync_calls 120 | - |
-| InfluxDB | 460000 | batch, size 1000 | - |
-| TimescaleDB | 600000 | copy, size 1000 | - |
-| Elasticsearch | 170000 | batch, size 10000 | db |
+| Database (Version tested) | Inserts/s | Insert mode | Primary-key mode |
+|-----------------------------------------|-----------|--------------------------------------|------------------|
+| PostgreSQL | 428000 | copy, size 1000 | sql |
+| CockroachDB (22.1.3) | 91000 | values lists, size 1000 | db |
+| YugabyteDB YSQL (2.15.0) | 295000 | copy, size 1000 | sql |
+| YugabyteDB YCQL (2.15.0) | 288000 | batch, size 1000 | - |
+| ArrangoDB | 137000 | batch, size 1000 | db |
+| Cassandra sync inserts | 389000 | batch, size 1000, max_sync_calls 1 | - |
+| Cassandra async inserts | 410000 | batch, size 1000, max_sync_calls 120 | - |
+| InfluxDB | 460000 | batch, size 1000 | - |
+| TimescaleDB | 600000 | copy, size 1000 | - |
+| Elasticsearch | 170000 | batch, size 10000 | db |
+| Azure Data Explorer (Storage optimized) | 36000* | batch, size 1000 | - |
+| Azure Data Explorer (Storage optimized) | 30000* | stream, size 1000 | - |
+| Azure Data Explorer (Compute optimized) | 38000* | batch, size 1000 | - |
+| Azure Data Explorer (Compute optimized) | 53000* | stream, size 1000 | - |
+
+*Inserts are not written into database immediately, but only queued
You can find additional results from older runs in [old-results.md](old-results.md) but be aware that comparing them with the current ones is not always possible due to different conditions during the runs.
@@ -64,16 +73,21 @@ Although the results of our benchmarks show a drastic improvement, batching in m
For TimescaleDB the insert performance depends a lot on the number and size of chunks that are written to. In a fill-level test with 50 million inserts per step where in each step the timestamps started again (so the same chunks were written to as in the last step) performance degraded rapidly. But in the more realistic case of ever increasing timestamps (so new chunks being added) performance stayed relatively constant.
+Notable about Azure Data Explorer is the fact, that inserts are not immediately written to the database, instead their queued. The larger the batch/stream the longer it takes until the queue
+starts to work it off. Once started it can keep the pace. For example a couple of rows take 20 - 30 seconds to appear in the database, 1000 rows 5 - 6 minutes, but 12.5 million rows require also only 5 - 6 minutes.
+
### Query performance
-| Database / Query | count-events | temperature-min-max | temperature-stats | temperature-stats-per-device | newest-per-device |
-|------------------|--------------|---------------------|-------------------|------------------------------|-------------------|
-| PostgreSQL | 39 | 0.01 | 66 | 119 | 92 |
-| CockroachDB | 123 | 153 | 153 | 153 | 150 |
-| InfluxDB | 10 | 48 | 70 | 71 | 0.1 |
-| TimescaleDB | 30 | 0.17 | 34 | 42 | 38 |
-| Elasticsearch | 0.04 | 0.03 | 5.3 | 11 | 13 |
-| Yugabyte (YSQL) | 160 | 0.03 | 220 | 1700 | failure |
+| Database / Query | count-events | temperature-min-max | temperature-stats | temperature-stats-per-device | newest-per-device |
+|-----------------------------------------|--------------|---------------------|-------------------|------------------------------|-------------------|
+| PostgreSQL | 39 | 0.01 | 66 | 119 | 92 |
+| CockroachDB | 123 | 153 | 153 | 153 | 150 |
+| InfluxDB | 10 | 48 | 70 | 71 | 0.1 |
+| TimescaleDB | 30 | 0.17 | 34 | 42 | 38 |
+| Elasticsearch | 0.04 | 0.03 | 5.3 | 11 | 13 |
+| Yugabyte (YSQL) | 160 | 0.03 | 220 | 1700 | failure |
+| Azure Data Explorer (Storage optimized) | 0.33 | 0.76 | 1.0 | 3.2 | 8.6 |
+| Azure Data Explorer (Compute optimized) | 0.29 | 0.55 | 0.69 | 2.2 | failure |
The table gives the average query duration in seconds
@@ -101,6 +115,10 @@ For TimescaleDB query performance is also very dependent on the number and size
Elasticsearch seems to cache query results, as such running the queries several times will yield millisecond response times for all queries. The times noted in the table above are against a freshly started elasticsearch cluster.
+In case of the newest-per-device query (compute optimized) Azure Data Explorer did not succeed but terminated with "partition operator exceed amount of maximum partitions allowed (64)."
+When a query is run against Azure Data Explorer, the query engine tries to optimize it by breaking it down into smaller, parallelizable tasks that can be executed across multiple partitions.
+If the query requires more partitions than the maximum allowed limit, it will fail with the error message above.
+
For all databases there seems to be a rough linear correlation between query times and database size. So when running the tests with only 50 million rows the query times were about 10 times as fast.
## Running the tests
@@ -136,9 +154,9 @@ To run the test use `python run.py insert`. You can use the following options:
* `--workers`: Set of worker counts to try, default is `1,4,8,12,16` meaning the test will try with 1 concurrent worker, then with 4, then 8, then 12 and finally 16
* `--runs`: How often should the test be repeated for each worker count, default is `3`
* `--primary-key`: Defines how the primary key should be generated, see below for choices. Defaults to `db`
-* `--tables`: To simulate how the databases behave if inserts are done to several tables this option can be changed from `single` to `multiple` to have the test write into four instead of just one table
+* `--tables`: To simulate how the databases behave if inserts are done to several tables. This option can be changed from `single` to `multiple` to have the test write into four instead of just one table
* `--num-inserts`: The number of inserts each worker should do, by default 10000 to get a quick result. Increase this to see how the databases behave under constant load. Also increase the timout option accordingly
-* `--timeout`: How long should the script wait for the insert test to complete in seconds. Default is `0`. Increase accordingly if you increase the number of inserts or disable by stting to `0`
+* `--timeout`: How long should the script wait for the insert test to complete in seconds. Default is `0`. Increase accordingly if you increase the number of inserts or disable by setting to `0`
* `--batch`: Switch to batch mode (for postgres this means manual commits, for arangodb using the [batch api](https://docs.python-arango.com/en/main/batch.html)). Specify the number of inserts per transaction/batch
* `--extra-option`: Extra options to supply to the test scripts, can be used multiple times. Currently only used for ArangoDB (see below)
* `--clean` / `--no-clean`: By default the simulator will clean and recreate tables to always have the same basis for the runs. Can be disabled
@@ -312,6 +330,47 @@ kubectl apply -f dbinstall/elastic-deployment.yaml
```
+### Azure Data Explorer
+
+Azure Data Explorer (ADX) is a fully managed, high-performance, big data analytics platform. Azure Data Explorer can take all this varied data, and then ingest, process, and store it. You can use Azure Data Explorer for near real-time queries and advanced analytics.
+
+To deploy an ADX-Cluster change the Service Principle (AAD) to your own Service Principle inside dbinstall/azure_data_explorer/main.tf:
+```
+data "azuread_service_principal" "service-principle" {
+ display_name = "mw_iot_ADX-DB-Comparison"
+}
+```
+
+Additionally adjust the following fields depending on your performance needs:
+
+```
+resource "azurerm_kusto_cluster" "adxcompare" {
+ ...
+
+ sku {
+ name = "Dev(No SLA)_Standard_E2a_v4"
+ capacity = 1
+ }
+```
+
+To apply the infrastructure, run:
+```bash
+az login
+terraform apply
+```
+
+Finally, create the kubernetes secret:
+
+````bash
+kubectl create secret generic adx-secret --from-literal=adx_aad_app_id= --from-literal=adx_app_key= --from-literal=adx_authority_id= -n default
+````
+To enable streaming set the `batch` parameter of the config to `false`. This will not apply to existing tables.
+Streaming ingestion on cluster level is enabled by default by terraform cluster config.
+
+
+Besides having an emulator for ADX, capable of running locally, it is not recommended to use this emulator for any kind of benchmark tests, since the performance profile is very different. Furthermore, it is even prohibited by license terms.
+
+
## Remarks on the databases
This is a collection of problems / observations collected over the course of the tests.
diff --git a/config.yaml b/config.yaml
index 3dcb377..1e08559 100644
--- a/config.yaml
+++ b/config.yaml
@@ -56,4 +56,9 @@ targets:
elasticsearch:
module: elasticsearch
endpoint: https://elastic-es-http.default.svc.cluster.local:9200
+ azure_data_explorer:
+ module: azure_data_explorer
+ kusto_uri: https://adxcompare.westeurope.kusto.windows.net
+ kusto_ingest_uri: https://ingest-adxcompare.westeurope.kusto.windows.net
+ kusto_db: SampleDB
namespace: default
diff --git a/dbinstall/azure_data_explorer/main.tf b/dbinstall/azure_data_explorer/main.tf
new file mode 100644
index 0000000..d0a1d18
--- /dev/null
+++ b/dbinstall/azure_data_explorer/main.tf
@@ -0,0 +1,73 @@
+terraform {
+ required_providers {
+ azurerm = {
+ source = "hashicorp/azurerm"
+ version = "~> 3.0.2"
+ }
+ }
+
+ required_version = ">= 1.1.0"
+}
+
+provider "azurerm" {
+ features {}
+}
+
+
+resource "azurerm_resource_group" "rg-compare" {
+ name = "db-performance-comparison"
+ location = "West Europe"
+
+ tags = {
+ "cost center" = "IOT",
+ environment = "iot-lab"
+ }
+}
+
+resource "azurerm_kusto_cluster" "adxcompare" {
+ name = "adxcompare"
+ location = azurerm_resource_group.rg-compare.location
+ resource_group_name = azurerm_resource_group.rg-compare.name
+ streaming_ingestion_enabled = true
+
+ # Change depending on your performance needs
+ sku {
+ name = "Dev(No SLA)_Standard_E2a_v4"
+ capacity = 1
+ }
+
+ tags = {
+ "cost center" = "IOT",
+ environment = "iot-lab"
+ }
+}
+
+resource "azurerm_kusto_database" "sample-db" {
+ name = "SampleDB"
+ resource_group_name = azurerm_resource_group.rg-compare.name
+ location = azurerm_resource_group.rg-compare.location
+ cluster_name = azurerm_kusto_cluster.adxcompare.name
+
+ hot_cache_period = "P1D"
+ soft_delete_period = "P1D"
+}
+
+data "azurerm_client_config" "current" {
+}
+
+# Change to your own service principal
+data "azuread_service_principal" "service-principle" {
+ display_name = "mw_iot_ADX-DB-Comparison"
+}
+
+resource "azurerm_kusto_database_principal_assignment" "ad-permission" {
+ name = "AD-Permission"
+ resource_group_name = azurerm_resource_group.rg-compare.name
+ cluster_name = azurerm_kusto_cluster.adxcompare.name
+ database_name = azurerm_kusto_database.sample-db.name
+
+ tenant_id = data.azurerm_client_config.current.tenant_id
+ principal_id = data.azuread_service_principal.service-principle.id
+ principal_type = "App"
+ role = "Admin"
+}
diff --git a/deployment/templates/collector.yaml b/deployment/templates/collector.yaml
index 3283e3b..9e39de2 100644
--- a/deployment/templates/collector.yaml
+++ b/deployment/templates/collector.yaml
@@ -26,6 +26,10 @@ spec:
value: "{{ .Values.target_module }}"
- name: RUN_CONFIG
value: "{{ .Values.run_config }}"
+ envFrom:
+ - secretRef:
+ name: adx-secret
+
terminationGracePeriodSeconds: 2
nodeSelector:
{{ .Values.nodeSelector | toYaml | indent 8 }}
diff --git a/deployment/templates/worker.yaml b/deployment/templates/worker.yaml
index fad7450..302d596 100644
--- a/deployment/templates/worker.yaml
+++ b/deployment/templates/worker.yaml
@@ -37,6 +37,9 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
+ envFrom:
+ - secretRef:
+ name: adx-secret
restartPolicy: Never
terminationGracePeriodSeconds: 2
nodeSelector:
diff --git a/simulator/Dockerfile b/simulator/Dockerfile
index 379e5f1..c40173e 100644
--- a/simulator/Dockerfile
+++ b/simulator/Dockerfile
@@ -2,8 +2,8 @@ FROM python:3.8-alpine as base
FROM base as builder
-RUN mkdir /install
-RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev
+RUN mkdir /install
+RUN apk update && apk add postgresql-dev gcc python3-dev musl-dev libc-dev make git libffi-dev openssl-dev libxml2-dev libxslt-dev automake g++
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN pip install --prefix=/install -r /requirements.txt
@@ -13,7 +13,7 @@ FROM base
COPY --from=builder /install /usr/local
COPY requirements.txt /
RUN pip install -r requirements.txt
-RUN apk --no-cache add libpq
+RUN apk --no-cache add libpq libstdc++
ADD . /simulator
WORKDIR /simulator
CMD ["python", "main.py"]
diff --git a/simulator/modules/__init__.py b/simulator/modules/__init__.py
index cad5df9..fa9a996 100644
--- a/simulator/modules/__init__.py
+++ b/simulator/modules/__init__.py
@@ -20,5 +20,8 @@ def select_module():
elif mod == "elasticsearch":
from . import elasticsearch
return elasticsearch
+ elif mod == "azure_data_explorer":
+ from . import azure_data_explorer
+ return azure_data_explorer
else:
raise Exception(f"Unknown module: {mod}")
diff --git a/simulator/modules/azure_data_explorer.py b/simulator/modules/azure_data_explorer.py
new file mode 100644
index 0000000..df280af
--- /dev/null
+++ b/simulator/modules/azure_data_explorer.py
@@ -0,0 +1,218 @@
+import io
+import itertools
+import os
+import time
+
+import pandas as pd
+from azure.kusto.data import DataFormat
+from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
+from azure.kusto.data.exceptions import KustoApiError
+from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor
+
+from .config import config
+
+AAD_APP_ID = os.getenv("adx_aad_app_id")
+APP_KEY = os.getenv("adx_app_key")
+AUTHORITY_ID = os.getenv("adx_authority_id")
+
+KUSTO_URI = config["kusto_uri"]
+KUSTO_INGEST_URI = config["kusto_ingest_uri"]
+KUSTO_DATABASE = config["kusto_db"]
+
+TABLE_NAMES = ["events0", "events1", "events2", "events3"] if config["use_multiple_tables"] else ["events"]
+EVENT_TABLE_MAPPING = """'[{"Name":"timestamp","datatype":"long","Ordinal":0}, {"Name":"device_id","datatype":"string","Ordinal":1}, {"Name":"sequence_number","datatype":"long","Ordinal":2}, {"Name":"temperature","datatype":"real","Ordinal":3}]'"""
+
+
+def init():
+ with _kusto_client() as kusto_client:
+ existing_tables = _get_existing_tables(kusto_client)
+ if config["clean_database"]:
+ _clean_database(existing_tables, kusto_client)
+ else:
+ table_names = _get_tables_requiring_creation(existing_tables, TABLE_NAMES)
+ for table_name in table_names:
+ _create_table(kusto_client, table_name)
+ _configure_stream_ingestion(kusto_client, table_name)
+ _create_ingestion_mapping(kusto_client, table_name)
+
+
+def prefill_events(events):
+ _insert_events(events, True, 1_000)
+
+
+def insert_events(events):
+ batch_mode = config.get("batch_mode", False)
+ batch_size = config.get("batch_size", 1_000)
+ _insert_events(events, batch_mode, batch_size)
+
+
+_queries = {
+ "count-events": "events | count",
+ "temperature-min-max": "events| summarize max(temperature), min(temperature)",
+ "temperature-stats": "events| summarize max(temperature), avg(temperature), min(temperature)",
+ "temperature-stats-per-device": "events | summarize max(temperature), avg(temperature), min(temperature) by device_id",
+ "newest-per-device": "events | partition by device_id (top 1 by timestamp desc | project device_id, temperature)",
+}
+
+
+def queries():
+ _filter_queries_to_execute()
+ query_times = dict([(name, []) for name in _queries.keys()])
+ for _ in range(int(config["runs"])):
+ for name, query in _queries.items():
+ _execute_query(name, query, query_times)
+ return query_times
+
+
+def _ingestion_client():
+ kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID)
+ return QueuedIngestClient(kcsb_ingest)
+
+
+def _kusto_client():
+ kcsb_data = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_URI, AAD_APP_ID, APP_KEY, AUTHORITY_ID)
+ return KustoClient(kcsb_data)
+
+
+def _get_tables_requiring_creation(existing_tables, table_names):
+ return [table_name for table_name in table_names if table_name not in existing_tables]
+
+
+def _clean_database(existing_tables, kusto_client):
+ for table_name in existing_tables:
+ try:
+ print(f"Delete table {table_name}")
+ delete_table_command = f".drop table {table_name}"
+ kusto_client.execute_mgmt(KUSTO_DATABASE, delete_table_command)
+ except KustoApiError as error:
+ print(f"Could not delete table, due to:\n {error}")
+
+
+def _get_existing_tables(kusto_client):
+ response = kusto_client.execute(KUSTO_DATABASE, f""".show tables | where DatabaseName == "{KUSTO_DATABASE}" """)
+ existing_tables = [row[0] for row in response.primary_results[0]]
+ print(f"Following tables already exist: {existing_tables}")
+ return existing_tables
+
+
+def _create_ingestion_mapping(kusto_client, table_name):
+ create_mapping_command = f""".create table {table_name} ingestion csv mapping '{table_name}_CSV_Mapping' {EVENT_TABLE_MAPPING}"""
+ kusto_client.execute_mgmt(KUSTO_DATABASE, create_mapping_command)
+
+
+def _create_table(kusto_client, table_name):
+ print(f"Create table {table_name}")
+ create_table_command = f".create table {table_name} (timestamp: long, device_id: string, sequence_number: long, temperature: real)"
+ kusto_client.execute_mgmt(KUSTO_DATABASE, create_table_command)
+
+
+def _configure_stream_ingestion(kusto_client, table_name):
+ if not config.get("batch_mode", True):
+ print(f"Enable streaming for {table_name}")
+ enable_streaming_command = f".alter table {table_name} policy streamingingestion enable"
+ kusto_client.execute_mgmt(KUSTO_DATABASE, enable_streaming_command)
+ # Manuel check: .show table policy streamingingestion
+
+
+def _batch_insert(events, batch_size, table_names):
+ count = 0
+ timestamps = []
+ device_ids = []
+ sequence_numbers = []
+ temperatures = []
+ for idx, event in enumerate(events):
+ timestamps.append(event.timestamp)
+ device_ids.append(event.device_id)
+ sequence_numbers.append(event.sequence_number)
+ temperatures.append(event.temperature)
+ count += 1
+ if count >= batch_size:
+ table = _determine_table_for_ingestion(batch_size, idx, table_names)
+ print(f"Insert {count} entries into {table}")
+ _ingest(table, timestamps, device_ids, sequence_numbers, temperatures)
+ timestamps.clear()
+ device_ids.clear()
+ sequence_numbers.clear()
+ temperatures.clear()
+ count = 0
+ if count > 0:
+ print(f"Insert {count} entries into {table_names[0]}")
+ _ingest(table_names[0], timestamps, device_ids, sequence_numbers, temperatures)
+
+
+def _determine_table_for_ingestion(batch_size, idx, table_names):
+ return table_names[int(idx / batch_size) % len(table_names)]
+
+
+def _stream_insert(events, table_names):
+ number_of_tables = len(table_names)
+ number_of_inserts = int(config["num_inserts"])
+ inserts_per_table = number_of_inserts // number_of_tables
+ print("Stream ingestion", flush=True)
+ with _ingestion_client() as ingestion_client:
+ for table in table_names:
+ _ingest_by_stream(events, ingestion_client, inserts_per_table, table)
+
+
+def _ingest_by_stream(events, ingestion_client, inserts_per_table, table):
+ print(f"Ingest {inserts_per_table} into {table}", flush=True)
+ events_partition = list(itertools.islice(events, inserts_per_table))
+ json_string = _to_json(events_partition)
+ stream_descriptor = _create_stream_descriptor(json_string)
+ ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table,
+ data_format=DataFormat.SINGLEJSON)
+ result = ingestion_client.ingest_from_stream(stream_descriptor, ingestion_props)
+ print(result)
+
+
+def _create_stream_descriptor(json_string):
+ bytes_array = json_string.encode("utf-8")
+ byte_stream = io.BytesIO(bytes_array)
+ byte_stream.flush()
+ stream_descriptor = StreamDescriptor(byte_stream)
+ return stream_descriptor
+
+
+def _to_json(events_partition):
+ json_string = ""
+ for event in events_partition:
+ json_string = json_string + event.to_json() + "\n"
+ return json_string
+
+
+def _ingest(table, timestamps, device_ids, sequence_numbers, temperatures):
+ with _ingestion_client() as ingestion_client:
+ ingestion_data = {'timestamp': timestamps, 'device_id': device_ids, 'sequence_number': sequence_numbers,
+ 'temperature': temperatures}
+ dataframe = pd.DataFrame(data=ingestion_data)
+ ingestion_props = IngestionProperties(database=KUSTO_DATABASE, table=table, data_format=DataFormat.CSV,
+ ignore_first_record=True)
+ result = ingestion_client.ingest_from_dataframe(dataframe, ingestion_props)
+ print(result)
+
+
+def _insert_events(events, batch_mode, batch_size):
+ print("Inserting events", flush=True)
+ if batch_mode:
+ _batch_insert(events, batch_size, TABLE_NAMES)
+ else:
+ _stream_insert(events, TABLE_NAMES)
+
+
+def _execute_query(name, query, query_times):
+ with _kusto_client() as kusto_client:
+ print(f"Executing query {name}", flush=True)
+ start = time.time()
+ kusto_client.execute(KUSTO_DATABASE, query)
+ duration = time.time() - start
+ print(f"Finished query. Duration: {duration}", flush=True)
+ query_times[name].append(duration)
+
+
+def _filter_queries_to_execute():
+ if "queries" in config:
+ included = config["queries"].split(",")
+ for key in list(_queries.keys()):
+ if key not in included:
+ del _queries[key]
+ print(_queries)
diff --git a/simulator/modules/influxdb.py b/simulator/modules/influxdb.py
index f07ec21..c1369f8 100644
--- a/simulator/modules/influxdb.py
+++ b/simulator/modules/influxdb.py
@@ -8,7 +8,7 @@
BUCKET_NAME = "dbtest"
CURRENT_YEAR = date.today().year
-NEXT_YEAR = (date.today()+ timedelta(days=366)).year
+NEXT_YEAR = (date.today() + timedelta(days=366)).year
def _db():
diff --git a/simulator/requirements.txt b/simulator/requirements.txt
index fa5cc64..174ea20 100644
--- a/simulator/requirements.txt
+++ b/simulator/requirements.txt
@@ -1,7 +1,10 @@
-Flask==1.1.2
+Flask==2.2.2
dataclasses-json==0.5.2
psycopg2-binary==2.8.6
python-arango==7.1.0
cassandra-driver==3.25.0
influxdb-client==1.21.0
elasticsearch==7.15.1
+azure-kusto-data==4.1.2
+azure-kusto-ingest==4.1.2
+pandas==1.5.3
\ No newline at end of file