Skip to content

Commit

Permalink
rgw: stream get_obj operation
Browse files Browse the repository at this point in the history
Fixes: ceph#2941
Instead of iterating through the parts one by one when reading
an object, we can now send multiple requests in parallel. Two new
configurables added to control the max request size, and the total
size of pending requests.

Signed-off-by: Yehuda Sadeh <[email protected]>
  • Loading branch information
yehudasa committed Feb 8, 2013
1 parent 3383618 commit 278dfe5
Show file tree
Hide file tree
Showing 11 changed files with 496 additions and 63 deletions.
5 changes: 3 additions & 2 deletions src/common/Throttle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ enum {
l_throttle_last,
};

Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool use_perf)
Throttle::Throttle(CephContext *cct, std::string n, int64_t m, bool _use_perf)
: cct(cct), name(n), logger(NULL),
max(m),
lock("Throttle::lock")
lock("Throttle::lock"),
use_perf(_use_perf)
{
assert(m >= 0);

Expand Down
3 changes: 2 additions & 1 deletion src/common/Throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ class Throttle {
ceph::atomic_t count, max;
Mutex lock;
list<Cond*> cond;
bool use_perf;

public:
Throttle(CephContext *cct, std::string n, int64_t m = 0, bool use_perf = true);
Throttle(CephContext *cct, std::string n, int64_t m = 0, bool _use_perf = true);
~Throttle();

private:
Expand Down
2 changes: 2 additions & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ OPTION(rgw_resolve_cname, OPT_BOOL, false) // should rgw try to resolve hostnam
OPTION(rgw_obj_stripe_size, OPT_INT, 4 << 20)
OPTION(rgw_extended_http_attrs, OPT_STR, "") // list of extended attrs that can be set on objects (beyond the default)
OPTION(rgw_exit_timeout_secs, OPT_INT, 120) // how many seconds to wait for process to go down before exiting unconditionally
OPTION(rgw_get_obj_window_size, OPT_INT, 16 << 20) // window size in bytes for single get obj request
OPTION(rgw_get_obj_max_req_size, OPT_INT, 4 << 20) // max length of a single get obj rados op

OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter

Expand Down
90 changes: 44 additions & 46 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,13 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc
if (ret < 0)
goto done_err;

len = bl.length();
off_t len = bl.length();
cur_ofs += len;
ofs += len;
ret = 0;
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
send_response_data(bl);
send_response_data(bl, 0, len);

start_time = ceph_clock_now(s->cct);
}
Expand Down Expand Up @@ -526,34 +526,63 @@ int RGWGetObj::handle_user_manifest(const char *prefix)
return 0;
}

class RGWGetObj_CB : public RGWGetDataCB
{
RGWGetObj *op;
public:
RGWGetObj_CB(RGWGetObj *_op) : op(_op) {}
virtual ~RGWGetObj_CB() {}

int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
return op->get_data_cb(bl, bl_ofs, bl_len);
}
};

int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
/* garbage collection related handling */
utime_t start_time = ceph_clock_now(s->cct);
if (start_time > gc_invalidate_time) {
int r = store->defer_gc(s->obj_ctx, obj);
if (r < 0) {
dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
}
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
return send_response_data(bl, bl_ofs, bl_len);
}

void RGWGetObj::execute()
{
void *handle = NULL;
utime_t start_time = s->time;
bufferlist bl;
utime_t gc_invalidate_time = ceph_clock_now(s->cct);
gc_invalidate_time = ceph_clock_now(s->cct);
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);

RGWGetObj_CB cb(this);

map<string, bufferlist>::iterator attr_iter;

perfcounter->inc(l_rgw_get);
off_t new_ofs, new_end;

ret = get_params();
if (ret < 0)
goto done;
goto done_err;

ret = init_common();
if (ret < 0)
goto done;
goto done_err;

new_ofs = ofs;
new_end = end;

ret = store->prepare_get_obj(s->obj_ctx, obj, &new_ofs, &new_end, &attrs, mod_ptr,
unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &s->obj_size, &handle, &s->err);
if (ret < 0)
goto done;
goto done_err;

attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
if (attr_iter != attrs.end()) {
Expand All @@ -570,53 +599,22 @@ void RGWGetObj::execute()
start = ofs;

if (!get_data || ofs > end)
goto done;
goto done_err;

perfcounter->inc(l_rgw_get_b, end - ofs);

while (ofs <= end) {
ret = store->get_obj(s->obj_ctx, &handle, obj, bl, ofs, end);
if (ret < 0) {
goto done;
}
len = ret;

if (!len) {
dout(0) << "WARNING: failed to read object, returned zero length" << dendl;
ret = -EIO;
goto done;
}

ofs += len;
ret = 0;
ret = store->get_obj_iterate(s->obj_ctx, &handle, obj, ofs, end, &cb);

perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
ret = send_response_data(bl);
bl.clear();
if (ret < 0) {
dout(0) << "NOTICE: failed to send response to client" << dendl;
goto done;
}

start_time = ceph_clock_now(s->cct);

if (ofs <= end) {
if (start_time > gc_invalidate_time) {
int r = store->defer_gc(s->obj_ctx, obj);
if (r < 0) {
dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
}
gc_invalidate_time = start_time;
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
}
}
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
if (ret < 0) {
goto done_err;
}

return;
store->finish_get_obj(&handle);

done:
send_response_data(bl);
done_err:
send_response_data(bl, 0, 0);
store->finish_get_obj(&handle);
}

Expand Down
7 changes: 4 additions & 3 deletions src/rgw/rgw_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class RGWGetObj : public RGWOp {
const char *if_match;
const char *if_nomatch;
off_t ofs;
uint64_t len;
uint64_t total_len;
off_t start;
off_t end;
Expand All @@ -76,6 +75,7 @@ class RGWGetObj : public RGWOp {
bool get_data;
bool partial_content;
rgw_obj obj;
utime_t gc_invalidate_time;

int init_common();
public:
Expand All @@ -87,7 +87,6 @@ class RGWGetObj : public RGWOp {
if_nomatch = NULL;
start = 0;
ofs = 0;
len = 0;
total_len = 0;
end = -1;
mod_time = 0;
Expand All @@ -112,8 +111,10 @@ class RGWGetObj : public RGWOp {
uint64_t *ptotal_len, bool read_data);
int handle_user_manifest(const char *prefix);

int get_data_cb(bufferlist& bl, off_t ofs, off_t len);

virtual int get_params() = 0;
virtual int send_response_data(bufferlist& bl) = 0;
virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) = 0;

virtual const char *name() { return "get_obj"; }
};
Expand Down
Loading

0 comments on commit 278dfe5

Please sign in to comment.