-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathlocaliface.py
More file actions
326 lines (281 loc) · 13.6 KB
/
localiface.py
File metadata and controls
326 lines (281 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
'''
localiface.py
Local storage interface for the IOP WOPI server.
Note that this interface is meant for development purposes only,
and it is supported on Linux and WSL for Windows, not
on native Windows nor on native MacOS systems as they lack
support for extended attributes.
Main author: Giuseppe.LoPresti@cern.ch, CERN/IT-ST
'''
import time
import os
import fcntl
import warnings
from stat import S_ISDIR
import core.commoniface as common
# module-wide state
config = None
log = None
homepath = None
class Flock:
'''A simple class to lock/unlock when entering/leaving a runtime context
credits: https://github.com/misli/python-flock/blob/master/flock.py
Could be used as a PoC for the production storage interfaces'''
def __init__(self, fd, blocking=False):
'''Instance init'''
self.fd = fd
self.op = fcntl.LOCK_EX
if not blocking:
self.op |= fcntl.LOCK_NB
def __enter__(self):
'''Called on `with`'''
fcntl.flock(self.fd, self.op)
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
'''Called when exiting a `with` runtime context'''
fcntl.flock(self.fd, fcntl.LOCK_UN)
def _getfilepath(filepath):
'''map the given filepath into the target fs by prepending the homepath (see storagehomepath in wopiserver.conf)'''
return os.path.normpath(homepath + os.sep + filepath)
def init(inconfig, inlog):
'''Init module-level variables'''
global config # pylint: disable=global-statement
global log # pylint: disable=global-statement
global homepath # pylint: disable=global-statement
common.config = config = inconfig
log = inlog
homepath = config.get('local', 'storagehomepath')
try:
# validate the given storagehomepath folder
mode = os.stat(homepath).st_mode
if not S_ISDIR(mode):
raise IOError('Not a directory')
except IOError as e:
raise IOError(f'Could not stat storagehomepath folder {homepath}: {e}') from e
# all right but inform the user
log.warning('msg="Use this local storage interface for test/development purposes only, not for production"')
def healthcheck():
'''Probes the storage and returns a status message. For local storage, we just stat the root'''
try:
stat(None, '/', None)
return 'Warning' # to please CodeQL but never reached
except IOError as e:
if str(e) == 'Is a directory':
# that's expected, yet we return warning as this is a test/dev storage interface
log.debug('msg="Executed health check against storage root"')
return 'Warning'
# any other error is a failure
log.error('msg="Health check failed against storage root" error="%s"' % e)
return str(e)
def getuseridfromcreds(_token, _wopiuser):
'''Maps a Reva token and wopiuser to the credentials to be used to access the storage.
For the localfs case, this is trivially hardcoded'''
return '0:0', 'root!0:0'
def stat(_endpoint, filepath, _userid):
'''Stat a file and returns (size, mtime) as well as other extended info.
This method assumes that the given userid has access.'''
try:
tstart = time.time()
statInfo = os.stat(_getfilepath(filepath))
tend = time.time()
log.info('msg="Invoked stat" inode="%d" filepath="%s" elapsedTimems="%.1f"' %
(statInfo.st_ino, _getfilepath(filepath), (tend - tstart) * 1000))
if S_ISDIR(statInfo.st_mode):
raise IOError('Is a directory')
try:
xattrs = {
k.strip('user.'): os.getxattr(_getfilepath(filepath), k).decode()
for k in os.listxattr(_getfilepath(filepath))
}
except OSError as e:
log.info('msg="Failed to invoke listxattr/getxattr" inode="%d" filepath="%s" exception="%s"' %
statInfo.st_ino, _getfilepath(filepath), e)
xattrs = {}
return {
'inode': common.encodeinode('local', str(statInfo.st_ino)),
'filepath': filepath,
'ownerid': str(statInfo.st_uid) + ':' + str(statInfo.st_gid),
'size': statInfo.st_size,
'mtime': statInfo.st_mtime,
'etag': str(statInfo.st_mtime),
'xattrs': xattrs,
}
except (FileNotFoundError, PermissionError) as e:
raise IOError(e) from e
def statx(endpoint, filepath, userid):
'''Get extended stat info (inode, filepath, ownerid, size, mtime). Equivalent to stat in the case of local storage'''
return stat(endpoint, filepath, userid)
def _validatelock(filepath, currlock, lockmd, op, log):
'''Common logic for validating locks: duplicates some logic
natively implemented by EOS and Reva on the other storage interfaces'''
appname = value = None
if lockmd:
appname, value = lockmd
try:
if not currlock:
raise IOError(common.EXCL_ERROR)
if appname and currlock['app_name'] != appname:
raise IOError(common.EXCL_ERROR + f", file is locked by {currlock['app_name']}")
if value != currlock['lock_id']:
raise IOError(common.EXCL_ERROR)
except IOError as e:
log.warning('msg="Failed to %s" filepath="%s" appname="%s" lockid="%s" currlock="%s" reason="%s"' %
(op, filepath, appname, value, currlock, e))
raise
def setxattr(endpoint, filepath, userid, key, value, lockmd):
'''Set the extended attribute <key> to <value> on behalf of the given userid'''
try:
os.setxattr(_getfilepath(filepath), 'user.' + key, str(value).encode())
except OSError as e:
log.error(f'msg="Failed to setxattr" filepath="{filepath}" key="{key}" exception="{e}"')
raise IOError(e) from e
def _getxattr(filepath, key):
'''Internal only: get the extended attribute <key>, do not raise exceptions'''
try:
return os.getxattr(_getfilepath(filepath), 'user.' + key).decode('UTF-8')
except OSError as e:
log.warning(f'msg="Failed to getxattr or missing key" filepath="{filepath}" key="{key}" exception="{e}"')
return None
def rmxattr(endpoint, filepath, userid, key, lockmd):
'''Remove the extended attribute <key> on behalf of the given userid'''
try:
os.removexattr(_getfilepath(filepath), 'user.' + key)
except OSError as e:
log.error(f'msg="Failed to rmxattr" filepath="{filepath}" key="{key}" exception="{e}"')
raise IOError(e) from e
def setlock(endpoint, filepath, userid, appname, value):
'''Set the lock as an xattr on behalf of the given userid'''
log.debug(f'msg="Invoked setlock" filepath="{filepath}" value="{value}"')
with open(_getfilepath(filepath)) as fd:
fl = Flock(fd) # ensures atomicity of the following operations
try:
with fl:
if not getlock(endpoint, filepath, userid):
log.debug(f'msg="setlock: invoking setxattr" filepath="{filepath}" value="{value}"')
setxattr(endpoint, filepath, '0:0', common.LOCKKEY, common.genrevalock(appname, value), None)
else:
raise IOError(common.EXCL_ERROR)
except BlockingIOError as e:
log.error(f'msg="File already flocked" filepath="{filepath}" exception="{e}"')
raise IOError(common.EXCL_ERROR) from e
def getlock(endpoint, filepath, _userid):
'''Get the lock metadata as an xattr on behalf of the given userid'''
rawl = _getxattr(filepath, common.LOCKKEY)
if rawl:
lock = common.retrieverevalock(rawl)
if lock['expiration']['seconds'] > time.time():
log.debug(f'msg="Invoked getlock" filepath="{filepath}"')
return lock
# otherwise, the lock had expired: drop it and return None
log.debug(f'msg="getlock: removed stale lock" filepath="{filepath}"')
rmxattr(endpoint, filepath, '0:0', common.LOCKKEY, None)
return None
def refreshlock(endpoint, filepath, userid, appname, value, oldvalue=None):
'''Refresh the lock value as an xattr on behalf of the given userid'''
currlock = getlock(endpoint, filepath, userid)
if not oldvalue and currlock:
# this is a pure refresh operation
oldvalue = currlock['lock_id']
_validatelock(filepath, currlock, (appname, oldvalue), 'refreshlock', log)
# this is non-atomic, but if we get here the lock was already held
log.debug(f'msg="Invoked refreshlock" filepath="{filepath}" value="{value}"')
setxattr(endpoint, filepath, '0:0', common.LOCKKEY, common.genrevalock(appname, value), None)
def unlock(endpoint, filepath, userid, appname, value):
'''Remove the lock as an xattr on behalf of the given userid'''
_validatelock(filepath, getlock(endpoint, filepath, userid), (appname, value), 'unlock', log)
log.debug(f'msg="Invoked unlock" filepath="{filepath}" value="{value}"')
rmxattr(endpoint, filepath, '0:0', common.LOCKKEY, None)
def readfile(_endpoint, filepath, _userid, _lockid):
'''Read a file on behalf of the given userid. Note that the function is a generator, managed by the app server.'''
log.debug(f'msg="Invoking readFile" filepath="{filepath}"')
try:
tstart = time.time()
chunksize = config.getint('io', 'chunksize')
with open(_getfilepath(filepath), mode='rb', buffering=chunksize) as f:
tend = time.time()
log.info(f'msg="File open for read" filepath="{filepath}" elapsedTimems="{(tend - tstart) * 1000:.1f}"')
# the actual read is buffered and managed by the app server
for chunk in iter(lambda: f.read(chunksize), b''):
yield chunk
except FileNotFoundError as fnfe:
# log this case as info to keep the logs cleaner
log.info(f'msg="File not found on read" filepath="{filepath}"')
raise IOError('No such file or directory') from fnfe
except OSError as e:
log.error(f'msg="Error opening the file for read" filepath="{filepath}" error="{e}"')
raise IOError(e) from e
def writefile(endpoint, filepath, userid, content, size, lockmd, islock=False):
'''Write a file via xroot on behalf of the given userid. The entire content is written
and any pre-existing file is deleted (or moved to the previous version if supported).
With islock=True, the file is opened with O_CREAT|O_EXCL.'''
stream = True
if size == -1:
if isinstance(content, str):
content = bytes(content, 'UTF-8')
size = len(content)
stream = False
if lockmd:
_validatelock(filepath, getlock(endpoint, filepath, userid), lockmd, 'writefile', log)
elif getlock(endpoint, filepath, userid):
raise IOError(common.EXCL_ERROR)
log.debug('msg="Invoking writeFile" filepath="%s" size="%d"' % (filepath, size))
tstart = time.time()
written = 0
if islock:
warnings.simplefilter("ignore", ResourceWarning)
try:
# apparently there's no way to pass O_CREAT without O_TRUNC to the python/C f.open()!
# cf. https://stackoverflow.com/questions/38530910/python-open-flags-for-open-or-create
# so we resort to the os-level open(), with some caveats
fd = os.open(_getfilepath(filepath), os.O_CREAT | os.O_EXCL)
f = os.fdopen(fd, mode='wb')
tend = time.time()
written = f.write(content) # os.write(fd, ...) raises EBADF?
os.close(fd) # f.close() raises EBADF! while this works
# as f goes out of scope here, we'd get a false ResourceWarning, which is ignored by the above filter
except FileExistsError as e:
log.info(f'msg="File exists on write but islock flag requested" filepath="{filepath}"')
raise IOError(common.EXCL_ERROR) from e
except OSError as e:
log.warning(f'msg="Error writing file in O_EXCL mode" filepath="{filepath}" error="{e}"')
raise IOError(e) from e
else:
try:
with open(_getfilepath(filepath), mode='wb') as f:
tend = time.time()
if stream:
chunksize = config.getint('io', 'chunksize')
o = 0
while True:
chunk = content.read(chunksize)
if len(chunk) == 0:
break
f.seek(o)
written += f.write(chunk)
o += len(chunk)
else:
written = f.write(content)
except OSError as e:
log.error(f'msg="Error writing file" filepath="{filepath}" error="{e}"')
raise IOError(e) from e
if written != size:
raise IOError('Written %d bytes but content is %d bytes' % (written, size))
log.info('msg="File written successfully" filepath="%s" elapsedTimems="%.1f" islock="%s"' %
(filepath, (tend - tstart) * 1000, islock))
def renamefile(endpoint, origfilepath, newfilepath, userid, lockmd):
'''Rename a file from origfilepath to newfilepath on behalf of the given userid.'''
currlock = getlock(endpoint, origfilepath, userid)
if currlock:
# enforce lock only if previously set
_validatelock(origfilepath, currlock, lockmd, 'renamefile', log)
try:
os.rename(_getfilepath(origfilepath), _getfilepath(newfilepath))
except OSError as e:
raise IOError(e) from e
def removefile(_endpoint, filepath, _userid, force=False):
'''Remove a file on behalf of the given userid.
The force argument is irrelevant and ignored for local storage.'''
try:
os.remove(_getfilepath(filepath))
except OSError as e:
raise IOError(e) from e