Skip to content

Commit

Permalink
Creating persistent single workload socket for EdgeAgent (#6435)
Browse files Browse the repository at this point in the history
* Create persistent workload socket for edgeAgent
  • Loading branch information
bilalsellak authored Jun 28, 2022
1 parent d89dfe5 commit 35f8781
Show file tree
Hide file tree
Showing 6 changed files with 877 additions and 12 deletions.
3 changes: 3 additions & 0 deletions edgelet/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions edgelet/iotedged/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ publish = false
edition = "2018"

[dependencies]
tempfile = "3"
serde_derive = "1.0"
base64 = "0.9"
clap = "2.31"
chrono = "0.4"
env_logger = "0.9"
failure = "0.1"
config = { version = "0.9", default-features = false, features = ["yaml", "json"] }
futures = "0.1"
hyper = "0.12.17"
hyper-tls = { version = "0.3", optional = true }
Expand All @@ -32,6 +35,7 @@ edgelet-docker = { path = "../edgelet-docker" }
edgelet-hsm = { path = "../edgelet-hsm" }
edgelet-http = { path = "../edgelet-http" }
edgelet-http-external-provisioning = { path = "../edgelet-http-external-provisioning" }
edgelet-test-utils = { path = "../edgelet-test-utils" }
edgelet-http-mgmt = { path = "../edgelet-http-mgmt" }
edgelet-http-workload = { path = "../edgelet-http-workload" }
edgelet-iothub = { path = "../edgelet-iothub" }
Expand Down
50 changes: 38 additions & 12 deletions edgelet/iotedged/src/workload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::error::{Error, ErrorKind, InitializeErrorReason};

const SOCKET_DEFAULT_PERMISSION: u32 = 0o666;
const MAX_CONCURRENCY: ConcurrencyThrottling = ConcurrencyThrottling::Limited(10);
const EDGEAGENT: &str = "edgeAgent";

pub struct WorkloadManager<K, C, W, M>
where
Expand Down Expand Up @@ -142,14 +143,34 @@ where
// "stop"
// There is still a chance that 2 concurrent servers are launch with concurrence,
// But it is extremely unlikely and anyway doesn't have any side effect expect memory footprint.
if let Some(shutdown_sender) = self.shutdown_senders.remove(module_id) {
info!(
"Listener {} already started, removing old listener",
module_id
);
shutdown_sender
.send(())
.map_err(|()| Error::from(ErrorKind::WorkloadService))?;

match module_id {
// If edgeAgent's listener exists, then we do not create a new one
EDGEAGENT => {
if self.shutdown_senders.contains_key(EDGEAGENT) {
info!(
"Listener {} already started, keeping old listener and socket",
module_id
);
if let Some(signal_socket_created) = signal_socket_created {
signal_socket_created.send(()).map_err(|()| {
ErrorKind::Initialize(InitializeErrorReason::WorkloadService)
})?;
}
return Ok(());
}
}
_ => {
if let Some(shutdown_sender) = self.shutdown_senders.remove(module_id) {
info!(
"Listener {} already started, removing old listener",
module_id
);
shutdown_sender
.send(())
.map_err(|()| Error::from(ErrorKind::WorkloadService))?;
}
}
}

let (shutdown_sender, shutdown_receiver) = oneshot::channel();
Expand Down Expand Up @@ -308,6 +329,7 @@ where
})?;

// Ignore error, we don't want the server to close on error.
// We do not stop edgeAgent's socket, as we want it to persist
let server = create_socket_channel_rcv.for_each(move |module_id| match module_id {
ModuleAction::Start(module_id, sender) => {
if let Err(err) = workload_manager.start(&module_id, Some(sender)) {
Expand All @@ -317,11 +339,15 @@ where
Ok(())
}
ModuleAction::Stop(module_id) => {
if let Err(err) = workload_manager.stop(&module_id) {
log_failure(Level::Warn, &err);
if let EDGEAGENT = module_id.as_ref() {
Ok(())
} else {
if let Err(err) = workload_manager.stop(&module_id) {
log_failure(Level::Warn, &err);
}

Ok(())
}

Ok(())
}
});

Expand Down
103 changes: 103 additions & 0 deletions edgelet/iotedged/tests/crypto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) Microsoft. All rights reserved.

use edgelet_core::crypto::MemoryKey;
use edgelet_core::{
CertificateProperties, CreateCertificate, Decrypt, Encrypt, GetTrustBundle, KeyBytes,
KeyIdentity, KeyStore, MasterEncryptionKey, PrivateKey,
};
use edgelet_test_utils::cert::TestCert;

#[derive(Clone, Default, Debug)]
pub struct TestCrypto;

impl GetTrustBundle for TestCrypto {
type Certificate = TestCert;

fn get_trust_bundle(&self) -> Result<Self::Certificate, edgelet_core::Error> {
unimplemented!()
}
}

impl MasterEncryptionKey for TestCrypto {
fn create_key(&self) -> Result<(), edgelet_core::Error> {
unimplemented!();
}
fn destroy_key(&self) -> Result<(), edgelet_core::Error> {
unimplemented!();
}
}

impl CreateCertificate for TestCrypto {
type Certificate = TestCert;

fn create_certificate(
&self,
_properties: &CertificateProperties,
) -> Result<Self::Certificate, edgelet_core::Error> {
Ok(TestCert::default()
.with_cert(vec![1, 2, 3])
.with_private_key(PrivateKey::Key(KeyBytes::Pem("some key".to_string())))
.with_fail_pem(false)
.with_fail_private_key(false))
}

fn destroy_certificate(&self, _alias: String) -> Result<(), edgelet_core::Error> {
unimplemented!()
}

fn get_certificate(&self, _alias: String) -> Result<Self::Certificate, edgelet_core::Error> {
unimplemented!()
}
}

impl Encrypt for TestCrypto {
type Buffer = Vec<u8>;

fn encrypt(
&self,
_client_id: &[u8],
_plaintext: &[u8],
_initialization_vector: &[u8],
) -> Result<Self::Buffer, edgelet_core::Error> {
unimplemented!()
}
}

impl Decrypt for TestCrypto {
// type Buffer = Buffer;
type Buffer = Vec<u8>;

fn decrypt(
&self,
_client_id: &[u8],
_ciphertext: &[u8],
_initialization_vector: &[u8],
) -> Result<Self::Buffer, edgelet_core::Error> {
unimplemented!()
}
}

#[derive(Clone, Default, Debug)]
pub struct TestKeyStore;

impl KeyStore for TestKeyStore {
type Key = MemoryKey;

fn get(
&self,
_identity: &KeyIdentity,
_key_name: &str,
) -> Result<Self::Key, edgelet_core::Error> {
unimplemented!()
}
}

pub struct TestCertificateManager<C: CreateCertificate + Clone> {
_crypto: C,
}

impl<C: CreateCertificate + Clone> TestCertificateManager<C> {
pub fn new(crypto: C) -> Self {
Self { _crypto: crypto }
}
}
Loading

0 comments on commit 35f8781

Please sign in to comment.