Skip to content
This repository was archived by the owner on Nov 9, 2017. It is now read-only.

Commit 5929b0e

Browse files
committed
Concurrent uploading in mirror services
1 parent daa40ac commit 5929b0e

File tree

3 files changed

+50
-9
lines changed

3 files changed

+50
-9
lines changed

Gemfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ gemspec
55
gem "rake"
66
gem "byebug"
77

8-
gem 'sqlite3'
9-
gem 'httparty'
10-
gem 'concurrent-ruby'
8+
gem "sqlite3"
9+
gem "httparty"
10+
gem "concurrent-ruby"
1111

1212
gem "aws-sdk", "~> 2", require: false
1313
gem "google-cloud-storage", "~> 1.3", require: false

lib/active_storage/async_uploader.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
require "concurrent/promise"
2+
3+
class ActiveStorage::AsyncUploader
4+
class << self
5+
def result(uploaders)
6+
promises = uploaders.map(&:promise)
7+
Concurrent::Promise.zip(*promises).value!
8+
end
9+
end
10+
11+
attr_reader :promise
12+
13+
def initialize(service, key, checksum: nil)
14+
@data = ""
15+
@eof = false
16+
@promise = Concurrent::Promise.execute do
17+
until eof? do; end
18+
service.upload key, StringIO.new(@data), checksum: checksum
19+
end
20+
end
21+
22+
def eof?
23+
@eof
24+
end
25+
26+
def write(chunk)
27+
@data << chunk
28+
@eof = false
29+
end
30+
31+
def close
32+
@eof = true
33+
end
34+
end

lib/active_storage/service/mirror_service.rb

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
require "active_support/core_ext/module/delegation"
2+
require "active_storage/async_uploader"
23
require "concurrent/promise"
34

45
class ActiveStorage::Service::MirrorService < ActiveStorage::Service
6+
CHUNK_SIZE = 1024
57
attr_reader :primary, :mirrors
68

79
delegate :download, :exist?, :url, to: :primary
@@ -18,24 +20,29 @@ def initialize(primary:, mirrors:)
1820
end
1921

2022
def upload(key, io, checksum: nil)
21-
each_service.collect do |service|
22-
service.upload key, io.tap(&:rewind), checksum: checksum
23+
uploaders = each_service.collect do |service|
24+
ActiveStorage::AsyncUploader.new(service, key, checksum: checksum)
2325
end
26+
io.rewind
27+
while chunk = io.read(CHUNK_SIZE)
28+
uploaders.each { |uploader| uploader.write(chunk) }
29+
end
30+
ActiveStorage::AsyncUploader.result(uploaders.each(&:close))
2431
end
2532

2633
def delete(key)
27-
perform_across_services :delete, key
34+
perform_async_across_services :delete, key
2835
end
2936

3037
private
3138
def each_service(&block)
3239
[ primary, *mirrors ].each(&block)
3340
end
3441

35-
def perform_across_services(method, *args)
36-
promises = services.collect do |service|
42+
def perform_async_across_services(method, *args)
43+
promises = each_service.collect do |service|
3744
Concurrent::Promise.execute { service.public_send method, *args }
3845
end
39-
Concurrent::Promise.zip(*promises).value
46+
Concurrent::Promise.zip(*promises).value!
4047
end
4148
end

0 commit comments

Comments
 (0)