Skip to content

Commit f69d426

Browse files
kodek16accek
authored andcommitted
Implemented keeping track of file logical sizes. (#41)
* Implemented keeping track of file logical sizes. This makes Client.file_size() consistent independently of whether file is cached.
1 parent 0d9ad03 commit f69d426

File tree

5 files changed

+122
-59
lines changed

5 files changed

+122
-59
lines changed

filetracker/client/remote_data_store.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import functools
55
import gzip
66
import logging
7+
import os
78
import shutil
8-
import time
99
import tempfile
10+
import time
1011

1112
import requests
1213
from six.moves.urllib.request import pathname2url
@@ -116,7 +117,8 @@ def add_file(self, name, filename, compress_hint=True):
116117
with gzip.GzipFile(fileobj=tmp, mode='wb') as gz:
117118
shutil.copyfileobj(f, gz)
118119
tmp.seek(0)
119-
headers.update({'Content-Encoding': 'gzip'})
120+
headers['Content-Encoding'] = 'gzip'
121+
headers['Logical-Size'] = str(os.stat(filename).st_size)
120122
response = self._put_file(url, version, tmp, headers)
121123
else:
122124
response = self._put_file(url, version, f, headers)
@@ -165,7 +167,7 @@ def file_size(self, name):
165167
url, version = self._parse_name(name)
166168
response = requests.head(url, allow_redirects=True)
167169
response.raise_for_status()
168-
return int(response.headers.get('content-length', 0))
170+
return int(response.headers.get('logical-size', 0))
169171

170172
@_verbose_http_errors
171173
def delete_file(self, filename):

filetracker/interaction_test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,16 @@ def test_file_version_should_be_set_to_current_time_on_upload(self):
125125
self.assertNotEqual(version, 1)
126126
self.assertTrue(pre_upload <= version <= post_upload)
127127

128+
def test_file_size_should_return_decompressed_size_without_cache(self):
129+
src_file = os.path.join(self.temp_dir, 'size.txt')
130+
with open(src_file, 'wb') as sf:
131+
sf.write(b'hello size') # size = 10
132+
133+
self.client.put_file('/size.txt', src_file, to_local_store=False)
134+
135+
self.assertEqual(
136+
self.client.file_size('/size.txt'), len(b'hello size'))
137+
128138
def test_every_link_should_have_independent_version(self):
129139
src_file = os.path.join(self.temp_dir, 'foo.txt')
130140
with open(src_file, 'wb') as sf:

filetracker/scripts/recover.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def main():
9696
processed_blobs = 0
9797
broken_blobs = 0
9898

99+
# TODO this script should be updated to recalculate file logical sizes.
99100
with progress_bar.conditional(show=not silent,
100101
widgets=blobs_widgets) as bar:
101102
for cur_dir, _, files in os.walk(file_storage.blobs_dir):

filetracker/servers/files.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,31 @@ def handle_PUT(self, environ, start_response):
5252
compressed = environ.get('HTTP_CONTENT_ENCODING', None) == 'gzip'
5353

5454
digest = environ.get('HTTP_SHA256_CHECKSUM', None)
55+
logical_size = environ.get('HTTP_LOGICAL_SIZE', None)
5556

5657
version = self.storage.store(name=path,
5758
data=environ['wsgi.input'],
5859
version=last_modified,
5960
size=content_length,
6061
compressed=compressed,
61-
digest=digest)
62+
digest=digest,
63+
logical_size=logical_size)
6264
start_response('200 OK', [
6365
('Content-Type', 'text/plain'),
6466
('Last-Modified', email.utils.formatdate(version)),
6567
])
6668
return []
6769

68-
def _file_headers(self, path):
69-
link_st = os.lstat(path)
70-
blob_st = os.stat(path)
70+
def _file_headers(self, name):
71+
link_st = os.lstat(os.path.join(self.dir, name))
72+
blob_st = os.stat(os.path.join(self.dir, name))
73+
logical_size = self.storage.logical_size(name)
7174
return [
72-
('Last-Modified', email.utils.formatdate(link_st.st_mtime)),
7375
('Content-Type', 'application/octet-stream'),
7476
('Content-Length', str(blob_st.st_size)),
75-
('Content-Encoding', 'gzip')
77+
('Content-Encoding', 'gzip'),
78+
('Last-Modified', email.utils.formatdate(link_st.st_mtime)),
79+
('Logical-Size', str(logical_size)),
7680
]
7781

