mirror of
https://github.com/AmbiML/sparrow-kata-full.git
synced 2025-07-14 14:34:19 +00:00
MlCoord: Execution queue
This CL enables multiple outstanding periodic executions. To do so the MlCoordinator now includes an array of started models and a queue of models that are ready to be executed immediately. Additionally, each periodic model has an associated timer. When that timer fires the model is added to the execution queue. When a model finishes executing, the next model is popped off the queue and executed. If a model becomes ready when there's already an execution for it queued, that execution is dropped and a warning printed. A cancel command is added to remove periodic or outstanding executions. A state debug command was also added. Currently we can only load a single model due to limitations with the StorageManager, but we can do so multiple times. Tests: Single shot: https://paste.googleplex.com/6704629669691392 Two periodic execs: https://paste.googleplex.com/5288292800004096 Overloaded warning: https://paste.googleplex.com/4549962219126784 Debug State: KATA> state_mlcoord kata_ml_coordinator::Running model: fubar:mobilenet_v1_emitc_static.model kata_ml_coordinator::Loaded model: fubar:mobilenet_v1_emitc_static.model kata_ml_coordinator::Loadable Models: kata_ml_coordinator:: LoadableModel { bundle_id: "fubar", model_id: "mobilenet_v1_emitc_static.model", rate_in_ms: Some(2000) } kata_ml_coordinator:: LoadableModel { bundle_id: "fubar", model_id: "mobilenet_v1_emitc_static.model", rate_in_ms: Some(6000) } kata_ml_coordinator::Execution Queue: kata_ml_coordinator:: fubar:mobilenet_v1_emitc_static.model kata_ml_coordinator::Statistics: Statistics { load_failures: 0, already_queued: 0 } Change-Id: I7637c9c390eb6ffd9ae22088f37b98c056a441c2 GitOrigin-RevId: 18c0d3fe740a37381f7f1eddee8f2224f679fd61
This commit is contained in:
parent
20f1d1aa9d
commit
1662e80ef1
@ -44,7 +44,7 @@ DeclareCAmkESComponent(MemoryManager
|
||||
RustAddLibrary(
|
||||
kata_ml_coordinator
|
||||
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/components/MlCoordinator
|
||||
LIB_FILENAME libkata_ml_coordinator.a
|
||||
LIB_FILENAME libkata_ml_component.a
|
||||
)
|
||||
|
||||
DeclareCAmkESComponent(MlCoordinator
|
||||
|
@ -10,7 +10,7 @@ use cpio::CpioNewcReader;
|
||||
use kata_io as io;
|
||||
use kata_line_reader::LineReader;
|
||||
use kata_memory_interface::*;
|
||||
use kata_ml_interface::kata_mlcoord_execute;
|
||||
use kata_ml_interface::*;
|
||||
use kata_os_common::sel4_sys;
|
||||
use kata_os_common::slot_allocator;
|
||||
use kata_proc_interface::kata_pkg_mgmt_install;
|
||||
@ -153,14 +153,17 @@ fn dispatch_command(
|
||||
"test_alloc" => test_alloc_command(output),
|
||||
"test_alloc_error" => test_alloc_error_command(output),
|
||||
"test_bootinfo" => test_bootinfo_command(output),
|
||||
"test_mlcancel" => test_mlcancel_command(&mut args, output),
|
||||
"test_mlexecute" => test_mlexecute_command(&mut args, output),
|
||||
"test_mlcontinuous" => test_mlcontinuous_command(&mut args),
|
||||
"test_mlperiodic" => test_mlperiodic_command(&mut args, output),
|
||||
"test_obj_alloc" => test_obj_alloc_command(output),
|
||||
"test_panic" => test_panic_command(),
|
||||
"test_timer_async" => test_timer_async_command(&mut args, output),
|
||||
"test_timer_blocking" => test_timer_blocking_command(&mut args, output),
|
||||
"test_timer_completed" => test_timer_completed_command(output),
|
||||
|
||||
"state_mlcoord" => state_mlcoord_command(),
|
||||
|
||||
_ => Err(CommandError::UnknownCommand),
|
||||
};
|
||||
if let Err(e) = result {
|
||||
@ -634,30 +637,52 @@ fn test_panic_command() -> Result<(), CommandError> {
|
||||
panic!("testing");
|
||||
}
|
||||
|
||||
/// Implements a command that runs an ML execution.
|
||||
fn test_mlexecute_command(
|
||||
/// Implements a command that cancels an ML execution.
|
||||
fn test_mlcancel_command(
|
||||
args: &mut dyn Iterator<Item = &str>,
|
||||
_output: &mut dyn io::Write,
|
||||
output: &mut dyn io::Write,
|
||||
) -> Result<(), CommandError> {
|
||||
let bundle_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
let model_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
kata_mlcoord_execute(bundle_id, model_id)
|
||||
.map_err(|_| CommandError::IO)
|
||||
|
||||
if let Err(e) = kata_mlcoord_cancel(bundle_id, model_id) {
|
||||
writeln!(output, "Cancel {:?} {:?} err: {:?}", bundle_id, model_id, e)?;
|
||||
} else {
|
||||
writeln!(output, "Cancelled {:?} {:?}", bundle_id, model_id)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Implements a command that sets whether the ml execution is continuous.
|
||||
fn test_mlcontinuous_command(args: &mut dyn Iterator<Item = &str>) -> Result<(), CommandError> {
|
||||
extern "C" {
|
||||
fn mlcoord_set_continuous_mode(mode: bool);
|
||||
/// Implements a command that runs a oneshot ML execution.
|
||||
fn test_mlexecute_command(
|
||||
args: &mut dyn Iterator<Item = &str>,
|
||||
output: &mut dyn io::Write,
|
||||
) -> Result<(), CommandError> {
|
||||
let bundle_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
let model_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
|
||||
if let Err(e) = kata_mlcoord_oneshot(bundle_id, model_id) {
|
||||
writeln!(output, "Execute {:?} {:?} err: {:?}", bundle_id, model_id, e)?;
|
||||
}
|
||||
if let Some(mode_str) = args.next() {
|
||||
let mode = mode_str.parse::<bool>()?;
|
||||
unsafe {
|
||||
mlcoord_set_continuous_mode(mode);
|
||||
}
|
||||
return Ok(());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Implements a command that runs a periodic ML execution.
|
||||
fn test_mlperiodic_command(
|
||||
args: &mut dyn Iterator<Item = &str>,
|
||||
output: &mut dyn io::Write,
|
||||
) -> Result<(), CommandError> {
|
||||
let bundle_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
let model_id = args.next().ok_or(CommandError::BadArgs)?;
|
||||
let rate_str = args.next().ok_or(CommandError::BadArgs)?;
|
||||
let rate_in_ms = rate_str.parse::<u32>()?;
|
||||
|
||||
if let Err(e) = kata_mlcoord_periodic(bundle_id, model_id, rate_in_ms) {
|
||||
writeln!(output, "Periodic {:?} {:?} err: {:?}", bundle_id, model_id, e)?;
|
||||
}
|
||||
Err(CommandError::BadArgs)
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn test_obj_alloc_command(output: &mut dyn io::Write) -> Result<(), CommandError> {
|
||||
@ -808,3 +833,7 @@ fn test_timer_completed_command(
|
||||
) -> Result<(), CommandError> {
|
||||
return Ok(writeln!(output, "Timers completed: {:#032b}", timer_service_completed_timers())?);
|
||||
}
|
||||
|
||||
fn state_mlcoord_command() -> Result<(), CommandError> {
|
||||
return Ok(kata_mlcoord_debug_state());
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ cargo-features = ["edition2021"]
|
||||
[workspace]
|
||||
|
||||
members = [
|
||||
"kata-ml-component",
|
||||
"kata-ml-coordinator",
|
||||
"kata-ml-interface",
|
||||
"kata-vec-core"
|
||||
|
@ -3,8 +3,12 @@ import <MlCoordinatorInterface.camkes>;
|
||||
import <SecurityCoordinatorInterface.camkes>;
|
||||
|
||||
component MlCoordinator {
|
||||
control;
|
||||
|
||||
provides MlCoordinatorInterface mlcoord;
|
||||
|
||||
uses Timer timer;
|
||||
|
||||
consumes Interrupt host_req;
|
||||
consumes Interrupt finish;
|
||||
consumes Interrupt instruction_fault;
|
||||
|
@ -0,0 +1,22 @@
|
||||
cargo-features = ["edition2021"]
|
||||
|
||||
[package]
|
||||
name = "kata-ml-component"
|
||||
version = "0.1.0"
|
||||
authors = ["Adam Jesionowski <jesionowski@google.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
cstr_core = { version = "0.2.3", default-features = false }
|
||||
kata-os-common = { path = "../../kata-os-common" }
|
||||
kata-memory-interface = { path = "../../MemoryManager/kata-memory-interface" }
|
||||
kata-ml-coordinator = { path = "../kata-ml-coordinator" }
|
||||
kata-ml-interface = { path = "../kata-ml-interface" }
|
||||
kata-timer-interface = { path = "../../TimerService/kata-timer-interface" }
|
||||
log = "0.4"
|
||||
spin = "0.9"
|
||||
|
||||
[lib]
|
||||
name = "kata_ml_component"
|
||||
path = "src/run.rs"
|
||||
crate-type = ["staticlib"]
|
@ -0,0 +1,164 @@
|
||||
#![no_std]
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
use alloc::string::String;
|
||||
use cstr_core::CStr;
|
||||
use kata_ml_coordinator::MLCoordinator;
|
||||
use kata_ml_coordinator::ModelIdx;
|
||||
use kata_ml_interface::MlCoordError;
|
||||
use kata_os_common::allocator;
|
||||
use kata_os_common::logger::KataLogger;
|
||||
use kata_os_common::sel4_sys;
|
||||
use kata_os_common::slot_allocator::KATA_CSPACE_SLOTS;
|
||||
use kata_timer_interface::*;
|
||||
use log::{error, trace};
|
||||
use sel4_sys::seL4_CPtr;
|
||||
use spin::Mutex;
|
||||
|
||||
static mut ML_COORD: Mutex<MLCoordinator> = Mutex::new(MLCoordinator::new());
|
||||
|
||||
extern "C" {
|
||||
static SELF_CNODE_FIRST_SLOT: seL4_CPtr;
|
||||
static SELF_CNODE_LAST_SLOT: seL4_CPtr;
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn pre_init() {
|
||||
static KATA_LOGGER: KataLogger = KataLogger;
|
||||
log::set_logger(&KATA_LOGGER).unwrap();
|
||||
log::set_max_level(log::LevelFilter::Trace);
|
||||
|
||||
// TODO(sleffler): temp until we integrate with seL4
|
||||
static mut HEAP_MEMORY: [u8; 4 * 1024] = [0; 4 * 1024];
|
||||
allocator::ALLOCATOR.init(HEAP_MEMORY.as_mut_ptr() as usize, HEAP_MEMORY.len());
|
||||
trace!(
|
||||
"setup heap: start_addr {:p} size {}",
|
||||
HEAP_MEMORY.as_ptr(),
|
||||
HEAP_MEMORY.len()
|
||||
);
|
||||
|
||||
KATA_CSPACE_SLOTS.init(
|
||||
/*first_slot=*/ SELF_CNODE_FIRST_SLOT,
|
||||
/*size=*/ SELF_CNODE_LAST_SLOT - SELF_CNODE_FIRST_SLOT,
|
||||
);
|
||||
trace!(
|
||||
"setup cspace slots: first slot {} free {}",
|
||||
KATA_CSPACE_SLOTS.base_slot(),
|
||||
KATA_CSPACE_SLOTS.free_slots()
|
||||
);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord__init() {
|
||||
ML_COORD.lock().init();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn run() {
|
||||
loop {
|
||||
timer_service_wait();
|
||||
let completed = timer_service_completed_timers();
|
||||
|
||||
for i in 0..31 {
|
||||
let idx: u32 = 1 << i;
|
||||
if completed & idx != 0 {
|
||||
unsafe {
|
||||
if let Err(e) = ML_COORD.lock().timer_completed(i as ModelIdx) {
|
||||
error!("Error when trying to run periodic model: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn validate_ids(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) -> Result<(String, String), MlCoordError> {
|
||||
let bundle_id = CStr::from_ptr(c_bundle_id)
|
||||
.to_str()
|
||||
.map_err(|_| MlCoordError::InvalidBundleId)?;
|
||||
let model_id = CStr::from_ptr(c_model_id)
|
||||
.to_str()
|
||||
.map_err(|_| MlCoordError::InvalidModelId)?;
|
||||
Ok((String::from(bundle_id), String::from(model_id)))
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_oneshot(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) -> MlCoordError {
|
||||
let (bundle_id, model_id) = match validate_ids(c_bundle_id, c_model_id) {
|
||||
Ok(ids) => ids,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
if let Err(e) = ML_COORD.lock().oneshot(bundle_id, model_id) {
|
||||
return e;
|
||||
}
|
||||
|
||||
MlCoordError::MlCoordOk
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_periodic(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
rate_in_ms: u32,
|
||||
) -> MlCoordError {
|
||||
let (bundle_id, model_id) = match validate_ids(c_bundle_id, c_model_id) {
|
||||
Ok(ids) => ids,
|
||||
Err(e) => return e,
|
||||
};
|
||||
if let Err(e) = ML_COORD.lock().periodic(bundle_id, model_id, rate_in_ms) {
|
||||
return e;
|
||||
}
|
||||
|
||||
MlCoordError::MlCoordOk
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_cancel(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) -> MlCoordError {
|
||||
let (bundle_id, model_id) = match validate_ids(c_bundle_id, c_model_id) {
|
||||
Ok(ids) => ids,
|
||||
Err(e) => return e,
|
||||
};
|
||||
|
||||
if let Err(e) = ML_COORD.lock().cancel(bundle_id, model_id) {
|
||||
return e;
|
||||
}
|
||||
|
||||
MlCoordError::MlCoordOk
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn host_req_handle() {
|
||||
ML_COORD.lock().handle_host_req_interrupt();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn finish_handle() {
|
||||
ML_COORD.lock().handle_return_interrupt();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn instruction_fault_handle() {
|
||||
ML_COORD.lock().handle_instruction_fault_interrupt();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn data_fault_handle() {
|
||||
ML_COORD.lock().handle_data_fault_interrupt();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_debug_state() {
|
||||
ML_COORD.lock().debug_state();
|
||||
}
|
@ -12,10 +12,7 @@ kata-os-common = { path = "../../kata-os-common" }
|
||||
kata-memory-interface = { path = "../../MemoryManager/kata-memory-interface" }
|
||||
kata-ml-interface = { path = "../kata-ml-interface" }
|
||||
kata-security-interface = { path = "../../SecurityCoordinator/kata-security-interface" }
|
||||
kata-timer-interface = { path = "../../TimerService/kata-timer-interface" }
|
||||
kata-vec-core = { path = "../kata-vec-core" }
|
||||
log = "0.4"
|
||||
|
||||
[lib]
|
||||
name = "kata_ml_coordinator"
|
||||
path = "src/run.rs"
|
||||
crate-type = ["staticlib"]
|
||||
spin = "0.9"
|
||||
|
@ -0,0 +1,359 @@
|
||||
#![no_std]
|
||||
|
||||
// ML Coordinator Design Doc: go/sparrow-ml-doc
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
use alloc::string::String;
|
||||
use alloc::vec::Vec;
|
||||
use kata_ml_interface::MlCoordError;
|
||||
use kata_ml_interface::MlCoreInterface;
|
||||
use kata_os_common::cspace_slot::CSpaceSlot;
|
||||
use kata_security_interface::*;
|
||||
use kata_timer_interface::*;
|
||||
use kata_vec_core::MlCore;
|
||||
use log::{error, info, trace, warn};
|
||||
|
||||
/// The maximum number of models that the MLCoordinator can handle, bounded by
|
||||
/// timer slots. It's unlikely we'll be anywhere near this.
|
||||
const MAX_MODELS: usize = 32;
|
||||
|
||||
/// Represents a single loadable model.
|
||||
#[derive(Debug)]
|
||||
struct LoadableModel {
|
||||
bundle_id: String,
|
||||
model_id: String,
|
||||
rate_in_ms: Option<u32>,
|
||||
}
|
||||
|
||||
/// Statistics on non-happy-path events.
|
||||
#[derive(Debug)]
|
||||
struct Statistics {
|
||||
load_failures: u32,
|
||||
already_queued: u32,
|
||||
}
|
||||
|
||||
pub struct MLCoordinator {
|
||||
/// The currently running model index, if any.
|
||||
running_model: Option<ModelIdx>,
|
||||
/// The currently loaded model.
|
||||
// NB: This will be removed once the WMMU allows for multiple models loaded
|
||||
loaded_model: Option<ModelIdx>,
|
||||
/// A list of all models that have been requested for oneshot or periodic
|
||||
/// execution.
|
||||
models: [Option<LoadableModel>; MAX_MODELS],
|
||||
/// A queue of models that are ready for immediate execution on the vector
|
||||
/// core, once the currently running model has finished.
|
||||
execution_queue: Vec<ModelIdx>,
|
||||
statistics: Statistics,
|
||||
ml_core: MlCore,
|
||||
}
|
||||
|
||||
// The index of a model in MLCoordinator.models
|
||||
pub type ModelIdx = usize;
|
||||
|
||||
// NB: Can't use `None` as it's Option<T>, need to clarify its Option<Model>
|
||||
const INIT_NONE: Option<LoadableModel> = None;
|
||||
|
||||
impl MLCoordinator {
|
||||
pub const fn new() -> Self {
|
||||
MLCoordinator {
|
||||
running_model: None,
|
||||
loaded_model: None,
|
||||
models: [INIT_NONE; MAX_MODELS],
|
||||
execution_queue: Vec::new(),
|
||||
statistics: Statistics{load_failures: 0, already_queued: 0},
|
||||
ml_core: MlCore {},
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the vector core.
|
||||
pub fn init(&mut self) {
|
||||
self.ml_core.enable_interrupts(true);
|
||||
self.execution_queue.reserve(MAX_MODELS);
|
||||
}
|
||||
|
||||
/// Load a model by copying it into the Vector Core's TCM, if it's not
|
||||
/// already been loaded.
|
||||
fn load_model(&mut self, model_idx: ModelIdx) -> Result<(), MlCoordError> {
|
||||
if self.loaded_model == Some(model_idx) {
|
||||
trace!("Model already loaded, skipping load");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Ensure we have a model at the passed index. This shouldn't error.
|
||||
let model = self.models[model_idx]
|
||||
.as_ref()
|
||||
.ok_or(MlCoordError::LoadModelFailed)?;
|
||||
|
||||
// Loads |model_id| associated with |bundle_id| from the
|
||||
// SecurityCoordinator. The data are returned as unmapped
|
||||
// page frames in a CNode container left in |container_slot|.
|
||||
// To load the model into the vector core the pages must be
|
||||
// mapped into the MlCoordinator's VSpace before being copied
|
||||
// to their destination.
|
||||
let container_slot = CSpaceSlot::new();
|
||||
match kata_security_load_model(&model.bundle_id, &model.model_id, &container_slot) {
|
||||
Ok(model_frames) => {
|
||||
match self.ml_core.load_image(&model_frames) {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Load of {}:{} failed: {:?}",
|
||||
&model.bundle_id, &model.model_id, e
|
||||
);
|
||||
// May have corrupted TCM.
|
||||
self.loaded_model = None;
|
||||
self.statistics.load_failures += 1;
|
||||
Err(MlCoordError::LoadModelFailed)
|
||||
}
|
||||
Ok(sections) => {
|
||||
info!("Load successful.");
|
||||
self.ml_core.set_wmmu(§ions);
|
||||
self.loaded_model = Some(model_idx);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"LoadModel of bundle {}:{} failed: {:?}",
|
||||
&model.bundle_id, &model.model_id, e
|
||||
);
|
||||
self.statistics.load_failures += 1;
|
||||
Err(MlCoordError::LoadModelFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If there is a next model in the queue, load it onto the core and start
|
||||
/// running. If there's already a running model, don't do anything.
|
||||
fn schedule_next_model(&mut self) -> Result<(), MlCoordError> {
|
||||
if !self.running_model.is_some() && !self.execution_queue.is_empty() {
|
||||
let next_idx = self.execution_queue.remove(0);
|
||||
// If load model fails we won't try and re-queue this model.
|
||||
// It's very unlikely for load errors to be transient, it should
|
||||
// only happen in the case of a mal-formed model.
|
||||
self.load_model(next_idx)?;
|
||||
self.ml_core.run(); // Unhalt, start at default PC.
|
||||
self.running_model = Some(next_idx);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_return_interrupt(&mut self) {
|
||||
extern "C" {
|
||||
fn finish_acknowledge() -> u32;
|
||||
}
|
||||
|
||||
// TODO(jesionowski): Move the result from TCM to SRAM,
|
||||
// update the input/model.
|
||||
let return_code = MlCore::get_return_code();
|
||||
let fault = MlCore::get_fault_register();
|
||||
|
||||
// TODO(jesionowski): Signal the application that there was a failure.
|
||||
if return_code != 0 {
|
||||
error!(
|
||||
"vctop execution failed with code {}, fault pc: {:#010X}",
|
||||
return_code, fault
|
||||
);
|
||||
}
|
||||
|
||||
self.running_model = None;
|
||||
|
||||
// TODO(jesionowski): Signal the application that owns this model
|
||||
// that there was a failure.
|
||||
if let Err(e) = self.schedule_next_model() {
|
||||
error!("Running next model failed with {:?}", e)
|
||||
}
|
||||
|
||||
MlCore::clear_finish();
|
||||
assert!(unsafe { finish_acknowledge() == 0 });
|
||||
}
|
||||
|
||||
// Constructs a new model and add to an open slot, returning the index
|
||||
// of that slot.
|
||||
fn ready_model(
|
||||
&mut self,
|
||||
bundle_id: String,
|
||||
model_id: String,
|
||||
rate_in_ms: Option<u32>,
|
||||
) -> Result<ModelIdx, MlCoordError> {
|
||||
// Return None if all slots are full.
|
||||
let index = self
|
||||
.models
|
||||
.iter()
|
||||
.position(|m| m.is_none())
|
||||
.ok_or(MlCoordError::NoModelSlotsLeft)?;
|
||||
|
||||
self.models[index] = Some(LoadableModel {
|
||||
bundle_id,
|
||||
model_id,
|
||||
rate_in_ms,
|
||||
});
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Start a one-time model execution, to be executed immediately.
|
||||
pub fn oneshot(&mut self, bundle_id: String, model_id: String) -> Result<(), MlCoordError> {
|
||||
let model_idx = self.ready_model(bundle_id, model_id, None)?;
|
||||
|
||||
self.execution_queue.push(model_idx);
|
||||
self.schedule_next_model()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start a periodic model execution, to be executed immediately and
|
||||
/// then every rate_in_ms.
|
||||
pub fn periodic(
|
||||
&mut self,
|
||||
bundle_id: String,
|
||||
model_id: String,
|
||||
rate_in_ms: u32,
|
||||
) -> Result<(), MlCoordError> {
|
||||
let model_idx = self.ready_model(bundle_id, model_id, Some(rate_in_ms))?;
|
||||
|
||||
self.execution_queue.push(model_idx);
|
||||
self.schedule_next_model()?;
|
||||
|
||||
timer_service_periodic(model_idx as u32, rate_in_ms);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Cancels an outstanding execution.
|
||||
pub fn cancel(&mut self, bundle_id: String, model_id: String) -> Result<(), MlCoordError> {
|
||||
// Find the model index matching the bundle/model id.
|
||||
let model_idx = self
|
||||
.models
|
||||
.iter()
|
||||
.position(|optm| {
|
||||
if let Some(m) = optm {
|
||||
m.bundle_id == bundle_id && m.model_id == model_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.ok_or(MlCoordError::NoSuchModel)?;
|
||||
|
||||
// If the model is periodic, cancel the timer.
|
||||
if self.models[model_idx]
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.rate_in_ms
|
||||
.is_some()
|
||||
{
|
||||
timer_service_cancel(model_idx as u32);
|
||||
}
|
||||
|
||||
// If the model is scheduled to be executed, remove it.
|
||||
let execution_idx = self
|
||||
.execution_queue
|
||||
.iter()
|
||||
.position(|idx| *idx == model_idx);
|
||||
if let Some(idx) = execution_idx {
|
||||
self.execution_queue.remove(idx);
|
||||
}
|
||||
|
||||
self.models[model_idx] = None;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueues the model associated with the completed timer.
|
||||
pub fn timer_completed(&mut self, model_idx: ModelIdx) -> Result<(), MlCoordError> {
|
||||
// There's a small chance the model was removed at the same time the
|
||||
// timer interrupt fires, in which case we just ignore it.
|
||||
if self.models[model_idx].is_some() {
|
||||
// We don't want the queue to grow unbounded, so don't requeue
|
||||
// an execution if there's one scheduled already.
|
||||
if self.execution_queue.iter().any(|idx| *idx == model_idx) {
|
||||
let model = self.models[model_idx].as_ref().unwrap();
|
||||
warn!(
|
||||
"Dropping {}:{} periodic execution as it has an execution outstanding already.",
|
||||
&model.bundle_id, &model.model_id
|
||||
);
|
||||
self.statistics.already_queued += 1;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.execution_queue.push(model_idx);
|
||||
self.schedule_next_model()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_host_req_interrupt(&self) {
|
||||
extern "C" {
|
||||
fn host_req_acknowledge() -> u32;
|
||||
}
|
||||
MlCore::clear_host_req();
|
||||
unsafe {
|
||||
assert!(host_req_acknowledge() == 0);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_instruction_fault_interrupt(&self) {
|
||||
extern "C" {
|
||||
fn instruction_fault_acknowledge() -> u32;
|
||||
}
|
||||
error!("Instruction fault in Vector Core.");
|
||||
MlCore::clear_instruction_fault();
|
||||
unsafe {
|
||||
assert!(instruction_fault_acknowledge() == 0);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_data_fault_interrupt(&self) {
|
||||
extern "C" {
|
||||
fn data_fault_acknowledge() -> u32;
|
||||
}
|
||||
error!("Data fault in Vector Core.");
|
||||
MlCore::clear_data_fault();
|
||||
unsafe {
|
||||
assert!(data_fault_acknowledge() == 0);
|
||||
}
|
||||
}
|
||||
|
||||
fn ids_at(&self, idx: ModelIdx) -> (&str, &str) {
|
||||
match self.models[idx].as_ref() {
|
||||
Some(model) => (&model.bundle_id, &model.model_id),
|
||||
None => ("None", "None"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn debug_state(&self) {
|
||||
match self.running_model {
|
||||
Some(idx) => {
|
||||
let (bundle, model) = self.ids_at(idx);
|
||||
info!("Running model: {}:{}", bundle, model);
|
||||
}
|
||||
None => info!("No running model.")
|
||||
}
|
||||
|
||||
match self.loaded_model {
|
||||
Some(idx) => {
|
||||
let (bundle, model) = self.ids_at(idx);
|
||||
info!("Loaded model: {}:{}", bundle, model);
|
||||
}
|
||||
None => info!("No loaded model.")
|
||||
}
|
||||
|
||||
info!("Loadable Models:");
|
||||
for model in self.models.as_ref() {
|
||||
if let Some(m) = model {
|
||||
info!(" {:?}", m);
|
||||
}
|
||||
}
|
||||
|
||||
info!("Execution Queue:");
|
||||
for idx in &self.execution_queue {
|
||||
let (bundle, model) = self.ids_at(*idx);
|
||||
info!(" {}:{}", bundle, model);
|
||||
}
|
||||
|
||||
info!("Statistics: {:?}", self.statistics);
|
||||
}
|
||||
}
|
@ -1,225 +0,0 @@
|
||||
#![no_std]
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
|
||||
// ML Coordinator Design Doc: go/sparrow-ml-doc
|
||||
|
||||
extern crate alloc;
|
||||
|
||||
use alloc::string::String;
|
||||
use cstr_core::CStr;
|
||||
use kata_ml_interface::MlCoordinatorInterface;
|
||||
use kata_ml_interface::MlCoreInterface;
|
||||
use kata_os_common::allocator;
|
||||
use kata_os_common::cspace_slot::CSpaceSlot;
|
||||
use kata_os_common::logger::KataLogger;
|
||||
use kata_os_common::sel4_sys;
|
||||
use kata_os_common::slot_allocator::KATA_CSPACE_SLOTS;
|
||||
use kata_security_interface::*;
|
||||
use kata_vec_core::MlCore;
|
||||
use log::{error, info, trace};
|
||||
|
||||
use sel4_sys::seL4_CPtr;
|
||||
|
||||
extern "C" {
|
||||
static SELF_CNODE_FIRST_SLOT: seL4_CPtr;
|
||||
static SELF_CNODE_LAST_SLOT: seL4_CPtr;
|
||||
}
|
||||
|
||||
pub struct MLCoordinator {
|
||||
loaded_bundle: Option<String>,
|
||||
loaded_model: Option<String>,
|
||||
is_running: bool,
|
||||
continous_mode: bool,
|
||||
ml_core: MlCore,
|
||||
}
|
||||
|
||||
pub static mut ML_COORD: MLCoordinator = MLCoordinator {
|
||||
loaded_bundle: None,
|
||||
loaded_model: None,
|
||||
is_running: false,
|
||||
continous_mode: false,
|
||||
ml_core: MlCore {},
|
||||
};
|
||||
|
||||
impl MLCoordinator {
|
||||
fn init(&mut self) {
|
||||
self.ml_core.enable_interrupts(true);
|
||||
}
|
||||
|
||||
fn is_loaded(&self) -> bool {
|
||||
self.loaded_bundle.is_some() && self.loaded_model.is_some()
|
||||
}
|
||||
|
||||
fn cmp_loaded(&self, bundle_id: &str, model_id: &str) -> bool {
|
||||
self.loaded_bundle.as_deref() == Some(bundle_id) &&
|
||||
self.loaded_model.as_deref() == Some(model_id)
|
||||
}
|
||||
|
||||
fn handle_return_interrupt(&mut self) {
|
||||
extern "C" {
|
||||
fn finish_acknowledge() -> u32;
|
||||
}
|
||||
|
||||
// TODO(jesionowski): Move the result from TCM to SRAM,
|
||||
// update the input/model.
|
||||
let return_code = MlCore::get_return_code();
|
||||
let fault = MlCore::get_fault_register();
|
||||
|
||||
if return_code != 0 {
|
||||
error!(
|
||||
"{}: vctop execution failed with code {}, fault pc: {:#010X}",
|
||||
self.loaded_model.as_ref().unwrap(), return_code, fault
|
||||
);
|
||||
self.continous_mode = false;
|
||||
}
|
||||
|
||||
self.is_running = false;
|
||||
if self.continous_mode {
|
||||
// TODO(sleffler): can !is_loaded happen?
|
||||
// XXX needs proper state machine
|
||||
// XXX what is the threading/locking model?
|
||||
if self.is_loaded() {
|
||||
self.ml_core.run(); // Unhalt, start at default PC.
|
||||
self.is_running = true;
|
||||
}
|
||||
}
|
||||
|
||||
MlCore::clear_finish();
|
||||
assert!(unsafe { finish_acknowledge() == 0 });
|
||||
}
|
||||
}
|
||||
|
||||
impl MlCoordinatorInterface for MLCoordinator {
|
||||
fn execute(&mut self, bundle_id: &str, model_id: &str) {
|
||||
if self.is_running {
|
||||
trace!("Skip execute with {}:{} already running",
|
||||
self.loaded_bundle.as_ref().unwrap(),
|
||||
self.loaded_model.as_ref().unwrap());
|
||||
return;
|
||||
}
|
||||
|
||||
if !self.cmp_loaded(bundle_id, model_id) {
|
||||
// Loads |model_id| associated with |bundle_id| from the
|
||||
// SecurityCoordinator. The data are returned as unmapped
|
||||
// page frames in a CNode container left in |container_slot|.
|
||||
// To load the model into the vector core the pages must be
|
||||
// mapped into the MlCoordinator's VSpace before being copied
|
||||
// to their destination.
|
||||
let container_slot = CSpaceSlot::new();
|
||||
match kata_security_load_model(bundle_id, model_id, &container_slot) {
|
||||
Ok(model_frames) => {
|
||||
let res = self.ml_core.load_image(&model_frames);
|
||||
if let Err(e) = res {
|
||||
error!("Load of {}:{} failed: {:?}",
|
||||
bundle_id, model_id, e);
|
||||
// NB: may have corrupted TCM, clear loaded state
|
||||
self.loaded_bundle = None;
|
||||
self.loaded_model = None;
|
||||
} else {
|
||||
info!("Load successful.");
|
||||
self.ml_core.set_wmmu(&res.unwrap());
|
||||
self.loaded_bundle = Some(String::from(bundle_id));
|
||||
self.loaded_model = Some(String::from(model_id));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("LoadModel of bundle {} model {} failed: {:?}",
|
||||
bundle_id, model_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.is_loaded() {
|
||||
self.ml_core.run(); // Unhalt, start at default PC.
|
||||
self.is_running = true;
|
||||
}
|
||||
}
|
||||
|
||||
fn set_continuous_mode(&mut self, continous: bool) {
|
||||
self.continous_mode = continous;
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn pre_init() {
|
||||
static KATA_LOGGER: KataLogger = KataLogger;
|
||||
log::set_logger(&KATA_LOGGER).unwrap();
|
||||
log::set_max_level(log::LevelFilter::Trace);
|
||||
|
||||
// TODO(sleffler): temp until we integrate with seL4
|
||||
static mut HEAP_MEMORY: [u8; 4 * 1024] = [0; 4 * 1024];
|
||||
allocator::ALLOCATOR.init(HEAP_MEMORY.as_mut_ptr() as usize, HEAP_MEMORY.len());
|
||||
trace!(
|
||||
"setup heap: start_addr {:p} size {}",
|
||||
HEAP_MEMORY.as_ptr(),
|
||||
HEAP_MEMORY.len()
|
||||
);
|
||||
|
||||
KATA_CSPACE_SLOTS.init(
|
||||
/*first_slot=*/ SELF_CNODE_FIRST_SLOT,
|
||||
/*size=*/ SELF_CNODE_LAST_SLOT - SELF_CNODE_FIRST_SLOT
|
||||
);
|
||||
trace!("setup cspace slots: first slot {} free {}",
|
||||
KATA_CSPACE_SLOTS.base_slot(),
|
||||
KATA_CSPACE_SLOTS.free_slots());
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord__init() {
|
||||
ML_COORD.init();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_execute(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) {
|
||||
match CStr::from_ptr(c_bundle_id).to_str() {
|
||||
Ok(bundle_id) => match CStr::from_ptr(c_model_id).to_str() {
|
||||
Ok(model_id) => {
|
||||
ML_COORD.execute(bundle_id, model_id)
|
||||
}
|
||||
_ => error!("Invalid model_id"),
|
||||
}
|
||||
_ => error!("Invalid bundle_id"),
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn mlcoord_set_continuous_mode(mode: bool) {
|
||||
ML_COORD.set_continuous_mode(mode);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn host_req_handle() {
|
||||
extern "C" {
|
||||
fn host_req_acknowledge() -> u32;
|
||||
}
|
||||
MlCore::clear_host_req();
|
||||
assert!(host_req_acknowledge() == 0);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn finish_handle() {
|
||||
ML_COORD.handle_return_interrupt();
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn instruction_fault_handle() {
|
||||
extern "C" {
|
||||
fn instruction_fault_acknowledge() -> u32;
|
||||
}
|
||||
error!("Instruction fault in Vector Core.");
|
||||
MlCore::clear_instruction_fault();
|
||||
assert!(instruction_fault_acknowledge() == 0);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn data_fault_handle() {
|
||||
extern "C" {
|
||||
fn data_fault_acknowledge() -> u32;
|
||||
}
|
||||
error!("Data fault in Vector Core.");
|
||||
MlCore::clear_data_fault();
|
||||
assert!(data_fault_acknowledge() == 0);
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
INTERFACES=../../../interfaces
|
||||
|
||||
${INTERFACES}/MlCoordBindings.h: src/lib.rs
|
||||
cbindgen -c cbindgen.toml $? -o $@
|
@ -0,0 +1,7 @@
|
||||
language = "C"
|
||||
include_guard = "__ML_COORDINATOR_BINDINGS_H__"
|
||||
autogen_warning = "/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */"
|
||||
no_includes = true
|
||||
|
||||
[export]
|
||||
include = ["MlCoordError"]
|
@ -1,5 +1,5 @@
|
||||
#![no_std]
|
||||
|
||||
#[allow(dead_code)]
|
||||
use cstr_core::CString;
|
||||
use kata_memory_interface::ObjDescBundle;
|
||||
|
||||
@ -19,11 +19,29 @@ pub struct ModelSections {
|
||||
pub data: Window,
|
||||
}
|
||||
|
||||
pub trait MlCoordinatorInterface {
|
||||
fn execute(&mut self, bundle_id: &str, model_id: &str);
|
||||
fn set_continuous_mode(&mut self, mode: bool);
|
||||
/// Errors that can occur when interacting with the MlCoordinator.
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub enum MlCoordError {
|
||||
MlCoordOk,
|
||||
InvalidModelId,
|
||||
InvalidBundleId,
|
||||
LoadModelFailed,
|
||||
NoModelSlotsLeft,
|
||||
NoSuchModel,
|
||||
}
|
||||
|
||||
impl From<MlCoordError> for Result<(), MlCoordError> {
|
||||
fn from(err: MlCoordError) -> Result<(), MlCoordError> {
|
||||
if err == MlCoordError::MlCoordOk {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstraction layer over the hardware vector core.
|
||||
pub trait MlCoreInterface {
|
||||
fn set_wmmu(&mut self, sections: &ModelSections);
|
||||
fn enable_interrupts(&mut self, enabled: bool);
|
||||
@ -38,19 +56,57 @@ pub trait MlCoreInterface {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(dead_code)]
|
||||
pub fn kata_mlcoord_execute(bundle_id: &str, model_id: &str)
|
||||
-> Result<(),cstr_core:: NulError>
|
||||
{
|
||||
pub fn kata_mlcoord_oneshot(bundle_id: &str, model_id: &str) -> Result<(), MlCoordError> {
|
||||
extern "C" {
|
||||
// NB: this assumes the MlCoordinator component is named "mlcoord".
|
||||
fn mlcoord_execute(
|
||||
fn mlcoord_oneshot(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char
|
||||
);
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) -> MlCoordError;
|
||||
}
|
||||
let bundle_id_cstr = CString::new(bundle_id)?;
|
||||
let model_id_cstr = CString::new(model_id)?;
|
||||
unsafe { mlcoord_execute(bundle_id_cstr.as_ptr(), model_id_cstr.as_ptr()) };
|
||||
Ok(())
|
||||
let bundle_id_cstr = CString::new(bundle_id).map_err(|_| MlCoordError::InvalidBundleId)?;
|
||||
let model_id_cstr = CString::new(model_id).map_err(|_| MlCoordError::InvalidModelId)?;
|
||||
|
||||
unsafe { mlcoord_oneshot(bundle_id_cstr.as_ptr(), model_id_cstr.as_ptr()) }.into()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn kata_mlcoord_periodic(
|
||||
bundle_id: &str,
|
||||
model_id: &str,
|
||||
rate_in_ms: u32,
|
||||
) -> Result<(), MlCoordError> {
|
||||
extern "C" {
|
||||
fn mlcoord_periodic(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
rate_in_ms: u32,
|
||||
) -> MlCoordError;
|
||||
}
|
||||
let bundle_id_cstr = CString::new(bundle_id).map_err(|_| MlCoordError::InvalidBundleId)?;
|
||||
let model_id_cstr = CString::new(model_id).map_err(|_| MlCoordError::InvalidModelId)?;
|
||||
|
||||
unsafe { mlcoord_periodic(bundle_id_cstr.as_ptr(), model_id_cstr.as_ptr(), rate_in_ms) }.into()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn kata_mlcoord_cancel(bundle_id: &str, model_id: &str) -> Result<(), MlCoordError> {
|
||||
extern "C" {
|
||||
fn mlcoord_cancel(
|
||||
c_bundle_id: *const cstr_core::c_char,
|
||||
c_model_id: *const cstr_core::c_char,
|
||||
) -> MlCoordError;
|
||||
}
|
||||
let bundle_id_cstr = CString::new(bundle_id).map_err(|_| MlCoordError::InvalidBundleId)?;
|
||||
let model_id_cstr = CString::new(model_id).map_err(|_| MlCoordError::InvalidModelId)?;
|
||||
|
||||
unsafe { mlcoord_cancel(bundle_id_cstr.as_ptr(), model_id_cstr.as_ptr()) }.into()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn kata_mlcoord_debug_state() {
|
||||
extern "C" {
|
||||
fn mlcoord_debug_state();
|
||||
}
|
||||
unsafe { mlcoord_debug_state() };
|
||||
}
|
||||
|
15
apps/system/interfaces/MlCoordBindings.h
Normal file
15
apps/system/interfaces/MlCoordBindings.h
Normal file
@ -0,0 +1,15 @@
|
||||
#ifndef __ML_COORDINATOR_BINDINGS_H__
|
||||
#define __ML_COORDINATOR_BINDINGS_H__
|
||||
|
||||
/* Warning, this file is autogenerated by cbindgen. Don't modify this manually. */
|
||||
|
||||
typedef enum MlCoordError {
|
||||
MlCoordOk,
|
||||
InvalidModelId,
|
||||
InvalidBundleId,
|
||||
LoadModelFailed,
|
||||
NoModelSlotsLeft,
|
||||
NoSuchModel,
|
||||
} MlCoordError;
|
||||
|
||||
#endif /* __ML_COORDINATOR_BINDINGS_H__ */
|
@ -1,4 +1,9 @@
|
||||
procedure MlCoordinatorInterface {
|
||||
void execute(in string bundle_id, in string model_id);
|
||||
void set_continuous_mode(bool mode);
|
||||
include <MlCoordBindings.h>;
|
||||
|
||||
MlCoordError oneshot(in string bundle_id, in string model_id);
|
||||
MlCoordError periodic(in string bundle_id, in string model_id, in uint32_t rate_in_ms);
|
||||
MlCoordError cancel(in string bundle_id, in string model_id);
|
||||
|
||||
void debug_state();
|
||||
};
|
||||
|
@ -108,6 +108,7 @@ assembly {
|
||||
connection seL4HardwareInterrupt timer_interrupt(from timer.timer_interrupt,
|
||||
to timer_service.timer_interrupt);
|
||||
connection seL4RPCCallSignal timer_rpc(from debug_console.timer,
|
||||
from ml_coordinator.timer,
|
||||
to timer_service.timer);
|
||||
|
||||
// Hookup ProcessManager to DebugConsole for shell commands.
|
||||
|
Loading…
Reference in New Issue
Block a user