-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetl_utils.py
More file actions
63 lines (51 loc) · 1.64 KB
/
etl_utils.py
File metadata and controls
63 lines (51 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import pandas as pd
import numpy as np
import requests
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sqlalchemy import create_engine
# ========== EXTRACT ========== #
def extract_csv(file):
return pd.read_csv(file)
def extract_json(file):
return pd.read_json(file)
def extract_api(url):
res = requests.get(url)
if res.status_code == 200:
data = res.json()
return pd.DataFrame(data)
return None
# ========== TRANSFORM ========== #
def clean_data(df):
df.columns = [col.lower().strip().replace(" ", "_") for col in df.columns]
df.drop_duplicates(inplace=True)
return df
def handle_missing_data(df):
return df.dropna(axis=0)
def parse_dates(df):
for col in df.columns:
if "date" in col or "time" in col:
try:
df[col] = pd.to_datetime(df[col])
except:
pass
return df
def convert_types(df):
return df.convert_dtypes()
def create_dummies(df):
return pd.get_dummies(df, drop_first=True)
def remove_outliers(df, z_thresh=3):
return df[(np.abs((df - df.mean()) / df.std()) < z_thresh).all(axis=1)]
def scale_features(df):
scaler = StandardScaler()
num_df = df.select_dtypes(include=[np.number])
df[num_df.columns] = scaler.fit_transform(num_df)
return df
def feature_engineering(df):
if 'price' in df.columns and 'quantity' in df.columns:
df['total_value'] = df['price'] * df['quantity']
return df
# ========== LOAD ========== #
def load_to_database(df, db_url, table_name):
engine = create_engine(db_url)
df.to_sql(table_name, engine, if_exists='replace', index=False)