Skip to content

use auto-formatting #305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,18 @@ jobs:
- name: Run tests
run: cargo test -p capnp -p capnpc -p capnp-futures -p capnp-rpc

fmt:
name: formatting
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
profile: minimal
components: rustfmt
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
format_generated_files = false
53 changes: 31 additions & 22 deletions async-byte-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ impl Drop for Receiver {

pub fn channel() -> (Sender, Receiver) {
let inner = Arc::new(Mutex::new(Inner::new()));
let sender = Sender { inner: inner.clone() };
let sender = Sender {
inner: inner.clone(),
};
let receiver = Receiver { inner };
(sender, receiver)
}
Expand All @@ -70,8 +72,8 @@ impl AsyncRead for Receiver {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut futures::task::Context,
buf: &mut [u8]) -> futures::task::Poll<Result<usize, futures::io::Error>>
{
buf: &mut [u8],
) -> futures::task::Poll<Result<usize, futures::io::Error>> {
let mut inner = self.inner.lock().unwrap();
if inner.read_cursor == inner.write_cursor {
if inner.write_end_closed {
Expand All @@ -83,7 +85,8 @@ impl AsyncRead for Receiver {
} else {
assert!(inner.read_cursor < inner.write_cursor);
let copy_len = std::cmp::min(buf.len(), inner.write_cursor - inner.read_cursor);
buf[0..copy_len].copy_from_slice(&inner.buffer[inner.read_cursor .. inner.read_cursor + copy_len]);
buf[0..copy_len]
.copy_from_slice(&inner.buffer[inner.read_cursor..inner.read_cursor + copy_len]);
inner.read_cursor += copy_len;
if let Some(write_waker) = inner.write_waker.take() {
write_waker.wake();
Expand All @@ -97,19 +100,22 @@ impl AsyncWrite for Sender {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut futures::task::Context,
buf: &[u8]) -> futures::task::Poll<Result<usize, futures::io::Error>>
{
buf: &[u8],
) -> futures::task::Poll<Result<usize, futures::io::Error>> {
let mut inner = self.inner.lock().unwrap();
if inner.read_end_closed {
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "read end closed")))
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"read end closed",
)));
}
if inner.write_cursor == inner.buffer.len() {
if inner.read_cursor == inner.buffer.len() {
inner.write_cursor = 0;
inner.read_cursor = 0;
} else {
inner.write_waker = Some(cx.waker().clone());
return Poll::Pending
return Poll::Pending;
}
}

Expand All @@ -127,17 +133,15 @@ impl AsyncWrite for Sender {

fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut futures::task::Context)
-> Poll<Result<(), futures::io::Error>>
{
_cx: &mut futures::task::Context,
) -> Poll<Result<(), futures::io::Error>> {
Poll::Ready(Ok(()))
}

fn poll_close(
self: Pin<&mut Self>,
_cx: &mut futures::task::Context)
-> Poll<Result<(), futures::io::Error>>
{
_cx: &mut futures::task::Context,
) -> Poll<Result<(), futures::io::Error>> {
let mut inner = self.inner.lock().unwrap();
inner.write_end_closed = true;
if let Some(read_waker) = inner.read_waker.take() {
Expand All @@ -149,20 +153,26 @@ impl AsyncWrite for Sender {

#[cfg(test)]
pub mod test {
use futures::task::LocalSpawnExt;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures::task::{LocalSpawnExt};

#[test]
fn basic() {
let (mut sender, mut receiver) = crate::channel();
let buf: Vec<u8> = vec![1,2,3,4,5].into_iter().cycle().take(20000).collect();
let buf: Vec<u8> = vec![1, 2, 3, 4, 5]
.into_iter()
.cycle()
.take(20000)
.collect();
let mut pool = futures::executor::LocalPool::new();

let buf2 = buf.clone();
pool.spawner().spawn_local(async move {
sender.write_all(&buf2).await.unwrap();
()
}).unwrap();
pool.spawner()
.spawn_local(async move {
sender.write_all(&buf2).await.unwrap();
()
})
.unwrap();

let mut buf3 = vec![];
pool.run_until(receiver.read_to_end(&mut buf3)).unwrap();
Expand All @@ -176,8 +186,7 @@ pub mod test {
drop(receiver);

let mut pool = futures::executor::LocalPool::new();
let result = pool.run_until(sender.write_all(&[0,1,2]));
let result = pool.run_until(sender.write_all(&[0, 1, 2]));
assert!(result.is_err());
}
}

Loading