Skip to content

Add Quix__State__Dir environment variable to configure app state dire… #844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(
>***NOTE:*** the environment variable is set for you in the Quix Cloud
:param consumer_group: Kafka consumer group.
Passed as `group.id` to `confluent_kafka.Consumer`.
Linked Environment Variable: `Quix__Consumer__Group`.
Linked Environment Variable: `Quix__Consumer_Group`.
Default - "quixstreams-default" (set during init)
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
:param commit_interval: How often to commit the processed messages in seconds.
Expand All @@ -188,6 +188,7 @@ def __init__(
:param producer_extra_config: A dictionary with additional options that
will be passed to `confluent_kafka.Producer` as is.
:param state_dir: path to the application state directory.
Linked Environment Variable: `Quix__State__Dir`.
Default - `"state"`.
:param rocksdb_options: RocksDB options.
If `None`, the default options will be used.
Expand Down Expand Up @@ -232,7 +233,9 @@ def __init__(
consumer_extra_config = consumer_extra_config or {}

if state_dir is None:
state_dir = "/app/state" if is_quix_deployment() else "state"
state_dir = os.getenv(
"Quix__State__Dir", "/app/state" if is_quix_deployment() else "state"
)
state_dir = Path(state_dir)

# We can't use os.getenv as defaults (and have testing work nicely)
Expand Down
28 changes: 22 additions & 6 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,13 @@ def test_consumer_group_env(self):
"""
Sanity check consumer_group gets set from the environment via getenv.
"""
consumer_group = "my_group"
with patch.dict(os.environ, {"Quix__Consumer__group": consumer_group}):
app = Application(
broker_address="my_address", consumer_group=consumer_group
)
assert app.config.consumer_group == consumer_group
with patch.dict(os.environ, {"Quix__Consumer_Group": "environ_group"}):
app = Application(broker_address="my_address")
assert app.config.consumer_group == "environ_group"

with patch.dict(os.environ, {"Quix__Consumer_Group": "environ_group"}):
app = Application(broker_address="my_address", consumer_group="app_group")
assert app.config.consumer_group == "app_group"

def test_consumer_group_default(self):
"""
Expand All @@ -494,6 +495,21 @@ def test_consumer_group_default(self):
app = Application(broker_address="my_address")
assert app.config.consumer_group == "quixstreams-default"

def test_state_dir_env(self):
"""
Sanity check state_dir gets set from the environment via getenv.
"""
app = Application(broker_address="my_address")
assert app.config.state_dir == Path("state")

with patch.dict(os.environ, {"Quix__State__Dir": "/path/to/state"}):
app = Application(broker_address="my_address")
assert app.config.state_dir == Path("/path/to/state")

with patch.dict(os.environ, {"Quix__State__Dir": "/path/to/state"}):
app = Application(broker_address="my_address", state_dir="/path/to/other")
assert app.config.state_dir == Path("/path/to/other")


class TestAppGroupBy:
def test_group_by(
Expand Down