Skip to content

Commit

Permalink
*: Update tokio ecosystem to 1.0 (tikv#10175)
Browse files Browse the repository at this point in the history
* Update prometheus and yatp

Signed-off-by: koushiro <[email protected]>

* update rand & cargo fmt

Signed-off-by: koushiro <[email protected]>

* Update tokio to 1.0 ecosystem

Signed-off-by: koushiro <[email protected]>

* fix tests

Signed-off-by: koushiro <[email protected]>

* update some dependencies

Signed-off-by: koushiro <[email protected]>

* fix

Signed-off-by: koushiro <[email protected]>

* update yatp

Signed-off-by: koushiro <[email protected]>

* use tikv/rusoto

Signed-off-by: koushiro <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
koushiro and ti-chi-bot authored Jun 10, 2021
1 parent a0a0ba0 commit a4a96e5
Show file tree
Hide file tree
Showing 78 changed files with 877 additions and 1,249 deletions.
1,278 changes: 436 additions & 842 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ futures-util = { version = "0.3.1", default-features = false, features = ["io",
grpcio = { version = "0.9", default-features = false, features = ["openssl-vendored"] }
grpcio-health = { version = "0.9", default-features = false }
hex = "0.4"
hyper-tls = "0.4"
itertools = "0.8"
hyper-tls = "0.5"
itertools = "0.10"
openssl = "0.10"
hyper = "0.13"
hyper-openssl = "0.8"
hyper = { version = "0.14", features = ["full"] }
hyper-openssl = "0.9"
http = "0"
into_other = { path = "components/into_other", default-features = false }
keys = { path = "components/keys", default-features = false }
Expand All @@ -175,7 +175,7 @@ nom = { version = "5.1.0", default-features = false, features = ["std"] }
notify = "4"
num_cpus = "1"
pd_client = { path = "components/pd_client", default-features = false }
pin-project = "0.4.8"
pin-project = "1.0"
pnet_datalink = "0.23"
prost = "0.7"
pprof = { version = "^0.4", default-features = false, features = ["flamegraph", "protobuf"] }
Expand All @@ -195,8 +195,8 @@ serde_json = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
parking_lot = "0.11"
prometheus = { version = "0.10", features = ["nightly"] }
prometheus-static-metric = "0.4"
prometheus = { version = "0.12", features = ["nightly"] }
prometheus-static-metric = "0.5"
sst_importer = { path = "components/sst_importer", default-features = false }
sysinfo = "0.16"
tempfile = "3.0"
Expand All @@ -215,9 +215,9 @@ collections = { path = "components/collections" }
coprocessor_plugin_api = { path = "components/coprocessor_plugin_api" }
time = "0.1"
tipb = { git = "https://github.com/pingcap/tipb.git", default-features = false }
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "1.5", features = ["full"] }
tokio-timer = "0.2"
tokio-openssl = "0.4"
tokio-openssl = "0.6"
toml = "0.5"
txn_types = { path = "components/txn_types", default-features = false }
url = "2"
Expand All @@ -232,7 +232,7 @@ example_plugin = { path = "components/test_coprocessor_plugin/example_plugin" }
panic_hook = { path = "components/panic_hook" }
test_sst_importer = { path = "components/test_sst_importer", default-features = false }
test_util = { path = "components/test_util", default-features = false }
tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] }
tokio = { version = "1.5", features = ["macros", "rt-multi-thread", "time"] }
zipf = "6.1.0"


