Skip to content

Add bulk version of fast-api airflow example #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions factory/factory_pattern_real_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Real world example:

Here is the realworld example that I have used for one of my project with factory_pattern.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove this


### Project Description

Files
```
...
- main.py --> Fast-API app
Copy link
Owner

@egehanyorulmaz egehanyorulmaz Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- onboard.py --> Trigger factory class -- Each TPA must have class with construcy_payload method
- validation --> Uses pydantic model to validate incoming request payload for each different TPA.
```

There is a FastAPI endpoint that receives DAG trigger requests from the frontend and initiates onboarding for them. There are a total of 15 TPAs.

* TPA1: Facebook
* TPA2: Instagram
* TPA3: Tiktok
* TPA4: Snapchat
* ...

Each TPA triggers the corresponding pipeline in Airflow.




### Insight

Using the factory pattern enhances the readability and maintainability of the code. Additionally, there is neat code present before anything else. CAUTION: While it could be implemented more effectively using a different pattern, it is being used for educational purposes. Also code will not work if you try to run it.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any reference to any other source? if we have, we should also mention them in a credits topic. You can refer to other README.md files

33 changes: 33 additions & 0 deletions factory/factory_pattern_real_example/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from validation.onboard_item_validator import OnboardItemValidator
from onboard import ExampleTriggerFactory

APP_NAME = "demo-pattern"
app = FastAPI(
title="Fast-API airflow trigger",
description="Snippet to explain dp with real world DE example, API to triggers airflow jobs",
version=0.1,
)

@app.post(
"/{tpa_shortcut}/trigger_inital",
summary="Triggers initial run {tpa_shortcut}",
description="Triggers the initial run and sets a daily schedule for incoming jobs. There are a total of 10 jobs, with the number increasing each day.",
)
async def trigger_historical(onboard_payload: dict, tpa_shortcut: str):
# Determine and validate onboard item
validator = OnboardItemValidator(tpa_shortcut)
onboard_item = validator.validate_and_get_item(onboard_payload)

#
handler = ExampleTriggerFactory.get_handler(onboard_item)
airflow_response = handler.handle_trigger()


return JSONResponse(
status_code=status.HTTP_200_OK, content=({"detail": airflow_response})
)
90 changes: 90 additions & 0 deletions factory/factory_pattern_real_example/onboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from abc import ABC, abstractmethod
import os

from validation.data_models import *

class TriggerHandler(ABC):
""" Method to handle triggers.
It explains developer to override construct_payload method for each new example.
Each Incoming sends a hit to the airflow-api to trigger its corresponding Airflow DAG."

:param _type_ ABC: _description_
"""
def __init__(self, onboard_item, dag_name):
self.onboard_item = onboard_item
self.dag_name = dag_name
common_3 = "hardcoded_value"

self.default_parameters = {
# Internal common conf
"common_1_key": self.onboard_item.common1,
"common_2_key": self.onboard_item.common2,
"common_3_key": common_3,
"common_4_key": self.onboard_item.common4,
"common_5_key": self.onboard_item.common5,

# AWS related configs
"aws_common_value_example_1": os.environ["example1"],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is sufficient to define 2-3 example here.

"aws_common_value_example_2": os.environ["example2"],
"aws_common_value_example_3": os.environ["example3"],
"aws_common_value_example_4": os.environ["example4"],
"aws_common_value_example_5": os.environ["example5"],
"aws_common_value_example_6": os.environ["example6"],
}

@abstractmethod
def construct_payload(self):
pass

def handle_trigger(self):
# Each class will trigger the "self.dag_name" DAG
payload = self.construct_payload()

# Triggers Airflow, No need to implement this logic
response = AirflowClient().trigger_dag(payload=payload, dag_name=self.dag_name)
return response


class Example1TriggerHandler(TriggerHandler):
def construct_payload(self):
tpa_payload = {
"specific_key_1": "specific_value_1",
"specific_key_2": "specific_value_2",
"specific_key_3": "specific_value_3",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here

"specific_key_4": "specific_value_4",
"specific_key_5": "specific_value_5",
"specific_key_6": "specific_value_6",
"specific_key_7": "specific_value_7",
"specific_key_8": "specific_value_8",
"specific_key_9": "specific_value_9",
"specific_key_10": "specific_value_10",
}

return {"conf": {**self.default_parameters, **tpa_payload}}


class Example2TriggerHandler(TriggerHandler):
def construct_payload(self):
tpa_payload = {
"specific_key_1": "specific_value_1",
"specific_key_2": "specific_value_2",
"specific_key_3": "specific_value_3",
"specific_key_4": "specific_value_4",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here, we can keep the number of parameters consistent

"specific_key_5": "specific_value_5",
"specific_key_6": "specific_value_6",
"specific_key_7": "specific_value_7",
"specific_key_8": "specific_value_8",
}

return {"conf": {**self.default_parameters, **tpa_payload}}

class ExampleTriggerFactory:
@staticmethod
def get_handler(onboard_item):
if isinstance(onboard_item, Example_1_OnboardItem):
return Example1TriggerHandler(onboard_item, "Example1-dag-name")
elif isinstance(onboard_item, Example_2_OnboardItem):
return Example2TriggerHandler(onboard_item, "Example2-dag-name")
# elif isinstance(onboard_item, Example_3_OnboardItem):
# return Example3TriggerHandler(onboard_item, "Example3-dag-name")
# ... Add other onboard items
47 changes: 47 additions & 0 deletions factory/factory_pattern_real_example/validation/data_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from enum import Enum
from typing import Dict, Optional

from pydantic import BaseModel

class OnboardItem(BaseModel):
necessery_common_item: str
id_common: int
op_kwargs: Optional[Dict] = {}

class Example_1_OnboardItem(OnboardItem):
user_id: str
name: str

class Example_2_OnboardItem(OnboardItem):
ip_v: str
api_version: str

class Example_3_OnboardItem(OnboardItem):
customer_ids: int
yaml_path_groom: str
yaml_path_map: str
base_url: str


class ExampleTPA(Enum):
EXAMPLE_1 = ("Example1", Example_1_OnboardItem)
EXAMPLE_2 = ("Example2", Example_2_OnboardItem)
EXAMPLE_3 = ("Example3", Example_3_OnboardItem)
# EXAMPLE_4 = ("Example4", Example_4_OnboardItem)
# EXAMPLE_5 = ("Example5", Example_5_OnboardItem)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove extra examples here, and keep it consistent with other files

# EXAMPLE_6 = ("Example6", Example_6_OnboardItem)
# EXAMPLE_7 = ("Example7", Example_7_OnboardItem)
# EXAMPLE_8 = ("Example8", Example_8_OnboardItem)
# ...

def __init__(self, name, item):
self._name = name
self._item = item

@property
def name(self):
return self._name

@property
def item_name(self):
return self._item
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from fastapi import HTTPException
from pydantic.error_wrappers import ValidationError

from data_models import ExampleTPA

class OnboardItemValidator:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add docstring

def __init__(self, tpa_str: str):
tpa = [t for t in ExampleTPA if t.shorthand == tpa_str]
if not tpa:
raise HTTPException(400, f'{tpa_str} does not exist')

self.tpa = tpa.pop()

def validate_and_get_item(self, item):
try:
onboard_item = self.tpa.item_name(**item)
return onboard_item, self.tpa.handler
except ValidationError as e:
raise HTTPException(status_code=400, detail=e.errors())