This repository has been archived by the owner on Mar 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathauth.py
185 lines (152 loc) · 5.99 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# -*- coding: UTF-8 -*-
from typing import Tuple, Optional, Any
from pydantic import BaseModel
from fastapi import Depends, Response, HTTPException, APIRouter, Body
from fastapi_sessions import SessionCookie, SessionInfo
from fastapi_sessions.backends import InMemoryBackend
import sqlite3, uuid
import config, utils
router = APIRouter()
class SessionData(BaseModel):
userID: int
username: str
role: str
class UserInfo(BaseModel):
username: str
password: str
class ModifyPasswordUserInfo(BaseModel):
oldPassword: str
newPassword: str
curSession = SessionCookie(
name="session",
secret_key=config.SESSION_SECRET_KEY,
backend=InMemoryBackend(),
data_model=SessionData,
auto_error=False,
secure=True,
samesite='None'
)
async def checkLogin(
session_data: Optional[SessionInfo] = Depends(curSession)
):
if session_data is None:
raise HTTPException(
status_code=403,
detail="Not Authenticated"
)
async def needAdminRole(
session_data: Optional[SessionInfo] = Depends(curSession)
):
if session_data[1].role == 'user':
raise HTTPException(
status_code=403,
detail="Not Authenticated, you are not administrator."
)
@router.post("/users/login", tags=["users"])
async def userLogin(
user: UserInfo,
response: Response,
session_info: Optional[SessionInfo] = Depends(curSession)
):
old_session = None
if session_info:
old_session = session_info[0]
# Authentication: Password is MD5 Digested
with sqlite3.connect(config.DB_PATH) as DBConn:
params = (user.username, utils.MD5(user.password))
cursor = DBConn.execute("SELECT UID, Username, Role FROM User WHERE Username = ? and Password = ?", params)
userFinded = False
Role = ""
UID = 0
Username = ""
for row in cursor:
userFinded = True
UID = row[0]
Username = row[1]
Role = row[2]
break
if userFinded:
userSessionData = SessionData(userID=UID, username=Username, role=Role)
await curSession.create_session(userSessionData, response, old_session)
return {"status": 200, "message": "Logged in successfully.", "user": userSessionData}
else:
return {"status": 403, "message": "Username or password is wrong."}
@router.post("/users/logout", tags=["users"])
async def userLogout(
response: Response,
session_info: Optional[SessionInfo] = Depends(curSession)
):
await checkLogin(session_info)
await curSession.end_session(session_info[0], response)
return {"status": 200, "message": "You have logged out now.", "user": session_info}
@router.get("/users/status", tags=["users"])
async def userStatus(
session_data: Optional[SessionInfo] = Depends(curSession)
):
await checkLogin(session_data)
return {"status": 200, "message": "You are logged in.", "user": session_data}
@router.post("/users/register", tags=["users"])
async def userRegister(
user: UserInfo
):
regEn = 0
with sqlite3.connect(config.DB_PATH) as DBConn:
cursor = DBConn.execute("SELECT Value FROM Setting WHERE Name = 'RegisterEnabled'")
for row in cursor:
regEn = int(row[0])
break
if regEn == 0:
return {"status": 403, "message": "Registration Closed."}
# Authentication: Password is MD5 Digested
with sqlite3.connect(config.DB_PATH) as DBConn:
params_name = list()
params_folder_info = list()
params_name.append(user.username)
params_folder_info.append(str(uuid.uuid4()))
params_folder_info.append(user.username)
cursor = DBConn.execute("SELECT Username FROM User WHERE Username = ?", params_name)
userFinded = False
for row in cursor:
userFinded = True
break
params = (user.username, utils.MD5(user.password))
if userFinded:
return {"status": 202, "message": "User name already exists."}
else:
DBConn.execute("INSERT INTO User(Username, Password, Role) VALUES (?, ?, 'user')", params)
DBConn.execute("INSERT INTO Folder(FUUID, Name, Username, Shared) VALUES (?, '默认文件夹', ?, FALSE)", params_folder_info)
UID = DBConn.execute("SELECT UID FROM User WHERE Username = ?", params_name)
FID = DBConn.execute("SELECT FID FROM Folder WHERE Username = ?", params_name)
params_uid_fid = []
for uid in UID:
params_uid_fid.append(uid[0])
break
for fid in FID:
params_uid_fid.append(fid[0])
break
return {"status": 200, "message": "Registered successfully."}
@router.post("/users/modifypassword", tags=["users"])
async def userModifyPassword(
user: ModifyPasswordUserInfo,
session_info: Optional[SessionInfo] = Depends(curSession)
):
await checkLogin(session_info)
# Authentication: Password is MD5 Digested
with sqlite3.connect(config.DB_PATH) as DBConn:
params = (utils.MD5(user.newPassword), session_info[1].username, utils.MD5(user.oldPassword))
if params[0] == params[2]:
return {"status": 201, "message": "Password modification failed, the old password is identical to the new password."}
cursor = DBConn.execute("UPDATE User SET Password = ? WHERE Username = ? AND Password = ?", params)
if cursor.rowcount == 1:
return {"status": 200, "message": "Password modified successfully."}
else:
return {"status": 202, "message": "Password modification failed, the old password is wrong."}
@router.get("/users/isadmin", tags=["users"])
async def userIsAdmin(
session_info: Optional[SessionInfo] = Depends(curSession)
):
await checkLogin(session_info)
if session_info[1].role == 'user':
return {"status": 200, "message": "Is user.", "flag": False}
else:
return {"status": 200, "message": "Is admin.", "flag": True}