diff --git a/src/bytecode/src/bytecode.rs b/src/bytecode/src/bytecode.rs index bb35421..8eef331 100644 --- a/src/bytecode/src/bytecode.rs +++ b/src/bytecode/src/bytecode.rs @@ -12,7 +12,7 @@ pub type ThreadID = i64; /// and implementation details. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum ByteCode { - /// Signal that the program has finished executing. + /// Signal that the thread has finished executing. DONE, /// Assign the top of the operant stack to the given symbol in the current environment. ASSIGN(Symbol), @@ -40,8 +40,8 @@ pub enum ByteCode { LDF(usize, Vec), /// Call a function with the given number of arguments. CALL(usize), - /// Spawn a new thread. - SPAWN, + /// Spawn a new thread with the address of the instruction for the child to execute. + SPAWN(usize), /// Join a thread. JOIN(ThreadID), /// Yield the current thread. diff --git a/src/bytecode/src/stack_frame.rs b/src/bytecode/src/stack_frame.rs index 409b422..46de77a 100644 --- a/src/bytecode/src/stack_frame.rs +++ b/src/bytecode/src/stack_frame.rs @@ -8,7 +8,6 @@ use crate::Environment; pub enum FrameType { BlockFrame, CallFrame, - ThreadFrame, } #[derive(Debug, Clone)] @@ -29,8 +28,8 @@ impl StackFrame { pub fn new_with_address( frame_type: FrameType, - address: usize, env: Rc>, + address: usize, ) -> Self { StackFrame { frame_type, diff --git a/vm/ignite/src/error.rs b/vm/ignite/src/error.rs index d5829f0..cd65fb4 100644 --- a/vm/ignite/src/error.rs +++ b/vm/ignite/src/error.rs @@ -1,3 +1,4 @@ +use bytecode::ThreadID; use thiserror::Error; #[derive(Error, Debug)] @@ -20,6 +21,9 @@ pub enum VmError { #[error("Runtime stack underflow")] RuntimeStackUnderflow, + #[error("Thread not found: {0}")] + ThreadNotFound(ThreadID), + #[error("PC out of bounds: {0}")] PcOutOfBounds(usize), diff --git a/vm/ignite/src/micro_code/done.rs b/vm/ignite/src/micro_code/done.rs index 32cfdae..b0aac63 100644 --- a/vm/ignite/src/micro_code/done.rs +++ b/vm/ignite/src/micro_code/done.rs @@ -1,6 +1,8 @@ -use anyhow::Result; +use std::collections::hash_map::Entry; -use crate::{Runtime, ThreadState}; +use anyhow::{Ok, Result}; + +use crate::{Runtime, ThreadState, VmError}; /// Set the state of the current thread to done. /// @@ -10,10 +12,18 @@ use crate::{Runtime, ThreadState}; /// /// # Errors /// -/// Infallible. +/// * If the current thread is not found in the thread state hashmap. pub fn done(rt: &mut Runtime) -> Result<()> { - rt.current_thread.state = ThreadState::Done; - Ok(()) + let tid = rt.current_thread.thread_id; + let entry = rt.thread_states.entry(tid); + + match entry { + Entry::Vacant(_) => Err(VmError::ThreadNotFound(tid).into()), + Entry::Occupied(mut entry) => { + entry.insert(ThreadState::Done); + Ok(()) + } + } } #[cfg(test)] @@ -24,7 +34,7 @@ mod tests { fn test_done() -> Result<()> { let mut rt = Runtime::new(vec![]); done(&mut rt)?; - assert_eq!(rt.current_thread.state, ThreadState::Done); + assert_eq!(rt.thread_states.get(&1), Some(&ThreadState::Done)); Ok(()) } } diff --git a/vm/ignite/src/micro_code/join.rs b/vm/ignite/src/micro_code/join.rs index 44bc849..80e6918 100644 --- a/vm/ignite/src/micro_code/join.rs +++ b/vm/ignite/src/micro_code/join.rs @@ -1,7 +1,9 @@ +use std::collections::hash_map::Entry; + use anyhow::Result; use bytecode::ThreadID; -use crate::{Runtime, ThreadState}; +use crate::{Runtime, ThreadState, VmError}; /// Set the state of the current thread to joining the thread with the given ID. /// @@ -13,10 +15,18 @@ use crate::{Runtime, ThreadState}; /// /// # Errors /// -/// Infallible. +/// * If the thread with the given ID is not found in the thread state hashmap. pub fn join(rt: &mut Runtime, tid: ThreadID) -> Result<()> { - rt.current_thread.state = ThreadState::Joining(tid); - Ok(()) + let current_tid = rt.current_thread.thread_id; + let entry = rt.thread_states.entry(current_tid); + + match entry { + Entry::Vacant(_) => Err(VmError::ThreadNotFound(current_tid).into()), + Entry::Occupied(mut entry) => { + entry.insert(ThreadState::Joining(tid)); + Ok(()) + } + } } #[cfg(test)] @@ -27,7 +37,7 @@ mod tests { fn test_join() -> Result<()> { let mut rt = Runtime::new(vec![]); join(&mut rt, 2)?; - assert_eq!(rt.current_thread.state, ThreadState::Joining(2)); + assert_eq!(rt.thread_states.get(&1), Some(&ThreadState::Joining(2))); Ok(()) } } diff --git a/vm/ignite/src/micro_code/reset.rs b/vm/ignite/src/micro_code/reset.rs index 7fd9172..0a36c47 100644 --- a/vm/ignite/src/micro_code/reset.rs +++ b/vm/ignite/src/micro_code/reset.rs @@ -83,7 +83,7 @@ mod tests { let some_frame = StackFrame::new(FrameType::CallFrame, Rc::clone(&env_a)); let block_frame = - StackFrame::new_with_address(FrameType::BlockFrame, 123, Rc::clone(&env_c)); + StackFrame::new_with_address(FrameType::BlockFrame, Rc::clone(&env_c), 123); let call_frame = StackFrame::new(FrameType::CallFrame, Rc::clone(&env_b)); rt.current_thread.runtime_stack.push(some_frame); diff --git a/vm/ignite/src/micro_code/spawn.rs b/vm/ignite/src/micro_code/spawn.rs index 761b785..1dbf244 100644 --- a/vm/ignite/src/micro_code/spawn.rs +++ b/vm/ignite/src/micro_code/spawn.rs @@ -1,10 +1,11 @@ use anyhow::Result; -use crate::Runtime; +use crate::{Runtime, ThreadState}; /// Spawn a new thread that clones the main thread at the time of the spawn. -/// The new thread is added to the ready queue. +/// The new thread is added to the thread state hashmap with a state of Ready. /// The new thread is given a unique thread ID. +/// The new thread is added to the ready queue. /// This thread ID is pushed onto the operand stack of the parent thread. /// 0 is pushed onto the operand stack of the child thread. /// @@ -15,17 +16,19 @@ use crate::Runtime; /// # Errors /// /// Infallible. -pub fn spawn(rt: &mut Runtime) -> Result<()> { +pub fn spawn(rt: &mut Runtime, addr: usize) -> Result<()> { rt.thread_count += 1; - let new_thread_id = rt.thread_count; - let mut new_thread = rt.current_thread.spawn_new(new_thread_id); + let child_thread_id = rt.thread_count; + let mut child_thread = rt.current_thread.spawn_new(child_thread_id, addr); + // Add the child thread to the thread state hashmap. + rt.thread_states.insert(child_thread_id, ThreadState::Ready); - // The child thread ID is pushed onto the operand stack of the parent thread. - rt.current_thread.operand_stack.push(new_thread_id.into()); // 0 is pushed onto the operand stack of the child thread. - new_thread.operand_stack.push(0.into()); + child_thread.operand_stack.push(0.into()); + // The child thread ID is pushed onto the operand stack of the parent thread. + rt.current_thread.operand_stack.push(child_thread_id.into()); - rt.ready_queue.push_back(new_thread); + rt.ready_queue.push_back(child_thread); Ok(()) } diff --git a/vm/ignite/src/micro_code/yield_.rs b/vm/ignite/src/micro_code/yield_.rs index 97bd275..c58f29b 100644 --- a/vm/ignite/src/micro_code/yield_.rs +++ b/vm/ignite/src/micro_code/yield_.rs @@ -1,6 +1,8 @@ +use std::collections::hash_map::Entry; + use anyhow::Result; -use crate::{Runtime, ThreadState}; +use crate::{Runtime, ThreadState, VmError}; /// Yield the current thread. /// This will set the yield flag of the current thread to true. @@ -14,8 +16,16 @@ use crate::{Runtime, ThreadState}; /// /// Infallible. pub fn yield_(rt: &mut Runtime) -> Result<()> { - rt.current_thread.state = ThreadState::Yielded; - Ok(()) + let tid = rt.current_thread.thread_id; + let entry = rt.thread_states.entry(tid); + + match entry { + Entry::Vacant(_) => Err(VmError::ThreadNotFound(tid).into()), + Entry::Occupied(mut entry) => { + entry.insert(ThreadState::Yielded); + Ok(()) + } + } } #[cfg(test)] @@ -26,7 +36,7 @@ mod tests { fn test_yield() -> Result<()> { let mut rt = Runtime::new(vec![]); yield_(&mut rt)?; - assert_eq!(rt.current_thread.state, ThreadState::Yielded); + assert_eq!(rt.thread_states.get(&1), Some(&ThreadState::Yielded)); Ok(()) } } diff --git a/vm/ignite/src/runtime.rs b/vm/ignite/src/runtime.rs index ee7e8cf..1af3e60 100644 --- a/vm/ignite/src/runtime.rs +++ b/vm/ignite/src/runtime.rs @@ -1,10 +1,10 @@ use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, time::{Duration, Instant}, }; use anyhow::Result; -use bytecode::ByteCode; +use bytecode::{ByteCode, ThreadID}; use crate::{micro_code, Thread, ThreadState, VmError}; @@ -22,21 +22,29 @@ pub struct Runtime { time_quantum: Duration, pub instrs: Vec, pub thread_count: i64, + pub thread_states: HashMap, pub current_thread: Thread, pub ready_queue: VecDeque, pub suspended_queue: VecDeque, + pub zombie_threads: HashMap, } impl Runtime { pub fn new(instrs: Vec) -> Self { + let mut thread_states = HashMap::new(); + thread_states.insert(MAIN_THREAD_ID, ThreadState::Ready); + let current_thread = Thread::new(MAIN_THREAD_ID); + Runtime { time: Instant::now(), time_quantum: DEFAULT_TIME_QUANTUM, instrs, thread_count: 1, - current_thread: Thread::new(MAIN_THREAD_ID), + thread_states, + current_thread, ready_queue: VecDeque::new(), suspended_queue: VecDeque::new(), + zombie_threads: HashMap::new(), } } @@ -69,7 +77,7 @@ pub fn run(mut rt: Runtime) -> Result { } if rt.is_current_thread_joining() { - rt = rt.join_current_thread(); + rt = rt.join_current_thread()?; } let instr = rt.fetch_instr()?; @@ -81,7 +89,7 @@ pub fn run(mut rt: Runtime) -> Result { } if !rt.is_current_main_thread() { - rt = rt.drop_current_thread(); + rt = rt.zombify_current_thread(); continue; } @@ -123,7 +131,7 @@ pub fn execute(rt: &mut Runtime, instr: ByteCode) -> Result<()> { ByteCode::ENTERSCOPE(syms) => micro_code::enter_scope(rt, syms)?, ByteCode::EXITSCOPE => micro_code::exit_scope(rt)?, ByteCode::CALL(arity) => micro_code::call(rt, arity)?, - ByteCode::SPAWN => micro_code::spawn(rt)?, + ByteCode::SPAWN(addr) => micro_code::spawn(rt, addr)?, ByteCode::JOIN(tid) => micro_code::join(rt, tid)?, ByteCode::YIELD => micro_code::yield_(rt)?, } @@ -131,6 +139,21 @@ pub fn execute(rt: &mut Runtime, instr: ByteCode) -> Result<()> { } impl Runtime { + /// Get the current state of the current thread. + /// Panics if the current thread is not found. + pub fn get_current_thread_state(&self) -> ThreadState { + let current_thread_id = self.current_thread.thread_id; + *self + .thread_states + .get(¤t_thread_id) + .ok_or(VmError::ThreadNotFound(current_thread_id)) + .expect("Current thread not found") + } + + pub fn set_thread_state(&mut self, thread_id: ThreadID, state: ThreadState) { + self.thread_states.insert(thread_id, state); + } + /// Check if the time quantum has expired. /// The time quantum is the maximum amount of time a thread can run before it is preempted. pub fn time_quantum_expired(&self) -> bool { @@ -140,15 +163,18 @@ impl Runtime { /// Check if the current thread should yield. /// This is set by the `YIELD` instruction. pub fn should_yield_current_thread(&self) -> bool { - self.current_thread.state == ThreadState::Yielded + self.get_current_thread_state() == ThreadState::Yielded } /// Yield the current thread. Set the state of the current thread to `Ready` and push it onto the ready queue. /// Pop the next thread from the ready queue and set it as the current thread. /// The timer is reset to the current time. + /// Panics if the current thread is not found. pub fn yield_current_thread(mut self) -> Self { - let mut current_thread = self.current_thread; - current_thread.state = ThreadState::Ready; // Reset the state + let current_thread_id = self.current_thread.thread_id; + self.set_thread_state(current_thread_id, ThreadState::Ready); + + let current_thread = self.current_thread; self.ready_queue.push_back(current_thread); let next_ready_thread = self @@ -161,18 +187,27 @@ impl Runtime { self } - pub fn drop_current_thread(mut self) -> Self { + /// Zombify the current thread. Set the state of the current thread to `Zombie` and add it into the zombie threads. + /// Pop the next thread from the ready queue and set it as the current thread. + pub fn zombify_current_thread(mut self) -> Self { + let current_thread = self.current_thread; + let current_thread_id = current_thread.thread_id; let next_ready_thread = self .ready_queue .pop_front() .expect("No threads in ready queue"); + self.zombie_threads + .insert(current_thread_id, current_thread); + self.thread_states + .insert(current_thread_id, ThreadState::Zombie); + self.current_thread = next_ready_thread; self } pub fn is_current_thread_done(&self) -> bool { - self.current_thread.state == ThreadState::Done + self.get_current_thread_state() == ThreadState::Done } pub fn is_current_main_thread(&self) -> bool { @@ -180,33 +215,52 @@ impl Runtime { } pub fn is_current_thread_joining(&self) -> bool { - matches!(self.current_thread.state, ThreadState::Joining(_)) + matches!(self.get_current_thread_state(), ThreadState::Joining(_)) } /// Join the current thread with the thread with the given ThreadID based on the current thread's state. - /// If the thread to join is in the ready or suspended queue, then the current thread will yield. - /// Otherwise, the current thread will be set to ready. - pub fn join_current_thread(mut self) -> Self { - if let ThreadState::Joining(tid) = self.current_thread.state { - let thread_to_join = self - .ready_queue - .iter() - .chain(self.suspended_queue.iter()) - .find(|t| t.thread_id == tid); - - if thread_to_join.is_some() { - // If the thread to join in the ready or suspended queue, then we need to yield the current thread. - let rt = self.yield_current_thread(); - return rt; - }; - - // Otherwise we can just set the current thread to ready. - self.current_thread.state = ThreadState::Ready; - } else { + /// If the thread to join is in zombie state, then the current thread will be set to ready and the result + /// of the zombie thread will be pushed onto the current thread's operand stack. The zombie thread is deallocated. + /// If the thread to join is not found, then panic. + /// Otherwise, the current thread will yield. + pub fn join_current_thread(mut self) -> Result { + let current_thread_id = self.current_thread.thread_id; + + let ThreadState::Joining(tid) = self.get_current_thread_state() else { panic!("Current thread is not joining"); + }; + + let thread_to_join_state = self.thread_states.get(&tid); + + match thread_to_join_state { + // If the thread to join does not exist, then panic + None => { + panic!("Thread to join not found"); + } + // If the thread to join is in zombie state, then the current thread will be set to ready + Some(ThreadState::Zombie) => { + self.set_thread_state(current_thread_id, ThreadState::Ready); + let mut zombie_thread = self + .zombie_threads + .remove(&tid) + .ok_or(VmError::ThreadNotFound(tid))?; + + let result = zombie_thread + .operand_stack + .pop() + .ok_or(VmError::OperandStackUnderflow)?; + + self.thread_states.remove(&tid); // Deallocate the zombie thread + self.current_thread.operand_stack.push(result); + Ok(self) + } + // Otherwise we will yield the current thread + _ => { + self.current_thread.pc -= 1; // Decrement the program counter to re-execute the join instruction + let rt = self.yield_current_thread(); + Ok(rt) + } } - - self } } @@ -237,7 +291,7 @@ mod tests { use super::*; use anyhow::{Ok, Result}; - use bytecode::{builtin, BinOp, ByteCode, FrameType, UnOp, Value}; + use bytecode::{builtin, BinOp, ByteCode, FrameType, Symbol, UnOp, Value}; #[test] fn test_pc() { @@ -437,8 +491,8 @@ mod tests { } #[test] - fn test_simple_concurrency() -> Result<()> { - let instrs = vec![ByteCode::SPAWN, ByteCode::DONE]; + fn test_concurrency_01() -> Result<()> { + let instrs = vec![ByteCode::SPAWN(1), ByteCode::DONE]; let mut rt = Runtime::new(instrs); rt.set_time_quantum(Duration::from_millis(u64::MAX)); // Set the time quantum to infinity @@ -454,53 +508,82 @@ mod tests { vec![Value::Int(MAIN_THREAD_ID + 1)] ); + Ok(()) + } + + #[test] + fn test_concurrency_02() -> Result<()> { + // fn simple(n) { + // return n; + // } + // + // spawn simple(123); + // join 2 let instrs = vec![ - ByteCode::SPAWN, // Parent operand stack will have child tid 2, child operand stack will have 0 - ByteCode::enterscope(vec!["tid", "child", "parent"]), // all threads enters a new scope - ByteCode::assign("tid"), // Parent thread assigns the child tid to the tid symbol (2), child tid is 0 - ByteCode::ld("tid"), - ByteCode::ldc(0), - ByteCode::BINOP(BinOp::Eq), // Check if the child operand stack has 0 - ByteCode::JOF(10), // Parent jumps since (child_tid == 0) == false - ByteCode::ldc("Child thread"), // Child thread loads the value onto its operand stack - ByteCode::assign("child"), // Child thread sets the value of the child in the environment - ByteCode::GOTO(13), // Child jump to the DONE instruction - ByteCode::ldc("Parent thread"), // Parent thread loads the value onto its operand stack - ByteCode::assign("parent"), // Parent thread sets the value of the parent in the environment + ByteCode::enterscope(vec!["simple"]), + ByteCode::ldf(3, vec!["n"]), + ByteCode::GOTO(5), // Jump past function body + ByteCode::ld("n"), + ByteCode::RESET(FrameType::CallFrame), + ByteCode::assign("simple"), + ByteCode::SPAWN(8), // Parent operand stack will have child tid 2, child operand stack will have + ByteCode::GOTO(13), // Parent jump past CALL and DONE + ByteCode::POP, + ByteCode::ld("simple"), + ByteCode::ldc(123), + ByteCode::CALL(1), + ByteCode::DONE, ByteCode::JOIN(MAIN_THREAD_ID + 1), // Parent thread joins the child thread ByteCode::DONE, ]; let rt = Runtime::new(instrs); - let rt = run(rt)?; + let mut rt = run(rt)?; + + println!("{:?}", rt.current_thread.operand_stack); assert_eq!( - rt.current_thread.env.borrow().get(&"parent".to_string()), - Some(Value::String("Parent thread".to_string())) - ); - assert_eq!( - rt.current_thread.env.borrow().get(&"child".to_string()), - Some(Value::Unitialized) // The parent thread environment should be unchanged + rt.current_thread.operand_stack.pop().unwrap(), + Value::Int(123) ); + Ok(()) + } + + #[test] + fn test_concurrency_03() -> Result<()> { + // let count = 0; + // fn infinite_increment() { + // loop { + // count = count + 1; + // } + // } + // spawn infinite_increment(); + // yield; + // // no join + + let empty_str_arr: Vec = vec![]; + let instrs = vec![ - ByteCode::enterscope(vec!["count"]), // Line 0 - ByteCode::ldc(0), // Line 1 - ByteCode::assign("count"), // Line 2 - ByteCode::SPAWN, // Line 3: Parent operand stack will have child tid 2, child operand stack will have 0 - ByteCode::enterscope(vec!["tid"]), // Line 4 - ByteCode::assign("tid"), // Line 5: Parent thread assigns the child tid to the tid symbol (2), child tid is 0 - ByteCode::ld("tid"), // Line 6 - ByteCode::ldc(0), // Line 7 - ByteCode::BINOP(BinOp::Eq), // Line 8: Check if the child operand stack has 0 - ByteCode::JOF(15), // Line 9: Parent jumps since (child_tid == 0) == false - ByteCode::ldc(1), // Line 10: Child thread loads the value onto its operand stack - ByteCode::ld("count"), // Line 11: Child thread loads the value of count onto the stack - ByteCode::BINOP(BinOp::Add), // Line 12: Child thread adds the value of count and 1 - ByteCode::assign("count"), // Line 13: Child thread sets the value of count in the environment - ByteCode::GOTO(10), // Line 14: Child loops infinitely (until time quantum expires) - ByteCode::YIELD, // Line 15: Parent thread yields - ByteCode::DONE, // Line 16 + ByteCode::enterscope(vec!["count", "infinite_increment"]), + ByteCode::ldc(0), + ByteCode::assign("count"), // Set count to 0 + ByteCode::ldf(6, empty_str_arr), + ByteCode::assign("infinite_increment"), // assign function + ByteCode::GOTO(11), // Jump past function body + ByteCode::ld("count"), // Start of function body + ByteCode::ldc(1), + ByteCode::BINOP(BinOp::Add), + ByteCode::assign("count"), + ByteCode::GOTO(6), // End of function body + ByteCode::SPAWN(13), // Parent operand stack will have child tid 2, child operand stack will have + ByteCode::GOTO(17), // Parent jump past CALL and DONE + ByteCode::POP, + ByteCode::ld("infinite_increment"), + ByteCode::CALL(0), + ByteCode::DONE, + ByteCode::YIELD, // Parent thread yields to child thread + ByteCode::DONE, ]; let mut rt = Runtime::new(instrs); @@ -516,7 +599,6 @@ mod tests { .try_into()?; assert!(final_count > 0); - Ok(()) } } diff --git a/vm/ignite/src/thread.rs b/vm/ignite/src/thread.rs index 5ea0bd9..692df2a 100644 --- a/vm/ignite/src/thread.rs +++ b/vm/ignite/src/thread.rs @@ -5,13 +5,13 @@ use bytecode::{Environment, StackFrame, Symbol, ThreadID, Value}; use crate::VmError; -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Debug, Default, Clone, Copy, PartialEq)] pub enum ThreadState { - Running, #[default] Ready, Joining(ThreadID), Yielded, + Zombie, Done, } @@ -20,7 +20,6 @@ pub enum ThreadState { pub struct Thread { /// The unique identifier of the thread. pub thread_id: ThreadID, - pub state: ThreadState, pub env: Rc>, pub operand_stack: Vec, pub runtime_stack: Vec, @@ -40,10 +39,14 @@ impl Thread { } impl Thread { - pub fn spawn_new(&self, thread_id: i64) -> Thread { - let mut new_thread = self.clone(); - new_thread.thread_id = thread_id; - new_thread + pub fn spawn_new(&self, thread_id: i64, pc: usize) -> Thread { + Thread { + thread_id, + env: Rc::clone(&self.env), + operand_stack: Vec::new(), + runtime_stack: Vec::new(), + pc, + } } }