-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
134 lines (107 loc) · 3.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import re
import json
import base64
import humanize
from datetime import datetime
from pydantic import BaseModel
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
DATA_PATH=os.environ.get("DATA_PATH", "data/")
app = FastAPI()
@app.middleware("http")
async def cors_handler(request: Request, call_next):
response: Response = await call_next(request)
allowed_origins = os.environ.get("CORS_ORIGINS", "*").split(",")
allowed_methods = ", ".join(os.environ.get("CORS_METHODS", "GET,PUT,OPTIONS").replace(" ","").split(","))
allowed_headers = ", ".join(os.environ.get("CORS_HEADERS", "*").replace(" ","").split(","))
request_origin = request.headers.get("Origin")
if allowed_origins[0] == "*" or request_origin in os.environ.get("CORS_ORIGINS", "*").split(","):
response.headers['Access-Control-Allow-Methods'] = allowed_methods
response.headers['Access-Control-Allow-Headers'] = allowed_headers
if request_origin:
response.headers['Access-Control-Allow-Origin'] = request_origin
return response
def make_boolean(value):
true_values = [ "true", "yes", "y", "1", 1 ]
false_values = [ "false", "no", "n", "0", 0 ]
if type(value) == bool:
return value
if value.lower() in false_values:
return False
if value.lower() in true_values:
return True
return False
@app.get("/")
def read_root():
return {
"paths": [
"/"
]
}
@app.options("/{full_path:path}")
async def options_handler(full_path: str):
"""
Handle OPTIONS requests for the specified endpoint.
"""
return {}
@app.put("/{full_path:path}")
async def write(full_path: str, request: Request):
# get file and folder path
file_path = os.path.join(DATA_PATH, full_path)
folder_path = os.path.dirname(file_path)
# ensure all folder exists
try:
os.makedirs(folder_path, exist_ok=True)
except FileExistsError as error:
key = error.filename
error_message = {"msg": f"key '{key}' is already a key!"}
return JSONResponse(content=error_message, status_code=409)
# write base64 encoded data
exists = os.path.exists(file_path)
size = 0
data = base64.b64encode(await request.body())
try:
with open(file_path, "wb") as f:
f.write(data)
size = os.path.getsize(file_path)
except IsADirectoryError as error:
key_path = error.filename
error_message = {"msg": f"key '{key_path}' is already part of a key!"}
return JSONResponse(content=error_message, status_code=409)
return {
"created": not exists,
"size": size,
"size_human": humanize.naturalsize(size)
}
@app.get("/{full_path:path}")
async def read(full_path: str, plain = False, b64 = False, prefix = None):
b64 = make_boolean(b64)
plain = make_boolean(plain)
path = os.path.join(DATA_PATH, full_path)
# get keys (files) if path is a folder
if os.path.isdir(path):
keys = os.listdir(path)
if prefix:
# Use the filter() function to filter strings that start with the prefix
keys = list(filter(lambda item: item.startswith(prefix), keys))
return {"keys": keys}
if os.path.isfile(path):
with open(path, 'rb') as f:
data = f.read()
if plain:
data = base64.b64decode(data)
return Response(content=data, media_type="text/plain")
size = os.path.getsize(path)
timestamp = os.path.getmtime(path)
iso8601_date_time = datetime.utcfromtimestamp(timestamp).isoformat()
if not b64:
data = base64.b64decode(data)
return {
"size": size,
"size_human": humanize.naturalsize(size),
"timestamp": iso8601_date_time,
"content": data
}
error_message = {"msg": f"key '{path}' not found!"}
return JSONResponse(content=error_message, status_code=404)