7882
def handle_GET(self, environ, start_response):

filetracker/servers/storage.py

Lines changed: 96 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ def __del__(self):
7979
self.db.close()
8080
self.db_env.close()
8181

82-
def store(self, name, data, version, size=0, compressed=False, digest=None):
82+
def store(self, name, data, version, size=0,
83+
compressed=False, digest=None, logical_size=None):
8384
"""Adds a new file to the storage.
8485
8586
If the file with the same name existed before, it's not
@@ -108,62 +109,53 @@ def store(self, name, data, version, size=0, compressed=False, digest=None):
108109
digest: SHA256 digest of the file before compression
109110
If specified, the digest will not be computed again, saving
110111
resources.
112+
logical_size: if ``data`` is gzip-compressed, this parameter
113+
has to be set to decompressed file size.
111114
"""
112115
with _exclusive_lock(self._lock_path('links', name)):
113116
link_path = self._link_path(name)
114117
if _path_exists(link_path) and _file_version(link_path) > version:
115118
return _file_version(link_path)
116119

117-
# Path to temporary file that may be created in some cases.
118-
temp_file_path = None
119-
120-
if digest is None:
121-
# Write data to temp file and calculate hash.
122-
temp_file_fd, temp_file_path = tempfile.mkstemp()
123-
temp_file = os.fdopen(temp_file_fd, 'wb')
124-
_copy_stream(data, temp_file, size)
125-
temp_file.close()
126-
127-
if compressed:
128-
# If data was already compressed, we have to decompress it
129-
# before calculating the digest.
130-
with gzip.open(temp_file_path, 'rb') as compressed_file:
131-
digest = file_digest(compressed_file)
132-
else:
133-
digest = file_digest(temp_file_path)
120+
# data is managed by contents now, and shouldn't be used directly
121+
with _InputStreamWrapper(data, size) as contents:
122+
if digest is None or logical_size is None:
123+
contents.save()
124+
if compressed:
125+
# This shouldn't occur if the request came from a proper
126+
# filetracker client, so we don't care if it's slow.
127+
with gzip.open(
128+
contents.current_path, 'rb') as decompressed:
129+
digest = file_digest(decompressed)
130+
with gzip.open(
131+
contents.current_path, 'rb') as decompressed:
132+
logical_size = _read_stream_for_size(decompressed)
133+
else:
134+
digest = file_digest(contents.current_path)
135+
logical_size = os.stat(contents.current_path).st_size
136+
137+
blob_path = self._blob_path(digest)
138+
139+
with self._lock_blob_with_txn(digest) as txn:
140+
digest_bytes = digest.encode()
134141

135-
blob_path = self._blob_path(digest)
136-
137-
with self._lock_blob_with_txn(digest) as txn:
138-
digest_bytes = digest.encode('utf8')
139-
try:
140142
link_count = int(self.db.get(digest_bytes, 0, txn=txn))
141-
except KeyError:
142-
link_count = 0
143+
new_count = str(link_count + 1).encode()
144+
self.db.put(digest_bytes, new_count, txn=txn)
143145

144-
new_count = str(link_count + 1).encode('utf8')
145-
self.db.put(digest_bytes, new_count, txn=txn)
146+
# Create a new blob if this isn't a duplicate.
147+
if link_count == 0:
148+
_create_file_dirs(blob_path)
149+
self.db.put('{}:logical_size'.format(digest).encode(),
150+
str(logical_size).encode())
146151

147-
if link_count == 0:
148-
# Create a new blob.
149-
_create_file_dirs(blob_path)
150-
if compressed:
151-
if temp_file_path:
152-
shutil.move(temp_file_path, blob_path)
152+
if compressed:
153+
contents.save(blob_path)
153154
else:
154-
with open(blob_path, 'wb') as blob:
155-
_copy_stream(data, blob, size)
156-
else:
157-
if temp_file_path:
158-
with open(temp_file_path, 'rb') as raw,\
155+
contents.save()
156+
with open(contents.current_path, 'rb') as raw,\
159157
gzip.open(blob_path, 'wb') as blob:
160158
shutil.copyfileobj(raw, blob)
161-
else:
162-
with gzip.open(blob_path, 'wb') as blob:
163-
_copy_stream(data, blob, size)
164-
165-
if temp_file_path and os.path.exists(temp_file_path):
166-
os.unlink(temp_file_path)
167159

