diff --git a/ledger_service/src/lib.rs b/ledger_service/src/lib.rs index 23d5921..be8467d 100644 --- a/ledger_service/src/lib.rs +++ b/ledger_service/src/lib.rs @@ -461,58 +461,80 @@ impl LedgerService { // This vector holds per-blob responses with the same ordering as the original // blob_metadata field. For legacy mode, this vector contains exactly one entry. - let mut authorized_blob_keys = Vec::with_capacity(blob_metadata.len()); - let mut ledger_key_ids = HashSet::new(); + let mut authorized_blob_keys: Vec = Vec::default(); + authorized_blob_keys.resize_with(blob_metadata.len(), Default::default); + // This map is used to sort indices of authorized blobs per key_id. + let mut blob_indices_per_key_id = BTreeMap::, Vec>::default(); + // IDs of visited blobs to ensure that there are no duplicating blobs. let mut blob_ids = HashSet::new(); - for blob in blob_metadata { - authorized_blob_keys.push( - match self.authorize_blob_access( - blob, - &access_policy, - &access_policy_sha256, - event.transform_index.try_into().unwrap(), + // The total number of blobs that have been authorized either successfully or + // not. + let mut num_authorized_blobs = 0; + // Authorize all blobs. + for i in 0..blob_metadata.len() { + match self.authorize_blob_access( + &blob_metadata[i], + &access_policy, + &access_policy_sha256, + event.transform_index.try_into().unwrap(), + &mut blob_ids, + &blob_range, + ) { + Ok(ledger_key_id) => { + blob_indices_per_key_id.entry(ledger_key_id).or_default().push(i); + } + Err(error) => { + if legacy_mode { + return Err(error); + } + authorized_blob_keys[i].status = + Some(Status { code: error.code as i32, message: error.message.into() }); + num_authorized_blobs += 1; + } + }; + } + + // Find the earliest expiring ledger key to be used for reencrypting derived + // objects. + let mut reencryption_public_key = Vec::default(); + let mut reencryption_public_key_expiration: Duration = Duration::MAX; + for (key_id, _) in &blob_indices_per_key_id { + let per_key_ledger = self.get_per_key_ledger(key_id)?; + if per_key_ledger.expiration < reencryption_public_key_expiration { + reencryption_public_key_expiration = per_key_ledger.expiration; + reencryption_public_key = per_key_ledger.public_key.clone(); + } + } + + for (key_id, blob_indices) in blob_indices_per_key_id { + let per_key_ledger = self.get_per_key_ledger(&key_id)?; + let private_key = &per_key_ledger.private_key; + // Rewrap symmetric keys for every blob for the current ledger key. + for i in blob_indices { + match Self::rewrap_symmetric_key( + &blob_metadata[i], + &reencryption_public_key, + private_key, &recipient_public_key, - &mut blob_ids, - &blob_range, ) { - Ok((encapsulated_key, encrypted_symmetric_key, ledger_key_id)) => { - ledger_key_ids.insert(ledger_key_id); - AuthorizedBlobKeys { + Ok((encapsulated_key, encrypted_symmetric_key)) => { + authorized_blob_keys[i] = AuthorizedBlobKeys { encapsulated_key, encrypted_symmetric_key, status: Some(Status { code: 0, ..Default::default() }), ..Default::default() - } + }; + num_authorized_blobs += 1; } Err(error) => { if legacy_mode { return Err(error); } - - AuthorizedBlobKeys { - status: Some(Status { - code: error.code as i32, - message: error.message.into(), - }), - ..Default::default() - } + authorized_blob_keys[i].status = + Some(Status { code: error.code as i32, message: error.message.into() }); + num_authorized_blobs += 1; } - }, - ); - } - - let mut reencryption_public_key = Vec::default(); - let mut reencryption_public_key_expiration: Duration = Duration::MAX; - for key_id in &ledger_key_ids { - let per_key_ledger = self.per_key_ledgers.get_mut(key_id).ok_or_else(|| { - micro_rpc::Status::new_with_message( - micro_rpc::StatusCode::NotFound, - "public key not found", - ) - })?; - if per_key_ledger.expiration < reencryption_public_key_expiration { - reencryption_public_key_expiration = per_key_ledger.expiration; - reencryption_public_key = per_key_ledger.public_key.clone(); + }; } let mut policy_budget_tracker = per_key_ledger.budget_tracker.get_policy_budget( &access_policy_sha256, @@ -523,6 +545,12 @@ impl LedgerService { policy_budget_tracker.update_budget(&blob_range); } + if (num_authorized_blobs != blob_metadata.len()) { + return Err(micro_rpc::Status::new_with_message( + micro_rpc::StatusCode::FailedPrecondition, + "num_authorized_blobs must match blob_metadata.len(). This is likely an internal bug.", + )); + } // TODO: b/288282266 - Include the selected transform's destination node id in // the response. if legacy_mode { @@ -542,16 +570,27 @@ impl LedgerService { } } + fn get_per_key_ledger( + &mut self, + key_id: &Vec, + ) -> Result<&mut PerKeyLedger, micro_rpc::Status> { + self.per_key_ledgers.get_mut(key_id).ok_or_else(|| { + micro_rpc::Status::new_with_message( + micro_rpc::StatusCode::NotFound, + "public key not found", + ) + }) + } + fn authorize_blob_access( &mut self, - blob: BlobMetadata, + blob: &BlobMetadata, access_policy: &DataAccessPolicy, access_policy_sha256: &Vec, transform_index: usize, - recipient_public_key: &CoseKey, blob_ids: &mut HashSet, range: &BlobRange, - ) -> Result<(Vec, Vec, Vec), micro_rpc::Status> { + ) -> Result, micro_rpc::Status> { // Decode the blob header. let header = BlobHeader::decode(blob.blob_header.as_ref()).map_err(|err| { micro_rpc::Status::new_with_message( @@ -561,12 +600,7 @@ impl LedgerService { })?; // Find the right per-key ledger. - let per_key_ledger = self.per_key_ledgers.get_mut(&header.key_id).ok_or_else(|| { - micro_rpc::Status::new_with_message( - micro_rpc::StatusCode::NotFound, - "public key not found", - ) - })?; + let per_key_ledger = self.get_per_key_ledger(&header.key_id)?; // Verify that all blobs use the same policy. if header.access_policy_sha256 != *access_policy_sha256 { @@ -616,15 +650,23 @@ impl LedgerService { )); } + Ok(header.key_id.clone()) + } + + fn rewrap_symmetric_key( + blob: &BlobMetadata, + public_key: &Vec, + private_key: &PrivateKey, + recipient_public_key: &CoseKey, + ) -> Result<(Vec, Vec), micro_rpc::Status> { // Re-wrap the blob's symmetric key. This should be done before budgets are // updated in case there are decryption errors (e.g., due to invalid // associated data). - let wrap_associated_data = - [&per_key_ledger.public_key[..], &blob.recipient_nonce[..]].concat(); - let (encapsulated_key, encrypted_symmetric_key) = cfc_crypto::rewrap_symmetric_key( + let wrap_associated_data = [&public_key[..], &blob.recipient_nonce[..]].concat(); + cfc_crypto::rewrap_symmetric_key( &blob.encrypted_symmetric_key, &blob.encapsulated_key, - &per_key_ledger.private_key, + &private_key, /* unwrap_associated_data= */ &blob.blob_header, recipient_public_key, &wrap_associated_data, @@ -634,9 +676,7 @@ impl LedgerService { micro_rpc::StatusCode::InvalidArgument, format!("failed to re-wrap symmetric key: {:?}", err), ) - })?; - - Ok((encapsulated_key, encrypted_symmetric_key, header.key_id.clone())) + }) } /// Saves the current state into LedgerSnapshot as a part of snapshot @@ -972,8 +1012,8 @@ mod tests { #[test] fn test_authorize_access_multiple_blobs() { - // Create 2 public keys. - let mut ledger_public_key = Vec::with_capacity(2); + // Create 3 public keys. + let mut ledger_public_key = Vec::with_capacity(3); let (mut ledger, public_key) = create_ledger_service(); ledger_public_key.push(public_key); ledger_public_key.push( @@ -985,9 +1025,19 @@ mod tests { .unwrap() .public_key, ); - let mut cose_key = Vec::with_capacity(2); + ledger_public_key.push( + ledger + .create_key(CreateKeyRequest { + ttl: Some(prost_types::Duration { seconds: 10800, ..Default::default() }), + ..Default::default() + }) + .unwrap() + .public_key, + ); + let mut cose_key = Vec::with_capacity(3); cose_key.push(extract_key_from_cwt(&ledger_public_key[0]).unwrap()); cose_key.push(extract_key_from_cwt(&ledger_public_key[1]).unwrap()); + cose_key.push(extract_key_from_cwt(&ledger_public_key[2]).unwrap()); // Define an access policy that grants access. let recipient_tag = "tag"; @@ -1003,17 +1053,19 @@ mod tests { } .encode_to_vec(); - // Construct 4 client messages, 2 per key. + // Construct 6 client messages, 2 per key. let plaintext = b"plaintext"; - let mut blob_header = Vec::with_capacity(4); - let mut ciphertexts = Vec::with_capacity(4); - let mut blob_metadata = Vec::with_capacity(4); + let mut blob_header = Vec::with_capacity(6); + let mut ciphertexts = Vec::with_capacity(6); + let mut blob_metadata = Vec::with_capacity(6); let recipient_nonce: &[u8] = b"nonce"; - for i in 0..4 { + let mut key_index = 0; + let mut curr_count = 0; + for i in 0..6 { blob_header.push( BlobHeader { blob_id: BlobId::from(i as u128).to_vec(), - key_id: cose_key[i / 2].key_id.clone(), + key_id: cose_key[key_index].key_id.clone(), access_policy_sha256: Sha256::digest(&access_policy).to_vec(), ..Default::default() } @@ -1021,7 +1073,8 @@ mod tests { ); let (ciphertext, encapsulated_key, encrypted_symmetric_key) = - cfc_crypto::encrypt_message(plaintext, &cose_key[i / 2], &blob_header[i]).unwrap(); + cfc_crypto::encrypt_message(plaintext, &cose_key[key_index], &blob_header[i]) + .unwrap(); blob_metadata.push(BlobMetadata { blob_header: blob_header[i].clone(), @@ -1030,6 +1083,12 @@ mod tests { recipient_nonce: recipient_nonce.to_vec(), }); ciphertexts.push(ciphertext); + + curr_count += 1; + if curr_count == 2 { + key_index += 1; + curr_count = 0; + } } // Add some blobs with invalid header and invalid policy hash. blob_metadata.push(BlobMetadata { @@ -1041,7 +1100,7 @@ mod tests { blob_metadata.push(BlobMetadata { blob_header: BlobHeader { blob_id: BlobId::from(4).to_vec(), - key_id: cose_key[0].key_id.clone(), + key_id: cose_key[1].key_id.clone(), access_policy_sha256: "invalid".into(), ..Default::default() } @@ -1055,30 +1114,40 @@ mod tests { let (recipient_private_key, recipient_public_key) = cfc_crypto::gen_keypair(b"key-id"); let response = ledger .authorize_access(AuthorizeAccessRequest { + // Since `now` is after the first key's expiration time, access should be denied for + // the first 2 blobs. + now: Some(prost_types::Timestamp { seconds: 4000, ..Default::default() }), access_policy, recipient_public_key: create_recipient_cwt(recipient_public_key), recipient_tag: recipient_tag.to_owned(), blob_metadata, blob_range: Some(Range { start: BlobId::from(0).to_vec(), - end: BlobId::from(6).to_vec(), + end: BlobId::from(8).to_vec(), }), ..Default::default() }) .unwrap(); + // The reencryption key must be the earliest expiring key from the batch of + // authorized blobs. + let expected_reencryption_key = ledger_public_key[1].clone(); // Verify that the response contains the right public key and allows the message // to be read. - assert_eq!(response.reencryption_public_key, ledger_public_key[0]); - assert_eq!(response.authorized_blob_keys.len(), 6); - for i in 0..4 { + assert_eq!(response.reencryption_public_key, expected_reencryption_key); + assert_eq!(response.authorized_blob_keys.len(), 8); + for i in 2..6 { let authorized_blob_key = response.authorized_blob_keys.get(i).unwrap(); + assert_eq!( + authorized_blob_key.status.as_ref().unwrap().code, + micro_rpc::StatusCode::Ok as i32 + ); assert_eq!( cfc_crypto::decrypt_message( &ciphertexts[i], &blob_header[i], &authorized_blob_key.encrypted_symmetric_key, - &[&ledger_public_key[i / 2], recipient_nonce].concat(), + &[&expected_reencryption_key, recipient_nonce].concat(), &authorized_blob_key.encapsulated_key, &recipient_private_key ) @@ -1086,8 +1155,15 @@ mod tests { plaintext ); } + // First 2 blobs must correspond to the expired key. + for i in 0..2 { + assert_eq!( + response.authorized_blob_keys.get(i).unwrap().status.as_ref().unwrap().code, + micro_rpc::StatusCode::NotFound as i32 + ); + } // Last 2 responses must correspond to invalid blobs. - for i in 4..6 { + for i in 6..8 { assert_eq!( response.authorized_blob_keys.get(i).unwrap().status.as_ref().unwrap().code, micro_rpc::StatusCode::InvalidArgument as i32