Skip to content

Commit 9b6b46a

Browse files
committed
Update solutions
1 parent 1876381 commit 9b6b46a

13 files changed

+1045
-23
lines changed

dags/generate_twitter.py

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
""" Simple example of creating subdags and generating work dynamically"""
2+
from airflow import DAG
3+
from airflow.hooks import SqliteHook
4+
5+
from airflow.hooks.mysql_hook import MySqlHook
6+
from airflow.models import Variable
7+
from airflow.operators.email_operator import EmailOperator
8+
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
9+
from airflow.operators.bash_operator import BashOperator
10+
from airflow.operators.subdag_operator import SubDagOperator
11+
12+
13+
from twitter_airflow import search_twitter, RAW_TWEET_DIR
14+
from subdags.twitter_subdag import subdag
15+
from datetime import datetime, timedelta
16+
import pandas as pd
17+
import re
18+
import random
19+
20+
21+
SEARCH_TERMS = ["#python", "#pydata", "#airflow", "data wrangling", "data pipelines"]
22+
23+
24+
default_args = {
25+
"owner": "admin",
26+
"depends_on_past": False,
27+
"start_date": datetime.now() - timedelta(days=4),
28+
"retries": 1,
29+
"retry_delay": timedelta(minutes=5),
30+
}
31+
32+
dag = DAG(
33+
"generate_twitter_dags", default_args=default_args, schedule_interval="@daily"
34+
)
35+
36+
37+
def fill_terms(my_terms=SEARCH_TERMS, **kwargs):
38+
""" Fill sqlite database with a few search terms. """
39+
dbconn = MySqlHook(mysql_conn_id="mysql_default")
40+
conn = dbconn.get_connection()
41+
cursor = conn.cursor()
42+
df = pd.DataFrame(my_terms, columns=["search_term"])
43+
try:
44+
df.to_sql("twitter_terms", conn)
45+
except ValueError:
46+
# table already exists
47+
pass
48+
49+
50+
def generate_search_terms(**kwargs):
51+
""" Generate subdag to search twitter for terms. """
52+
dbconn = MySqlHook(mysql_conn_id="mysql_default")
53+
conn = dbconn.get_connection()
54+
cursor = conn.cursor()
55+
query = "select * from twitter_terms"
56+
df = pd.read_sql_query(query, conn)
57+
return random.choice(
58+
[
59+
"search_{}_twitter".format(re.sub(r"\W+", "", t))
60+
for t in df.search_term.values
61+
]
62+
)
63+
64+
65+
fill_search_terms = PythonOperator(
66+
task_id="fill_terms", provide_context=True, python_callable=fill_terms, dag=dag
67+
)
68+
69+
70+
gen_search_terms = BranchPythonOperator(
71+
task_id="generate_search_terms",
72+
provide_context=True,
73+
python_callable=generate_search_terms,
74+
dag=dag,
75+
)
76+
77+
78+
email_links = EmailOperator(
79+
task_id="email_best_links",
80+
81+
subject="Latest popular links",
82+
html_content="Check out the latest!!",
83+
files=["{}/latest_links.txt".format(RAW_TWEET_DIR)],
84+
dag=dag,
85+
)
86+
87+
88+
sub = SubDagOperator(
89+
subdag=subdag, task_id="insert_and_id_pop", trigger_rule="one_success", dag=dag
90+
)
91+
92+
93+
clear_latest = BashOperator(
94+
bash_command="rm -rf {}/latest_links.txt".format(RAW_TWEET_DIR),
95+
task_id="clear_latest",
96+
dag=dag,
97+
)
98+
99+
100+
gen_search_terms.set_upstream(fill_search_terms)
101+
102+
for term in SEARCH_TERMS:
103+
term_without_punctuation = re.sub(r"\W+", "", term)
104+
simple_search = PythonOperator(
105+
task_id="search_{}_twitter".format(term_without_punctuation),
106+
provide_context=True,
107+
python_callable=search_twitter,
108+
dag=dag,
109+
params={"query": term},
110+
)
111+
simple_search.set_upstream(gen_search_terms)
112+
simple_search.set_downstream(sub)
113+
114+
sub.set_downstream(email_links)
115+
email_links.set_downstream(clear_latest)

