from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
import json
import pandas as pd
import glob
import os
def debug_json_load(x):
"""
Safely parses a JSON string. If parsing fails, it returns an empty dictionary.
Logs errors for invalid JSON inputs.
"""
try:
return json.loads(x) if x and isinstance(x, str) else {}
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {x}")
return {}
def check_if_data(**kwargs):
"""
Checks for files in the specified directory that start with 'NEW_'.
- Renames the newest file to remove the 'NEW_' prefix.
- Pushes the renamed file's path and a dynamically generated table name to XCom for later tasks.
"""
files = glob.glob(os.path.join("/airflow/dags", "NEW_*"))
if files:
newest_file = max(files, key=os.path.getmtime)
new_filename = os.path.join("/airflow/dags", os.path.basename(newest_file).replace("NEW_", ""))
os.rename(newest_file, new_filename)
kwargs['ti'].xcom_push(key='filename', value=new_filename)
print(f"Renamed {newest_file} to {new_filename}")
table_name = 'data_' + datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
print(f"Table name will be: {table_name}")
kwargs['ti'].xcom_push(key='table_name', value=table_name)
else:
print("No files starting with 'NEW_' found in the directory.")
def create_table(**kwargs):
"""
Creates a new Oxla table using a dynamically generated name.
The table schema includes columns for the data to be ingested.
"""
table_name = kwargs['ti'].xcom_pull(task_ids='check_if_data', key='table_name')
if table_name:
create_table_sql = f"""
CREATE TABLE {table_name} (
id INT,
metadata JSON,
some_date TIMESTAMP,
age INT,
date_of_birth TIMESTAMP
);
"""
postgres_operator = PostgresOperator(
task_id="create_table",
postgres_conn_id="oxla",
sql=create_table_sql
)
postgres_operator.execute(context=kwargs)
print(f"Table {table_name} created.")
else:
print("No table name found, skipping table creation.")
def load_data(**kwargs):
"""
Loads data from the renamed file into an Oxla table.
- Extracts file and table names from XCom.
- Reads the CSV file into a DataFrame and processes its columns.
- Transforms the 'metadata' JSON column to compute additional fields.
- Inserts the transformed data into the specified Oxla table.
"""
filename = kwargs['ti'].xcom_pull(task_ids='check_if_data', key='filename')
table_name = kwargs['ti'].xcom_pull(task_ids='check_if_data', key='table_name')
if filename and table_name:
df = pd.read_csv(filename, delimiter=";")
df['metadata'] = df['metadata'].apply(debug_json_load)
current_date = datetime.now()
df['age'] = df['metadata'].apply(lambda x: x.get('age') if isinstance(x, dict) else None)
df['date_of_birth'] = df['age'].apply(lambda age: current_date.replace(year=current_date.year - age).strftime('%Y-%m-%d'))
pg_hook = PostgresHook(postgres_conn_id='oxla')
conn = pg_hook.get_conn()
conn.autocommit = True
cursor = conn.cursor()
for index, row in df.iterrows():
cursor.execute(f"""
INSERT INTO {table_name} (id, metadata, some_date, age, date_of_birth)
VALUES (%s, %s, %s, %s, %s)
""", (row['id'], json.dumps(row['metadata']), row['some_date'], row['age'], row['date_of_birth']))
cursor.close()
conn.close()
else:
print("No new files")
with DAG(
'etl_example',
description="example",
) as dag:
start_pipeline = EmptyOperator(task_id='start_pipeline')
end_pipeline = EmptyOperator(task_id='end_pipeline')
check = PythonOperator(
task_id='check_if_data',
python_callable=check_if_data
)
create = PythonOperator(
task_id='create_table',
python_callable=create_table
)
load = PythonOperator(
task_id='load_data',
python_callable=load_data
)
start_pipeline >> check >> create >> load >> end_pipeline