Skip to content

Commit

Permalink
Add a skeleton implementation for StreamInitialize to `Confidential…
Browse files Browse the repository at this point in the history
…TransformBase` class.

This CL adds the skeleton implementation for `StreamInitialize` in the confidential compute server base class. Follow up CLs will implement the actual StreamInitialize logic to the `fed_sql` server.

This CL also updates the WORKSPACE with the updated (not the latest) federated-compute repo archive that contains StreamInitialize proto definition.

Bug: 378744150
Change-Id: Id1280ed7c1d9ea024dd96653b11834594b4a5878
  • Loading branch information
ryi06 committed Jan 6, 2025
1 parent 04e5989 commit 54d7f57
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 11 deletions.
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ http_archive(
"//third_party/federated_compute:libcppbor.patch",
"//third_party/federated_compute:visibility.patch",
],
sha256 = "801b08bbc5cb19fc24ba81028dc847a41d9eb5bd51218e4a3dd18edce68a7a6e",
strip_prefix = "federated-compute-33a0495a0c4462001a8ca67eac8e65c2e414478a",
url = "https://github.com/google/federated-compute/archive/33a0495a0c4462001a8ca67eac8e65c2e414478a.tar.gz",
sha256 = "c6e624ae40407cc729a4e6bad3cb97f34adc34296aee2a290903bb88cb9dc1ab",
strip_prefix = "federated-compute-d921ead2ea9f227a46ca1666610ebd8f95ea9f07",
url = "https://github.com/google/federated-compute/archive/d921ead2ea9f227a46ca1666610ebd8f95ea9f07.tar.gz",
)

