-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmagic_item_put.py
168 lines (138 loc) · 4.91 KB
/
magic_item_put.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Import for data-typing
from google.auth.transport.requests import AuthorizedSession
# Built-in libraries
import json
import threading
# Created modules
import connection_error
# Global variables
error_files = []
# accommodate
class RequestThread(
threading.Thread
):
def __init__(
self,
auth_session: AuthorizedSession,
file_dir: str,
index_json_dir: str,
full_url: str,
file_name: str,
index_lock: threading.Lock,
print_lock: threading.Lock,
error_lock: threading.Lock,
) -> None:
"""
Thread to handle POST requests
Args:
auth_session (AuthorizedSession): Authenticated request object
file_dir (str): Directory of JSONs
index_json_dir (str): Directory of index.json
full_db_url (str): URL of database
file_name (str): Name of file to add to database
index_lock (threading.Lock): Lock to prevent race conditions while accessing/editing index.json
print_lock (threading.Lock): Lock to prevent race conditions while printing
error_lock (threading.Lock): Lock to prevent race conditions while adding to error list
"""
threading.Thread.__init__(self)
self.session = auth_session
self.file_dir = file_dir
self.index_json_dir = index_json_dir
self.full_url = full_url
self.file_name = file_name
self.index_lock = index_lock
self.print_lock = print_lock
self.error_lock = error_lock
def run(
self
) -> None:
global error_files
# Checks to see if the file in in the index
with open(self.index_json_dir, "r", encoding="utf-8") as f:
index_json = json.load(f)
# If the file is already in the index
if self.file_name not in index_json.keys():
# Print lock
with self.print_lock:
print(f"{self.file_name} doesn't already exist! ")
# If the file is not already in the index
else:
# Builds file path
full_file_dir = f"{self.file_dir}/{self.file_name}"
# Opens JSON file (thread safe because threads are accessing different files)
with open(full_file_dir, "r", encoding="utf-8") as f:
edit_json_file = json.load(f)
# Renames keys since Firebase does not like $'s in keys
schema = edit_json_file['$schema']
json_id = edit_json_file['$id']
del edit_json_file['$schema']
del edit_json_file['$id']
edit_json_file['schema'] = schema
edit_json_file['id'] = json_id
json_file = json.dumps(edit_json_file, indent=4, sort_keys=True)
# Creates the final portion of the DB full_URL
self.full_url = f"{self.full_url}{index_json[self.file_name]}.json"
# Sends JSON to database
response = self.session.put(self.full_url, data=json_file)
# If the database says it was a good request
if response.status_code == 200:
# Print lock
with self.print_lock:
print(f"{self.file_name} successfully updated!\n")
# If the databases says it was not a good request
else:
# Error lock
with self.error_lock:
error_files.append(
f"{self.file_name} Error: {response.status_code}"
)
def main(
auth_session: AuthorizedSession,
file_dir: str,
index_json_dir: str,
db_folder_url: str,
file_list: list[str],
) -> None:
"""
Main function to add JSONs to the database
Args:
auth_session (AuthorizedSession): Authenticated request object
file_dir (str): Directory of JSONs
index_json_dir (str): Directory of index.json
db_folder_url (str): URL of target database folder
file_list (list[str]): List of file names within target directory
"""
# Creates locks
index_lock = threading.Lock()
print_lock = threading.Lock()
error_lock = threading.Lock()
# Creates threads
threads = []
for file in file_list:
threads.append(
RequestThread(
auth_session,
file_dir,
index_json_dir,
db_folder_url,
file,
index_lock,
print_lock,
error_lock,
)
)
# Starts threads
for thread in threads:
thread.start()
# Joins threads
for thread in threads:
thread.join()
# Error notifications
if len(error_files) > 0:
print("The following files had errors:\n")
for file in error_files:
print(file)
if __name__ == "__main__":
print(
"This code is not meant to be executed directly, please execute main.py instead."
)