Skip to content

Commit

Permalink
Refactor confidential_transform_server so that it can be used from mu…
Browse files Browse the repository at this point in the history
…ltiple containers.

This adds a ConfidentialTransformServer base class and a new Session
interface which can be implemented to store individual session state.

Note that the FedSqlSession implementation is not currently threadsafe
because each session is currently handled sequentially. This will likely
need to change if blobs are incorporated in parallel.

Change-Id: I683eb2b40aa93d9b5534b45a1147c89f559c9e27
  • Loading branch information
mayaspivak committed Jul 25, 2024
1 parent b6dd662 commit 6eb0941
Show file tree
Hide file tree
Showing 8 changed files with 1,191 additions and 206 deletions.
45 changes: 45 additions & 0 deletions containers/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,48 @@ cc_test(
"@oak//proto/containers:interfaces_cc_proto",
],
)

cc_library(
name = "confidential_transform_server_base",
srcs = ["confidential_transform_server_base.cc"],
hdrs = ["confidential_transform_server_base.h"],
deps = [
":blob_metadata",
":crypto",
":session",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/log:die_if_null",
"@com_google_absl//absl/status",
"@com_google_absl//absl/synchronization",
"@com_google_protobuf//:protobuf",
"@federated-compute//fcp/base",
"@federated-compute//fcp/base:status_converters",
"@federated-compute//fcp/confidentialcompute:crypto",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_grpc",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_proto",
"@federated-compute//fcp/protos/confidentialcompute:fed_sql_container_config_cc_proto",
"@oak//proto/containers:orchestrator_crypto_cc_grpc",
],
)

cc_test(
name = "confidential_transform_server_base_test",
size = "small",
srcs = ["confidential_transform_server_base_test.cc"],
deps = [
":blob_metadata",
":confidential_transform_server_base",
":crypto",
":crypto_test_utils",
"//testing:parse_text_proto",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@federated-compute//fcp/confidentialcompute:crypto",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_grpc",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_proto",
"@federated-compute//fcp/protos/confidentialcompute:fed_sql_container_config_cc_proto",
"@googletest//:gtest_main",
],
)
220 changes: 220 additions & 0 deletions containers/confidential_transform_server_base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "containers/confidential_transform_server_base.h"

#include <execution>
#include <memory>
#include <optional>
#include <string>
#include <thread>

#include "absl/log/check.h"
#include "absl/log/die_if_null.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "containers/blob_metadata.h"
#include "containers/crypto.h"
#include "containers/session.h"
#include "fcp/base/status_converters.h"
#include "fcp/confidentialcompute/crypto.h"
#include "fcp/protos/confidentialcompute/confidential_transform.grpc.pb.h"
#include "fcp/protos/confidentialcompute/confidential_transform.pb.h"
#include "google/protobuf/repeated_ptr_field.h"
#include "grpcpp/support/status.h"