http_archive(
Expand Down
95 changes: 89 additions & 6 deletions containers/confidential_transform_server_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ using ::fcp::confidentialcompute::InitializeRequest;
using ::fcp::confidentialcompute::InitializeResponse;
using ::fcp::confidentialcompute::SessionRequest;
using ::fcp::confidentialcompute::SessionResponse;
using ::fcp::confidentialcompute::StreamInitializeRequest;
using ::fcp::confidentialcompute::WriteRequest;
using ::grpc::ServerContext;

Expand Down Expand Up @@ -84,8 +85,7 @@ absl::Status HandleWrite(
} // namespace

absl::Status ConfidentialTransformBase::InitializeInternal(
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) {
const InitializeRequest* request, InitializeResponse* response) {
FCP_ASSIGN_OR_RETURN(google::protobuf::Struct config_properties,
InitializeTransform(request));
const BlobDecryptor* blob_decryptor;
Expand All @@ -98,10 +98,11 @@ absl::Status ConfidentialTransformBase::InitializeInternal(
blob_decryptor_.emplace(crypto_stub_, config_properties);
session_tracker_.emplace(request->max_num_sessions());

// Since blob_decryptor_ is set once in Initialize and never
// modified, and the underlying object is threadsafe, it is safe to store a
// local pointer to it and access the object without a lock after we check
// under the mutex that a value has been set for the std::optional wrapper.
// Since blob_decryptor_ is set once in Initialize or StreamInitialize and
// never modified, and the underlying object is threadsafe, it is safe to
// store a local pointer to it and access the object without a lock after we
// check under the mutex that a value has been set for the std::optional
// wrapper.
blob_decryptor = &*blob_decryptor_;
}

Expand All @@ -110,6 +111,82 @@ absl::Status ConfidentialTransformBase::InitializeInternal(
return absl::OkStatus();
}

absl::Status ConfidentialTransformBase::StreamInitializeInternal(
grpc::ServerReader<StreamInitializeRequest>* reader,
InitializeResponse* response) {
google::protobuf::Struct config_properties;
StreamInitializeRequest request;
bool contain_initialize_request = false;
uint32_t max_num_sessions;
while (reader->Read(&request)) {
switch (request.kind_case()) {
case StreamInitializeRequest::kInitializeRequest: {
// StreamInitializeRequest.initialize_request should be the last request
// sent by the client's StreamInitializeRequest stream. Each stream
// should only have exactly one
// StreamInitializeRequest.initialize_request.
if (contain_initialize_request) {
return absl::FailedPreconditionError(
"Expect one of the StreamInitializeRequests to be "
"configured with a InitializeRequest, found more than one.");
}
max_num_sessions = request.initialize_request().max_num_sessions();
FCP_ASSIGN_OR_RETURN(config_properties,
StreamInitializeTransform(&request));
contain_initialize_request = true;
break;
}
case StreamInitializeRequest::kWriteConfiguration: {
// Each stream should contain zero or more
// StreamInitializeRequest.write_configurations, all of which must be
// sent before the StreamInitializeRequest.initialize_request. The first
// write_configuration for a blob will contain the metadata about the
// blob, while the last will have a value of `commit` set to True.
if (contain_initialize_request) {
return absl::FailedPreconditionError(
"Expect all StreamInitializeRequests.write_configurations to be "
"sent before the StreamInitializeRequest.initialize_request.");
}
FCP_RETURN_IF_ERROR(
ReadWriteConfigurationRequest(request.write_configuration()));
break;
}
default:
return absl::FailedPreconditionError(
absl::StrCat("StreamInitializeRequest expected a "
"WriteConfigurationRequest or a "
"InitializeRequest, but received request of type: ",
request.kind_case()));
}
}
if (!contain_initialize_request) {
return absl::FailedPreconditionError(absl::StrCat(
"Expect one of the StreamInitializeRequests to be configured with a "
"InitializeRequest, found zero."));
}

const BlobDecryptor* blob_decryptor;
{
absl::MutexLock l(&mutex_);
if (blob_decryptor_ != std::nullopt) {
return absl::FailedPreconditionError(
"StreamInitialize can only be called once.");
}
blob_decryptor_.emplace(crypto_stub_, config_properties);
session_tracker_.emplace(max_num_sessions);

// Since blob_decryptor_ is set once in Initialize or StreamInitialize and
// never modified, and the underlying object is threadsafe, it is safe to
// store a local pointer to it and access the object without a lock after we
// check under the mutex that a value has been set for the std::optional
// wrapper.
blob_decryptor = &*blob_decryptor_;
}
FCP_ASSIGN_OR_RETURN(*response->mutable_public_key(),
blob_decryptor->GetPublicKey());
return absl::OkStatus();
}

absl::Status ConfidentialTransformBase::SessionInternal(
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
BlobDecryptor* blob_decryptor;
Expand Down Expand Up @@ -203,6 +280,12 @@ grpc::Status ConfidentialTransformBase::Initialize(
return ToGrpcStatus(InitializeInternal(request, response));
}

grpc::Status ConfidentialTransformBase::StreamInitialize(
ServerContext* context, grpc::ServerReader<StreamInitializeRequest>* reader,
InitializeResponse* response) {
return ToGrpcStatus(StreamInitializeInternal(reader, response));
}

grpc::Status ConfidentialTransformBase::Session(
ServerContext* context,
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
Expand Down
20 changes: 20 additions & 0 deletions containers/confidential_transform_server_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class ConfidentialTransformBase
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) override;

grpc::Status StreamInitialize(
grpc::ServerContext* context,
grpc::ServerReader<fcp::confidentialcompute::StreamInitializeRequest>*
reader,
fcp::confidentialcompute::InitializeResponse* response) override;

grpc::Status Session(
grpc::ServerContext* context,
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
Expand All @@ -54,6 +60,15 @@ class ConfidentialTransformBase

virtual absl::StatusOr<google::protobuf::Struct> InitializeTransform(
const fcp::confidentialcompute::InitializeRequest* request) = 0;
virtual absl::StatusOr<google::protobuf::Struct> StreamInitializeTransform(
fcp::confidentialcompute::StreamInitializeRequest* request) = 0;
// Handles a WriteConfigurationRequest that contains a blob or a chunk of a
// blob used for container initialization. Must be implemented by a subclass.
// The first WriteConfigurationRequest for each blob must contain the metadata
// for the blob, while the last must have `commit` set to True.
virtual absl::Status ReadWriteConfigurationRequest(
const fcp::confidentialcompute::WriteConfigurationRequest&
write_configuration) = 0;
virtual absl::StatusOr<
std::unique_ptr<confidential_federated_compute::Session>>
CreateSession() = 0;
Expand All @@ -63,6 +78,11 @@ class ConfidentialTransformBase
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response);

absl::Status StreamInitializeInternal(
grpc::ServerReader<fcp::confidentialcompute::StreamInitializeRequest>*
reader,
fcp::confidentialcompute::InitializeResponse* response);

absl::Status SessionInternal(
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
fcp::confidentialcompute::SessionRequest>*
Expand Down
Loading

0 comments on commit 54d7f57

Please sign in to comment.