-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·3369 lines (2737 loc) · 125 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Standard Imports
import os
import re
from datetime import date, datetime, timedelta
# Third-Party Imports
import bleach #sanitazition of data to avoid data breaches
import string
import random
import smtplib
import logging
import psycopg2
from functools import wraps
import requests
from flask_mail import Mail, Message
from requests import RequestException
from flask import send_from_directory
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, request, session, flash, redirect, url_for, g
app = Flask(__name__, static_folder="Uploads", static_url_path="/Uploads")
# ALL CONFIGURATIONS.
UPLOAD_FOLDER = "Uploads/Profile"
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg"}
# Set the maximum file size to 1MB for file uploads
app.config["MAX_CONTENT_LENGTH"] = 1 * 1024 * 1024 # 1MB limit
# Set session lifetime to 60 minutes (3600 seconds) == 1 hour
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=60)
# Set up Flask app configuration with updated reCAPTCHA keys
app.config['RECAPTCHA_PUBLIC_KEY'] = '6Ldm9jAqAAAAAPSaTkngMWx9FnjfySGIepLn6sfw'
app.config['RECAPTCHA_PRIVATE_KEY'] = '6Ldm9jAqAAAAAACxamBNIs0ZVTXT9LcObcKU0BdH'
app.config["SECRET_KEY"] = "*(SbXi=a<bV~8a4v@AWlOT-w"
app.config["MAIL_SERVER"] = "smtp.gmail.com"
app.config["MAIL_PORT"] = 587
app.config["MAIL_USE_TLS"] = True
app.config["MAIL_USE_SSL"] = False
app.config["MAIL_USERNAME"] = "[email protected]"
app.config["MAIL_PASSWORD"] = "shou wmat ekvz jbkp" # app password
app.config["MAIL_DEFAULT_SENDER"] = "[email protected]"
mail = Mail(app)
# please replace this with your actual database credentials
# Database connection
conn = psycopg2.connect(
database="authflaskapp",
user="postgres",
password="authenticate",
host="localhost",
port="5432",
)
@app.teardown_appcontext
def close_db_connection(exception=None):
"""
Teardown function to close the database connection.
Parameters:
- exception: Exception object (default is None).
Functionality:
- Closes the database connection if it exists in the global context ('g').
"""
# Get the database connection from the global context ('g')
db_connection = getattr(g, "_db_connection", None)
# Check if the connection exists
if db_connection is not None:
# Close the database connection
db_connection.close()
# Establish a connection to PostgreSQL database
def get_db_connection():
"""
Establish a connection to the PostgreSQL database.
Returns:
- PostgreSQL database connection.
Functionality:
- Retrieves the database connection from the global context ('g').
- If the connection is not present,
establishes a new connection and stores it in 'g'.
"""
# Get the database connection from the global context ('g')
db_connection = getattr(g, "_db_connection", None)
# Check if the connection is not present
if db_connection is None:
# Establish a new connection to the PostgreSQL database
db_connection = g._db_connection = psycopg2.connect(
database="authflaskapp",
user="postgres",
password="authenticate",
host="localhost",
port="5432",
)
# Return the established or existing database connection
return db_connection
# AM NOT SURE WITH THIS PART: I HAVE NOT TESTED IT YET.
def get_old_email(user_id):
"""
Retrieve the old email address associated with a user.
Parameters:
- user_id: ID of the user in the database.
Returns:
- Old email address associated with the user.
Functionality:
- Executes an SQL query to fetch the email address for the specified user_id.
"""
# Create a cursor to interact with the database
cursor = conn.cursor()
# Execute an SQL query to select the email from the 'accounts' table based on user_id
cursor.execute("SELECT email FROM accounts WHERE id = %s", (user_id,))
# Fetch the first row of the result and get the email value
email = cursor.fetchone()[0]
# Close the cursor to release resources
cursor.close()
# Return the retrieved email address
return email
# generate a random verification token of length 64
def generate_verification_token(length=64):
"""
Generate a random verification token.
Parameters:
- length: Length of the token (default is 64).
Returns:
- Randomly generated verification token.
Functionality:
- Uses a combination of letters and digits
to generate a random token of the specified length.
"""
characters = string.ascii_letters + string.digits
return "".join(random.choice(characters) for _ in range(length))
# THIS PART (index, about, contact terms of services, privacy policies)
@app.route("/")
def index():
"""
Route for the main index page.
Returns:
- flask.render_template: Renders the 'index.html' template.
"""
return render_template("index-.html")
@app.route("/about")
def about():
"""
Route for the 'About' page.
Returns:
- flask.render_template: Renders the 'about.html' template.
"""
return render_template("Intuitivers/about-.html")
@app.route("/terms")
def terms_of_services():
"""
Route for the 'Terms of service Page'
:return:
- flask.render_template terms-of-services template,
-located in Intuitivers folder
"""
return render_template("Intuitivers/terms-of-services-.html")
@app.route("/privacy_policy")
def privacy_policy():
"""
Route for the 'Privacy Policy Page'
:return:
- flask.render_template privacy-policy template,
-located in Intuitivers folder
"""
return render_template("Intuitivers/privacy-policy-.html")
@app.route("/services")
def services():
"""
Route for 'services' Page
return:
- flask.render_template: Renders the services.html Page
located in the Intuitivers folder
"""
return render_template("Intuitivers/services-.html")
@app.route('/Intuitivers/contact-.html')
def contacts():
return render_template('Intuitivers/contact.html')
@app.route("/contact", methods=["GET", "POST"])
def contact():
"""
Handle the contact form functionality.
This contact route handles both GET and POST requests.
On GET, it renders the 'contact.html' template, allowing users
to view and fill out the contact form. On POST, it processes the submitted form,
sends an email to the support team, and redirects the user to the home page.
Args:
(GET request) or form data (POST request) None.
Returns:
flask.render_template or flask.redirect:
The rendered template or a redirection response.
"""
if request.method == "POST":
name = request.form["name"]
email = request.form["email"]
message = request.form["message"]
subject = request.form["subject"]
# sanitize user inputs.
name = bleach.clean(name)
email = bleach.clean(email)
message = bleach.clean(message)
subject = bleach.clean(subject)
# Email your support team with the user's message
support_email = "[email protected]" # support team's email address
email_message = Message(
f"{subject.upper()} " f"from {name.title()}", recipients=[support_email]
)
email_message.body = (
f"Name: {name.title()}\nEmail: "
f"{email.lower()}\n\nMessage:\n{message.upper()}"
)
mail.send(email_message)
flash(
"success: Message has been sent to our support team."
" We will get back to you soon!",
"success",
)
# Redirect to the homepage page after submission
return redirect(url_for("contact"))
return render_template("/Intuitivers/contact-.html")
# Minimum length of character in first name, last name and country.
MIN_LENGTH = 3
# THIS FUNCTION HERE WORK WELL: VERSION 2.0. Nov 14 Tuesday
def generate_security_pin(first_name, last_name, country, username):
"""
Generate a security pin for account deletion.
Extract the first letter from each field and convert to uppercase.
Parameters:
- first_name: First name of the user.
- last_name: Last name of the user.
- country: Country of the user.
- username: Username of the user.
Returns:
- Security PIN generated from the first letters of the input fields.
"""
# Extract the first letter from the first name and convert to uppercase
first_letter_first_name = first_name[0].upper()
# Extract the first letter from the last name and convert to uppercase
first_letter_last_name = last_name[0].upper()
# Extract the first letter from the country and convert to uppercase
first_letter_country = country[0].upper()
# Extract the first letter from the username and convert to uppercase
first_letter_username = username[0].upper()
# Combine the first letters to form the security PIN
pin = (
f"{first_letter_first_name}{first_letter_last_name}"
f"{first_letter_country}{first_letter_username}"
)
# Return the generated security PIN
return pin
# THIS FUNCTION WORK WELL: VERSION 2.0
def send_security_pin_email(email, pin, username):
try:
# Create a message with the security PIN and send it to the user's email
email_message = Message(
"Security PIN for Account Deletion", recipients=[email]
)
email_message.body = (
# I CHANGED THIS PART WHEN OFF LINE I WILL BE BACK.
f"Hello {username.title()}!!,\n\n"
f"Your security PIN for account deletion is: {pin}\n\n"
f"Please keep this PIN secure, as it will be used "
f"during account deletion purposes. If you lose this PIN, "
f"please contact our support team for assistance. Please note that"
f" we may require additional information to verify your identity.\n\n"
f"Thank you for choosing us!\n\n"
f"Best regards,\n"
f"The AuthFlaskApp Team"
)
mail.send(email_message)
return True # Email sent successfully
except smtplib.SMTPAuthenticationError:
flash(
"Error: Failed to authenticate with the email server. "
"Please contact our support team for assistance.",
"error",
)
return False # Email sending failed due to authentication error
except smtplib.SMTPException:
flash(
"Error: An error occurred while sending the email. "
"Please contact our support team for assistance.",
"error",
)
return False # Email sending failed due to other SMTP-related issues
# WORK AS I NEED: VERSION 2.0 Nov 14 Tuesday
def is_strong_password(password):
"""
Check if a password meets the criteria for a strong password.
Parameters:
- password (str): The password to be checked.
Returns:
- bool: True if the password is strong, False otherwise.
Criteria:
- The password must be at least eight characters long.
- It must contain at least one numeric character or one special character.
"""
# The Password must be
# at least eight characters
# long and contain, one space
# and one special character
# and one alphanumeric character
return len(password) >= 8 and (
re.search(r"\d", password)
or re.search(r'[!@#$%^&*(),.?":{}|<>]', password)
or " " in password
)
# WORK AS I NEED: VERSION: 2.0 Nov 14 Tues
@app.route("/register", methods=["GET", "POST"])
def register():
"""
Handle user registration. If the request method is POST:
- Extract user information from the registration form.
- Validate email, password strength, first and last name,
username, and country before storing them in a database.
- Hash the password using PBKDF2 with SHA-256 salt_length = 8.
- Check if the email or username already exists in the database.
- Generate a random security PIN and a verification token for email verification.
- Insert user data into the database and send email verification messages.
- Render the registration success page.
If the request method is GET:
- Render the registration form.
Returns:
flask.render_template: Renders the 'registration_form.html' template for GET requests.
flask.redirect: Redirects to the 'register' route after successfully registering a user.
flask.render_template: Renders the 'registration_success.html' template after successful registration.
flask.render_template: Renders the 'email_send_error.html' template if there's an email sending failure.
"""
if request.method == "POST":
email = request.form["email"].lower()
first_name = request.form["first_name"].title()
last_name = request.form["last_name"].title()
username = request.form["username"]
password = request.form["password"]
country = request.form["country"].title()
# it's a good practice to
# sanitize user inputs in all routes
# where user input is accepted, especially
# if that input is later displayed on your web page.
# Sanitizing inputs helps prevent security
# vulnerabilities such as Cross-Site Scripting (XSS).
# Sanitize user inputs
email = bleach.clean(email)
first_name = bleach.clean(first_name)
last_name = bleach.clean(last_name)
username = bleach.clean(username)
password = bleach.clean(password)
country = bleach.clean(country)
# Generate security PIN
pin = generate_security_pin(first_name, last_name, country, username)
# Validate email format
if not email or "@" not in email:
flash("Error: Invalid email address.", "error")
return redirect(url_for("register"))
# Validate the password strength
# if not is_strong_password(password):
# flash(
# 'Password must be at least eight characters
# long and contain at least one uppercase, lowercase,
# digit,and special character.' 'error')
# return redirect(url_for('register')) # redirect tho the same page to try again
if (
len(password) < 8
or not re.search(r"[A-Z]", password)
or not re.search(r"[a-z]", password)
or not re.search(r"[0-9]", password)
or not re.search(r'[!@#$%^&*(),.?":{}|<>]', password)
):
flash(
"Error: Password must be at least 8 characters long "
"and contain at least one uppercase letter, one lowercase letter,"
" one digit, and one special character.",
"error",
)
return redirect(url_for("register"))
# Validate first name, last name, and username
if (
len(first_name) < MIN_LENGTH
or len(last_name) < MIN_LENGTH
or len(username) < MIN_LENGTH
):
flash(
"Error: Name and username must be at least {}"
" characters long.".format(MIN_LENGTH),
"error",
)
return redirect(url_for("register"))
# Validate first name, last name, username, and country
if not (first_name.isalpha() and first_name[0].isalpha()):
flash("Invalid first name.", "error")
return redirect(url_for("register"))
if not (last_name.isalpha() and last_name[0].isalpha()):
flash("Invalid last name.", "error")
return redirect(url_for("register"))
if not (username.isalpha() and username[0].isalpha()):
flash("Invalid username.", "error")
return redirect(url_for("register"))
if not (country.isalpha() and country[0].isalpha()):
flash("Invalid country name.", "error")
return redirect(url_for("register"))
# Hash the password using PBKDF2 with SHA-256 before storing it in the database
hashed_password = generate_password_hash(
password, method="pbkdf2:sha256", salt_length=8
)
# Check if the email and username already exist
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM accounts " "WHERE email = %s OR username = %s",
(email, username),
)
existing_user = cursor.fetchone()
if existing_user:
flash(
"Error: username or email address already used. "
"Use another email and or username",
"error",
)
return redirect(url_for("register"))
# Generate a random token for email verification
verification_token = "".join(
random.choices(string.ascii_letters + string.digits, k=32)
)
# Calculations of day, date, month and year of registration
registration_date = datetime.now()
day = registration_date.day
month = registration_date.month
year = registration_date.year
# Insert the user data into the database
cursor.execute(
"INSERT INTO accounts (email, first_name, last_name, "
"username, password, country, day, month, year, verified, pin) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(
email,
first_name,
last_name,
username,
hashed_password,
country,
day,
month,
year,
False,
pin,
),
)
# Get the id of the newly inserted account
account_id = cursor.fetchone()[0]
# An SQL cursor named 'cursor' and a database connection named 'conn'
cursor.execute(
"INSERT INTO tokens "
"(account_id, username, email, verification_token) VALUES (%s, %s, %s, %s)",
(account_id, username, email, verification_token),
)
# Send an email verification message
email_message = Message("Account Activation.", recipients=[email])
# local server is running on port 5000
server_address = "http://127.0.0.1:5000"
email_message.body = (
f"Hello {username},\n\n"
f"We are excited to welcome you to our AuthFlaskApp platform! To complete your registration please "
f"verify your email address, click the link below:\n\n"
f"Verification Link: {server_address}/verify/{verification_token}\n\n"
f"This link is valid for 10 minutes. If you didn't sign up for an account, please ignore "
f"this email someone might mistyped your email address. "
f"For any assistance or questions, feel free to contact our support team.\n\n"
f"Best regards,\n"
f"The AuthFlaskApp Team"
)
mail.send(email_message)
# Send security PIN email
email_sent = send_security_pin_email(email, pin, username)
if not email_sent:
# Handle email sending failure
return render_template("email_send_error.html")
# Send a congratulatory email for social media account registration
congratulatory_message = Message(
"Welcome to Our Social Media Community", recipients=[email]
)
congratulatory_message.body = (
f"Hello,\n\n"
f"Welcome to our social media community! We are delighted to have you on board. You have successfully registered "
f"for our platform, and we can't wait for you to start connecting with others and exploring the exciting content "
f"our community has to offer.\n\n"
f"Here are a few things you can do to get started:\n"
f"- Complete your profile: Add a profile picture and a short bio to let others know more about you.\n"
f"- Connect with others: Find and connect with friends, family, and people with shared interests.\n"
f"- Explore content: Dive into posts, photos, videos, and discussions shared by our vibrant community members.\n\n"
f"If you have any questions or need assistance, feel free to reach out to our support team. Thank you for joining "
f"us, and we hope you have a wonderful experience!\n\n"
f"Best regards,\n"
f"The AuthFlaskApp Team"
)
mail.send(congratulatory_message)
return render_template("/General/Info/Success/registration-success-.html")
return render_template("/Auth/registration-form-.html")
@app.route("/verify/<token>", methods=["GET"])
def verify_email(token):
"""
Verify the user's email using a verification token.
This route is responsible for
processing the verification token provided in the URL during registration.
It checks the validity of the token, whether it has expired or not, and updates the
user's account status to 'verified' in the database. The verification link is valid for
10 minutes. If the token is valid, the user is redirected to the login page with a
notification to log in. If the token is invalid or has expired, an appropriate error
message is displayed, and the user is redirected to request a new verification token.
Args:
token (str): The verification token extracted from the URL.
Returns:
flask.Response: A redirect to the login page or the resend_verification page, or
a rendered template in case of an error.
"""
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM tokens " "WHERE verification_token = %s", (token,))
token_data = cursor.fetchone()
if token_data:
# Check if the verification link has expired (valid for 10 minutes)
verification_sent_time = token_data[
5
] # Verification_sent_time is in the 6th column
current_time = datetime.now()
# Calculate the time difference in minutes
time_difference = (
current_time - verification_sent_time).total_seconds() / 60
# the verification link is valid for 10 minutes
if time_difference <= 10:
# Update the 'verified' column to
# mark the user as verified in the table accounts
cursor.execute(
"UPDATE accounts SET verified = TRUE " "WHERE id = %s", (
token_data[1],)
)
# Delete the verification token
# from the table tokens after successful verification
cursor.execute("DELETE FROM tokens WHERE id = %s",
(token_data[0],))
# Commit transaction to save changes in the database
conn.commit()
cursor.close()
# flash a success message after successful verification.
# redirect to login page.
flash("success: Account verified, you can now log in.")
# after success verification
# of the account redirect to log in page to continue
return redirect(url_for("login"))
else:
# Verification link has expired, delete the token from the table tokens
cursor.execute("DELETE FROM tokens WHERE id = %s",
(token_data[0],))
conn.commit()
cursor.close()
# if the token has been expired due to time
flash(
"Error: The token already "
"expired due to time. Request a new token here.",
"error",
)
# redirect to resend_verification route
# to request a new email for verifying users' email account
return redirect(url_for("resend_verification"))
else:
# the token has been used, and it is not present in the database
return render_template("General/verification_error.html")
# Function to generate a 6-digit random token for 2FA accounts
def generate_token():
two_fa_token = "".join(random.choices(string.digits, k=6))
# Print generated token for debugging
print(f"Generated Token: {two_fa_token}")
return two_fa_token
# Function to insert 2FA token into the accounts table if 2FA is enabled for the user
def insert_tfa_token_to_table(user_id, token):
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT tfa " "FROM accounts WHERE id = %s", (user_id,))
enable_tfa = cursor.fetchone()
if enable_tfa and enable_tfa[0] == "T":
# Update the auth_token field and
# token_timestamp in the table accounts for the specific user
update_query = (
"UPDATE accounts "
"SET auth_token = %s, ttmp = %s WHERE id = %s"
)
token_timestamp = datetime.now() # Get the current timestamp
cursor.execute(
update_query, (token, token_timestamp, user_id))
# Print the stored token and user ID for debugging
print("Stored Token: {token} for User ID: {user_id}")
# Commit the transaction to save changes to the database
conn.commit()
else:
print(
f"2FA is not enabled for User ID: {user_id}. Token not stored."
)
# handle database-specific error
except psycopg2.Error as db_error:
print(f"Database error: {db_error}")
flash("Error: Database error occurred. " "Please try again later.", "error")
# handle network-related error
except RequestException as request_error:
print(f"Network request error: {request_error}")
flash(
"Error: Network request error occurred. " "Please try again later.", "error"
)
# handle value-related error
except ValueError as value_error:
print(f"Value error: {value_error}")
flash(
"Error: Invalid value error occurred. " "Please check your input.", "error"
)
except Exception as e:
print(f"Unexpected error: {e}")
flash(
"Error: An unexpected error occurred. " "Please try again later.", "error"
)
conn.rollback() # Rollback the transaction in case of an error
def send_tfa_token_email(email, token, username):
# Print the token and email for debugging
print(f"Sending token: {token} to email: {email}")
msg = Message(
"Authentication Code for Your Account",
sender="[email protected]",
recipients=[email],
)
msg.html = (
f"<p>Hello {username},</p>"
f"<p>We detected a new login attempt on your account. "
f"To continue, please enter the verification code below:</p>"
f"<p style='font-size: 30px; font-weight: bold; color: teal'>Verification Code: {token}</p>"
f"<p>Please enter this code to complete the login process. "
f"If you did not request this, please ignore this email.</p>"
f"<p>For your account security, if you did not initiate this login attempt, "
f"your password might be compromised, "
f"we recommend changing your password immediately to prevent"
f" unauthorized access.</p>"
f"<p>Thank you for using our service!</p>"
f"<p>Best regards,<br>The AuthFlaskApp Team</p>"
)
mail.send(msg)
# Function to store 2FA token in the database
def store_tfa_token(user_id, token):
"""
Store 2FA token in the database.
Update the auth_token field and
token_timestamp in the table accounts for the specific user.
Args:
user_id (int): The user's ID.
token (str): The generated 2FA token. 2FA Two-Factor Authentication token
Returns:
None
"""
# Update the auth_token field and
# token_timestamp in the table accounts for the specific user
update_query = "UPDATE accounts SET auth_token = %s, ttmp = %s WHERE id = %s"
token_timestamp = datetime.now()
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(update_query, (token, token_timestamp, user_id))
conn.commit()
# Function to get user by username from the database
def get_user_by_username(username):
"""
Get user by username from the database.
Args:
username (str): The username of the user.
Returns:
dict or None: A dictionary representing the user if found, None otherwise.
"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM accounts " "WHERE username = %s", (username,)
)
user = cursor.fetchone()
return user
except psycopg2.Error as e:
# Handle the exception based on your application's needs
# logging.error(f"Database error: {e}", exc_info=True)
print(f"Database error: {e}")
return None
# Function to get stored 2FA token and timestamp from the database based on user_id
def get_stored_tfa_token_and_timestamp(user_id):
"""
Get stored 2FA token and timestamp from the database based on user_id.
Args:
user_id (int): The user's ID.
Returns:
tuple or None:
A tuple containing the stored token and timestamp if found, None otherwise.
"""
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT auth_token, ttmp FROM accounts " "WHERE id = %s", (
user_id,)
)
stored_token, token_timestamp = cursor.fetchone()
return stored_token, token_timestamp
except psycopg2.Error as e:
# Handle the exception based on your application's needs
print(f"Database error: {e}")
return None, None
MAX_FAILED_ATTEMPTS = 3 # Maximum allowed consecutive failed attempts
LOCKOUT_DURATION_MINUTES = 5 # Lockout duration in minutes
@app.route("/login", methods=["GET", "POST"])
def login():
"""
Handle user login functionality.
If the request method is POST, retrieve the entered username and password from the database.
Check if the username exists in the database and verify the entered password to match with the stored password.
If the username and password match with the stored user data AND If the user is verified and has 2FA enabled,
generate a token, send it via email, and proceed to the 2FA verification process.
Else:
If 2FA is not enabled, proceed to the dashboard and store user data in the session.
If the user is not verified, render an account not verified template to inform a user.
If the username or password is incorrect, display a flash message for incorrect login credentials.
If the request method is GET, render the login form again.
Returns:
If login credentials (username and or password) are incorrect, flash an invalid username or password message.
A rendered template or a redirect to another route based on the login outcome.
"""
if "user_id" in session:
# If the user is already logged in,
# and their session data is still active
# try to access the login page to log in again.
# Direct them directly to the dashboard without
# the need to provide login credentials at this time
return redirect(url_for("dashboard"))
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
# Check if the username exists in the database
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM accounts " "WHERE username = %s", (username,)
)
user = cursor.fetchone()
# check if the password during login matches with the stored hashed password
if user and check_password_hash(user[5], password):
print(f"User {username} found and password matched.")
# Check if the user is verified
if user[12]: # verified column is the 13th column in the table accounts
print("User is verified.")
# Check if 2FA is activated for the user's account
if user[14] == "T": # tfa column is the 15th column in the table accounts
print("2FA is enabled.")
# Generate a 6-digit token
token = generate_token() # generate_token: The function responsible
# Print the token for debugging
print(f"Generated Token: {token}")
# Inside the login route, after setting session variables
# Send the token to the user's email address and store it in the database
# user[1] is the email column, and its (2nd column in table accounts)
# send_tfa_token_email: The function responsible
send_tfa_token_email(user[1], token, username)
# Insert the token into the table accounts
# 'insert_tfa_token_to_table': The function responsible
insert_tfa_token_to_table(user[0], token)
conn.commit() # Commit transaction to save changes in the database
# Inside the '2FA is enabled' branch
print(
f"Setting session variables: "
f"user_id={user[0]}, username={user[4]}, "
f"email={user[1]}, first_name={user[2]}, "
f"last_name={user[3]}, 2fa_token={token}"
)
session["user_id"] = user[0]
session["username"] = user[4]
session["email"] = user[1]
session["first_name"] = user[2]
session["last_name"] = user[3]
session["2fa_token"] = token
# Before redirecting or rendering templates,
# print the session variables again to confirm their values
print(f"Session variables after setting: {session}")
# Redirect to the 2FA verification page
return render_template("/2FA/tfa-login-verification-.html")
elif user[14] == "F":
print(f"2FA is not enabled. Token Not generated hence not stored.")
# After setting session variables
# 2FA is not enabled, proceed to the dashboard
# Store user data in the session
session["user_id"] = user[0]
print(
f"Setting session variables: "
f"user_id={user[0]}, username={user[4]}, "
f"email={user[1]}, first_name={user[2]}, last_name={user[3]}"
)
print(
f"user_id: {session['user_id']}, "
f"2fa_token: {session.get('2fa_token')}"
)
# 5th column in the table accounts
session["username"] = user[4]
# 2nd column in the table accounts
session["email"] = user[1]
# 3rd column in the table accounts
session["first_name"] = user[2]
# 4th column in the table accounts
session["last_name"] = user[3]
print(f"Session variables after setting: {session}")
flash("success: You logged in successfully.", "success")
# Redirect to the dashboard after successful login
return redirect(url_for("dashboard"))
else:
# if an account is not verified and user tries to log in.
return render_template("/Info/Failure/account-not-verified-.html")
else:
# Extend the login process if the username or password is incorrect.
flash("Error: invalid username or password.", "error")
# the GET request