Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] persist: implement Blob using Postgres and hook it up to sqllogictest #17382

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/persist/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use crate::file::{FileBlob, FileBlobConfig};
use crate::location::{Blob, Consensus, ExternalError};
use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
use crate::metrics::PostgresConsensusMetrics;
use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
use crate::postgres::{
PostgresBlob, PostgresBlobConfig, PostgresConsensus, PostgresConsensusConfig,
};
use crate::s3::{S3Blob, S3BlobConfig};

/// Config for an implementation of [Blob].
Expand All @@ -31,6 +33,8 @@ pub enum BlobConfig {
File(FileBlobConfig),
/// Config for [S3Blob].
S3(S3BlobConfig),
/// Config for [PostgresBlob].
Postgres(PostgresBlobConfig),
/// Config for [MemBlob], only available in testing to prevent
/// footguns.
Mem,
Expand All @@ -42,6 +46,7 @@ impl BlobConfig {
match self {
BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
BlobConfig::Postgres(config) => Ok(Arc::new(PostgresBlob::open(config).await?)),
BlobConfig::Mem => Ok(Arc::new(MemBlob::open(MemBlobConfig::default()))),
}
}
Expand Down Expand Up @@ -89,6 +94,16 @@ impl BlobConfig {
query_params.clear();
Ok(BlobConfig::Mem)
}
"postgres" | "postgresql" => {
if !cfg!(debug_assertions) {
warn!("persist unexpectedly using postgres blob in a release binary");
}
query_params.clear();
Ok(BlobConfig::Postgres(PostgresBlobConfig::new(
value.to_owned(),
"".to_owned(),
)?))
}
p => Err(anyhow!(
"unknown persist blob scheme {}: {}",
p,
Expand Down
183 changes: 177 additions & 6 deletions src/persist/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! Implementation of [Consensus] backed by Postgres.

use crate::cfg::ConsensusKnobs;
use anyhow::{anyhow, bail};
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use deadpool_postgres::tokio_postgres::config::SslMode;
Expand All @@ -31,13 +31,16 @@ use std::fmt::Formatter;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tracing::debug;

use crate::error::Error;
use crate::location::{Consensus, ExternalError, SeqNo, VersionedData, SCAN_ALL};
use crate::location::{
Atomicity, Blob, BlobMetadata, Consensus, ExternalError, SeqNo, VersionedData, SCAN_ALL,
};
use crate::metrics::PostgresConsensusMetrics;

const SCHEMA: &str = "
const CONSENSUS_SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS consensus (
shard text NOT NULL,
sequence_number bigint NOT NULL,
Expand Down Expand Up @@ -230,7 +233,7 @@ impl PostgresConsensus {
let mut client = pool.get().await?;

let tx = client.transaction().await?;
tx.batch_execute(SCHEMA).await?;
tx.batch_execute(CONSENSUS_SCHEMA).await?;

// The `consensus` table creates and deletes rows at a high frequency, generating many
// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
Expand All @@ -256,7 +259,7 @@ impl PostgresConsensus {
// this could be a TRUNCATE if we're confident the db won't reuse any state
let client = self.get_connection().await?;
client.execute("DROP TABLE consensus", &[]).await?;
client.execute(SCHEMA, &[]).await?;
client.execute(CONSENSUS_SCHEMA, &[]).await?;
Ok(())
}

Expand Down Expand Up @@ -492,16 +495,160 @@ impl Consensus for PostgresConsensus {
}
}

const BLOB_SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS blob (
prefix STRING(1024) NOT NULL,
key STRING(1024) NOT NULL,
data BYTEA NOT NULL,
PRIMARY KEY(prefix, key)
);
";

/// Configuration to connect to a Postgres backed implementation of [Blob].
#[derive(Clone, Debug)]
pub struct PostgresBlobConfig {
url: String,
prefix: String,
}

impl PostgresBlobConfig {
/// Returns a new [PostgresBlobConfig] for use in production.
pub fn new(url: String, prefix: String) -> Result<Self, Error> {
Ok(PostgresBlobConfig { url, prefix })
}

/// Returns a new [PostgresBlobConfig] for use in unit tests.
///
/// By default, persist tests that use external storage (like Postgres) are
/// no-ops so that `cargo test` works on new environments without any
/// configuration. To activate the tests for [PostgresBlob] set the
/// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL` environment variable
/// with a valid connection url [1].
///
/// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
pub fn new_for_test() -> Result<Option<Self>, Error> {
let url = match std::env::var(PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL) {
Ok(url) => url,
Err(_) => {
if mz_ore::env::is_var_truthy("CI") {
panic!("CI is supposed to run this test but something has gone wrong!");
}
return Ok(None);
}
};

// Give each test a unique prefix so they don't conflict. WIP delete
// data somehow.
let prefix = uuid::Uuid::new_v4().to_string();
let config = PostgresBlobConfig { url, prefix };
Ok(Some(config))
}
}

/// Implementation of [Blob] over a Postgres database.
pub struct PostgresBlob {
prefix: String,
client: tokio_postgres::Client,
_handle: JoinHandle<()>,
}

impl std::fmt::Debug for PostgresBlob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresBlob").finish_non_exhaustive()
}
}

impl PostgresBlob {
/// Open a Postgres [Blob] instance with `config`, for the collection
/// named `shard`.
pub async fn open(config: PostgresBlobConfig) -> Result<Self, ExternalError> {
let pg_config: tokio_postgres::Config = config.url.parse()?;
let tls = make_tls(&pg_config)?;
let (mut client, conn) = tokio_postgres::connect(&config.url, tls)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would certainly need to be able to reconnect, so should either the actual connection and checking in the Blob impl functions, or else use deadpool or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I kinda felt like maybe it should share the pool with PostgresConsensus or something? didn't want to deal thinking through the tradeoffs for a demo, so just did the obvious thing

.await
.with_context(|| "error connecting to postgres".to_owned())?;
let handle = mz_ore::task::spawn(|| "pg_consensus_client", async move {
if let Err(e) = conn.await {
tracing::error!("connection error: {}", e);
}
});

let tx = client.transaction().await?;
tx.batch_execute(BLOB_SCHEMA).await?;
tx.commit().await?;
Ok(PostgresBlob {
prefix: config.prefix,
client,
_handle: handle,
})
}
}

#[async_trait]
impl Blob for PostgresBlob {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, ExternalError> {
const Q: &str = "SELECT data FROM blob WHERE prefix = $1 AND key = $2";
let row = self.client.query_opt(&*Q, &[&self.prefix, &key]).await?;
let row = match row {
None => return Ok(None),
Some(row) => row,
};

let data: Vec<u8> = row.try_get("data")?;
Ok(Some(data))
}

async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
const Q: &str = "SELECT key, length(data) AS len FROM blob WHERE prefix = $1 AND key LIKE concat($2, '%')";
let rows = self.client.query(&*Q, &[&self.prefix, &key_prefix]).await?;
for row in rows {
let key: &str = row.try_get("key")?;
let data_len: i64 = row.try_get("len")?;
let size_in_bytes = u64::try_from(data_len).expect("len should not be negative");
f(BlobMetadata { key, size_in_bytes });
}
Ok(())
}

async fn set(&self, key: &str, value: Bytes, _atomic: Atomicity) -> Result<(), ExternalError> {
const Q: &str = "UPSERT INTO blob VALUES ($1, $2, $3)";
let _ = self
.client
.execute(&*Q, &[&self.prefix, &key, &value.as_ref()])
.await?;
Ok(())
}

async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
const Q: &str =
"DELETE FROM blob WHERE prefix = $1 AND key = $2 LIMIT 1 RETURNING length(data) AS len";
let row = self.client.query_opt(&*Q, &[&self.prefix, &key]).await?;
let row = match row {
None => return Ok(None),
Some(row) => row,
};

let data_len: i64 = row.try_get("len")?;
let data_len = u64::try_from(data_len).expect("len should not be negative");
Ok(Some(usize::cast_from(data_len)))
}
}