Expand Down
7 changes: 3 additions & 4 deletions cmd/tikv-ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ error_code = { path = "../../components/error_code", default-features = false }
file_system = { path = "../../components/file_system", default-features = false }
fs2 = "0.4"
futures = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "time"] }
tokio = { version = "1.5", features = ["rt-multi-thread", "time"] }
grpcio = { version = "0.9", default-features = false, features = ["openssl-vendored"] }
hex = "0.4"
keys = { path = "../../components/keys", default-features = false }
Expand All @@ -103,13 +103,13 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug
log_wrappers = { path = "../../components/log_wrappers" }
nix = "0.19"
pd_client = { path = "../../components/pd_client", default-features = false }
prometheus = { version = "0.10", features = ["nightly"] }
prometheus = { version = "0.12", features = ["nightly"] }
promptly = "0.3.0"
protobuf = "2.8"
raft = { version = "0.6.0-alpha", default-features = false }
raft_log_engine = { path = "../../components/raft_log_engine", default-features = false }
raftstore = { path = "../../components/raftstore", default-features = false }
rand = "0.7"
rand = "0.8"
security = { path = "../../components/security", default-features = false }
serde_json = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
Expand All @@ -121,7 +121,6 @@ tikv_util = { path = "../../components/tikv_util", default-features = false }
collections = { path = "../../components/collections" }
toml = "0.5"
txn_types = { path = "../../components/txn_types", default-features = false }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[build-dependencies]
time = "0.1"
Expand Down
5 changes: 2 additions & 3 deletions components/backup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ engine_rocks = { path = "../engine_rocks", default-features = false }
engine_traits = { path = "../engine_traits", default-features = false }
error_code = { path = "../error_code", default-features = false }
external_storage_export = { path = "../external_storage/export", default-features = false }
failure = "0.1"
file_system = { path = "../file_system", default-features = false }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["io"] }
Expand All @@ -82,7 +81,7 @@ kvproto = { rev = "7a046020d1c091638e1e8aba623c8c1e8962219d", git = "https://git
lazy_static = "1.3"
log_wrappers = { path = "../log_wrappers" }
pd_client = { path = "../pd_client", default-features = false }
prometheus = { version = "0.10", default-features = false, features = ["nightly"] }
prometheus = { version = "0.12", default-features = false, features = ["nightly"] }
raft = { version = "0.6.0-alpha", default-features = false }
raftstore = { path = "../raftstore", default-features = false }
security = { path = "../security", default-features = false }
Expand All @@ -101,5 +100,5 @@ txn_types = { path = "../txn_types", default-features = false }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[dev-dependencies]
rand = "0.7"
rand = "0.8"
tempfile = "3.0"
4 changes: 2 additions & 2 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ thiserror = "1.0"
tikv = { path = "../../", default-features = false }
tikv_util = { path = "../tikv_util", default-features = false }
collections = { path = "../collections" }
tokio = { version = "0.2", features = ["rt-threaded"]}
tokio = { version = "1.5", features = ["rt-multi-thread"]}
txn_types = { path = "../txn_types", default-features = false }
concurrency_manager = { path = "../concurrency_manager", default-features = false }
fail = "0.4"
lazy_static = "1.3"
log_wrappers = { path = "../log_wrappers" }
prometheus = { version = "0.10", default-features = false, features = ["nightly"] }
prometheus = { version = "0.12", default-features = false, features = ["nightly"] }
protobuf = "2.8"
prost = "0.7"
futures-timer = "3.0"
Expand Down
2 changes: 1 addition & 1 deletion components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl Delegate {
for (tag, count) in cf_details.iter() {
CDC_OLD_VALUE_SCAN_DETAILS
.with_label_values(&[*cf, *tag])
.inc_by(*count as i64);
.inc_by(*count as u64);
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,15 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
security_mgr: Arc<SecurityManager>,
sink_memory_quota: MemoryQuota,
) -> Endpoint<T> {
let workers = Builder::new()
.threaded_scheduler()
let workers = Builder::new_multi_thread()
.thread_name("cdcwkr")
.core_threads(cfg.incremental_scan_threads)
.worker_threads(cfg.incremental_scan_threads)
.build()
.unwrap();
let scan_concurrency_semaphore = Arc::new(Semaphore::new(cfg.incremental_scan_concurrency));
let tso_worker = Builder::new()
.threaded_scheduler()
let tso_worker = Builder::new_multi_thread()
.thread_name("tso")
.core_threads(1)
.worker_threads(1)
.build()
.unwrap();
CDC_SINK_CAP.set(sink_memory_quota.cap() as i64);
Expand Down Expand Up @@ -1446,10 +1444,9 @@ mod tests {
let quota = crate::channel::MemoryQuota::new(usize::MAX);
let (sink, drain) = crate::channel::channel(buffer, quota);

let pool = Builder::new()
.threaded_scheduler()
let pool = Builder::new_multi_thread()
.thread_name("test-initializer-worker")
.core_threads(4)
.worker_threads(4)
.build()
.unwrap();
let downstream_state = Arc::new(AtomicCell::new(DownstreamState::Normal));
Expand Down
2 changes: 1 addition & 1 deletion components/cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ futures-io = "0.3"
kvproto = { rev = "7a046020d1c091638e1e8aba623c8c1e8962219d", git = "https://github.com/pingcap/kvproto.git", default-features = false }
openssl = "0.10"
protobuf = "2.8"
rusoto_core = "0.45.0"
rusoto_core = "0.46.0"
thiserror = "1.0"
tikv_util = { path = "../tikv_util", default-features = false }
url = "2.0"
Expand Down
21 changes: 10 additions & 11 deletions components/cloud/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ failpoints = ["fail/failpoints"]

[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
cloud = { path = "../", default-features = false }
fail = "0.4"
futures = "0.3"
Expand All @@ -30,22 +30,21 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] }
# makes `cargo test -p aws` link correctly.
grpcio = { version = "0.9", default-features = false, features = ["openssl-vendored"] }
http = "0.2.0"
hyper = "0.13.3"
hyper-tls = "0.4.1"
hyper = "0.14"
hyper-tls = "0.5"
kvproto = { rev = "7a046020d1c091638e1e8aba623c8c1e8962219d", git = "https://github.com/pingcap/kvproto.git", default-features = false }
rusoto_core = "0.45.0"
rusoto_credential = "0.45.0"
rusoto_kms = { version = "0.45.0", features = ["serialize_structs"] }
rusoto_sts = "0.45.0"
rusoto_s3 = "0.45.0"
rusoto_core = "0.46.0"
rusoto_credential = "0.46.0"
rusoto_kms = { version = "0.46.0", features = ["serialize_structs"] }
rusoto_sts = "0.46.0"
rusoto_s3 = "0.46.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
# better to not use slog-global, but pass in the logger
tokio = { version = "0.2.13", features = ["time"] }
tokio = { version = "1.5", features = ["time"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
tikv_util = { path = "../../tikv_util", default-features = false }
url = "2.0"

[dev-dependencies]
matches = "0.1.8"
futures = "0.3"
rusoto_mock = "0.45.0"
rusoto_mock = "0.46.0"
21 changes: 11 additions & 10 deletions components/cloud/aws/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::io;
use std::time::Duration;

use fail::fail_point;
use futures_util::{
future::FutureExt,
io::{AsyncRead, AsyncReadExt},
stream::TryStreamExt,
};
use rusoto_core::{
request::DispatchSignedRequest,
{ByteStream, RusotoError},
};
use rusoto_credential::{ProvideAwsCredentials, StaticProvider};
use rusoto_s3::{util::AddressingStyle, *};
use tokio::time::{delay_for, timeout};
use tokio::time::{sleep, timeout};

use crate::util;
use cloud::blob::{none_to_empty, BlobConfig, BlobStorage, BucketConf, StringNonEmpty};
use fail::fail_point;
use futures_util::{
future::FutureExt,
io::{AsyncRead, AsyncReadExt},
stream::TryStreamExt,
};
pub use kvproto::backup::{Bucket as InputBucket, CloudDynamic, S3 as InputConfig};
use std::time::Duration;
use tikv_util::debug;
use tikv_util::stream::{block_on_external_io, error_stream, retry};

use crate::util;

const CONNECTION_TIMEOUT: Duration = Duration::from_secs(900);
pub const STORAGE_VENDOR_NAME_AWS: &str = "aws";

Expand Down Expand Up @@ -406,7 +407,7 @@ impl<'client> S3Uploader<'client> {
let delay_duration = Duration::from_millis(0);

if delay_duration > Duration::from_millis(0) {
delay_for(delay_duration).await;
sleep(delay_duration).await;
}

#[cfg(feature = "failpoints")]
Expand Down
10 changes: 5 additions & 5 deletions components/cloud/gcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ prost-codec = [
[dependencies]
futures-util = { version = "0.3", default-features = false, features = ["io"] }
http = "0.2.0"
hyper = "0.13.3"
hyper-tls = "0.4.1"
hyper = "0.14"
hyper-tls = "0.5"
kvproto = { rev = "7a046020d1c091638e1e8aba623c8c1e8962219d", git = "https://github.com/pingcap/kvproto.git", default-features = false }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
# better to not use slog-global, but pass in the logger
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
tame-gcs = { version = "0.8.0", features = ["async-multipart"] }
tame-oauth = "0.4.2"
tame-gcs = { version = "0.10", features = ["async-multipart"] }
tame-oauth = "0.4.7"
cloud = { path = "../", default-features = false }
tikv_util = { path = "../../tikv_util", default-features = false }
tokio = { version = "0.2.13", features = ["time"] }
tokio = { version = "1.5", features = ["time"] }
url = "2.0"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions components/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ tikv_alloc = { path = "../tikv_alloc" }
[dev-dependencies]
panic_hook = { path = "../panic_hook" }
protobuf = "2"
bytes = "0.5"
rand = "0.7"
bytes = "1.0"
rand = "0.8"
4 changes: 2 additions & 2 deletions components/concurrency_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ prost-codec = [

[dependencies]
parking_lot = "0.11"
tokio = { version = "0.2", features = ["macros", "sync", "time"] }
tokio = { version = "1.5", features = ["macros", "sync", "time"] }
txn_types = { path = "../txn_types", default-features = false }
fail = "0.4"

Expand All @@ -26,7 +26,7 @@ branch = "tikv-5.0"
package = "crossbeam-skiplist"

[dev-dependencies]
rand = "0.7.3"
rand = "0.8.3"
futures = "0.3"
criterion = "0.3"
tikv_alloc = { path = "../tikv_alloc", features = ["jemalloc"] }
Expand Down
4 changes: 2 additions & 2 deletions components/concurrency_manager/benches/lock_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn bench_point_check(c: &mut Criterion) {
fn range_check_baseline(c: &mut Criterion) {
c.bench_function("range_check_baseline", |b| {
b.iter(|| {
let start = thread_rng().gen_range(0u8, 245);
let start = thread_rng().gen_range(0u8..245);
black_box(Key::from_raw(&[start]));
black_box(Key::from_raw(&[start + 25]));
})
Expand All @@ -79,7 +79,7 @@ fn bench_range_check(c: &mut Criterion) {
let ts_set = TsSet::Empty;
c.bench_function("range_check_1k_in_10k_locks", |b| {
b.iter(|| {
let start = thread_rng().gen_range(0u8, 230);
let start = thread_rng().gen_range(0u8..230);
let start_key = Key::from_raw(&[start]);
let end_key = Key::from_raw(&[start + 25]);
// The key range is roughly 1/10 the key space.
Expand Down
4 changes: 2 additions & 2 deletions components/concurrency_manager/src/key_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod tests {
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use tokio::time::delay_for;
use tokio::time::sleep;

#[tokio::test]
async fn test_key_mutex() {
Expand All @@ -118,7 +118,7 @@ mod tests {
// Modify an atomic counter with a mutex guard. The value of the counter
// should remain unchanged if the mutex works.
let counter_val = counter.fetch_add(1, Ordering::SeqCst) + 1;
delay_for(Duration::from_millis(1)).await;
sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::SeqCst), counter_val);
});
handles.push(handle);
Expand Down
4 changes: 2 additions & 2 deletions components/concurrency_manager/src/lock_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod test {
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use tokio::time::delay_for;
use tokio::time::sleep;
use txn_types::LockType;

#[tokio::test]
Expand All @@ -143,7 +143,7 @@ mod test {
// Modify an atomic counter with a mutex guard. The value of the counter
// should remain unchanged if the mutex works.
let counter_val = counter.fetch_add(1, Ordering::SeqCst) + 1;
delay_for(Duration::from_millis(1)).await;
sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::SeqCst), counter_val);
});
handles.push(handle);
Expand Down
Loading

0 comments on commit a4a96e5

Please sign in to comment.