Skip to content

Commit

Permalink
Add Ruby progress callback + customise our GIL unblock function (#151)
Browse files Browse the repository at this point in the history
We should provide some ability for users to re-acquire the GIL and call Ruby callback blocks. We will start with a progress callback which can be used for progress bars and speed computation. We also need to alter the way we unblock the thread so that we correctly handle signals.

This adds the following API:

sess.progress_callback = ->(download_total, download_now, upload_total, upload_now) {
  progress_bar.update(download_total) # ...or other interesting actions
}
The GIL will only be reacquired if this is configured on the Session/Request objects, otherwise we proceed through the same fast path as before with the GIL unlocked.
  • Loading branch information
julik authored Jan 30, 2018
1 parent f747d8f commit ba2161c
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `Session#progress_callback` which accepts a callable object, which can be used to report session progress during request
execution.
* Fixed parsing of response headers when multiple responses are involved (redirect chains and HTTP proxies)

### 0.10.0
Expand Down
82 changes: 67 additions & 15 deletions ext/patron/session_ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ struct patron_curl_state {
membuffer header_buffer;
membuffer body_buffer;
size_t download_byte_limit;
VALUE user_progress_blk;
int interrupt;
size_t dltotal;
size_t dlnow;
size_t ultotal;
size_t ulnow;
};


Expand Down Expand Up @@ -69,16 +74,45 @@ static size_t file_write_handler(void* stream, size_t size, size_t nmemb, FILE*
}
}

static int call_user_rb_progress_blk(void* vd_curl_state) {
struct patron_curl_state* state = (struct patron_curl_state*)vd_curl_state;
// Invoke the block with the array
VALUE blk_result = rb_funcall(state->user_progress_blk,
rb_intern("call"), 4,
LONG2NUM(state->dltotal),
LONG2NUM(state->dlnow),
LONG2NUM(state->ultotal),
LONG2NUM(state->ulnow));
return 0;
}