168160
if _path_exists(link_path):
169161
# Lend the link lock to delete().
@@ -206,29 +198,34 @@ def delete(self, name, version, _lock=True):
206198
digest = self._digest_for_link(name)
207199
with self._lock_blob_with_txn(digest) as txn:
208200
os.unlink(link_path)
209-
digest_bytes = digest.encode('utf8')
201+
digest_bytes = digest.encode()
210202
link_count = self.db.get(digest_bytes, txn=txn)
211203
if link_count is None:
212204
raise RuntimeError("File exists but has no key in db")
213205
link_count = int(link_count)
214206
if link_count == 1:
215207
self.db.delete(digest_bytes, txn=txn)
208+
self.db.delete(
209+
'{}:logical_size'.format(digest).encode(), txn=txn)
216210
os.unlink(self._blob_path(digest))
217211
else:
218-
new_count = str(link_count - 1).encode('utf8')
212+
new_count = str(link_count - 1).encode()
219213
self.db.put(digest_bytes, new_count, txn=txn)
220214
return True
221215

222216
def stored_version(self, name):
223-
"""
224-
Returns the version of file `name` that is currently stored
225-
or None if it doesn't exist.
226-
"""
217+
"""Returns the version of file `name` or None if it doesn't exist."""
227218
link_path = self._link_path(name)
228219
if not _path_exists(link_path):
229220
return None
230221
return _file_version(link_path)
231222

223+
def logical_size(self, name):
224+
"""Returns the logical size (before compression) of file `name`."""
225+
digest = self._digest_for_link(name)
226+
return int(self.db.get('{}:logical_size'
227+
.format(digest).encode()).decode())
228+
232229
def _link_path(self, name):
233230
return os.path.join(self.links_dir, name)
234231

@@ -261,6 +258,44 @@ def _digest_for_link(self, name):
261258
return digest
262259

263260

261+
class _InputStreamWrapper(object):
262+
"""A wrapper for lazy reading and moving contents of 'wsgi.input'.
263+
264+
Should be used as a context manager.
265+
"""
266+
def __init__(self, data, size):
267+
self._data = data
268+
self._size = size
269+
self.current_path = None
270+
self.saved_in_temp = False
271+
272+
def __enter__(self):
273+
return self
274+
275+
def __exit__(self, _exc_type, _exc_value, _traceback):
276+
"""Removes file if it was last saved as a temporary file."""
277+
if self.saved_in_temp:
278+
os.unlink(self.current_path)
279+
280+
def save(self, new_path=None):
281+
"""Moves or creates the file with stream contents to a new location.
282+
283+
Args:
284+
new_path: path to move to, if None a temporary file is created.
285+
"""
286+
self.saved_in_temp = new_path is None
287+
if new_path is None:
288+
fd, new_path = tempfile.mkstemp()
289+
os.close(fd)
290+
291+
if self.current_path:
292+
shutil.move(self.current_path, new_path)
293+
else:
294+
with open(new_path, 'wb') as dest:
295+
_copy_stream(self._data, dest, self._size)
296+
self.current_path = new_path
297+
298+
264299
_BUFFER_SIZE = 64 * 1024
265300

266301

@@ -292,6 +327,17 @@ def _copy_stream(src, dest, length=0):
292327
bytes_left -= buf_size
293328

294329

330+
def _read_stream_for_size(stream):
331+
"""Reads a stream discarding the data read and returns its size."""
332+
size = 0
333+
while True:
334+
buf = stream.read(_BUFFER_SIZE)
335+
size += len(buf)
336+
if not buf:
337+
break
338+
return size
339+
340+
295341
def _create_file_dirs(file_path):
296342
"""Creates directory tree to file if it doesn't exist."""
297343
dir_name = os.path.dirname(file_path)

0 commit comments

Comments
 (0)