#[cfg(test)]
mod tests {
use crate::location::tests::consensus_impl_test;
use crate::location::tests::{blob_impl_test, consensus_impl_test};
use tracing::info;
use uuid::Uuid;

use super::*;

#[tokio::test(flavor = "multi_thread")]
async fn postgres_consensus() -> Result<(), ExternalError> {
mz_ore::test::init_logging();
let config = match PostgresConsensusConfig::new_for_test()? {
Some(config) => config,
None => {
Expand Down Expand Up @@ -536,4 +683,28 @@ mod tests {

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn postgres_blob() -> Result<(), ExternalError> {
mz_ore::test::init_logging();
let config = match PostgresBlobConfig::new_for_test()? {
Some(config) => config,
None => {
info!(
"{} env not set: skipping test that uses external service",
PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
);
return Ok(());
}
};

blob_impl_test(|path| {
let config = PostgresBlobConfig {
url: config.url.clone(),
prefix: format!("{}/{}", config.prefix, path),
};
PostgresBlob::open(config)
})
.await
}
}
3 changes: 2 additions & 1 deletion src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,8 @@ impl RunnerInner {
clusterd_image: "clusterd".into(),
init_container_image: None,
persist_location: PersistLocation {
blob_uri: format!("file://{}/persist/blob", temp_dir.path().display()),
// blob_uri: format!("file://{}/persist/blob", temp_dir.path().display()),
blob_uri: consensus_uri.clone(),
consensus_uri,
},
persist_clients,
Expand Down
Loading