From a5df7865a1dcb0dffed88d28e7cac3129ae535e2 Mon Sep 17 00:00:00 2001 From: David Westgate Date: Thu, 30 Nov 2023 14:11:56 -0800 Subject: [PATCH] fix issue with room msg logic; add client keepalive watchdog --- src/client.rs | 39 ++++++++++++++++++----- src/lib.rs | 1 + src/server.rs | 88 ++++++++++++++++++++++++++++++++------------------- 3 files changed, 87 insertions(+), 41 deletions(-) diff --git a/src/client.rs b/src/client.rs index 251b8c6..065c792 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,9 +2,9 @@ use prompted::input; use rust_irc::{clear, codes}; use std::io::{Read, Write}; use std::net::TcpStream; +use std::sync::{Arc, Mutex}; use std::thread; - -use crate::client; +use std::time::{Duration, Instant}; fn no_param_op(opcode: u8, stream: &mut TcpStream) { stream.write(&[opcode]).unwrap(); @@ -40,7 +40,7 @@ fn two_param_op(opcode: u8, stream: &mut TcpStream, param0: &str, param1: &str) stream.write(&out_buf).unwrap(); } -fn read_messages(mut stream: TcpStream, nick: &str) { +fn read_messages(mut stream: TcpStream, nick: &str, timestamp: &mut Arc>) { let mut buffer: [u8; 1024] = [0; 1024]; loop { match stream.read(&mut buffer) { @@ -49,6 +49,8 @@ fn read_messages(mut stream: TcpStream, nick: &str) { std::process::exit(0); } Ok(size) => { + let mut lock = timestamp.lock().unwrap(); + *lock = Instant::now(); let msg_bytes: &[u8] = &buffer[..size]; process_message(msg_bytes, nick); } @@ -60,7 +62,6 @@ fn read_messages(mut stream: TcpStream, nick: &str) { } fn process_message(msg_bytes: &[u8], nick: &str) { - println!(); match msg_bytes[0] { codes::ERROR => match msg_bytes[1] { codes::error::INVALID_ROOM => { @@ -75,6 +76,9 @@ fn process_message(msg_bytes: &[u8], nick: &str) { codes::error::NOT_IN_ROOM => { eprintln!("Cannot send a message before joining room. Use /join [room].") } + codes::error::EMPTY_ROOM => { + eprintln!("Room is Empty"); + } _ => { eprintln!("Error code: {:x?}", msg_bytes[1]); } @@ -158,23 +162,42 @@ pub fn start() { } // let host: String = input!("Enter the server host: "); - let host: &str = "localhost"; + let host: &str = "fab04.cecs.pdx.edu"; if let Ok(mut stream) = TcpStream::connect(host.to_owned() + ":6667") { println!("Connected to {}", host); //another stream for reading messages - let stream_clone: TcpStream = stream.try_clone().expect("Failed to clone stream"); + let reader_clone: TcpStream = stream.try_clone().expect("Failed to clone stream"); + let mut keepalive_clone = stream.try_clone().expect("failed to clone stream"); let nick_clone: String = nick.clone(); + + //timestamp for detecting unresponsive server + let timestamp = Arc::new(Mutex::new(Instant::now())); + let mut timestamp_clone: Arc> = Arc::clone(×tamp); + thread::spawn(move || { - read_messages(stream_clone, &nick_clone); + read_messages(reader_clone, &nick_clone, &mut timestamp_clone); + }); + + //watchdog to send keep_alive and stop client if server fails to respond + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(5)); + let lock = timestamp.lock().unwrap(); + let now = Instant::now(); + if now.duration_since(*lock) > Duration::from_secs(30) { + eprintln!("Server is unresponsive. Stopping client"); + std::process::exit(1); + } else if now.duration_since(*lock) > Duration::from_secs(5) { + keepalive_clone.write_all(&[codes::KEEP_ALIVE]).unwrap(); + } }); //try to register the nickname one_param_op(codes::client::REGISTER_NICK, &mut stream, &nick); loop { - let mut inp = String::new(); + let inp; if active_room.is_empty() { inp = input!(""); } else { diff --git a/src/lib.rs b/src/lib.rs index f1e1cc9..9f15807 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub mod codes { pub const MALFORMED: u8 = 0x15; pub const ALREADY_IN_ROOM: u8 = 0x16; pub const NOT_IN_ROOM: u8 = 0x17; + pub const EMPTY_ROOM: u8 = 0x8; } } diff --git a/src/server.rs b/src/server.rs index 3888e59..3be05f4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -60,25 +60,57 @@ fn message(room: &str, msg: &str, sender: &str, server: &Arc>) { let mut guard: std::sync::MutexGuard<'_, Server> = server.lock().unwrap(); let server: &mut Server = guard.deref_mut(); + //1: Make sure specified rooms exists, ifn error + //2: Make sure sender is a member of the room, ifn error + //3: Message all non-sender users in the room the message, ifnone error empty room + //4: Message the sender RESPONSE_OK let room_users: Option<&Vec> = server.rooms.get(room); + let mut sender_stream = server + .users + .get_mut(sender) + .unwrap() + .try_clone() + .expect("Clone issue"); + //1 match room_users { Some(users) => { + //2 + let mut is_member = false; for user in users { - let stream: Option<&mut TcpStream> = server.users.get_mut(user); - match stream { - Some(str) => { - str.write_all(&out_buf).unwrap(); - } - None => { - //TODO send error msg to sender - eprintln!("Error: Invalid message from client"); + if user.eq(sender) { + is_member = true; + } + } + + if is_member { + for user in users { + if user.eq(sender) { + //4 + sender_stream.write_all(&[codes::RESPONSE_OK]).unwrap(); + } else { + //3 + let recipient_stream: Option<&mut TcpStream> = server.users.get_mut(user); + match recipient_stream { + Some(str) => { + println!("Sending msg {:?}", out_buf.to_ascii_lowercase()); + str.write_all(&out_buf).unwrap(); + } + None => { + eprintln!("Server error: could not find user"); + } + } } } + } else { + sender_stream + .write_all(&[codes::ERROR, codes::error::NOT_IN_ROOM]) + .unwrap(); } } None => { - //TODO send error msg to sender - eprintln!("Error: Invalid message from client"); + sender_stream + .write_all(&[codes::ERROR, codes::error::EMPTY_ROOM]) + .unwrap(); } } } @@ -184,36 +216,21 @@ fn handle_client( codes::client::MESSAGE => { #[cfg(debug_assertions)] println!("MESSAGE"); + stream.write_all(&[codes::RESPONSE_OK]).unwrap(); + } + + codes::KEEP_ALIVE => { + println!("kEEP alive"); + stream.write_all(&[codes::RESPONSE_OK]).unwrap(); } codes::client::MESSAGE_ROOM => { let p: String = String::from_utf8_lossy(param_bytes).to_string(); let params: Option<(&str, &str)> = p.split_once(" "); + match params { Some((room, msg)) => { - let unlocked_server: std::sync::MutexGuard<'_, Server> = server.lock().unwrap(); - let users_in_room: Option<&Vec> = unlocked_server.rooms.get(room); - match users_in_room { - Some(users) => { - let is_user_in_room: Option<&String> = - users.iter().find(|&u| u.eq(nickname)); - match is_user_in_room { - Some(_) => { - message(room, msg, nickname, server); - } - None => { - stream - .write_all(&[codes::ERROR, codes::error::NOT_IN_ROOM]) - .unwrap(); - } - } - } - None => { - stream - .write_all(&[codes::ERROR, codes::error::INVALID_ROOM]) - .unwrap(); - } - } + message(room, msg, nickname, server); } _ => { stream @@ -402,6 +419,7 @@ pub fn start() { println!("1: list connected users"); println!("2: list rooms"); println!("3: Broadcast message to all"); + println!("4: Freeze server via double lock (for testing)"); let inp: String = input!(":"); match inp.parse::() { Ok(num) => match num { @@ -416,6 +434,10 @@ pub fn start() { let inp2 = input!("Enter message: "); broadcast(codes::client::MESSAGE, &server, &inp2); } + 4 => { + let s1 = server.lock().unwrap(); + let s2 = server.lock().unwrap(); + } _ => println!("Invalid Input"), }, Err(_) => {