diff --git a/factory/factory_pattern_real_example/README.md b/factory/factory_pattern_real_example/README.md new file mode 100644 index 0000000..5639bfe --- /dev/null +++ b/factory/factory_pattern_real_example/README.md @@ -0,0 +1,30 @@ +# Real world example: + +Here is the realworld example that I have used for one of my project with factory_pattern. + +### Project Description + +Files +``` +... +- main.py --> Fast-API app +- onboard.py --> Trigger factory class -- Each TPA must have class with construcy_payload method +- validation --> Uses pydantic model to validate incoming request payload for each different TPA. +``` + +There is a FastAPI endpoint that receives DAG trigger requests from the frontend and initiates onboarding for them. There are a total of 15 TPAs. + +* TPA1: Facebook +* TPA2: Instagram +* TPA3: Tiktok +* TPA4: Snapchat +* ... + +Each TPA triggers the corresponding pipeline in Airflow. + + + + +### Insight + +Using the factory pattern enhances the readability and maintainability of the code. Additionally, there is neat code present before anything else. CAUTION: While it could be implemented more effectively using a different pattern, it is being used for educational purposes. Also code will not work if you try to run it. \ No newline at end of file diff --git a/factory/factory_pattern_real_example/main.py b/factory/factory_pattern_real_example/main.py new file mode 100644 index 0000000..c37c15a --- /dev/null +++ b/factory/factory_pattern_real_example/main.py @@ -0,0 +1,33 @@ + +from fastapi import FastAPI, HTTPException, Request, Response, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from validation.onboard_item_validator import OnboardItemValidator +from onboard import ExampleTriggerFactory + +APP_NAME = "demo-pattern" +app = FastAPI( + title="Fast-API airflow trigger", + description="Snippet to explain dp with real world DE example, API to triggers airflow jobs", + version=0.1, +) + +@app.post( + "/{tpa_shortcut}/trigger_inital", + summary="Triggers initial run {tpa_shortcut}", + description="Triggers the initial run and sets a daily schedule for incoming jobs. There are a total of 10 jobs, with the number increasing each day.", +) +async def trigger_historical(onboard_payload: dict, tpa_shortcut: str): + # Determine and validate onboard item + validator = OnboardItemValidator(tpa_shortcut) + onboard_item = validator.validate_and_get_item(onboard_payload) + + # + handler = ExampleTriggerFactory.get_handler(onboard_item) + airflow_response = handler.handle_trigger() + + + return JSONResponse( + status_code=status.HTTP_200_OK, content=({"detail": airflow_response}) + ) \ No newline at end of file diff --git a/factory/factory_pattern_real_example/onboard.py b/factory/factory_pattern_real_example/onboard.py new file mode 100644 index 0000000..eb66302 --- /dev/null +++ b/factory/factory_pattern_real_example/onboard.py @@ -0,0 +1,90 @@ +from abc import ABC, abstractmethod +import os + +from validation.data_models import * + +class TriggerHandler(ABC): + """ Method to handle triggers. + It explains developer to override construct_payload method for each new example. + Each Incoming sends a hit to the airflow-api to trigger its corresponding Airflow DAG." + + :param _type_ ABC: _description_ + """ + def __init__(self, onboard_item, dag_name): + self.onboard_item = onboard_item + self.dag_name = dag_name + common_3 = "hardcoded_value" + + self.default_parameters = { + # Internal common conf + "common_1_key": self.onboard_item.common1, + "common_2_key": self.onboard_item.common2, + "common_3_key": common_3, + "common_4_key": self.onboard_item.common4, + "common_5_key": self.onboard_item.common5, + + # AWS related configs + "aws_common_value_example_1": os.environ["example1"], + "aws_common_value_example_2": os.environ["example2"], + "aws_common_value_example_3": os.environ["example3"], + "aws_common_value_example_4": os.environ["example4"], + "aws_common_value_example_5": os.environ["example5"], + "aws_common_value_example_6": os.environ["example6"], + } + + @abstractmethod + def construct_payload(self): + pass + + def handle_trigger(self): + # Each class will trigger the "self.dag_name" DAG + payload = self.construct_payload() + + # Triggers Airflow, No need to implement this logic + response = AirflowClient().trigger_dag(payload=payload, dag_name=self.dag_name) + return response + + +class Example1TriggerHandler(TriggerHandler): + def construct_payload(self): + tpa_payload = { + "specific_key_1": "specific_value_1", + "specific_key_2": "specific_value_2", + "specific_key_3": "specific_value_3", + "specific_key_4": "specific_value_4", + "specific_key_5": "specific_value_5", + "specific_key_6": "specific_value_6", + "specific_key_7": "specific_value_7", + "specific_key_8": "specific_value_8", + "specific_key_9": "specific_value_9", + "specific_key_10": "specific_value_10", + } + + return {"conf": {**self.default_parameters, **tpa_payload}} + + +class Example2TriggerHandler(TriggerHandler): + def construct_payload(self): + tpa_payload = { + "specific_key_1": "specific_value_1", + "specific_key_2": "specific_value_2", + "specific_key_3": "specific_value_3", + "specific_key_4": "specific_value_4", + "specific_key_5": "specific_value_5", + "specific_key_6": "specific_value_6", + "specific_key_7": "specific_value_7", + "specific_key_8": "specific_value_8", + } + + return {"conf": {**self.default_parameters, **tpa_payload}} + +class ExampleTriggerFactory: + @staticmethod + def get_handler(onboard_item): + if isinstance(onboard_item, Example_1_OnboardItem): + return Example1TriggerHandler(onboard_item, "Example1-dag-name") + elif isinstance(onboard_item, Example_2_OnboardItem): + return Example2TriggerHandler(onboard_item, "Example2-dag-name") + # elif isinstance(onboard_item, Example_3_OnboardItem): + # return Example3TriggerHandler(onboard_item, "Example3-dag-name") + # ... Add other onboard items \ No newline at end of file diff --git a/factory/factory_pattern_real_example/validation/data_models.py b/factory/factory_pattern_real_example/validation/data_models.py new file mode 100644 index 0000000..5684e85 --- /dev/null +++ b/factory/factory_pattern_real_example/validation/data_models.py @@ -0,0 +1,47 @@ +from enum import Enum +from typing import Dict, Optional + +from pydantic import BaseModel + +class OnboardItem(BaseModel): + necessery_common_item: str + id_common: int + op_kwargs: Optional[Dict] = {} + +class Example_1_OnboardItem(OnboardItem): + user_id: str + name: str + +class Example_2_OnboardItem(OnboardItem): + ip_v: str + api_version: str + +class Example_3_OnboardItem(OnboardItem): + customer_ids: int + yaml_path_groom: str + yaml_path_map: str + base_url: str + + +class ExampleTPA(Enum): + EXAMPLE_1 = ("Example1", Example_1_OnboardItem) + EXAMPLE_2 = ("Example2", Example_2_OnboardItem) + EXAMPLE_3 = ("Example3", Example_3_OnboardItem) + # EXAMPLE_4 = ("Example4", Example_4_OnboardItem) + # EXAMPLE_5 = ("Example5", Example_5_OnboardItem) + # EXAMPLE_6 = ("Example6", Example_6_OnboardItem) + # EXAMPLE_7 = ("Example7", Example_7_OnboardItem) + # EXAMPLE_8 = ("Example8", Example_8_OnboardItem) + # ... + + def __init__(self, name, item): + self._name = name + self._item = item + + @property + def name(self): + return self._name + + @property + def item_name(self): + return self._item diff --git a/factory/factory_pattern_real_example/validation/onboard_item_validator.py b/factory/factory_pattern_real_example/validation/onboard_item_validator.py new file mode 100644 index 0000000..2c707b8 --- /dev/null +++ b/factory/factory_pattern_real_example/validation/onboard_item_validator.py @@ -0,0 +1,19 @@ +from fastapi import HTTPException +from pydantic.error_wrappers import ValidationError + +from data_models import ExampleTPA + +class OnboardItemValidator: + def __init__(self, tpa_str: str): + tpa = [t for t in ExampleTPA if t.shorthand == tpa_str] + if not tpa: + raise HTTPException(400, f'{tpa_str} does not exist') + + self.tpa = tpa.pop() + + def validate_and_get_item(self, item): + try: + onboard_item = self.tpa.item_name(**item) + return onboard_item, self.tpa.handler + except ValidationError as e: + raise HTTPException(status_code=400, detail=e.errors())