Skip to content

Commit

Permalink
Concurrency and Async Programming
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbojiTheSecond committed Jan 2, 2025
1 parent 3ea156e commit c0b3442
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 1 deletion.
49 changes: 49 additions & 0 deletions src/day28_to_day30/async_basics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::time::Duration;

pub fn run() {
println!("\nAsync Basics Examples:");

// To run async code, we need a runtime like tokio
// This is just a demonstration of async syntax
println!("Note: These are syntax examples. To run async code, use tokio or another async runtime.");

// Example async function
async fn fetch_data(id: u32) -> String {
// Simulated async operation
// In real code, you would use tokio::time::sleep
std::thread::sleep(Duration::from_millis(100));
format!("Data {}", id)
}

// Example of async block
async fn process_data() {
let data = fetch_data(1).await;
println!("Processed: {}", data);
}

// Example of multiple async operations
async fn parallel_operations() {
let future1 = fetch_data(1);
let future2 = fetch_data(2);

// In real code with tokio:
// let (result1, result2) = tokio::join!(future1, future2);
}
}

// Example struct with async methods
struct AsyncProcessor {
id: u32,
}

impl AsyncProcessor {
async fn process(&self) -> String {
format!("Processed by AsyncProcessor {}", self.id)
}

async fn run_tasks(&self) {
// Example of async/.await syntax
let result = self.process().await;
println!("{}", result);
}
}
39 changes: 39 additions & 0 deletions src/day28_to_day30/async_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Note: To actually run this code, you need to add tokio to your dependencies:
// tokio = { version = "1.0", features = ["full"] }

pub fn run() {
println!("\nAsync I/O Examples:");
println!("Note: These examples require the tokio runtime.");

// The following code demonstrates the structure of async I/O operations
// To run it, you would need to add tokio and uncomment the code

/*
#[tokio::main]
async fn main() {
// File operations
async fn read_file() -> std::io::Result<String> {
use tokio::fs::File;
use tokio::io::AsyncReadExt;
let mut file = File::open("test.txt").await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}
// Network operations
async fn fetch_url(url: &str) -> Result<String, Box<dyn std::error::Error>> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
// Error handling in async context
match read_file().await {
Ok(contents) => println!("File contents: {}", contents),
Err(e) => println!("Error reading file: {}", e),
}
}
*/
}
38 changes: 38 additions & 0 deletions src/day28_to_day30/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pub fn run() {
println!("\nFutures Examples:");
println!("Note: These examples demonstrate Future concepts.");

// The following code shows the structure of working with futures
// To run it, you would need to add the futures crate and uncomment the code

/*
use futures::future::{self, Future};
use futures::stream::{self, Stream};
async fn future_combinators() {
// Creating a future
let future1 = future::ready(1);
let future2 = future::ready(2);
// Combining futures
let combined = future::join(future1, future2);
// Chaining futures
let mapped = future1.map(|x| x + 1);
// Working with streams
let stream = stream::iter(vec![1, 2, 3]);
// Processing streams
let sum = stream
.fold(0, |acc, x| future::ready(acc + x))
.await;
}
// Example of Pin
use std::pin::Pin;
struct AsyncStruct {
future: Pin<Box<dyn Future<Output = ()>>>,
}
*/
}
92 changes: 92 additions & 0 deletions src/day28_to_day30/message_passing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

pub fn run() {
println!("\nMessage Passing Examples:");

// Example 1: Basic channel
basic_channel_example();

// Example 2: Multiple producers
multiple_producers();

// Example 3: Channel with complex data
complex_data_channel();
}

fn basic_channel_example() {
println!("\nBasic Channel Example:");
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let messages = vec!["Hello", "from", "the", "thread"];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

for received in rx {
println!("Got: {}", received);
}
}

fn multiple_producers() {
println!("\nMultiple Producers Example:");
let (tx, rx) = mpsc::channel();

// Clone the transmitter for the second thread
let tx2 = tx.clone();

// First producer
thread::spawn(move || {
let messages = vec![1, 2, 3];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

// Second producer
thread::spawn(move || {
let messages = vec![4, 5, 6];
for msg in messages {
tx2.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

// Receive all messages
for received in rx {
println!("Got: {}", received);
}
}

// Example of sending complex data
#[derive(Debug)]
struct Task {
id: u32,
description: String,
}

fn complex_data_channel() {
println!("\nComplex Data Channel Example:");
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let tasks = vec![
Task { id: 1, description: String::from("First task") },
Task { id: 2, description: String::from("Second task") },
];

for task in tasks {
tx.send(task).unwrap();
thread::sleep(Duration::from_millis(100));
}
});

for task in rx {
println!("Received task: {:?}", task);
}
}
16 changes: 16 additions & 0 deletions src/day28_to_day30/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pub mod threads;
pub mod message_passing;
pub mod shared_state;
pub mod async_basics;
pub mod async_io;
pub mod futures;

pub fn run() {
println!("Days 28-30: Concurrency and Async Programming");
threads::run();
message_passing::run();
shared_state::run();
async_basics::run();
async_io::run();
futures::run();
}
92 changes: 92 additions & 0 deletions src/day28_to_day30/shared_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};

pub fn run() {
println!("\nShared State Examples:");

// Example 1: Mutex example
mutex_example();

// Example 2: RwLock example
rwlock_example();

// Example 3: Atomic types
atomic_example();
}

fn mutex_example() {
println!("\nMutex Example:");

let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..3 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
println!("Thread incremented counter to: {}", *num);
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Final counter value: {}", *counter.lock().unwrap());
}

fn rwlock_example() {
println!("\nRwLock Example:");

let data = Arc::new(RwLock::new(vec![1, 2, 3, 4]));
let mut handles = vec![];

// Spawn reader threads
for i in 0..2 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let values = data.read().unwrap();
println!("Reader {} sees: {:?}", i, *values);
}));
}

// Spawn writer thread
{
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut values = data.write().unwrap();
values.push(5);
println!("Writer added value: {:?}", *values);
}));
}

for handle in handles {
handle.join().unwrap();
}
}

fn atomic_example() {
println!("\nAtomic Example:");

let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];

for _ in 0..3 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..100 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Final atomic counter value: {}", counter.load(Ordering::SeqCst));
}
Loading

0 comments on commit c0b3442

Please sign in to comment.