diff --git a/.github/workflows/test_matrix.yml b/.github/workflows/test_matrix.yml index b6942287..82342c60 100644 --- a/.github/workflows/test_matrix.yml +++ b/.github/workflows/test_matrix.yml @@ -3,7 +3,7 @@ name: "test_matrix" on: # yamllint disable-line rule:truthy pull_request: - branches: main + branches: dap-main push: branches-ignore: - '*_test' @@ -12,6 +12,7 @@ on: # yamllint disable-line rule:truthy paths-ignore: - '**.md' - 'LICENSE' + workflow_dispatch: jobs: tests: @@ -23,16 +24,9 @@ jobs: strategy: matrix: python-version: - - '3.9' - - '3.10' - '3.11' - - '3.12' clickhouse-version: - - '23.8' - - '24.1' - - '24.2' - - '24.3' - - latest + - '24.12' steps: - name: Checkout diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d9e77ef..8062cf83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,78 @@ +### Release [1.8.9], 2025-02-16 + +#### Improvements +* It is now possible to configure a TLS client certificate using `client_cert` and `client_cert_key` profile parameters. ([#413](https://github.com/ClickHouse/dbt-clickhouse/pull/413)) +* Added Support of insert_overwrite in cluster setup with incremental and distributed_incremental materializations ([#394](https://github.com/ClickHouse/dbt-clickhouse/pull/394)) +* Improve index and projections creation process ([#421](https://github.com/ClickHouse/dbt-clickhouse/pull/421)) + +#### Bugs +* Reverted breaking changes in MV materialization ([#416](https://github.com/ClickHouse/dbt-clickhouse/pull/416)) +* A fix was introduced for distributed tables, where an incremental local table could have been dropped if the distributed table was missing. ([#363](https://github.com/ClickHouse/dbt-clickhouse/pull/363)) + + +### Release [1.8.8], 2025-02-05 +### Improvements +* Materialized view now attempts to use `ALTER TABLE...MODIFY QUERY` to update existing materialized views. This is an atomic operation so data is not lost. ([#390](https://github.com/ClickHouse/dbt-clickhouse/pull/390)) +* Make view materialization updates atomic. ([#412](https://github.com/ClickHouse/dbt-clickhouse/pull/412)) +* Create a black list settings to ignore based on the configured Engine. ([#367](https://github.com/ClickHouse/dbt-clickhouse/pull/367)) + +#### New Features +* [ClickHouse indexes](https://clickhouse.com/docs/en/optimize/sparse-primary-indexes) are now fully supported for `table` materialization. + + +#### New Features +* [ClickHouse indexes](https://clickhouse.com/docs/en/optimize/sparse-primary-indexes) are now fully supported for `table` materialization. +The index config should be added to the model config. for instance: + ```python + {{ config( + materialized='%s', + indexes=[{ + 'name': 'your_index_name', + 'definition': 'your_column TYPE minmax GRANULARITY 2' + }] + ) }} + ``` + +### Release [1.8.7], 2025-01-05 + +### New Features +* Added support for [refreshable materialized view](https://clickhouse.com/docs/en/materialized-view/refreshable-materialized-view) ([#401](https://github.com/ClickHouse/dbt-clickhouse/pull/401)) + +### Improvement +* Avoid potential data loss by using `CREATE OR REPLACE DICTIONARY` to atomically update a dictionary ([#393](https://github.com/ClickHouse/dbt-clickhouse/pull/393)) +* Removed support in python 3.8 as it is no longer supported by dbt ([#402](https://github.com/ClickHouse/dbt-clickhouse/pull/402) + +### Bug Fixes +* Fix a minor bug related to validating existence of an old hanging mv ([#396]()) + +### Release [1.8.6], 2024-12-05 + +### Improvement +* Today, on mv model creation, the target table is being populated with the historical data based on the query provided in the mv creation. This catchup mechanism is now behind a config flag and enabled by default (as is today). ([#399](https://github.com/ClickHouse/dbt-clickhouse/pull/399)) + + +### Release [1.8.5], 2024-11-19 + +### New Features +* Added support for the creation of more than one materialized view inserting records into the same target table. ([#360](https://github.com/ClickHouse/dbt-clickhouse/pull/364)) + +### Improvement +* Added support for [range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#range_hashed) and [complex_key_range_hashed](https://clickhouse.com/docs/en/sql-reference/dictionaries#complex_key_range_hashed) layouts to the dictionary materialization. ([#361](https://github.com/ClickHouse/dbt-clickhouse/pull/361)) +* Truncated stack trace for database errors for cleaner output when HIDE_STACK_TRACE variable is set to any value. ([#382](https://github.com/ClickHouse/dbt-clickhouse/pull/382)) +* It is now possible to pass query settings not only on table creation but also on query. ([#362](https://github.com/ClickHouse/dbt-clickhouse/pull/362)) + + +### Bug Fixes +* Before this version, `split_part` macro used to add an extra quotation. that was fixed in ([#338](https://github.com/ClickHouse/dbt-clickhouse/pull/338)) + +### Bug Fixes +* Existing local tables are no longer dropped/recreated in case of missing distributed tables in `distributed_incremental` materialization mode. ([#363](https://github.com/ClickHouse/dbt-clickhouse/pull/363)) + +### Release [1.8.4], 2024-09-17 +### Improvement +* The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to +[Mitchell Bregman](https://github.com/mitchbregs) for the contribution! + ### Release [1.8.3], 2024-09-01 ### Bug Fixes * An [issue](https://github.com/ClickHouse/dbt-clickhouse/issues/348) was detected when using multiple projections. We solved it and added a test to cover that use case. ([#349](https://github.com/ClickHouse/dbt-clickhouse/pull/349)) @@ -522,4 +597,4 @@ for Replicated tables that use the `{uuid}` macro in the path to avoid name conf [0.19.1]: https://github.com/ClickHouse/dbt-clickhouse/compare/v0.19.0.2...v0.19.1 [0.19.0.2]: https://github.com/ClickHouse/dbt-clickhouse/compare/v0.19.0.1...v0.19.0.2 [0.19.0.1]: https://github.com/ClickHouse/dbt-clickhouse/compare/v0.19.0...v0.19.0.1 -[0.19.0]: https://github.com/ClickHouse/dbt-clickhouse/compare/eb3020a...v0.19.0 \ No newline at end of file +[0.19.0]: https://github.com/ClickHouse/dbt-clickhouse/compare/eb3020a...v0.19.0 diff --git a/README.md b/README.md index 99da7ff6..232f5ee5 100644 --- a/README.md +++ b/README.md @@ -6,18 +6,21 @@ This plugin ports [dbt](https://getdbt.com) functionality to [Clickhouse](https://clickhouse.tech/). -The plugin uses syntax that requires ClickHouse version 22.1 or newer. We do not test older versions of Clickhouse. We also do not currently test +The plugin uses syntax that requires ClickHouse version 22.1 or newer. We do not test older versions of Clickhouse. We +also do not currently test Replicated tables or the related `ON CLUSTER` functionality. ## Installation Use your favorite Python package manager to install the app from PyPI, e.g. + ```bash pip install dbt-core dbt-clickhouse ``` -> **_NOTE:_** Beginning in v1.8, dbt-core and adapters are decoupled. Therefore, the installation mentioned above explicitly includes both dbt-core and the desired adapter.If you use a version prior to 1.8.0 the pip installation command should look like this: - +> **_NOTE:_** Beginning in v1.8, dbt-core and adapters are decoupled. Therefore, the installation mentioned above +> explicitly includes both dbt-core and the desired adapter.If you use a version prior to 1.8.0 the pip installation +> command should look like this: ```bash pip install dbt-clickhouse @@ -34,7 +37,7 @@ pip install dbt-clickhouse - [x] Docs generate - [x] Tests - [x] Snapshots -- [x] Most dbt-utils macros (now included in dbt-core) +- [x] Most dbt-utils macros (now included in dbt-core) - [x] Ephemeral materialization - [x] Distributed table materialization (experimental) - [x] Distributed incremental materialization (experimental) @@ -43,17 +46,20 @@ pip install dbt-clickhouse # Usage Notes ## SET Statement Warning + In many environments, using the SET statement to persist a ClickHouse setting across all DBT queries is not reliable -and can cause unexpected failures. This is particularly true when using HTTP connections through a load balancer that +and can cause unexpected failures. This is particularly true when using HTTP connections through a load balancer that distributes queries across multiple nodes (such as ClickHouse cloud), although in some circumstances this can also -happen with native ClickHouse connections. Accordingly, we recommend configuring any required ClickHouse settings in the +happen with native ClickHouse connections. Accordingly, we recommend configuring any required ClickHouse settings in the "custom_settings" property of the DBT profile as a best practice, instead of relying on a prehook "SET" statement as has been occasionally suggested. ## Database -The dbt model relation identifier `database.schema.table` is not compatible with Clickhouse because Clickhouse does not support a `schema`. -So we use a simplified approach `schema.table`, where `schema` is the Clickhouse database. Using the `default` database is not recommended. +The dbt model relation identifier `database.schema.table` is not compatible with Clickhouse because Clickhouse does not +support a `schema`. +So we use a simplified approach `schema.table`, where `schema` is the Clickhouse database. Using the `default` database +is not recommended. ## Example Profile @@ -76,6 +82,8 @@ your_profile_name: cluster: [] # If set, certain DDL/table operations will be executed with the `ON CLUSTER` clause using this cluster. Distributed materializations require this setting to work. See the following ClickHouse Cluster section for more details. verify: [True] # Validate TLS certificate if using TLS/SSL secure: [False] # Use TLS (native protocol) or HTTPS (http protocol) + client_cert: [null] # Path to a TLS client certificate in .pem format + client_cert_key: [null] # Path to the private key for the TLS client certificate retries: [1] # Number of times to retry a "retriable" database exception (such as a 503 'Service Unavailable' error) compression: [] # Use gzip compression if truthy (http), or compression type for a native connection connect_timeout: [10] # Timeout in seconds to establish a connection to ClickHouse @@ -111,113 +119,166 @@ your_profile_name: | settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | | | query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | | | ttl | A TTL expression to be used with the table. The TTL expression is a string that can be used to specify the TTL for the table. | | - +| indexes | A list of indexes to create, available only for `table` materialization. For examples look at ([#397](https://github.com/ClickHouse/dbt-clickhouse/pull/397)) | | ## Column Configuration | Option | Description | Default if any | -| ------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------- | +|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------| | codec | A string consisting of arguments passed to `CODEC()` in the column's DDL. For example: `codec: "Delta, ZSTD"` will be interpreted as `CODEC(Delta, ZSTD)`. | | -## ClickHouse Cluster +## ClickHouse Cluster The `cluster` setting in profile enables dbt-clickhouse to run against a ClickHouse cluster. ### Effective Scope - if `cluster` is set in profile, `on_cluster_clause` now will return cluster info for: + - Database creation - View materialization - Distributed materializations - Models with Replicated engines -table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would be created on the connected node only). -### Compatibility +By default, tables and incremental materializations with non-replicated engines will not be affected by the `cluster` setting (model would be created on the connected node only). +To force relations to be created on a cluster regardless of their engine or materialization, use the `force_on_cluster` argument: +```sql +{{ config( + engine='Null', + materialized='materialized_view', + force_on_cluster='true' + ) +}} +``` -If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML without `on cluster` clause for this model. +table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would +be created on the connected node only). -## A Note on Model Settings +### Compatibility -ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are configurable. `settings` means the `SETTINGS` -clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the specific ClickHouse table engine. The new -`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization (including incremental materializations). -There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" setting (although the latter are generally -available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties should be carefully researched and tested. +If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML +without `on cluster` clause for this model. +## A Note on Model Settings + +ClickHouse has several types/levels of "settings". In the model configuration above, two types of these are +configurable. `settings` means the `SETTINGS` +clause used in `CREATE TABLE/VIEW` types of DDL statements, so this is generally settings that are specific to the +specific ClickHouse table engine. The new +`query_settings` is use to add a `SETTINGS` clause to the `INSERT` and `DELETE` queries used for model materialization ( +including incremental materializations). +There are hundreds of ClickHouse settings, and it's not always clear which is a "table" setting and which is a "user" +setting (although the latter are generally +available in the `system.settings` table.) In general the defaults are recommended, and any use of these properties +should be carefully researched and tested. ## Known Limitations -* Ephemeral models/CTEs don't work if placed before the "INSERT INTO" in a ClickHouse insert statement, see https://github.com/ClickHouse/ClickHouse/issues/30323. This -should not affect most models, but care should be taken where an ephemeral model is placed in model definitions and other SQL statements. +* Ephemeral models/CTEs don't work if placed before the "INSERT INTO" in a ClickHouse insert statement, + see https://github.com/ClickHouse/ClickHouse/issues/30323. This + should not affect most models, but care should be taken where an ephemeral model is placed in model definitions and + other SQL statements. ## Incremental Model Strategies As of version 1.3.2, dbt-clickhouse supports three incremental model strategies. -### The Default (Legacy) Strategy +### The Default (Legacy) Strategy -Historically ClickHouse has had only limited support for updates and deletes, in the form of asynchronous "mutations." To emulate expected dbt behavior, -dbt-clickhouse by default creates a new temporary table containing all unaffected (not deleted, not changed) "old" records, plus any new or updated records, -and then swaps or exchanges this temporary table with the existing incremental model relation. This is the only strategy that preserves the original relation if something -goes wrong before the operation completes; however, since it involves a full copy of the original table, it can be quite expensive and slow to execute. +Historically ClickHouse has had only limited support for updates and deletes, in the form of asynchronous "mutations." +To emulate expected dbt behavior, +dbt-clickhouse by default creates a new temporary table containing all unaffected (not deleted, not changed) "old" +records, plus any new or updated records, +and then swaps or exchanges this temporary table with the existing incremental model relation. This is the only strategy +that preserves the original relation if something +goes wrong before the operation completes; however, since it involves a full copy of the original table, it can be quite +expensive and slow to execute. ### The Delete+Insert Strategy -ClickHouse added "lightweight deletes" as an experimental feature in version 22.8. Lightweight deletes are significantly faster than ALTER TABLE ... DELETE -operations, because they don't require rewriting ClickHouse data parts. The incremental strategy `delete+insert` utilizes lightweight deletes to implement -incremental materializations that perform significantly better than the "legacy" strategy. However, there are important caveats to using this strategy: -- Lightweight deletes must be enabled on your ClickHouse server using the setting `allow_experimental_lightweight_delete=1` or you -must set `use_lw_deletes=true` in your profile (which will enable that setting for your dbt sessions) -- Lightweight deletes are now production ready, but there may be performance and other problems on ClickHouse versions earlier than 23.3. -- This strategy operates directly on the affected table/relation (with creating any intermediate or temporary tables), so if there is an issue during the operation, the -data in the incremental model is likely to be in an invalid state -- When using lightweight deletes, dbt-clickhouse enabled the setting `allow_nondeterministic_mutations`. In some very rare cases using non-deterministic incremental_predicates -this could result in a race condition for the updated/deleted items (and related log messages in the ClickHouse logs). To ensure consistent results the -incremental predicates should only include sub-queries on data that will not be modified during the incremental materialization. +ClickHouse added "lightweight deletes" as an experimental feature in version 22.8. Lightweight deletes are significantly +faster than ALTER TABLE ... DELETE +operations, because they don't require rewriting ClickHouse data parts. The incremental strategy `delete+insert` +utilizes lightweight deletes to implement +incremental materializations that perform significantly better than the "legacy" strategy. However, there are important +caveats to using this strategy: + +- Lightweight deletes must be enabled on your ClickHouse server using the setting + `allow_experimental_lightweight_delete=1` or you + must set `use_lw_deletes=true` in your profile (which will enable that setting for your dbt sessions) +- Lightweight deletes are now production ready, but there may be performance and other problems on ClickHouse versions + earlier than 23.3. +- This strategy operates directly on the affected table/relation (with creating any intermediate or temporary tables), + so if there is an issue during the operation, the + data in the incremental model is likely to be in an invalid state +- When using lightweight deletes, dbt-clickhouse enabled the setting `allow_nondeterministic_mutations`. In some very + rare cases using non-deterministic incremental_predicates + this could result in a race condition for the updated/deleted items (and related log messages in the ClickHouse logs). + To ensure consistent results the + incremental predicates should only include sub-queries on data that will not be modified during the incremental + materialization. ### The Append Strategy -This strategy replaces the `inserts_only` setting in previous versions of dbt-clickhouse. This approach simply appends new rows to the existing relation. -As a result duplicate rows are not eliminated, and there is no temporary or intermediate table. It is the fastest approach if duplicates are either permitted +This strategy replaces the `inserts_only` setting in previous versions of dbt-clickhouse. This approach simply appends +new rows to the existing relation. +As a result duplicate rows are not eliminated, and there is no temporary or intermediate table. It is the fastest +approach if duplicates are either permitted in the data or excluded by the incremental query WHERE clause/filter. ### The insert_overwrite Strategy (Experimental) + > [IMPORTANT] -> Currently, the insert_overwrite strategy is not fully functional with distributed materializations. - +> Currently, the insert_overwrite strategy is not fully functional with distributed materializations. + Performs the following steps: -1. Create a staging (temporary) table with the same structure as the incremental model relation: `CREATE TABLE AS `. + +1. Create a staging (temporary) table with the same structure as the incremental model relation: + `CREATE TABLE AS `. 2. Insert only new records (produced by `SELECT`) into the staging table. 3. Replace only new partitions (present in the staging table) into the target table. This approach has the following advantages: + - It is faster than the default strategy because it doesn't copy the entire table. -- It is safer than other strategies because it doesn't modify the original table until the INSERT operation completes successfully: in case of intermediate failure, the original table is not modified. -- It implements "partitions immutability" data engineering best practice. Which simplifies incremental and parallel data processing, rollbacks, etc. +- It is safer than other strategies because it doesn't modify the original table until the INSERT operation completes + successfully: in case of intermediate failure, the original table is not modified. +- It implements "partitions immutability" data engineering best practice. Which simplifies incremental and parallel data + processing, rollbacks, etc. -The strategy requires `partition_by` to be set in the model configuration. Ignores all other strategies-specific parameters of the model config. +The strategy requires `partition_by` to be set in the model configuration. Ignores all other strategies-specific +parameters of the model config. ## Additional ClickHouse Macros ### Model Materialization Utility Macros The following macros are included to facilitate creating ClickHouse specific tables and views: -- `engine_clause` -- Uses the `engine` model configuration property to assign a ClickHouse table engine. dbt-clickhouse uses the `MergeTree` engine by default. -- `partition_cols` -- Uses the `partition_by` model configuration property to assign a ClickHouse partition key. No partition key is assigned by default. -- `order_cols` -- Uses the `order_by` model configuration to assign a ClickHouse order by/sorting key. If not specified ClickHouse will use an empty tuple() and the table will be unsorted -- `primary_key_clause` -- Uses the `primary_key` model configuration property to assign a ClickHouse primary key. By default, primary key is set and ClickHouse will use the order by clause as the primary key. -- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to certain dbt-operations: distributed materializations, views creation, database creation. -- `ttl_config` -- Uses the `ttl` model configuration property to assign a ClickHouse table TTL expression. No TTL is assigned by default. + +- `engine_clause` -- Uses the `engine` model configuration property to assign a ClickHouse table engine. dbt-clickhouse + uses the `MergeTree` engine by default. +- `partition_cols` -- Uses the `partition_by` model configuration property to assign a ClickHouse partition key. No + partition key is assigned by default. +- `order_cols` -- Uses the `order_by` model configuration to assign a ClickHouse order by/sorting key. If not specified + ClickHouse will use an empty tuple() and the table will be unsorted +- `primary_key_clause` -- Uses the `primary_key` model configuration property to assign a ClickHouse primary key. By + default, primary key is set and ClickHouse will use the order by clause as the primary key. +- `on_cluster_clause` -- Uses the `cluster` profile property to add an `ON CLUSTER` clause to certain dbt-operations: + distributed materializations, views creation, database creation. +- `ttl_config` -- Uses the `ttl` model configuration property to assign a ClickHouse table TTL expression. No TTL is + assigned by default. ### s3Source Helper Macro -The `s3source` macro simplifies the process of selecting ClickHouse data directly from S3 using the ClickHouse S3 table function. It works by -populating the S3 table function parameters from a named configuration dictionary (the name of the dictionary must end in `s3`). The macro -first looks for the dictionary in the profile `vars`, and then in the model configuration. The dictionary can contain any of the following +The `s3source` macro simplifies the process of selecting ClickHouse data directly from S3 using the ClickHouse S3 table +function. It works by +populating the S3 table function parameters from a named configuration dictionary (the name of the dictionary must end +in `s3`). The macro +first looks for the dictionary in the profile `vars`, and then in the model configuration. The dictionary can contain +any of the following keys used to populate the parameters of the S3 table function: | Argument Name | Description | @@ -230,42 +291,145 @@ keys used to populate the parameters of the S3 table function: | aws_secret_access_key | The S3 secret key. | | role_arn | The ARN of a ClickhouseAccess IAM role to use to securely access the S3 objects. See this [documentation](https://clickhouse.com/docs/en/cloud/security/secure-s3) for more information. | | compression | The compression method used with the S3 objects. If not provided ClickHouse will attempt to determine compression based on the file name. | -See the [S3 test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/clickhouse/test_clickhouse_s3.py) for examples of how to use this macro. + +See +the [S3 test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/clickhouse/test_clickhouse_s3.py) +for examples of how to use this macro. # Contracts and Constraints -Only exact column type contracts are supported. For example, a contract with a UInt32 column type will fail if the model returns a UInt64 or other integer type. -ClickHouse also support _only_ `CHECK` constraints on the entire table/model. Primary key, foreign key, unique, and column level CHECK constraints are not supported. +Only exact column type contracts are supported. For example, a contract with a UInt32 column type will fail if the model +returns a UInt64 or other integer type. +ClickHouse also support _only_ `CHECK` constraints on the entire table/model. Primary key, foreign key, unique, and +column level CHECK constraints are not supported. (See ClickHouse documentation on primary/order by keys.) # Materialized Views (Experimental) -A `materialized_view` materialization should be a `SELECT` from an existing (source) table. The adapter will create a target table with the model name -and a ClickHouse MATERIALIZED VIEW with the name `_mv`. Unlike PostgreSQL, a ClickHouse materialized view is not "static" (and has -no corresponding REFRESH operation). Instead, it acts as an "insert trigger", and will insert new rows into the target table using the defined `SELECT` -"transformation" in the view definition on rows inserted into the source table. See the [test file] -(https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/materialized_view/test_materialized_view.py) for an introductory example + +A `materialized_view` materialization should be a `SELECT` from an existing (source) table. The adapter will create a +target table with the model name +and a ClickHouse MATERIALIZED VIEW with the name `_mv`. Unlike PostgreSQL, a ClickHouse materialized view is +not "static" (and has +no corresponding REFRESH operation). Instead, it acts as an "insert trigger", and will insert new rows into the target +table using the defined `SELECT` +"transformation" in the view definition on rows inserted into the source table. See the [test file](https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/materialized_view/test_materialized_view.py) +for an introductory example of how to use this functionality. +Clickhouse provides the ability for more than one materialized view to write records to the same target table. To +support this in dbt-clickhouse, you can construct a `UNION` in your model file, such that the SQL for each of your +materialized views is wrapped with comments of the form `--my_mv_name:begin` and `--my_mv_name:end`. + +For example the following will build two materialized views both writing data to the same destination table of the +model. The names of the materialized views will take the form `_mv1` and `_mv2` : + +``` +--mv1:begin +select a,b,c from {{ source('raw', 'table_1') }} +--mv1:end +union all +--mv2:begin +select a,b,c from {{ source('raw', 'table_2') }} +--mv2:end +``` + +> IMPORTANT! +> +> When updating a model with multiple materialized views (MVs), especially when renaming one of the MV names, +> dbt-clickhouse does not automatically drop the old MV. Instead, +> you will encounter the following warning: +`Warning - Table was detected with the same pattern as model name but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!) ` + +## Data catchup + +Currently, when creating a materialized view (MV), the target table is first populated with historical data before the +MV itself is created. + +In other words, dbt-clickhouse initially creates the target table and preloads it with historical data based on the +query defined for the MV. Only after this step is the MV created. + +If you prefer not to preload historical data during MV creation, you can disable this behavior by setting the catchup +config to False: + +```python +{{config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', + catchup=False +)}} +``` + +## Refreshable Materialized Views + +To use [Refreshable Materialized View](https://clickhouse.com/docs/en/materialized-view/refreshable-materialized-view), +please adjust the following configs as needed in your MV model (all these configs are supposed to be set inside a +refreshable config object): + +| Option | Description | Required | Default Value | +|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| +| refresh_interval | The interval clause (required) | Yes | | +| randomize | The randomization clause, will appear after `RANDOMIZE FOR` | | | +| append | If set to `True`, each refresh inserts rows into the table without deleting existing rows. The insert is not atomic, just like a regular INSERT SELECT. | | False | +| depends_on | A dependencies list for the refreshable mv. Please provide the dependencies in the following format `{schema}.{view_name}` | | | +| depends_on_validation | Whether to validate the existence of the dependencies provided in `depends_on`. In case a dependency doesn't contain a schema, the validation occurs on schema `default` | | False | + +A config example for refreshable materialized view: + +```python +{{ + config( + materialized='materialized_view', + refreshable={ + "interval": "EVERY 5 MINUTE", + "randomize": "1 MINUTE", + "append": True, + "depends_on": ['schema.depend_on_model'], + "depends_on_validation": True + } + ) +}} +``` + +### Limitations + +* When creating a refreshable materialized view (MV) in ClickHouse that has a dependency, ClickHouse does not throw an + error if the specified dependency does not exist at the time of creation. Instead, the refreshable MV remains in an + inactive state, waiting for the dependency to be satisfied before it starts processing updates or refreshing. + This behavior is by design, but it may lead to delays in data availability if the required dependency is not addressed + promptly. Users are advised to ensure all dependencies are correctly defined and exist before creating a refreshable + materialized view. +* As of today, there is no actual "dbt linkage" between the mv and its dependencies, therefore the creation order is not + guaranteed. +* The refreshable feature was not tested with multiple mvs directing to the same target model. + # Dictionary materializations (experimental) -See the tests in https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/dictionary/test_dictionary.py for examples of how to -implement materializations for ClickHouse dictionaries + +See the tests +in https://github.com/ClickHouse/dbt-clickhouse/blob/main/tests/integration/adapter/dictionary/test_dictionary.py for +examples of how to +implement materializations for ClickHouse dictionaries # Distributed materializations Notes: -- dbt-clickhouse queries now automatically include the setting `insert_distributed_sync = 1` in order to ensure that downstream incremental -materialization operations execute correctly. This could cause some distributed table inserts to run more slowly than expected. +- dbt-clickhouse queries now automatically include the setting `insert_distributed_sync = 1` in order to ensure that + downstream incremental + materialization operations execute correctly. This could cause some distributed table inserts to run more slowly than + expected. ## Distributed table materialization Distributed table created with following steps: + 1. Creates temp view with sql query to get right structure -2. Create empty local tables based on view -3. Create distributed table based on local tables. +2. Create empty local tables based on view +3. Create distributed table based on local tables. 4. Data inserts into distributed table, so it is distributed across shards without duplicating. ### Distributed table model example + ```sql {{ config( @@ -276,7 +440,8 @@ Distributed table created with following steps: ) }} -select id, created_at, item from {{ source('db', 'table') }} +select id, created_at, item +from {{ source('db', 'table') }} ``` ### Generated migrations @@ -284,36 +449,56 @@ select id, created_at, item from {{ source('db', 'table') }} ```sql CREATE TABLE db.table_local on cluster cluster ( - `id` UInt64, - `created_at` DateTime, - `item` String + `id` + UInt64, + `created_at` + DateTime, + `item` + String +) + ENGINE = ReplacingMergeTree + ORDER BY +( + id, + created_at ) -ENGINE = ReplacingMergeTree -ORDER BY (id, created_at) -SETTINGS index_granularity = 8192; + SETTINGS index_granularity = 8192; CREATE TABLE db.table on cluster cluster ( - `id` UInt64, - `created_at` DateTime, - `item` String + `id` + UInt64, + `created_at` + DateTime, + `item` + String ) -ENGINE = Distributed('cluster', 'db', 'table_local', cityHash64(id)); + ENGINE = Distributed +( + 'cluster', + 'db', + 'table_local', + cityHash64 +( + id +)); ``` ## Distributed incremental materialization -Incremental model based on the same idea as distributed table, the main difficulty is to process all incremental strategies correctly. +Incremental model based on the same idea as distributed table, the main difficulty is to process all incremental +strategies correctly. 1. _The Append Strategy_ just insert data into distributed table. 2. _The Delete+Insert_ Strategy creates distributed temp table to work with all data on every shard. 3. _The Default (Legacy) Strategy_ creates distributed temp and intermediate tables for the same reason. -Only shard tables are replacing, because distributed table does not keep data. +Only shard tables are replacing, because distributed table does not keep data. The distributed table reloads only when the full_refresh mode is enabled or the table structure may have changed. ### Distributed incremental model example + ```sql {{ config( @@ -324,7 +509,8 @@ The distributed table reloads only when the full_refresh mode is enabled or the ) }} -select id, created_at, item from {{ source('db', 'table') }} +select id, created_at, item +from {{ source('db', 'table') }} ``` ### Generated migrations @@ -332,27 +518,46 @@ select id, created_at, item from {{ source('db', 'table') }} ```sql CREATE TABLE db.table_local on cluster cluster ( - `id` UInt64, - `created_at` DateTime, - `item` String + `id` + UInt64, + `created_at` + DateTime, + `item` + String ) -ENGINE = MergeTree -SETTINGS index_granularity = 8192; + ENGINE = MergeTree + SETTINGS index_granularity = 8192; CREATE TABLE db.table on cluster cluster ( - `id` UInt64, - `created_at` DateTime, - `item` String + `id` + UInt64, + `created_at` + DateTime, + `item` + String ) -ENGINE = Distributed('cluster', 'db', 'table_local', cityHash64(id)); + ENGINE = Distributed +( + 'cluster', + 'db', + 'table_local', + cityHash64 +( + id +)); ``` ## Contributing -We welcome contributions from the community to help improve the dbt-ClickHouse adapter. Whether you’re fixing a bug, adding a new feature, or enhancing documentation, your efforts are greatly appreciated! -Please take a moment to read our [Contribution Guide](CONTRIBUTING.md) to get started. The guide provides detailed instructions on setting up your environment, running tests, and submitting pull requests. +We welcome contributions from the community to help improve the dbt-ClickHouse adapter. Whether you’re fixing a bug, +adding a new feature, or enhancing documentation, your efforts are greatly appreciated! + +Please take a moment to read our [Contribution Guide](CONTRIBUTING.md) to get started. The guide provides detailed +instructions on setting up your environment, running tests, and submitting pull requests. ## Original Author -ClickHouse wants to thank @[silentsokolov](https://github.com/silentsokolov) for creating this connector and for their valuable contributions. + +ClickHouse wants to thank @[silentsokolov](https://github.com/silentsokolov) for creating this connector and for their +valuable contributions. diff --git a/dbt/adapters/clickhouse/__version__.py b/dbt/adapters/clickhouse/__version__.py index cf085ece..49ac8b03 100644 --- a/dbt/adapters/clickhouse/__version__.py +++ b/dbt/adapters/clickhouse/__version__.py @@ -1 +1 @@ -version = '1.8.3' +version = '1.8.9' diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index 939977e4..c3a89c63 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -24,6 +24,8 @@ class ClickHouseCredentials(Credentials): cluster_mode: bool = False secure: bool = False verify: bool = True + client_cert: Optional[str] = None + client_cert_key: Optional[str] = None connect_timeout: int = 10 send_receive_timeout: int = 300 sync_request_timeout: int = 5 @@ -73,6 +75,8 @@ def _connection_keys(self): 'cluster_mode', 'secure', 'verify', + 'client_cert', + 'client_cert_key', 'connect_timeout', 'send_receive_timeout', 'sync_request_timeout', diff --git a/dbt/adapters/clickhouse/httpclient.py b/dbt/adapters/clickhouse/httpclient.py index e809bca7..3f2dbaf9 100644 --- a/dbt/adapters/clickhouse/httpclient.py +++ b/dbt/adapters/clickhouse/httpclient.py @@ -8,6 +8,7 @@ from dbt.adapters.clickhouse import ClickHouseColumn from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException +from dbt.adapters.clickhouse.util import hide_stack_trace class ChHttpClient(ChClientWrapper): @@ -15,13 +16,15 @@ def query(self, sql, **kwargs): try: return self._client.query(sql, **kwargs) except DatabaseError as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def command(self, sql, **kwargs): try: return self._client.command(sql, **kwargs) except DatabaseError as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]: try: @@ -34,7 +37,8 @@ def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]: for name, ch_type in zip(query_result.column_names, query_result.column_types) ] except DatabaseError as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def get_ch_setting(self, setting_name): setting = self._client.server_settings.get(setting_name) @@ -62,6 +66,8 @@ def _create_client(self, credentials): send_receive_timeout=credentials.send_receive_timeout, client_name=f'dbt-adapters/{dbt_adapters_version} dbt-clickhouse/{dbt_clickhouse_version}', verify=credentials.verify, + client_cert=credentials.client_cert, + client_cert_key=credentials.client_cert_key, query_limit=0, settings=self._conn_settings, ) diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index 0c4ac60a..156ad485 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -50,10 +50,16 @@ GET_CATALOG_MACRO_NAME = 'get_catalog' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' +IGNORED_SETTINGS = { + 'Memory': ['replicated_deduplication_window'], + 'S3': ['replicated_deduplication_window'], +} + @dataclass class ClickHouseConfig(AdapterConfig): engine: str = 'MergeTree()' + force_on_cluster: Optional[bool] = False order_by: Optional[Union[List[str], str]] = 'tuple()' partition_by: Optional[Union[List[str], str]] = None sharding_key: Optional[Union[List[str], str]] = 'rand()' @@ -492,12 +498,13 @@ def run_sql_for_tests(self, sql, fetch, conn): conn.state = 'close' @available - def get_model_settings(self, model): + def get_model_settings(self, model, engine='MergeTree'): settings = model['config'].get('settings', {}) materialization_type = model['config'].get('materialized') conn = self.connections.get_if_exists() conn.handle.update_model_settings(settings, materialization_type) res = [] + settings = self.filter_settings_by_engine(settings, engine) for key in settings: res.append(f' {key}={settings[key]}') settings_str = '' if len(res) == 0 else 'SETTINGS ' + ', '.join(res) + '\n' @@ -506,6 +513,24 @@ def get_model_settings(self, model): {settings_str} """ + @available + def filter_settings_by_engine(self, settings, engine): + filtered_settings = {} + + if engine.endswith('MergeTree'): + # Special case for MergeTree due to all its variations. + ignored_settings = IGNORED_SETTINGS.get('MergeTree', []) + else: + ignored_settings = IGNORED_SETTINGS.get(engine, []) + + for key, value in settings.items(): + if key in ignored_settings: + logger.warning(f"Setting {key} not available for engine {engine}, ignoring.") + else: + filtered_settings[key] = value + + return filtered_settings + @available def get_model_query_settings(self, model, additional_settings: dict = None): settings = model['config'].get('query_settings', {}) diff --git a/dbt/adapters/clickhouse/nativeclient.py b/dbt/adapters/clickhouse/nativeclient.py index d0d7fdd5..dfa4c5bd 100644 --- a/dbt/adapters/clickhouse/nativeclient.py +++ b/dbt/adapters/clickhouse/nativeclient.py @@ -10,6 +10,7 @@ from dbt.adapters.clickhouse.__version__ import version as dbt_clickhouse_version from dbt.adapters.clickhouse.dbclient import ChClientWrapper, ChRetryableException from dbt.adapters.clickhouse.logger import logger +from dbt.adapters.clickhouse.util import hide_stack_trace try: driver_version = pkg_resources.get_distribution('clickhouse-driver').version @@ -22,7 +23,8 @@ def query(self, sql, **kwargs): try: return NativeClientResult(self._client.execute(sql, with_column_types=True, **kwargs)) except clickhouse_driver.errors.Error as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def command(self, sql, **kwargs): try: @@ -30,7 +32,8 @@ def command(self, sql, **kwargs): if len(result) and len(result[0]): return result[0][0] except clickhouse_driver.errors.Error as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]: try: @@ -40,7 +43,8 @@ def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]: ) return [ClickHouseColumn.create(column[0], column[1]) for column in columns] except clickhouse_driver.errors.Error as ex: - raise DbtDatabaseError(str(ex).strip()) from ex + err_msg = hide_stack_trace(ex) + raise DbtDatabaseError(err_msg) from ex def get_ch_setting(self, setting_name): try: @@ -64,6 +68,8 @@ def _create_client(self, credentials: ClickHouseCredentials): client_name=f'dbt-adapters/{dbt_adapters_version} dbt-clickhouse/{dbt_clickhouse_version} clickhouse-driver/{driver_version}', secure=credentials.secure, verify=credentials.verify, + certfile=credentials.client_cert, + keyfile=credentials.client_cert_key, connect_timeout=credentials.connect_timeout, send_receive_timeout=credentials.send_receive_timeout, sync_request_timeout=credentials.sync_request_timeout, diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 3a8fae03..34339601 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -114,6 +114,8 @@ def create_from( # If the database is set, and the source schema is "defaulted" to the source.name, override the # schema with the database instead, since that's presumably what's intended for clickhouse schema = relation_config.schema + + cluster = quoting.credentials.cluster or '' can_on_cluster = None engine = None # We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages @@ -121,6 +123,9 @@ def create_from( if schema == relation_config.source_name and relation_config.database: schema = relation_config.database + if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true": + can_on_cluster = True + else: cluster = quoting.credentials.cluster or '' materialized = relation_config.config.get('materialized', '') diff --git a/dbt/adapters/clickhouse/util.py b/dbt/adapters/clickhouse/util.py index 9410ad7d..7c120386 100644 --- a/dbt/adapters/clickhouse/util.py +++ b/dbt/adapters/clickhouse/util.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +import os from dbt_common.exceptions import DbtRuntimeError @@ -13,3 +13,12 @@ def compare_versions(v1: str, v2: str) -> int: except ValueError: raise DbtRuntimeError("Version must consist of only numbers separated by '.'") return 0 + + +def hide_stack_trace(ex: Exception) -> str: + + if not os.getenv("HIDE_STACK_TRACE", ''): + return str(ex).strip() + + err_msg = str(ex).split("Stack trace")[0].strip() + return err_msg diff --git a/dbt/include/clickhouse/macros/materializations/dictionary.sql b/dbt/include/clickhouse/macros/materializations/dictionary.sql index 486ff0b0..a0a94c42 100644 --- a/dbt/include/clickhouse/macros/materializations/dictionary.sql +++ b/dbt/include/clickhouse/macros/materializations/dictionary.sql @@ -2,38 +2,21 @@ {%- set existing_relation = load_cached_relation(this) -%} {%- set target_relation = this.incorporate(type='dictionary') -%} - {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} - {%- set existing_intermediate_relation = load_cached_relation(intermediate_relation) -%} - {%- set backup_relation_type = 'dictionary' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - {%- set existing_backup_relation = load_cached_relation(backup_relation) -%} {%- set cluster_clause = on_cluster_clause(target_relation) -%} {%- set grant_config = config.get('grants') -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ drop_dictionary_if_exists(existing_backup_relation, cluster_clause) }} - {{ drop_dictionary_if_exists(existing_intermediate_relation, cluster_clause) }} - {{ run_hooks(pre_hooks, inside_transaction=True) }} {# create our new dictionary #} {% call statement('main') -%} - {{ clickhouse__get_create_dictionary_as_sql(intermediate_relation, cluster_clause, sql) }} + {{ clickhouse__get_create_dictionary_as_sql(target_relation, cluster_clause, sql) }} {%- endcall %} - {# cleanup #} - {% if existing_relation is not none %} - {% set existing_relation = load_cached_relation(existing_relation) %} - {% if existing_relation is not none %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} - {% endif %} - {% endif %} - {{ adapter.rename_relation(intermediate_relation, target_relation) }} - - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% set should_revoke = should_revoke(target_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do persist_docs(target_relation, model) %} @@ -42,7 +25,6 @@ {{ adapter.commit() }} - {{ drop_dictionary_if_exists(backup_relation, cluster_clause) }} {{ run_hooks(post_hooks, inside_transaction=False) }} @@ -55,7 +37,7 @@ {%- set fields = config.get('fields') -%} {%- set source_type = config.get('source_type') -%} - CREATE DICTIONARY {{ relation }} {{ cluster_clause }} + CREATE OR REPLACE DICTIONARY {{ relation }} {{ cluster_clause }} ( {%- for (name, data_type) in fields -%} {{ name }} {{ data_type }}{%- if not loop.last -%},{%- endif -%} @@ -71,6 +53,9 @@ ) LAYOUT({{ config.get('layout') }}) LIFETIME({{ config.get('lifetime') }}) + {%- if config.get('range') %} + RANGE({{ config.get('range') }}) + {%- endif %} {% endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/distributed_table.sql b/dbt/include/clickhouse/macros/materializations/distributed_table.sql index 1ad2cf66..ccf31bda 100644 --- a/dbt/include/clickhouse/macros/materializations/distributed_table.sql +++ b/dbt/include/clickhouse/macros/materializations/distributed_table.sql @@ -142,7 +142,7 @@ {{ order_cols(label="order by") }} {{ primary_key_clause(label="primary key") }} {{ partition_cols(label="partition by") }} - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_settings(model, config.get('engine', default='MergeTree')) }} {%- endmacro %} {% macro create_distributed_local_table(distributed_relation, shard_relation, structure_relation, sql_query=none) -%} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index d06c99e0..37bd3bfd 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -52,7 +52,7 @@ {% endcall %} {% if existing_relation_local is none %} - -- No existing table, simply create a new one + -- No existing local table, recreate local and distributed tables {{ create_distributed_local_table(target_relation, target_relation_local, view_relation, sql) }} {% elif full_refresh_mode %} @@ -61,7 +61,7 @@ {% do adapter.drop_relation(distributed_intermediate_relation) or '' %} {% set need_swap = true %} - {% elif inserts_only or unique_key is none -%} + {% elif inserts_only -%} -- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing -- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter -- specific configurable that is used to avoid creating an expensive intermediate table. @@ -74,10 +74,10 @@ {%- endif -%} {% else %} {% if existing_relation is none %} - {{ drop_relation_if_exists(existing_relation) }} {% do run_query(create_distributed_table(target_relation, target_relation_local)) %} {% set existing_relation = target_relation %} {% endif %} + {% set incremental_strategy = adapter.calculate_incremental_strategy(config.get('incremental_strategy')) %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} {%- if on_schema_change != 'ignore' %} @@ -92,6 +92,15 @@ {% endif %} {% if incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} + {% elif incremental_strategy == 'insert_overwrite' %} + {%- set partition_by = config.get('partition_by') -%} + {% if partition_by is none or partition_by|length == 0 %} + {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %} + {% endif %} + {% if inserts_only or unique_key is not none or incremental_predicates is not none %} + {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %} + {% endif %} + {% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %} {% elif incremental_strategy == 'append' %} {%- if language == 'python' -%} {%- call statement('main', language=language) -%} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index 3427b669..a193819c 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -76,7 +76,7 @@ {% call statement('main') %} {{ clickhouse__insert_into(target_relation, sql) }} {% endcall %} - {% elif incremental_strategy == 'insert_overwrite' %}#} + {% elif incremental_strategy == 'insert_overwrite' %} {%- set partition_by = config.get('partition_by') -%} {% if partition_by is none or partition_by|length == 0 %} {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %} @@ -84,7 +84,7 @@ {% if inserts_only or unique_key is not none or incremental_predicates is not none %} {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %} {% endif %} - {% do clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} %} + {% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, False) %} {% endif %} {% endif %} @@ -266,40 +266,57 @@ {{ drop_relation_if_exists(distributed_new_data_relation) }} {% endmacro %} -{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} +{% macro clickhouse__incremental_insert_overwrite(existing_relation, partition_by, is_distributed=False) %} {%- set new_data_relation = make_intermediate_relation(existing_relation, '__dbt_new_data') -%} {{ drop_relation_if_exists(new_data_relation) }} - {% call statement('create_new_data_temp') -%} - {{ get_create_table_as_sql(False, new_data_relation, sql) }} - {%- endcall %} - {% call statement('main') -%} - create table {{ intermediate_relation }} as {{ existing_relation }} - {%- endcall %} - {% call statement('insert_new_data') -%} - insert into {{ intermediate_relation }} select * from {{ new_data_relation }} - {%- endcall %} + {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} + + + {%- set local_suffix = adapter.get_clickhouse_local_suffix() -%} + {%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + + {% if is_distributed %} + {{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }} + {% else %} + {% call statement('main') %} + {{ get_create_table_as_sql(False, new_data_relation, sql) }} + {% endcall %} + {% endif %} + + {# Get the parts from the cluster table, since the partitions between shards may not overlap due to distribution #} {% if execute %} {% set select_changed_partitions %} SELECT DISTINCT partition_id - FROM system.parts + {% if is_distributed %} + FROM cluster({{ adapter.get_clickhouse_cluster_name() }}, system.parts) + {% else %} + FROM system.parts + {% endif %} WHERE active - AND database = '{{ intermediate_relation.schema }}' - AND table = '{{ intermediate_relation.identifier }}' + AND database = '{{ new_data_relation.schema }}' + AND table = '{{ new_data_relation.identifier }}' {% endset %} {% set changed_partitions = run_query(select_changed_partitions).rows %} {% else %} {% set changed_partitions = [] %} {% endif %} + {% if changed_partitions %} - {% call statement('replace_partitions') %} - alter table {{ existing_relation }} - {%- for partition in changed_partitions %} - replace partition id '{{ partition['partition_id'] }}' - from {{ intermediate_relation }} - {{- ', ' if not loop.last }} - {%- endfor %} + {% call statement('replace_partitions') %} + {% if is_distributed %} + alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }} + {% else %} + alter table {{ existing_relation }} + {% endif %} + {%- for partition in changed_partitions %} + replace partition id '{{ partition['partition_id'] }}' + from {{ new_data_relation }} + {{- ', ' if not loop.last }} + {%- endfor %} {% endcall %} {% endif %} - {% do adapter.drop_relation(intermediate_relation) %} + + {% do adapter.drop_relation(distributed_new_data_relation) %} {% do adapter.drop_relation(new_data_relation) %} {% endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/materialized_view.sql b/dbt/include/clickhouse/macros/materializations/materialized_view.sql index 28cb626d..7479d425 100644 --- a/dbt/include/clickhouse/macros/materializations/materialized_view.sql +++ b/dbt/include/clickhouse/macros/materializations/materialized_view.sql @@ -6,8 +6,9 @@ {%- materialization materialized_view, adapter='clickhouse' -%} {%- set target_relation = this.incorporate(type='table') -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} {%- set cluster_clause = on_cluster_clause(target_relation) -%} + {%- set refreshable_clause = refreshable_mv_clause() -%} + {# look for an existing relation for the target table and create backup relations if necessary #} {%- set existing_relation = load_cached_relation(this) -%} @@ -35,33 +36,79 @@ -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} + -- extract the names of the materialized views from the sql + {% set view_names = modules.re.findall('--(?:\s)?([^:]+):begin', sql) %} + + -- extract the sql for each of the materialized view into a map + {% set views = {} %} + {% if view_names %} + {% for view_name in view_names %} + {% set view_sql = modules.re.findall('--(?:\s)?' + view_name + ':begin(.*)--(?:\s)?' + view_name + ':end', sql, flags=modules.re.DOTALL)[0] %} + {%- set _ = views.update({view_name: view_sql}) -%} + {% endfor %} + {% else %} + {%- set _ = views.update({"mv": sql}) -%} + {% endif %} + {% if backup_relation is none %} {{ log('Creating new materialized view ' + target_relation.name )}} - {% call statement('main') -%} - {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql) }} - {%- endcall %} + {% set catchup_data = config.get("catchup", True) %} + {{ clickhouse__get_create_materialized_view_as_sql(target_relation, sql, views, catchup_data) }} {% elif existing_relation.can_exchange %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {% call statement('drop existing materialized view') %} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {% endcall %} + -- in this section, we look for mvs that has the same pattern as this model, but for some reason, + -- are not listed in the model. This might happen when using multiple mv, and renaming one of the mv in the model. + -- In case such mv found, we raise a warning to the user, that they might need to drop the mv manually. + {{ log('Searching for existing materialized views with the pattern of ' + target_relation.name) }} + {{ log('Views dictionary contents: ' + views | string) }} + + {% set tables_query %} + select table_name + from information_schema.tables + where table_schema = '{{ existing_relation.schema }}' + and table_name like '%{{ target_relation.name }}%' + and table_type = 'VIEW' + {% endset %} + + {% set tables_result = run_query(tables_query) %} + {% if tables_result is not none and tables_result.columns %} + {% set tables = tables_result.columns[0].values() %} + {{ log('Current mvs found in ClickHouse are: ' + tables | join(', ')) }} + {% set mv_names = [] %} + {% for key in views.keys() %} + {% do mv_names.append(target_relation.name ~ "_" ~ key) %} + {% endfor %} + {{ log('Model mvs to replace ' + mv_names | string) }} + {% for table in tables %} + {% if table not in mv_names %} + {{ log('Warning - Table "' + table + '" was detected with the same pattern as model name "' + target_relation.name + '" but was not found in this run. In case it is a renamed mv that was previously part of this model, drop it manually (!!!)') }} + {% endif %} + {% endfor %} + {% else %} + {{ log('No existing mvs found matching the pattern. continuing..', info=True) }} + {% endif %} {% if should_full_refresh() %} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} + {% call statement('main') -%} {{ get_create_table_as_sql(False, backup_relation, sql) }} {%- endcall %} {% do exchange_tables_atomic(backup_relation, existing_relation) %} + + {{ clickhouse__create_mvs(existing_relation, cluster_clause, refreshable_clause, views) }} {% else %} -- we need to have a 'main' statement {% call statement('main') -%} select 1 {%- endcall %} + + -- try to alter view first to replace sql, else drop and create + {{ clickhouse__update_mvs(target_relation, cluster_clause, refreshable_clause, views) }} + {% endif %} - {% call statement('create new materialized view') %} - {{ clickhouse__create_mv_sql(mv_relation, existing_relation, cluster_clause, sql) }} - {% endcall %} {% else %} {{ log('Replacing existing materialized view ' + target_relation.name) }} - {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) }} + {{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) }} {% endif %} -- cleanup @@ -78,7 +125,12 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} - {{ return({'relations': [target_relation, mv_relation]}) }} + {% set relations = [target_relation] %} + {% for view in views %} + {{ relations.append(target_relation.derivative('_' + view, 'materialized_view')) }} + {% endfor %} + + {{ return({'relations': relations}) }} {%- endmaterialization -%} @@ -89,30 +141,80 @@ 2. Create a materialized view using the SQL in the model that inserts data into the table creating during step 1 #} -{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql) -%} - {% call statement('create_target_table') %} +{% macro clickhouse__get_create_materialized_view_as_sql(relation, sql, views, catchup=True ) -%} + {% call statement('main') %} + {% if catchup == True %} {{ get_create_table_as_sql(False, relation, sql) }} + {% else %} + {{ log('Catchup data config was set to false, skipping mv-target-table initial insertion ')}} + {% set has_contract = config.get('contract').enforced %} + {{ create_table_or_empty(False, relation, sql, has_contract) }} + {% endif %} {% endcall %} {%- set cluster_clause = on_cluster_clause(relation) -%} + {%- set refreshable_clause = refreshable_mv_clause() -%} {%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%} - {{ clickhouse__create_mv_sql(mv_relation, relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(relation, cluster_clause, refreshable_clause, views) }} {%- endmacro %} +{% macro clickhouse__drop_mv(mv_relation, cluster_clause) -%} + {% call statement('drop existing mv: ' + mv_relation.name) -%} + drop view if exists {{ mv_relation }} {{ cluster_clause }} + {% endcall %} +{%- endmacro %} + +{% macro clickhouse__create_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) -%} + {% call statement('create existing mv: ' + mv_relation.name) -%} + create materialized view if not exists {{ mv_relation }} {{ cluster_clause }} + {{ refreshable_clause }} + to {{ target_relation }} + as {{ view_sql }} + {% endcall %} +{%- endmacro %} + +{% macro clickhouse__modify_mv(mv_relation, cluster_clause, view_sql) -%} + {% call statement('modify existing mv: ' + mv_relation.name) -%} + alter table {{ mv_relation }} {{ cluster_clause }} modify query {{ view_sql }} + {% endcall %} +{%- endmacro %} + +{% macro clickhouse__update_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) -%} + {% set existing_relation = adapter.get_relation(database=mv_relation.database, schema=mv_relation.schema, identifier=mv_relation.identifier) %} + {% if existing_relation %} + {{ clickhouse__modify_mv(mv_relation, cluster_clause, view_sql) }}; + {% else %} + {{ clickhouse__drop_mv(mv_relation, cluster_clause) }}; + {{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) }}; + {% endif %} -{% macro clickhouse__create_mv_sql(mv_relation, target_table, cluster_clause, sql) -%} - create materialized view if not exists {{ mv_relation }} {{ cluster_clause }} - to {{ target_table }} - as {{ sql }} {%- endmacro %} +{% macro clickhouse__drop_mvs(target_relation, cluster_clause, views) -%} + {% for view in views.keys() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {{ clickhouse__drop_mv(mv_relation, cluster_clause) }}; + {% endfor %} +{%- endmacro %} + +{% macro clickhouse__create_mvs(target_relation, cluster_clause, refreshable_clause, views) -%} + {% for view, view_sql in views.items() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) }}; + {% endfor %} +{%- endmacro %} -{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql) %} +{% macro clickhouse__update_mvs(target_relation, cluster_clause, refreshable_clause, views) -%} + {% for view, view_sql in views.items() %} + {%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%} + {{ clickhouse__update_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) }}; + {% endfor %} +{%- endmacro %} + +{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) %} {# drop existing materialized view while we recreate the target table #} {%- set cluster_clause = on_cluster_clause(target_relation) -%} - {%- set mv_relation = target_relation.derivative('_mv', 'materialized_view') -%} - {% call statement('drop existing mv') -%} - drop view if exists {{ mv_relation }} {{ cluster_clause }} - {%- endcall %} + {%- set refreshable_clause = refreshable_mv_clause() -%} + {{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }} {# recreate the target table #} {% call statement('main') -%} @@ -122,5 +224,111 @@ {{ adapter.rename_relation(intermediate_relation, target_relation) }} {# now that the target table is recreated, we can finally create our new view #} - {{ clickhouse__create_mv_sql(mv_relation, target_relation, cluster_clause, sql) }} + {{ clickhouse__create_mvs(target_relation, cluster_clause, refreshable_clause, views) }} {% endmacro %} + +{% macro refreshable_mv_clause() %} + {%- if config.get('refreshable') is not none -%} + + {% set refreshable_config = config.get('refreshable') %} + {% if refreshable_config is not mapping %} + {% do exceptions.raise_compiler_error( + "The 'refreshable' configuration must be defined as a dictionary. Please review the docs for more details." + ) %} + {% endif %} + + {% set refresh_interval = refreshable_config.get('interval', none) %} + {% set refresh_randomize = refreshable_config.get('randomize', none) %} + {% set depends_on = refreshable_config.get('depends_on', none) %} + {% set depends_on_validation = refreshable_config.get('depends_on_validation', false) %} + {% set append = refreshable_config.get('append', false) %} + + {% if not refresh_interval %} + {% do exceptions.raise_compiler_error( + "The 'refreshable' configuration is defined, but 'interval' is missing. " + ~ "This is required to create a refreshable materialized view." + ) %} + {% endif %} + + {% if refresh_interval %} + REFRESH {{ refresh_interval }} + {# This is a comment to force a new line between REFRESH and RANDOMIZE clauses #} + {%- if refresh_randomize -%} + RANDOMIZE FOR {{ refresh_randomize }} + {%- endif -%} + {% endif %} + + {% if depends_on %} + {% set depends_on_list = [] %} + + {% if depends_on is string %} + {% set depends_on_list = [depends_on] %} + {% elif depends_on is iterable %} + {% set temp_list = depends_on_list %} + {%- for dep in depends_on %} + {% if dep is string %} + {% do temp_list.append(dep) %} + {% else %} + {% do exceptions.raise_compiler_error( + "The 'depends_on' configuration must be either a string or a list of strings." + ) %} + {% endif %} + {% endfor %} + {% set depends_on_list = temp_list %} + {% else %} + {% do exceptions.raise_compiler_error( + "The 'depends_on' configuration must be either a string or a list of strings." + ) %} + {% endif %} + + {% if depends_on_validation and depends_on_list | length > 0 %} + {%- for dep in depends_on_list %} + {% do validate_refreshable_mv_existence(dep) %} + {%- endfor %} + {% endif %} + + DEPENDS ON {{ depends_on_list | join(', ') }} + {% endif %} + + {%- if append -%} + APPEND + {%- endif -%} + + {%- endif -%} +{% endmacro %} + + +{% macro validate_refreshable_mv_existence(mv) %} + {{ log(mv + ' was recognized as a refreshable mv dependency, checking its existence') }} + {% set default_database = "default" %} + + {%- set parts = mv.split('.') %} + {%- if parts | length == 2 %} + {%- set database = parts[0] %} + {%- set table = parts[1] %} + {%- else %} + {%- set database = default_database %} + {%- set table = parts[0] %} + {%- endif %} + + {%- set condition = "database='" + database + "' and view='" + table + "'" %} + + {% set query %} + select count(*) + from system.view_refreshes + where {{ condition }} + {% endset %} + + {% set tables_result = run_query(query) %} + {{ log(tables_result.columns[0].values()[0]) }} + {% if tables_result.columns[0].values()[0] > 0 %} + {{ log('MV ' + mv + ' exists.') }} + {% else %} + {% do exceptions.raise_compiler_error( + 'No existing MV found matching MV: ' + mv + ) %} + {% endif %} +{% endmacro %} + + + diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index f543530b..c521b614 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -130,10 +130,11 @@ {% macro on_cluster_clause(relation, force_sync) %} {% set active_cluster = adapter.get_clickhouse_cluster_name() %} {%- if active_cluster is not none and relation.should_on_cluster %} - ON CLUSTER '{{ active_cluster }}' - {% if force_sync %} + {# Add trailing whitespace to avoid problems when this clause is not last #} + ON CLUSTER {{ active_cluster + ' ' }} + {%- if force_sync %} SYNC - {% endif %} + {%- endif %} {%- endif %} {%- endmacro -%} @@ -146,9 +147,8 @@ {% call statement('create_table_empty') %} {{ create_table }} {% endcall %} - {% if config.get('projections')%} - {{ projection_statement(relation) }} - {% endif %} + {{ add_index_and_projections(relation) }} + {%- set language = model['language'] -%} {%- if language == 'python' -%} {%- set code = py_write(compiled_code, relation) -%} @@ -162,18 +162,40 @@ {%- endif -%} {%- endmacro %} -{% macro projection_statement(relation) %} +{# + A macro that adds any configured projections or indexes at the same time. + We optimise to reduce the number of ALTER TABLE statements that are run to avoid + Code: 517. + DB::Exception: Metadata on replica is not up to date with common metadata in Zookeeper. + It means that this replica still not applied some of previous alters. Probably too many + alters executing concurrently (highly not recommended). +#} +{% macro add_index_and_projections(relation) %} {%- set projections = config.get('projections', default=[]) -%} - - {%- for projection in projections %} - {% call statement('add_projections') %} - ALTER TABLE {{ relation }} ADD PROJECTION {{ projection.get('name') }} - ( - {{ projection.get('query') }} - ) - {%endcall %} - {%- endfor %} -{%- endmacro %} + {%- set indexes = config.get('indexes', default=[]) -%} + + {% if projections | length > 0 or indexes | length > 0 %} + {% call statement('add_projections_and_indexes') %} + ALTER TABLE {{ relation }} + {%- if projections %} + {%- for projection in projections %} + ADD PROJECTION {{ projection.get('name') }} ({{ projection.get('query') }}) + {%- if not loop.last or indexes | length > 0 -%} + , + {% endif %} + {%- endfor %} + {%- endif %} + {%- if indexes %} + {%- for index in indexes %} + ADD INDEX {{ index.get('name') }} {{ index.get('definition') }} + {%- if not loop.last -%} + , + {% endif %} + {% endfor %} + {% endif %} + {% endcall %} + {% endif %} +{% endmacro %} {% macro create_table_or_empty(temporary, relation, sql, has_contract) -%} {%- set sql_header = config.get('sql_header', none) -%} @@ -184,7 +206,7 @@ {% if temporary -%} create temporary table {{ relation }} engine Memory - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_settings(model, 'Memory') }} as ( {{ sql }} ) @@ -200,7 +222,7 @@ {{ primary_key_clause(label="primary key") }} {{ partition_cols(label="partition by") }} {{ ttl_config(label="ttl")}} - {{ adapter.get_model_settings(model) }} + {{ adapter.get_model_settings(model, config.get('engine', default='MergeTree')) }} {%- if not has_contract %} {%- if not adapter.is_before_version('22.7.1.2484') %} @@ -210,6 +232,7 @@ {{ sql }} ) {%- endif %} + {{ adapter.get_model_query_settings(model) }} {%- endif %} {%- endmacro %} diff --git a/dbt/include/clickhouse/macros/materializations/view.sql b/dbt/include/clickhouse/macros/materializations/view.sql index f59d2d50..94a7494c 100644 --- a/dbt/include/clickhouse/macros/materializations/view.sql +++ b/dbt/include/clickhouse/macros/materializations/view.sql @@ -2,52 +2,24 @@ {%- set existing_relation = load_cached_relation(this) -%} {%- set target_relation = this.incorporate(type='view') -%} - {%- set backup_relation = none -%} - {%- set preexisting_backup_relation = none -%} - {%- set preexisting_intermediate_relation = none -%} - - {% if existing_relation is not none %} - {%- set backup_relation_type = existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - {% if not existing_relation.can_exchange %} - {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} - {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} - {% endif %} - {% endif %} {% set grant_config = config.get('grants') %} {{ run_hooks(pre_hooks, inside_transaction=False) }} - -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% if backup_relation is none %} + {% if existing_relation is none %} {{ log('Creating new relation ' + target_relation.name )}} - -- There is not existing relation, so we can just create - {% call statement('main') -%} - {{ get_create_view_as_sql(target_relation, sql) }} - {%- endcall %} - {% elif existing_relation.can_exchange %} - -- We can do an atomic exchange, so no need for an intermediate - {% call statement('main') -%} - {{ get_create_view_as_sql(backup_relation, sql) }} - {%- endcall %} - {% do exchange_tables_atomic(backup_relation, existing_relation) %} {% else %} - -- We have to use an intermediate and rename accordingly - {% call statement('main') -%} - {{ get_create_view_as_sql(intermediate_relation, sql) }} - {%- endcall %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} - {{ adapter.rename_relation(intermediate_relation, target_relation) }} + {{ log('Relation ' + target_relation.name + ' already exists, replacing it' )}} {% endif %} + {% call statement('main') -%} + {{ get_create_view_as_sql(target_relation, sql) }} + {%- endcall %} + -- cleanup {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} @@ -58,8 +30,6 @@ {{ adapter.commit() }} - {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} @@ -70,11 +40,13 @@ {% macro clickhouse__create_view_as(relation, sql) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} - create view {{ relation.include(database=False) }} {{ on_cluster_clause(relation)}} as ( + + create or replace view {{ relation.include(database=False) }} {{ on_cluster_clause(relation)}} {% set contract_config = config.get('contract') %} {% if contract_config.enforced %} {{ get_assert_columns_equivalent(sql) }} {%- endif %} + as ( {% if sql is none %} {{clickhouse__create_select_query_from_schema()}} {%- else -%} @@ -82,8 +54,8 @@ {%- endif -%} {{ adapter.get_model_query_settings(model) }} ) - {% if model.get('config').get('materialized') == 'view' %} - {{ adapter.get_model_settings(model) }} - {%- endif %} + {% if model.get('config').get('materialized') == 'view' %} + {{ adapter.get_model_settings(model, config.get('engine', default='MergeTree')) }} + {%- endif %} {%- endmacro %} diff --git a/dbt/include/clickhouse/macros/utils/utils.sql b/dbt/include/clickhouse/macros/utils/utils.sql index 1bf0ea5f..a07a6137 100644 --- a/dbt/include/clickhouse/macros/utils/utils.sql +++ b/dbt/include/clickhouse/macros/utils/utils.sql @@ -54,7 +54,7 @@ {% macro clickhouse__split_part(string_text, delimiter_text, part_number) %} - splitByChar('{{delimiter_text}}', {{ string_text }})[{{ part_number }}] + splitByString({{delimiter_text}}, {{ string_text }})[{{ part_number }}] {% endmacro %} @@ -108,3 +108,4 @@ {% macro clickhouse__array_concat(array_1, array_2) -%} arrayConcat({{ array_1 }}, {{ array_2 }}) {% endmacro %} + diff --git a/setup.py b/setup.py index 43b2151e..b156c6a6 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ def _dbt_clickhouse_version(): 'clickhouse-driver>=0.2.6', 'setuptools>=0.69', ], - python_requires=">=3.8", + python_requires=">=3.9", platforms='any', classifiers=[ 'Development Status :: 5 - Production/Stable', @@ -67,7 +67,6 @@ def _dbt_clickhouse_version(): 'Operating System :: Microsoft :: Windows', 'Operating System :: MacOS :: MacOS X', 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', diff --git a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py index ff6e2efb..c75396ff 100644 --- a/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py +++ b/tests/integration/adapter/clickhouse/test_clickhouse_table_materializations.py @@ -249,3 +249,105 @@ def test_base(self, project): assert len(results) == 1 self.assert_total_count_correct(project) + + +class TestMergeTreeForceClusterMaterialization(BaseSimpleMaterializations): + '''Test MergeTree materialized view is created across a cluster using the + `force_on_cluster` config argument + ''' + + @pytest.fixture(scope="class") + def models(self): + config_force_on_cluster = """ + {{ config( + engine='MergeTree', + materialized='materialized_view', + force_on_cluster='true' + ) + }} + """ + + return { + "force_on_cluster.sql": config_force_on_cluster + model_base, + "schema.yml": schema_base_yml, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "schema.yml": base_seeds_schema_yml, + "base.csv": seeds_base_csv, + } + + def assert_total_count_correct(self, project): + '''Check if table is created on cluster''' + cluster = project.test_config['cluster'] + + # check if data is properly distributed/replicated + table_relation = relation_from_name(project.adapter, "force_on_cluster") + # ClickHouse cluster in the docker-compose file + # under tests/integration is configured with 3 nodes + host_count = project.run_sql( + f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'", + fetch="one", + ) + assert host_count[0] > 1 + + table_count = project.run_sql( + f"select count() From clusterAllReplicas('{cluster}', system.tables) " + f"where database='{table_relation.schema}' and name='{table_relation.identifier}'", + fetch="one", + ) + + assert table_count[0] == 3 + + mv_count = project.run_sql( + f"select count() From clusterAllReplicas('{cluster}', system.tables) " + f"where database='{table_relation.schema}' and name='{table_relation.identifier}_mv'", + fetch="one", + ) + + assert mv_count[0] == 3 + + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_base(self, project): + # cluster setting must exist + cluster = project.test_config['cluster'] + assert cluster + + # seed command + results = run_dbt(["seed"]) + # seed result length + assert len(results) == 1 + + # run command + results = run_dbt() + # run result length + assert len(results) == 1 + + # names exist in result nodes + check_result_nodes_by_name(results, ["force_on_cluster"]) + + # check relation types + expected = { + "base": "table", + "replicated": "table", + } + check_relation_types(project.adapter, expected) + + relation = relation_from_name(project.adapter, "base") + # table rowcount + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + assert result[0] == 10 + + # relations_equal + self.assert_total_count_correct(project) + + # run full refresh + results = run_dbt(['--debug', 'run', '--full-refresh']) + # run result length + assert len(results) == 1 + + self.assert_total_count_correct(project) diff --git a/tests/integration/adapter/dictionary/test_dictionary.py b/tests/integration/adapter/dictionary/test_dictionary.py index 77ee1aae..70c6ea8c 100644 --- a/tests/integration/adapter/dictionary/test_dictionary.py +++ b/tests/integration/adapter/dictionary/test_dictionary.py @@ -114,6 +114,33 @@ - name: people """ +RANGE_DICTIONARY = """ +{{ config( + materialized='dictionary', + fields=[ + ('id', 'UInt8'), + ('start', 'UInt8'), + ('stop', 'UInt8'), + ('value', 'String') + ], + primary_key='id', + layout='RANGE_HASHED()', + lifetime='MIN 0 MAX 0', + source_type='clickhouse', + range='min start max stop' +) }} + +select + c1 as id, + c2 as start, + c3 as stop, + c4 as value +from values( + (0, 0, 2, 'foo'), + (0, 3, 5, 'bar') +) +""" + class TestQueryDictionary: @pytest.fixture(scope="class") @@ -193,3 +220,17 @@ def test_create(self, project): "select count(distinct LocationID) from taxi_zone_dictionary", fetch="all" ) assert results[0][0] == 265 + + +class TestRangeDictionary: + @pytest.fixture(scope="class") + def models(self): + return {"range_dictionary.sql": RANGE_DICTIONARY} + + def test_create(self, project): + run_dbt() + + results = project.run_sql("select dictGet(range_dictionary, 'value', 0, 1)", fetch="all") + assert results[0][0] == "foo" + results = project.run_sql("select dictGet(range_dictionary, 'value', 0, 5)", fetch="all") + assert results[0][0] == "bar" diff --git a/tests/integration/adapter/incremental/test_base_incremental.py b/tests/integration/adapter/incremental/test_base_incremental.py index 0c522df7..ea372b9b 100644 --- a/tests/integration/adapter/incremental/test_base_incremental.py +++ b/tests/integration/adapter/incremental/test_base_incremental.py @@ -195,7 +195,7 @@ def models(self): SELECT partitionKey1, partitionKey2, orderKey, value FROM VALUES( 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', - (1, 'p1', 1, 'a'), (1, 'p1', 1, 'b'), (2, 'p1', 1, 'c'), (2, 'p2', 1, 'd') + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') ) {% else %} SELECT partitionKey1, partitionKey2, orderKey, value @@ -207,7 +207,7 @@ def models(self): """ -class TestInsertReplaceIncremental: +class TestInsertOverwriteIncremental: @pytest.fixture(scope="class") def models(self): return {"insert_overwrite_inc.sql": insert_overwrite_inc} @@ -220,9 +220,9 @@ def test_insert_overwrite_incremental(self, project): ) assert result == [ (1, 'p1', 1, 'a'), - (1, 'p1', 1, 'b'), - (2, 'p1', 1, 'c'), - (2, 'p2', 1, 'd'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), ] run_dbt() result = project.run_sql( @@ -231,7 +231,63 @@ def test_insert_overwrite_incremental(self, project): ) assert result == [ (1, 'p1', 2, 'e'), - (2, 'p1', 1, 'c'), - (2, 'p2', 1, 'd'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + (3, 'p1', 2, 'f'), + ] + + +# "ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}/{uuid}/', '{replica}')" +insert_overwrite_replicated_inc = """ +{{ config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by=['partitionKey1', 'partitionKey2'], + order_by=['orderKey'], + engine="ReplicatedMergeTree('/clickhouse/tables/{uuid}/one_shard', '{server_index}')" + ) +}} +{% if not is_incremental() %} + SELECT partitionKey1, partitionKey2, orderKey, value + FROM VALUES( + 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') + ) +{% else %} + SELECT partitionKey1, partitionKey2, orderKey, value + FROM VALUES( + 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', + (1, 'p1', 2, 'e'), (3, 'p1', 2, 'f') + ) +{% endif %} +""" + + +class TestInsertOverwriteReplicatedIncremental: + @pytest.fixture(scope="class") + def models(self): + return {"insert_overwrite_replicated_inc.sql": insert_overwrite_replicated_inc} + + def test_insert_overwrite_replicated_incremental(self, project): + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 1, 'a'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + ] + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 2, 'e'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), (3, 'p1', 2, 'f'), ] diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index 2a4e73f2..71568547 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -264,3 +264,60 @@ def models(self): ) def test_incremental_not_schema_change(self, project): super().test_incremental_not_schema_change(project) + + +insert_overwrite_dist_inc = """ +{{ config( + materialized='distributed_incremental', + incremental_strategy='insert_overwrite', + partition_by=['partitionKey'], + order_by=['orderKey'], + sharding_key='shardingKey' + ) +}} +{% if not is_incremental() %} + SELECT shardingKey, partitionKey, orderKey, value + FROM VALUES( + 'shardingKey UInt8, partitionKey String, orderKey UInt8, value String', + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') + ) +{% else %} + SELECT shardingKey, partitionKey, orderKey, value + FROM VALUES( + 'shardingKey UInt8, partitionKey String, orderKey UInt8, value String', + (1, 'p1', 2, 'e'), (3, 'p1', 2, 'f') + ) +{% endif %} +""" + + +class TestInsertOverwriteDistributedIncremental: + @pytest.fixture(scope="class") + def models(self): + return {"insert_overwrite_dist_inc.sql": insert_overwrite_dist_inc} + + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_insert_overwrite_distributed_incremental(self, project): + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 1, 'a'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + ] + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 2, 'e'), + (2, 'p2', 4, 'd'), + (3, 'p1', 2, 'f'), + ] diff --git a/tests/integration/adapter/materialized_view/test_materialized_view.py b/tests/integration/adapter/materialized_view/test_materialized_view.py index ce651ff3..b8cb8214 100644 --- a/tests/integration/adapter/materialized_view/test_materialized_view.py +++ b/tests/integration/adapter/materialized_view/test_materialized_view.py @@ -15,6 +15,9 @@ 1231,Dade,33,engineering 6666,Ksenia,48,engineering 8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales """.lstrip() # This model is parameterized, in a way, by the "run_type" dbt project variable @@ -25,10 +28,11 @@ materialized='materialized_view', engine='MergeTree()', order_by='(id)', - schema='custom_schema', + schema='catchup' if var('run_type', '') == 'catchup' else 'custom_schema', + **({'catchup': False} if var('run_type', '') == 'catchup' else {}) ) }} -{% if var('run_type', '') == '' %} +{% if var('run_type', '') in ['', 'catchup'] %} select id, name, @@ -40,8 +44,7 @@ from {{ source('raw', 'people') }} where department = 'engineering' -{% else %} - +{% elif var('run_type', '') == 'extended_schema' %} select id, name, @@ -58,7 +61,6 @@ {% endif %} """ - SEED_SCHEMA_YML = """ version: 2 @@ -128,6 +130,49 @@ def test_create(self, project): result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all") assert result[0][0] == 4 + def test_disabled_catchup(self, project): + """ + 1. create a base table via dbt seed + 2. create a model with catchup disabled as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + schema = quote_identifier(project.test_schema + "_catchup") + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model with catchup disabled + run_vars = {"run_type": "catchup"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + # check that we only have the new row, without the historical data + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") + assert columns[0][1] == "Int32" + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',16,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + result = project.run_sql(f"select count(*) from {schema}.hackers", fetch="all") + assert result[0][0] == 1 + class TestUpdateMV: @pytest.fixture(scope="class") diff --git a/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py b/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py new file mode 100644 index 00000000..9a2d9850 --- /dev/null +++ b/tests/integration/adapter/materialized_view/test_multiple_materialized_views.py @@ -0,0 +1,248 @@ +""" +test materialized view creation. This is ClickHouse specific, which has a significantly different implementation +of materialized views from PostgreSQL or Oracle +""" + +import json + +import pytest +from dbt.tests.util import check_relation_types, run_dbt + +from dbt.adapters.clickhouse.query import quote_identifier + +PEOPLE_SEED_CSV = """ +id,name,age,department +1231,Dade,33,engineering +6666,Ksenia,48,engineering +8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales +""".lstrip() + +# This model is parameterized, in a way, by the "run_type" dbt project variable +# This is to be able to switch between different model definitions within +# the same test run and allow us to test the evolution of a materialized view + +MULTIPLE_MV_MODEL = """ +{{ config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(id)', + schema='custom_schema_for_multiple_mv', +) }} + +{% if var('run_type', '') == '' %} + +--mv1:begin +select + id, + name, + case + when name like 'Dade' then 'crash_override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% elif var('run_type', '') == 'extended_schema' %} + +--mv1:begin +select + id, + name, + case + -- Dade wasn't always known as 'crash override'! + when name like 'Dade' and age = 11 then 'zero cool' + when name like 'Dade' and age != 11 then 'crash override' + when name like 'Kate' then 'acid burn' + else 'N/A' + end as hacker_alias +from {{ source('raw', 'people') }} +where department = 'engineering' +--mv1:end + +union all + +--mv2:begin +select + id, + name, + -- sales people are not cool enough to have a hacker alias + 'N/A' as hacker_alias +from {{ source('raw', 'people') }} +where department = 'sales' +--mv2:end + +{% endif %} +""" + + +SEED_SCHEMA_YML = """ +version: 2 + +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: people +""" + + +class TestMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a materialized view, selecting from the table created in (1) + 3. insert data into the base table and make sure it's there in the target table created in (2) + """ + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql("DESCRIBE TABLE people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + run_dbt(["run"]) + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE {schema}.hackers", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv1", fetch="all") + assert columns[0][1] == "Int32" + + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv2", fetch="all") + assert columns[0][1] == "Int32" + + with pytest.raises(Exception): + columns = project.run_sql(f"DESCRIBE {schema}.hackers_mv", fetch="all") + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + # insert some data and make sure it reaches the target table + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (4000,'Dave',40,'sales'), (9999,'Eugene',40,'engineering'); + """ + ) + + result = project.run_sql(f"select * from {schema}.hackers order by id", fetch="all") + assert result == [ + (1000, 'Alfie', 'N/A'), + (1231, 'Dade', 'crash_override'), + (2000, 'Bill', 'N/A'), + (3000, 'Charlie', 'N/A'), + (4000, 'Dave', 'N/A'), + (6666, 'Ksenia', 'N/A'), + (8888, 'Kate', 'acid burn'), + (9999, 'Eugene', 'N/A'), + ] + + +class TestUpdateMultipleMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MULTIPLE_MV_MODEL, + } + + def test_update_incremental(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + assert len(result) == 2 + assert result[0][0] == "crash_override" + assert result[1][0] == "zero cool" + + def test_update_full_refresh(self, project): + schema = quote_identifier(project.test_schema + "_custom_schema_for_multiple_mv") + # create our initial materialized view + run_dbt(["seed"]) + run_dbt() + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "extended_schema"} + run_dbt(["run", "--full-refresh", "--vars", json.dumps(run_vars)]) + + project.run_sql( + f""" + insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department") + values (1232,'Dade',11,'engineering'), (9999,'eugene',40,'malware'); + """ + ) + + # assert that we now have both of Dade's aliases in our hackers table + result = project.run_sql( + f"select distinct hacker_alias from {schema}.hackers where name = 'Dade' order by hacker_alias", + fetch="all", + ) + print(result) + assert len(result) == 2 + assert result[0][0] == "crash override" + assert result[1][0] == "zero cool" diff --git a/tests/integration/adapter/materialized_view/test_refreshable_materialized_view.py b/tests/integration/adapter/materialized_view/test_refreshable_materialized_view.py new file mode 100644 index 00000000..dac51f4d --- /dev/null +++ b/tests/integration/adapter/materialized_view/test_refreshable_materialized_view.py @@ -0,0 +1,125 @@ +""" +test refreshable materialized view creation. This is ClickHouse specific, which has a significantly different implementation +of materialized views from PostgreSQL or Oracle +""" + +import json + +import pytest +from dbt.tests.util import check_relation_types, run_dbt + +PEOPLE_SEED_CSV = """ +id,name,age,department +1231,Dade,33,engineering +6666,Ksenia,48,engineering +8888,Kate,50,engineering +1000,Alfie,10,sales +2000,Bill,20,sales +3000,Charlie,30,sales +""".lstrip() + +# This model is parameterized, in a way, by the "run_type" dbt project variable +# This is to be able to switch between different model definitions within +# the same test run and allow us to test the evolution of a materialized view +MV_MODEL = """ +{{ config( + materialized='materialized_view', + engine='MergeTree()', + order_by='(department)', + refreshable=( + { + "interval": "EVERY 2 MINUTE", + "depends_on": ['depend_on_model'], + "depends_on_validation": True + } if var('run_type', '') == 'validate_depends_on' else { + "interval": "EVERY 2 MINUTE" + } + ) + ) + }} +select + department, + avg(age) as average + from {{ source('raw', 'people') }} +group by department +""" + +SEED_SCHEMA_YML = """ +version: 2 + +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: people +""" + + +class TestBasicRefreshableMV: + @pytest.fixture(scope="class") + def seeds(self): + """ + we need a base table to pull from + """ + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "hackers.sql": MV_MODEL, + } + + def test_create(self, project): + """ + 1. create a base table via dbt seed + 2. create a model as a refreshable materialized view, selecting from the table created in (1) + 3. check in system.view_refreshes for the table existence + """ + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql(f"DESCRIBE TABLE {project.test_schema}.people", fetch="all") + assert columns[0][1] == "Int32" + + # create the model + results = run_dbt() + assert len(results) == 1 + + columns = project.run_sql(f"DESCRIBE TABLE hackers", fetch="all") + assert columns[0][1] == "String" + + columns = project.run_sql(f"DESCRIBE hackers_mv", fetch="all") + assert columns[0][1] == "String" + + check_relation_types( + project.adapter, + { + "hackers_mv": "view", + "hackers": "table", + }, + ) + + result = project.run_sql( + f"select database, view, status from system.view_refreshes where database= '{project.test_schema}' and view='hackers_mv'", + fetch="all", + ) + assert result[0][2] == 'Scheduled' + + def test_validate_dependency(self, project): + """ + 1. create a base table via dbt seed + 2. create a refreshable mv model with non exist dependency and validation config, selecting from the table created in (1) + 3. make sure we get an error + """ + results = run_dbt(["seed"]) + assert len(results) == 1 + columns = project.run_sql(f"DESCRIBE TABLE {project.test_schema}.people", fetch="all") + assert columns[0][1] == "Int32" + + # re-run dbt but this time with the new MV SQL + run_vars = {"run_type": "validate_depends_on"} + result = run_dbt(["run", "--vars", json.dumps(run_vars)], False) + assert result[0].status == 'error' + assert 'No existing MV found matching MV' in result[0].message diff --git a/tests/integration/adapter/query_settings/test_query_settings.py b/tests/integration/adapter/query_settings/test_query_settings.py new file mode 100644 index 00000000..83ef370c --- /dev/null +++ b/tests/integration/adapter/query_settings/test_query_settings.py @@ -0,0 +1,65 @@ +import pytest +from dbt.tests.util import run_dbt + +nullable_column_model = """ +{{ + config( + materialized='table', + query_settings={ + 'join_use_nulls': 1 + } + ) +}} +select t2.id as test_id +from (select 1 as id) t1 + left join (select 2 as id) t2 +on t1.id=t2.id +""" + + +class TestNullableColumnJoin: + @pytest.fixture(scope="class") + def models(self): + return { + "nullable_column_model.sql": nullable_column_model, + } + + def test_nullable_column_join(self, project): + run_dbt(["run", "--select", "nullable_column_model"]) + result = project.run_sql( + "select isNullable(test_id) as is_nullable_column from nullable_column_model", + fetch="one", + ) + assert result[0] == 1 + + +not_nullable_column_model = """ +{{ + config( + materialized='table', + query_settings={ + 'join_use_nulls': 0 + } + ) +}} +select t2.id as test_id +from (select 1 as id) t1 + left join (select 2 as id) t2 +on t1.id=t2.id +""" + + +class TestNotNullableColumnJoin: + @pytest.fixture(scope="class") + def models(self): + return { + "not_nullable_column_model.sql": not_nullable_column_model, + } + + def test_nullable_column_join(self, project): + run_dbt(["run", "--select", "not_nullable_column_model"]) + result = project.run_sql( + "select isNullable(test_id) as is_nullable_column from not_nullable_column_model", + fetch="one", + ) + assert result[0] == 0 diff --git a/tests/integration/adapter/utils/test_split_part.py b/tests/integration/adapter/utils/test_split_part.py index 81e1afaa..95677075 100644 --- a/tests/integration/adapter/utils/test_split_part.py +++ b/tests/integration/adapter/utils/test_split_part.py @@ -10,7 +10,7 @@ ) select - {{ split_part('parts', '|', 1) }} as actual, + {{ split_part('parts', "'|'", 1) }} as actual, result_1 as expected from data @@ -18,7 +18,7 @@ union all select - {{ split_part('parts', '|', 2) }} as actual, + {{ split_part('parts', "'|'", 2) }} as actual, result_2 as expected from data @@ -26,7 +26,7 @@ union all select - {{ split_part('parts', '|', 3) }} as actual, + {{ split_part('parts', "'|'", 3) }} as actual, result_3 as expected from data diff --git a/tests/integration/adapter/view/test_view.py b/tests/integration/adapter/view/test_view.py new file mode 100644 index 00000000..fe47a335 --- /dev/null +++ b/tests/integration/adapter/view/test_view.py @@ -0,0 +1,70 @@ +""" +Test ClickHouse view materialization in dbt-clickhouse +""" + +import json + +import pytest +from dbt.tests.util import run_dbt + +PEOPLE_SEED_CSV = """ +id,name,age,department +1231,Dade,33,engineering +6666,Ksenia,48,engineering +8888,Kate,50,engineering +""".lstrip() + +PEOPLE_VIEW_MODEL = """ +{{ config( + materialized='view' +) }} + +{% if var('run_type', '') == '' %} + select id, name, age from {{ source('raw', 'people') }} +{% elif var('run_type', '') == 'update_view' %} + select id, name, age, department from {{ source('raw', 'people') }} +{% endif %} +""" + + +SEED_SCHEMA_YML = """ +version: 2 + +sources: + - name: raw + schema: "{{ target.schema }}" + tables: + - name: people +""" + + +class TestClickHouseView: + @pytest.fixture(scope="class") + def seeds(self): + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return {"people_view.sql": PEOPLE_VIEW_MODEL} + + def test_create_view(self, project): + # Load seed data + run_dbt(["seed"]) + + # Run dbt to create the view + run_dbt() + + # Query the view and check if it returns expected data + result = project.run_sql("SELECT COUNT(*) FROM people_view", fetch="one") + assert result[0] == 3 # 3 records in the seed data + + # Run dbt again to apply the update + run_dbt(["run", "--vars", json.dumps({"run_type": "update_view"})]) + + # Verify the new column is present + result = project.run_sql("DESCRIBE TABLE people_view", fetch="all") + columns = {row[0] for row in result} + assert "department" in columns # New column should be present diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml index e7810f0f..578995a3 100644 --- a/tests/integration/docker-compose.yml +++ b/tests/integration/docker-compose.yml @@ -22,6 +22,7 @@ services: - SERVER_INDEX=1 - SHARD_NUM=${SHARD_NUM:-1} - REPLICA_NUM=${REPLICA_NUM:-1} + - CLICKHOUSE_SKIP_USER_SETUP=1 ports: - "8123:8123" - "8443:8443" @@ -37,6 +38,7 @@ services: - SERVER_INDEX=2 - SHARD_NUM=${SHARD_NUM:-2} - REPLICA_NUM=${REPLICA_NUM:-2} + - CLICKHOUSE_SKIP_USER_SETUP=1 <<: *ch-common ch2: image: clickhouse/clickhouse-server:${DBT_CH_TEST_CH_VERSION:-latest} @@ -44,6 +46,7 @@ services: - SERVER_INDEX=3 - SHARD_NUM=${SHARD_NUM:-3} - REPLICA_NUM=${REPLICA_NUM:-3} + - CLICKHOUSE_SKIP_USER_SETUP=1 <<: *ch-common networks: diff --git a/tests/unit/test_util.py b/tests/unit/test_util.py index d87d2e57..2d287921 100644 --- a/tests/unit/test_util.py +++ b/tests/unit/test_util.py @@ -1,4 +1,6 @@ -from dbt.adapters.clickhouse.util import compare_versions +from unittest.mock import patch + +from dbt.adapters.clickhouse.util import compare_versions, hide_stack_trace def test_is_before_version(): @@ -11,3 +13,19 @@ def test_is_before_version(): assert compare_versions('22.0.0', '21.0.0') == 1 assert compare_versions('21.0.1', '21.0.0') == 1 assert compare_versions('21.0.1', '21.0') == 0 + + +def test_hide_stack_trace_no_env_var(): + # Test when HIDE_STACK_TRACE is not set + with patch('os.getenv', return_value=''): + exception = Exception("Error occurred\nStack trace details follow...") + result = hide_stack_trace(exception) + assert result == "Error occurred\nStack trace details follow..." + + +def test_hide_stack_trace_env_var_set(): + # Test when HIDE_STACK_TRACE is set + with patch('os.getenv', return_value='1'): + exception = Exception("Error occurred\nStack trace details follow...") + result = hide_stack_trace(exception) + assert result == "Error occurred"