Skip to content

Commit

Permalink
Add test concat server that implements the ConfidentialTransform API.
Browse files Browse the repository at this point in the history
Change-Id: Ib1624dc41a7989f9f73068ca492f8b8b4b739bfa
  • Loading branch information
zpgong committed Jul 11, 2024
1 parent 854e42d commit d887783
Show file tree
Hide file tree
Showing 5 changed files with 672 additions and 8 deletions.
44 changes: 44 additions & 0 deletions containers/test_concat/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,47 @@ cc_test(
"@oak//proto/containers:orchestrator_crypto_cc_grpc",
],
)

cc_library(
name = "confidential_transform_server",
srcs = ["confidential_transform_server.cc"],
hdrs = ["confidential_transform_server.h"],
deps = [
"//containers:blob_metadata",
"//containers:crypto",
"//containers:session",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:die_if_null",
"@com_google_absl//absl/status",
"@com_google_absl//absl/synchronization",
"@federated-compute//fcp/base",
"@federated-compute//fcp/base:status_converters",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_grpc",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_proto",
"@oak//proto/containers:orchestrator_crypto_cc_grpc",
"@org_tensorflow_federated//tensorflow_federated/cc/core/impl/aggregation/base",
],
)

cc_test(
name = "confidential_transform_server_test",
srcs = ["confidential_transform_server_test.cc"],
deps = [
"confidential_transform_server",
"//containers:blob_metadata",
"//containers:crypto",
"//containers:crypto_test_utils",
"//containers:session",
"//testing:parse_text_proto",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/status",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_grpc",
"@federated-compute//fcp/protos/confidentialcompute:confidential_transform_cc_proto",
"@googletest//:gtest_main",
"@oak//proto/containers:orchestrator_crypto_cc_grpc",
"@org_tensorflow_federated//tensorflow_federated/cc/core/impl/aggregation/base",
],
)
17 changes: 9 additions & 8 deletions containers/test_concat/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Test Concat Server
# Test Concat Servers

This folder contains a simple test container that can run in a Trusted Execution
Environment that implements the gRPC Pipeline Transform API. The implementation
uses the common cryptographic protocols required for confidential federated
compute containers to access and transform encrypted data. The transformation it
implements is trivial- it simply concatenates the decrypted strings together.
Currently, it supports being configured only once before any data is
This folder contains simple test containers that can run in a Trusted Execution
Environment. One implements the gRPC Pipeline Transform API and the other
implements the gRPC Confidential Transform API. Both implementations uses the
common cryptographic protocols required for confidential federated compute
containers to access and transform encrypted data. The transformation they both
implement is trivial- they simply concatenates the decrypted strings together.
Currently, they support being configured only once before any data is
transformed.

This container implementation is meant to be used for e2e tests of the
These container implementations are meant to be used for e2e tests of the
cryptographic protocols used throughout the system.
183 changes: 183 additions & 0 deletions containers/test_concat/confidential_transform_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "containers/test_concat/confidential_transform_server.h"

#include <execution>
#include <memory>
#include <optional>
#include <string>
#include <thread>

#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "containers/blob_metadata.h"
#include "containers/crypto.h"
#include "containers/session.h"
#include "fcp/base/status_converters.h"
#include "fcp/protos/confidentialcompute/confidential_transform.grpc.pb.h"
#include "fcp/protos/confidentialcompute/confidential_transform.pb.h"
#include "grpcpp/support/status.h"
#include "tensorflow_federated/cc/core/impl/aggregation/base/monitoring.h"