namespace confidential_federated_compute {

using ::fcp::base::ToGrpcStatus;
using ::fcp::confidential_compute::NonceChecker;
using ::fcp::confidentialcompute::BlobMetadata;
using ::fcp::confidentialcompute::ConfidentialTransform;
using ::fcp::confidentialcompute::InitializeRequest;
using ::fcp::confidentialcompute::InitializeResponse;
using ::fcp::confidentialcompute::SessionRequest;
using ::fcp::confidentialcompute::SessionResponse;
using ::fcp::confidentialcompute::WriteRequest;
using ::grpc::ServerContext;

namespace {

// Decrypts and parses a record and incorporates the record into the session.
//
// Reports status to the client in WriteFinishedResponse.
//
// TODO: handle blobs that span multiple WriteRequests.
absl::Status HandleWrite(
const WriteRequest& request, BlobDecryptor* blob_decryptor,
NonceChecker& nonce_checker,
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream,
Session* session) {
if (absl::Status nonce_status =
nonce_checker.CheckBlobNonce(request.first_request_metadata());
!nonce_status.ok()) {
stream->Write(ToSessionWriteFinishedResponse(nonce_status));
return absl::OkStatus();
}

absl::StatusOr<std::string> unencrypted_data = blob_decryptor->DecryptBlob(
request.first_request_metadata(), request.data());
if (!unencrypted_data.ok()) {
stream->Write(ToSessionWriteFinishedResponse(unencrypted_data.status()));
return absl::OkStatus();
}

FCP_ASSIGN_OR_RETURN(
SessionResponse response,
session->SessionWrite(request, std::move(unencrypted_data.value())));

stream->Write(response);
return absl::OkStatus();
}

} // namespace

absl::Status ConfidentialTransformBase::InitializeInternal(
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) {
FCP_ASSIGN_OR_RETURN(google::protobuf::Struct config_properties,
InitializeTransform(request));
const BlobDecryptor* blob_decryptor;
{
absl::MutexLock l(&mutex_);
if (blob_decryptor_ != std::nullopt) {
return absl::FailedPreconditionError(
"Initialize can only be called once.");
}
blob_decryptor_.emplace(crypto_stub_, config_properties);

// Since blob_decryptor_ is set once in Initialize and never
// modified, and the underlying object is threadsafe, it is safe to store a
// local pointer to it and access the object without a lock after we check
// under the mutex that a value has been set for the std::optional wrapper.
blob_decryptor = &*blob_decryptor_;
}

FCP_ASSIGN_OR_RETURN(*response->mutable_public_key(),
blob_decryptor->GetPublicKey());
return absl::OkStatus();
}

absl::Status ConfidentialTransformBase::SessionInternal(
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
BlobDecryptor* blob_decryptor;
{
absl::MutexLock l(&mutex_);
if (blob_decryptor_ == std::nullopt) {
return absl::FailedPreconditionError(
"Initialize must be called before Session.");
}

// Since blob_decryptor_ is set once in Initialize and never
// modified, and the underlying object is threadsafe, it is safe to store a
// local pointer to it and access the object without a lock after we check
// under the mutex that values have been set for the std::optional wrappers.
blob_decryptor = &*blob_decryptor_;
}

SessionRequest configure_request;
bool success = stream->Read(&configure_request);
if (!success) {
return absl::AbortedError("Session failed to read client message.");
}

if (!configure_request.has_configure()) {
return absl::FailedPreconditionError(
"Session must be configured with a ConfigureRequest before any other "
"requests.");
}
FCP_ASSIGN_OR_RETURN(
std::unique_ptr<confidential_federated_compute::Session> session,
CreateSession());
FCP_RETURN_IF_ERROR(session->ConfigureSession(configure_request));
SessionResponse configure_response;
NonceChecker nonce_checker;
*configure_response.mutable_configure()->mutable_nonce() =
nonce_checker.GetSessionNonce();
stream->Write(configure_response);

// Initialze result_blob_metadata with unencrypted metadata since
// EarliestExpirationTimeMetadata expects inputs to have either unencrypted or
// hpke_plus_aead_data.
BlobMetadata result_blob_metadata;
result_blob_metadata.mutable_unencrypted();
SessionRequest session_request;
while (stream->Read(&session_request)) {
switch (session_request.kind_case()) {
case SessionRequest::kWrite: {
const WriteRequest& write_request = session_request.write();
// Use the metadata with the earliest expiration timestamp for
// encrypting any results.
absl::StatusOr<BlobMetadata> earliest_expiration_metadata =
EarliestExpirationTimeMetadata(
result_blob_metadata, write_request.first_request_metadata());
if (!earliest_expiration_metadata.ok()) {
stream->Write(ToSessionWriteFinishedResponse(absl::Status(
earliest_expiration_metadata.status().code(),
absl::StrCat("Failed to extract expiration timestamp from "
"BlobMetadata with encryption: ",
earliest_expiration_metadata.status().message()))));
break;
}
result_blob_metadata = *earliest_expiration_metadata;
// TODO: spin up a thread to incorporate each blob.
FCP_RETURN_IF_ERROR(HandleWrite(write_request, blob_decryptor,
nonce_checker, stream, session.get()));
break;
}
case SessionRequest::kFinalize: {
FCP_ASSIGN_OR_RETURN(
SessionResponse finalize_response,
session->FinalizeSession(session_request.finalize(),
result_blob_metadata));
stream->Write(finalize_response);
return absl::OkStatus();
}
case SessionRequest::kConfigure:
default:
return absl::FailedPreconditionError(absl::StrCat(
"Session expected a write request but received request of type: ",
session_request.kind_case()));
}
}

return absl::AbortedError(
"Session failed to read client write or finalize message.");
}

grpc::Status ConfidentialTransformBase::Initialize(
ServerContext* context, const InitializeRequest* request,
InitializeResponse* response) {
return ToGrpcStatus(InitializeInternal(request, response));
}

grpc::Status ConfidentialTransformBase::Session(
ServerContext* context,
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
if (absl::Status session_status = session_tracker_.AddSession();
!session_status.ok()) {
return ToGrpcStatus(session_status);
}
grpc::Status status = ToGrpcStatus(SessionInternal(stream));
absl::Status remove_session = session_tracker_.RemoveSession();
if (!remove_session.ok()) {
return ToGrpcStatus(remove_session);
}
return status;
}

} // namespace confidential_federated_compute
85 changes: 85 additions & 0 deletions containers/confidential_transform_server_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_CONFIDENTIAL_TRANSFORM_SERVER_BASE_H_
#define CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_CONFIDENTIAL_TRANSFORM_SERVER_BASE_H_