/* A non-zero return value from the progress handler will terminate the current
* request. We use this fact in order to interrupt any request when either the
* user calls the "interrupt" method on the session or when the Ruby interpreter
* is attempting to exit.
*/
static int session_progress_handler(void *clientp, size_t dltotal, size_t dlnow, size_t ultotal, size_t ulnow) {
static int session_progress_handler(void* clientp, size_t dltotal, size_t dlnow, size_t ultotal, size_t ulnow) {
struct patron_curl_state* state = (struct patron_curl_state*) clientp;
UNUSED_ARGUMENT(dlnow);
UNUSED_ARGUMENT(ultotal);
UNUSED_ARGUMENT(ulnow);
state->dltotal = dltotal;
state->dlnow = dlnow;
state->ultotal = ultotal;
state->ulnow = ulnow;

// If a progress proc has been set, re-acquire the GIL and call it using
// `call_user_rb_progress_blk`. TODO: use the retval of that proc
// to permit premature abort
if(RTEST(state->user_progress_blk)) {
// Even though it is not documented, rb_thread_call_with_gvl is available even when
// rb_thread_call_without_gvl is not. See https://bugs.ruby-lang.org/issues/5543#note-4
// > rb_thread_call_with_gvl() is globally-visible (but not in headers)
// > for 1.9.3: https://bugs.ruby-lang.org/issues/4328
#if (defined(HAVE_TBR) || defined(HAVE_TCWOGVL)) && defined(USE_TBR)
rb_thread_call_with_gvl((void *(*)(void *)) call_user_rb_progress_blk, (void*)state);
#else
call_user_rb_progress_blk((void*)state);
#endif
}

// Set the interrupt if the download byte limit has been reached
if(state->download_byte_limit != 0 && (dltotal > state->download_byte_limit)) {
Expand Down Expand Up @@ -410,13 +444,20 @@ static void set_options_from_request(VALUE self, VALUE request) {
VALUE action_name = rb_funcall(request, rb_intern("action"), 0);
VALUE a_c_encoding = rb_funcall(request, rb_intern("automatic_content_encoding"), 0);
VALUE download_byte_limit = rb_funcall(request, rb_intern("download_byte_limit"), 0);
VALUE maybe_progress_proc = rb_funcall(request, rb_intern("progress_callback"), 0);

if (RTEST(download_byte_limit)) {
state->download_byte_limit = FIX2INT(download_byte_limit);
} else {
state->download_byte_limit = 0;
}

if (rb_obj_is_proc(maybe_progress_proc)) {
state->user_progress_blk = maybe_progress_proc;
} else {
state->user_progress_blk = Qnil;
}

headers = rb_funcall(request, rb_intern("headers"), 0);
if (RTEST(headers)) {
if (rb_type(headers) != T_HASH) {
Expand Down Expand Up @@ -718,6 +759,17 @@ static VALUE select_error(CURLcode code) {
return error;
}


/* Uses as the unblocking function when the thread running Patron gets
signaled. The important difference with session_interrupt is that we
are not allowed to touch any Ruby structures while outside the GIL,
but we _are_ permitted to touch our internal curl state struct
*/
void session_ubf_abort(void* patron_state) {
struct patron_curl_state* state = (struct patron_curl_state*) patron_state;
state->interrupt = INTERRUPT_ABORT;
}

/* Perform the actual HTTP request by calling libcurl. */
static VALUE perform_request(VALUE self) {
struct patron_curl_state *state = get_patron_curl_state(self);
Expand Down Expand Up @@ -745,17 +797,17 @@ static VALUE perform_request(VALUE self) {
}

#if (defined(HAVE_TBR) || defined(HAVE_TCWOGVL)) && defined(USE_TBR)
#if defined(HAVE_TCWOGVL)
ret = (CURLcode) rb_thread_call_without_gvl(
(void *(*)(void *)) curl_easy_perform, curl,
RUBY_UBF_IO, 0
);
#else
ret = (CURLcode) rb_thread_blocking_region(
(rb_blocking_function_t*) curl_easy_perform, curl,
RUBY_UBF_IO, 0
);
#endif
#if defined(HAVE_TCWOGVL)
ret = (CURLcode) rb_thread_call_without_gvl(
(void *(*)(void *)) curl_easy_perform, curl,
session_ubf_abort, (void*)state
);
#else
ret = (CURLcode) rb_thread_blocking_region(
(rb_blocking_function_t*) curl_easy_perform, curl,
session_ubf_abort, (void*)state
);
#endif
#else
ret = curl_easy_perform(curl);
#endif
Expand Down
4 changes: 2 additions & 2 deletions lib/patron/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ def initialize
:ignore_content_length, :multipart, :action, :timeout, :connect_timeout,
:max_redirects, :headers, :auth_type, :upload_data, :buffer_size, :cacert,
:ssl_version, :http_version, :automatic_content_encoding, :force_ipv4, :download_byte_limit,
:low_speed_time, :low_speed_limit
:low_speed_time, :low_speed_limit, :progress_callback
]

WRITER_VARS = [
:url, :username, :password, :file_name, :proxy, :proxy_type, :insecure,
:ignore_content_length, :multipart, :cacert, :ssl_version, :http_version, :automatic_content_encoding, :force_ipv4, :download_byte_limit,
:low_speed_time, :low_speed_limit
:low_speed_time, :low_speed_limit, :progress_callback
]

attr_reader *READER_VARS
Expand Down
6 changes: 6 additions & 0 deletions lib/patron/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class Session

private :handle_request, :add_cookie_file, :set_debug_file

# @return [#call, nil] callable object that will be called with 4 arguments
# during request/response execution - `dltotal`, `dlnow`, `ultotal`, `ulnow`.
# All these arguments are in bytes.
attr_accessor :progress_callback

# Create a new Session object for performing requests.
#
# @param args[Hash] options for the Session (same names as the writable attributes of the Session)
Expand Down Expand Up @@ -372,6 +377,7 @@ def build_request(action, url, headers, options = {})
req.ignore_content_length = options.fetch :ignore_content_length, self.ignore_content_length
req.buffer_size = options.fetch :buffer_size, self.buffer_size
req.download_byte_limit = options.fetch :download_byte_limit, self.download_byte_limit
req.progress_callback = options.fetch :progress_callback, self.progress_callback
req.multipart = options[:multipart]
req.upload_data = options[:data]
req.file_name = options[:file]
Expand Down
53 changes: 53 additions & 0 deletions spec/session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,59 @@
expect {@session.get("/slow")}.to raise_error(Patron::TimeoutError)
end

it "is able to terminate the thread that is running a slow request using Thread#kill (uses the custom unblock)" do
t = Thread.new do
session = Patron::Session.new
session.timeout = 40
session.base_url = "http://localhost:9001"
session.get("/slow")
end

# Our test server starts sending the body only after 20 seconds. We should be able to abort
# using a signal during that time.
started = Time.now.to_i
sleep 5 # Less than what it takes for the server to respond
t.kill # Kill the thread forcibly
t.join # wrap up the thread. If Patron is still busy there, this join call will still take 15s.

delta_s = Time.now.to_i - started
expect(delta_s).to be_within(2).of(5)
end

it "is able to terminate the thread that is running a slow request" do
t = Thread.new do
trap('SIGINT') do
exit # exit the thread
end
session = Patron::Session.new
session.timeout = 40
session.base_url = "http://localhost:9001"
session.get("/slow")
end

# Our test server starts sending the body only after 20 seconds. We should be able to abort
# using a signal during that time.
started = Time.now.to_i
sleep 5 # Less than what it takes for the server to respond
Process.kill("INT", Process.pid) # Signal ourselves...
t.join # wrap up the thread. If Patron is still busy there, this join call will still take 15s.
delta_s = Time.now.to_i - started
expect(delta_s).to be_within(2).of(5)
end

it "receives progress callbacks" do
session = Patron::Session.new
session.timeout = 40
session.base_url = "http://localhost:9001"
callback_args = []
session.progress_callback = Proc.new {|dltotal, dlnow, ultotal, ulnow|
callback_args << [dltotal, dlnow, ultotal, ulnow]
}
session.get("/slow")

expect(callback_args).not_to be_empty
end

it "should follow redirects by default" do
@session.max_redirects = 1
response = @session.get("/redirect")
Expand Down
5 changes: 3 additions & 2 deletions spec/support/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def do_GET(req,res)

class SlowServlet < HTTPServlet::AbstractServlet
def do_GET(req,res)
sleep 3
res.header['Content-Type'] = 'text/plain'
res.body = 'beep'
res.body << 'x'
sleep 20
res.body << 'rest of body'
end
end

Expand Down

0 comments on commit ba2161c

Please sign in to comment.