namespace confidential_federated_compute::test_concat {

namespace {

using ::fcp::base::ToGrpcStatus;
using ::fcp::confidential_compute::NonceChecker;
using ::fcp::confidentialcompute::BlobMetadata;
using ::fcp::confidentialcompute::ConfidentialTransform;
using ::fcp::confidentialcompute::ConfigureRequest;
using ::fcp::confidentialcompute::ConfigureResponse;
using ::fcp::confidentialcompute::FinalizeRequest;
using ::fcp::confidentialcompute::InitializeRequest;
using ::fcp::confidentialcompute::InitializeResponse;
using ::fcp::confidentialcompute::ReadResponse;
using ::fcp::confidentialcompute::SessionRequest;
using ::fcp::confidentialcompute::SessionResponse;
using ::fcp::confidentialcompute::WriteFinishedResponse;
using ::fcp::confidentialcompute::WriteRequest;
using ::grpc::ServerContext;

} // namespace

absl::Status TestConcatConfidentialTransform::Initialize(
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) {
const BlobDecryptor* blob_decryptor;
{
absl::MutexLock l(&mutex_);
blob_decryptor_.emplace(crypto_stub_);

// Since blob_decryptor_ is set once in Initialize and never
// modified, and the underlying object is threadsafe, it is safe to store a
// local pointer to it and access the object without a lock after we check
// under the mutex that a value has been set for the std::optional wrapper.
blob_decryptor = &*blob_decryptor_;
}

FCP_ASSIGN_OR_RETURN(*response->mutable_public_key(),
blob_decryptor->GetPublicKey());
return absl::OkStatus();
}

absl::Status TestConcatConfidentialTransform::Session(
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
BlobDecryptor* blob_decryptor;
{
absl::MutexLock l(&mutex_);
if (blob_decryptor_ == std::nullopt) {
return absl::FailedPreconditionError(
"Initialize must be called before Session.");
}

// Since blob_decryptor_ is set once in Initialize and never
// modified, and the underlying object is threadsafe, it is safe to store a
// local pointer to it and access the object without a lock after we check
// under the mutex that values have been set for the std::optional wrappers.
blob_decryptor = &*blob_decryptor_;
}

SessionRequest configure_request;
bool success = stream->Read(&configure_request);
if (!success) {
return absl::AbortedError("Session failed to read client message.");
}

if (!configure_request.has_configure()) {
return absl::FailedPreconditionError(
"Session must be configured with a ConfigureRequest before any other "
"requests.");
}
SessionResponse configure_response;
NonceChecker nonce_checker;
*configure_response.mutable_configure()->mutable_nonce() =
nonce_checker.GetSessionNonce();
configure_response.mutable_configure();
stream->Write(configure_response);

SessionRequest session_request;
std::string state = "";
while (stream->Read(&session_request)) {
switch (session_request.kind_case()) {
case SessionRequest::kWrite: {
const WriteRequest& write_request = session_request.write();
if (absl::Status nonce_status = nonce_checker.CheckBlobNonce(
write_request.first_request_metadata());
!nonce_status.ok()) {
stream->Write(ToSessionWriteFinishedResponse(nonce_status,
/*available_memory*/ 0));
break;
}

absl::StatusOr<std::string> unencrypted_data =
blob_decryptor->DecryptBlob(write_request.first_request_metadata(),
write_request.data());
if (!unencrypted_data.ok()) {
stream->Write(
ToSessionWriteFinishedResponse(unencrypted_data.status(),
/*available_memory*/ 0));
break;
}

absl::StrAppend(&state, *unencrypted_data);
stream->Write(ToSessionWriteFinishedResponse(
absl::OkStatus(), /*available_memory*/ 0,
write_request.first_request_metadata().total_size_bytes()));
break;
}
case SessionRequest::kFinalize: {
SessionResponse response;
ReadResponse* read_response = response.mutable_read();
read_response->set_finish_read(true);
*(read_response->mutable_data()) = state;

BlobMetadata result_metadata;
result_metadata.mutable_unencrypted();
result_metadata.set_total_size_bytes(state.length());
result_metadata.set_compression_type(
BlobMetadata::COMPRESSION_TYPE_NONE);
*(read_response->mutable_first_response_metadata()) = result_metadata;

stream->Write(response);
return absl::OkStatus();
}
case SessionRequest::kConfigure:
default:
return absl::FailedPreconditionError(
absl::StrCat("Session expected a write or finalize request but "
"received request of type: ",
session_request.kind_case()));
}
}

return absl::AbortedError(
"Session failed to read client write or finalize message.");
}

grpc::Status TestConcatConfidentialTransform::Initialize(
ServerContext* context, const InitializeRequest* request,
InitializeResponse* response) {
return ToGrpcStatus(Initialize(request, response));
}

grpc::Status TestConcatConfidentialTransform::Session(
ServerContext* context,
grpc::ServerReaderWriter<SessionResponse, SessionRequest>* stream) {
grpc::Status status = ToGrpcStatus(Session(stream));
return status;
}

} // namespace confidential_federated_compute::test_concat
76 changes: 76 additions & 0 deletions containers/test_concat/confidential_transform_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_TEST_CONCAT_CONFIDENTIAL_TRANSFORM_SERVER_H_
#define CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_TEST_CONCAT_CONFIDENTIAL_TRANSFORM_SERVER_H_

#include <optional>
#include <string>

#include "absl/base/thread_annotations.h"
#include "absl/log/die_if_null.h"
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "containers/crypto.h"
#include "fcp/protos/confidentialcompute/confidential_transform.grpc.pb.h"
#include "fcp/protos/confidentialcompute/confidential_transform.pb.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/status.h"
#include "proto/containers/orchestrator_crypto.grpc.pb.h"

namespace confidential_federated_compute::test_concat {

// Test ConfidentialTransform service that concatenates inputs. This test
// service doesn't return `write_capacity_bytes`, nor does it manage the number
// of sessions.
class TestConcatConfidentialTransform final
: public fcp::confidentialcompute::ConfidentialTransform::Service {
public:
// The OrchestratorCrypto stub must not be NULL and must outlive this object.
explicit TestConcatConfidentialTransform(
oak::containers::v1::OrchestratorCrypto::StubInterface* crypto_stub)
: crypto_stub_(*ABSL_DIE_IF_NULL(crypto_stub)) {}

grpc::Status Initialize(
grpc::ServerContext* context,
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response) override;

grpc::Status Session(
grpc::ServerContext* context,
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
fcp::confidentialcompute::SessionRequest>*
stream) override;

private:
absl::Status Initialize(
const fcp::confidentialcompute::InitializeRequest* request,
fcp::confidentialcompute::InitializeResponse* response);

absl::Status Session(
grpc::ServerReaderWriter<fcp::confidentialcompute::SessionResponse,
fcp::confidentialcompute::SessionRequest>*
stream);

oak::containers::v1::OrchestratorCrypto::StubInterface& crypto_stub_;
absl::Mutex mutex_;
// The mutex is used to protect the optional wrapping blob_decryptor_ to
// ensure the BlobDecryptor is initialized, but the BlobDecryptor is itself
// threadsafe.
std::optional<confidential_federated_compute::BlobDecryptor> blob_decryptor_
ABSL_GUARDED_BY(mutex_);
};

} // namespace confidential_federated_compute::test_concat

#endif // CONFIDENTIAL_FEDERATED_COMPUTE_CONTAINERS_TEST_CONCAT_CONFIDENTIAL_TRANSFORM_SERVER_H_
Loading

0 comments on commit d887783

Please sign in to comment.