#include <optional>
#include <string>

#include "absl/base/thread_annotations.h"
#include "absl/log/die_if_null.h"
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "containers/crypto.h"
#include "containers/session.h"
#include "fcp/protos/confidentialcompute/confidential_transform.grpc.pb.h"
#include "fcp/protos/confidentialcompute/confidential_transform.pb.h"
#include "google/protobuf/repeated_ptr_field.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/status.h"
#include "proto/containers/orchestrator_crypto.grpc.pb.h"

namespace confidential_federated_compute {

// Base class that implements the ConfidentialTransform service protocol.
class ConfidentialTransformBase
: public fcp::confidentialcompute::ConfidentialTransform::Service {
public:
grpc::Status Initialize(
grpc::ServerContext* context,
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) override;

grpc::Status Session(
grpc::ServerContext* context,
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
fcp::confidentialcompute::SessionRequest>*
stream) override;

protected:
ConfidentialTransformBase(
oak::containers::v1::OrchestratorCrypto::StubInterface* crypto_stub,
int max_num_sessions)
: crypto_stub_(*ABSL_DIE_IF_NULL(crypto_stub)),
session_tracker_(max_num_sessions) {}

virtual absl::StatusOr<google::protobuf::Struct> InitializeTransform(
const fcp::confidentialcompute::InitializeRequest* request) = 0;
virtual absl::StatusOr<
std::unique_ptr<confidential_federated_compute::Session>>
CreateSession() = 0;

private:
absl::Status InitializeInternal(
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response);

absl::Status SessionInternal(
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
fcp::confidentialcompute::SessionRequest>*
stream);

oak::containers::v1::OrchestratorCrypto::StubInterface& crypto_stub_;
confidential_federated_compute::SessionTracker session_tracker_;
absl::Mutex mutex_;
// The mutex is used to protect the optional wrapping blob_decryptor_ to
// ensure the BlobDecryptor is initialized, but the BlobDecryptor is itself
// threadsafe.
std::optional<confidential_federated_compute::BlobDecryptor> blob_decryptor_
ABSL_GUARDED_BY(mutex_);
};

} // namespace confidential_federated_compute

#endif // CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_CONFIDENTIAL_TRANSFORM_SERVER_BASE_H_
Loading

0 comments on commit 6eb0941

Please sign in to comment.