Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
halilbilgin committed May 24, 2019
0 parents commit 1d36300
Show file tree
Hide file tree
Showing 16 changed files with 701,453 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.ipynb_checkpoints
__pycache__
.idea
Empty file added code/__init__.py
Empty file.
183 changes: 183 additions & 0 deletions code/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import psycopg2 as pg
import yaml
import pandas as pd
import argparse
from utils import *
from os import path
import numpy as np
from sqlalchemy import create_engine
import sqlalchemy
from io import StringIO
import csv

def psql_insert_copy(table, conn, keys, data_iter):
"""
Fast insert to postgresql. Used in pandas.DataFrame.to_sql
Got from https://stackoverflow.com/questions/23103962/how-to-write-dataframe-to-postgres-table
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)

columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name

sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)

def get_engine(database_config):
## Needed for pandas to_sql command
engine = create_engine('postgresql+psycopg2://' + database_config['user'] + ':' + database_config['password'] + \
'@' + database_config['host'] + ':' + str(database_config['port']) + '/' + database_config[
'dbname'], use_batch_mode=True)

return engine

def load_config(config_path):
with open(config_path) as schema_file:
config = yaml.load(schema_file)
return config

def create_tables(config: list, engine: sqlalchemy.engine.base.Engine):
con = engine.connect()

for table in config:
name = table.get('name')
schema = table.get('schema')
ddl = f"""DROP TABLE IF EXISTS {name}"""
con.execute(ddl)

ddl = f"""CREATE TABLE {name} ({schema})"""
con.execute(ddl)

def load_tables(engine: sqlalchemy.engine.base.Engine, config: list, data_path: str):

for table in config:
table_name = table.get('name')
print(table_name)
table_source = path.join(data_path, f"{table_name}.csv")

df = pd.read_csv(table_source)
df.columns = map(str.lower, df.columns)
#df_reorder = df[table.get('columns')] # rearrange column here
df.to_sql(table_name, engine, index=False, if_exists='append', method=psql_insert_copy)

def etl(database_config, schema_path: str, data_path):
engine = get_engine(database_config)

if type(schema_path) == str:
config = load_config(schema_path)
else:
config = schema_path

create_tables(config=config, engine=engine)
load_tables(engine=engine, config=config, data_path=data_path)

def get_transactions(engine: sqlalchemy.engine.base.Engine):

## THIS WAS BEFORE TRANSACTIONS TABLE UPDATED BY REVOLUT
## I assume that currency rates (not cryptocurrency) don't vary significantly over time so I fetched the data of 2016-01-01
## I also use USD as reference currency and convert all the currencies to USD to compare amounts of transactions fairly.
#df_currency_rates = get_currency_rate()
## ---

df_transactions = pd.read_sql("""SELECT t.*, c.name AS merchant_country_name, cd.is_crypto, cd.exponent,
CASE WHEN f.user_id IS NULL
THEN FALSE
ELSE TRUE
END AS is_fraudster
FROM transactions AS t
LEFT JOIN currency_details AS cd ON cd.ccy = t.currency
LEFT JOIN countries AS c ON UPPER(c.code3)= UPPER(t.merchant_country)
LEFT JOIN fraudsters as f ON f.user_id = t.user_id
""", engine)

df_transactions.loc[:, 'created_date'] = df_transactions['created_date'].astype('datetime64[ns]')

df_transactions.loc[:, 'amount_usd'] = df_transactions['amount_usd'] * np.power(10.0, -2)

return df_transactions

def get_users(engine: sqlalchemy.engine.base.Engine, df_transactions=None):
if type(df_transactions) == None:
df_transactions = get_transactions(engine)

df_users = pd.read_sql("""SELECT u.*, c.name as country_name
FROM users AS u
INNER JOIN countries AS c ON c.code = u.country
""", engine).set_index('id', drop=False)

transaction_count_by_user = df_transactions.groupby('user_id')['amount'].count().rename('transaction_count')

mean_transaction_period = df_transactions.groupby('user_id')['created_date'].apply(lambda x: x.diff().abs().mean())
mean_transaction_period = mean_transaction_period.astype('timedelta64[h]').rename('mean_transaction_peried')

df_users = pd.concat([df_users, transaction_count_by_user, mean_transaction_period], axis=1)

## Combine transactions and users table by using summary statistics (sum and median)

usd_total_amount_by_user = df_transactions.groupby('user_id')['amount_usd'].sum()\
.rename('total_amount_usd').astype(np.float32)
usd_total_amount_by_user = usd_total_amount_by_user.loc[usd_total_amount_by_user.index.intersection(df_users.index)]

usd_median_amount_by_user = df_transactions.groupby('user_id')['amount_usd'].median()\
.rename('median_amount_usd').astype(np.float32)
usd_median_amount_by_user = usd_median_amount_by_user.loc[usd_median_amount_by_user.index.intersection(df_users.index)]

df_users = pd.concat([df_users, usd_total_amount_by_user, usd_median_amount_by_user], axis=1)

return df_users

def load_features(engine: sqlalchemy.engine.base.Engine, processed_data, processed_labels):
processed_data.to_sql('features', con=engine, index_label='user_id', if_exists='replace', method=psql_insert_copy)

processed_labels = pd.DataFrame({'is_fraudster': processed_labels}, index=processed_labels.index)

processed_labels.to_sql('labels', con=engine, index_label='user_id', if_exists='replace', method=psql_insert_copy)

with engine.connect() as con:
con.execute('ALTER TABLE features ADD PRIMARY KEY (user_id);')
con.execute('ALTER TABLE labels ADD PRIMARY KEY (user_id);')

def get_features(engine, user_id=None):
"""
Load features from database
:param user_id: user_id to retrieve, if None retrieve features of all users.
:return:
"""
where_clause = ""
params = None
if user_id != None:
where_clause = "WHERE f.user_id= %(user_id)s"
params = {'user_id': user_id}

df_features = pd.read_sql("""SELECT f.*, l.is_fraudster FROM features AS f
INNER JOIN labels AS l ON l.user_id=f.user_id
""" + where_clause, engine,
params=params).set_index('user_id', drop=True)
df_features = df_features.replace({None: np.nan})

processed_dataset = df_features.drop('is_fraudster', axis=1)
processed_labels = df_features['is_fraudster']

return processed_dataset, processed_labels

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--database_config_path", help="Database config file (.yaml)",
default='../misc/database_config.yaml', required=False)
parser.add_argument("--schema_path", help="Schema file (.yaml)", default='../misc/schemas.yaml', required=False)
parser.add_argument("--data_path", help="Data path containing csv files of the data that will be loaded into database"
, default='../data/', required=False)

args = parser.parse_args()
database_config = load_config(args.database_config_path)[0]
etl(database_config, args.schema_path, args.data_path)
52 changes: 52 additions & 0 deletions code/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pandas as pd
from etl import get_transactions, get_users, load_config, load_features, get_engine
import psycopg2 as pg
from utils import fill_missing_amount_usd

def create_features(df_users, df_transactions):
""" Create features from user and transaction tables. """

processed_dataset = pd.DataFrame()

one_hot_vector_features = ['currency', 'merchant_country', 'source', 'state']

for feature in one_hot_vector_features:
feature_onehot = pd.concat([pd.get_dummies(df_transactions[feature], prefix=feature), df_transactions['user_id']], axis=1)

feature_onehot = feature_onehot.groupby('user_id').sum()

## Remove the dimensions that have very low variance.
feature_onehot = feature_onehot.loc[:,(feature_onehot.sum(axis=0)/feature_onehot.values.sum() > 0.001)]
feature_onehot = feature_onehot.apply(lambda x: x/x.sum(), axis=1)

processed_dataset = pd.concat([processed_dataset, feature_onehot], axis=1)

processed_dataset = pd.concat([df_users, processed_dataset], axis=1)
processed_dataset.drop(['terms_version', 'created_date', 'id', 'country_name', 'country',
'phone_country', 'state'], axis=1, inplace=True)

processed_dataset.loc[processed_dataset['kyc'] != 'PASSED', 'kyc'] = 0
processed_dataset.loc[processed_dataset['kyc'] == 'PASSED', 'kyc'] = 1
processed_dataset.loc[:, 'has_email'] = processed_dataset['has_email'].replace({False: 0, True: 1})

processed_labels = processed_dataset['is_fraudster'].replace({False: -1, True: 1})

processed_dataset.drop('is_fraudster', axis=1, inplace=True)

return processed_dataset, processed_labels

if __name__ == '__main__':
database_config = load_config('../misc/database_config.yaml')[0]

engine = get_engine(database_config)

df_transactions = get_transactions(engine)
df_transactions = fill_missing_amount_usd(df_transactions)
df_users = get_users(engine, df_transactions)

processed_data, processed_labels = create_features(df_users, df_transactions)

processed_data.to_csv('../data/processed_dataset.csv', index_label='user_id')
processed_labels.to_csv('../data/processed_labels.csv', index_label='user_id')

load_features(engine, processed_data, processed_labels)
50 changes: 50 additions & 0 deletions code/patrol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import yaml
import pickle
from os import path
from etl import load_config, get_engine, get_features
import sqlalchemy


def patrol(user_id, database_config='../misc/database_config.yaml',
patrol_config='../misc/patrol_config.yaml', model_file='../artifacts/model.pkl'):
"""
Takes user_id, suggests an action.
It may be wise to lock user when we are highly certain that the user is a fraudster. To do this, we may aim to
maximize our utility. I defined a cost that denotes the loss of company when we didn't lock a fraudster but only
alert the agent. and another cost that denotes the loss of company when we locked a normal user despite that
they are innocent. The algorithm determines the action based on the losses given by the company.
:param user_id:
:param database_config: str or sqlalchemy.engine.base.Engine: If str, a connection will be initialized. If it is
a sqlalchemy Engine object, it will be directly used for retrieving data from database.
:param patrol_config: str, the path where the cost values are stored
:param model_file: str, the path where the model is stored
:return: action: str, {LOCK_USER, ALERT_AGENT, NONE}
"""
if not path.exists(model_file):
raise Exception("Model file could not be found.")

if type(database_config) == sqlalchemy.engine.base.Engine:
engine = database_config
else:
database_config = load_config(database_config)[0]
engine = get_engine(database_config)

user_features, _ = get_features(engine, user_id)

with open(model_file, 'rb') as f:
model = pickle.load(f)

prob_being_fraudster = model.predict_proba(user_features)[0][1]
prob_being_normal = 1 - prob_being_fraudster

costs = load_config(patrol_config)

action_costs = {
'LOCK_USER': prob_being_normal * costs['cost_of_false_negative'],
'ALERT_AGENT': prob_being_fraudster * costs['cost_of_false_negative'],
'NONE': prob_being_fraudster * costs['cost_of_false_positive']
}

return max(action_costs, key=action_costs.get)
71 changes: 71 additions & 0 deletions code/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from utils import load_transformed_tables_from_csv
from features import create_features
import pickle
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV, train_test_split
from xgboost import XGBClassifier
from sklearn.metrics import f1_score
from sklearn.metrics import make_scorer
from etl import get_engine, get_features, load_config
from os import path
import os

f1_scorer = make_scorer(f1_score, pos_label=1)

def train_xgb_model(train_set, train_labels, n_iter=80, n_splits=3, scoring=f1_scorer):
"""
Train XGB model with hyperparameter tuning using Randomized Grid search with pre-defined parameter grid.
Stratified Kfold cross validation is used to select the best model. Metric is F1 score by default.
:param train_set: pandas.DataFrame object, train set containing features
:param train_labels: pandas.Series object, train labels
:param n_iter: int, Number of samples drawn from parameter grid
:param n_splits: int, Number of splits used in Stratified KFold cross validation
:param scoring: callable or string, scoring metric used in cross validation
"""
param_grid = {
'silent': [False],
'max_depth': [6, 10, 15, 20],
'learning_rate': [0.001, 0.01, 0.1, 0.2, 0,3],
'subsample': [0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
'colsample_bytree': [0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
'colsample_bylevel': [0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
'min_child_weight': [0.5, 1.0, 3.0, 5.0, 7.0, 10.0],
'gamma': [0, 0.25, 0.5, 1.0],
'reg_lambda': [0.1, 1.0, 5.0, 10.0, 50.0, 100.0],
'n_estimators': [70, 100, 150],
'scale_pos_weight': [1, 2, 4, (train_labels==-1).sum()/(train_labels==1).sum()]
}

clf = XGBClassifier()

rs_clf = RandomizedSearchCV(clf, param_grid, n_iter=n_iter,
n_jobs=10, verbose=2, cv=StratifiedKFold(n_splits=n_splits),
scoring=f1_scorer, refit=True, random_state=42)

rs_clf.fit(train_set, train_labels)

return rs_clf

def save_model(model, model_path):
with open(model_path, 'wb') as f:
pickle.dump(model.best_estimator_, f)

if __name__ == '__main__':
database_config = load_config('../misc/database_config.yaml')[0]

engine = get_engine(database_config)

processed_data, processed_labels = get_features(engine)
samples_with_labels = ~processed_labels.isna()
processed_data = processed_data[samples_with_labels]
processed_labels = processed_labels[samples_with_labels]

model = train_xgb_model(processed_data, processed_labels)

model_path = '../artifacts'

if not path.exists(model_path):
os.makedirs(model_path)


save_model(model, path.join(model_path, 'model.pkl'))
Loading

0 comments on commit 1d36300

Please sign in to comment.