dags/parameters.py

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""
2+
This example uses the existing Dummy Operator and Variable model to
3+
demonstrate dynamic creation of DAGs based on a Variable setting. As
4+
shown below, a list of customer objects is retrieved and used to create
5+
unique dags based on the imput.
6+
"""
7+
8+
from datetime import datetime, timedelta
9+
from airflow.models import DAG
10+
from airflow.models import Variable
11+
from airflow.operators.dummy_operator import DummyOperator
12+
13+
# Create JSON Variable if it doesn't exist
14+
15+
CUSTOMERS = [
16+
{
17+
"customer_name": "Faux Customer",
18+
"customer_id": "faux_customer",
19+
20+
"schedule_interval": None,
21+
"enabled": True,
22+
},
23+
{
24+
"customer_name": "Bogus Customer",
25+
"customer_id": "bogus_customer",
26+
27+
"schedule_interval": "@once",
28+
"enabled": True,
29+
},
30+
]
31+
32+
# Get JSON Variable
33+
CUSTOMERS = Variable.get("customer_list", default_var=CUSTOMERS, deserialize_json=True)
34+
35+
36+
def create_dag(customer):
37+
"""
38+
Accepts a customer parameters dict and
39+
overrides default args to create a DAG object
40+
41+
Returns: DAG() Object
42+
"""
43+
default_args = {
44+
"owner": "airflow",
45+
"depends_on_past": False,
46+
"email": "[email protected]",
47+
"retries": 1,
48+
"retry_delay": timedelta(minutes=5),
49+
"start_date": datetime(2017, 1, 1, 0, 0),
50+
"end_date": None,
51+
}
52+
53+
"""
54+
This allows DAG parameters to be passed in from the Variable if
55+
a customer needs something specific overridden in their DAG.
56+
Consider how email being passed in from the customer object
57+
overrides email in the resulting replaced_args object.
58+
"""
59+
replaced_args = {
60+
k: default_args[k] if customer.get(k, None) is None else customer[k]
61+
for k in default_args
62+
}
63+
64+
dag_id = "{base_name}_{id}".format(
65+
base_name="load_clickstream_data", id=customer["customer_id"]
66+
)
67+
68+
return DAG(
69+
dag_id=dag_id,
70+
default_args=replaced_args,
71+
schedule_interval=customer["schedule_interval"],
72+
)
73+
74+
# Loop customers array of containing customer objects
75+
for cust in CUSTOMERS:
76+
if cust["enabled"]:
77+
78+
dag = create_dag(cust)
79+
80+
globals()[dag.dag_id] = dag
81+
82+
extract = DummyOperator(task_id="extract_data", dag=dag)
83+
84+
transform = DummyOperator(task_id="transform_data", dag=dag)
85+
86+
load = DummyOperator(task_id="load_data", dag=dag)
87+
88+
extract >> transform >> load
89+
90+
else:
91+
# TODO Create but programmatically pause
92+
pass

dags/simple_dag.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.operators.dummy_operator import DummyOperator
5+
from airflow.operators.python_operator import PythonOperator
6+
7+
8+
def print_hello():
9+
return "Hello world!"
10+
11+
12+
default_args = {
13+
"owner": "airflow",
14+
"depends_on_past": False,
15+
"start_date": datetime(2019, 4, 30),
16+
"email": ["[email protected]"],
17+
"email_on_failure": False,
18+
"email_on_retry": False,
19+
"retries": 1,
20+
"retry_delay": timedelta(minutes=2),
21+
}
22+
23+
dag = DAG(
24+
"hello_world",
25+
description="Simple tutorial DAG",
26+
schedule_interval="0 12 * * *",
27+
default_args=default_args,
28+
catchup=False,
29+
)
30+
31+
t1 = DummyOperator(task_id="dummy_task", retries=3, dag=dag)
32+
33+
t2 = PythonOperator(task_id="hello_task", python_callable=print_hello, dag=dag)
34+
35+
# sets downstream foe t1
36+
t1 >> t2
37+
38+
# equivalent
39+
# t2.set_upstream(t1)

dags/subdags/twitter_subdag.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
""" Simple subdag example """
2+
from airflow import DAG
3+
from airflow.operators import PythonOperator
4+
from twitter_airflow import csv_to_sql, identify_popular_links
5+
from datetime import datetime, timedelta
6+
7+
8+
default_args = {
9+
"owner": "admin",
10+
"depends_on_past": False,
11+
"start_date": datetime(2016, 1, 1),
12+
"retries": 1,
13+
"retry_delay": timedelta(minutes=5),
14+
}
15+
16+
subdag = DAG("generate_twitter_dags.insert_and_id_pop", default_args=default_args)
17+
18+
move_tweets_to_sql = PythonOperator(
19+
task_id="csv_to_sqlite",
20+
provide_context=True,
21+
python_callable=csv_to_sql,
22+
dag=subdag,
23+
)
24+
25+
id_popular = PythonOperator(
26+
task_id="identify_popular_links",
27+
provide_context=True,
28+
python_callable=identify_popular_links,
29+
dag=subdag,
30+
params={"write_mode": "a"},
31+
)
32+
33+
id_popular.set_upstream(move_tweets_to_sql)

0 commit comments

Comments
 (0)