Skip to content

Commit

Permalink
Merge pull request #22542 from def-/pr-pw-backup-restore
Browse files Browse the repository at this point in the history
parallel-workload: Backup&Restore, multiple DBs, less locking
  • Loading branch information
def- authored Oct 30, 2023
2 parents 9330b90 + a1ba253 commit fac720b
Show file tree
Hide file tree
Showing 11 changed files with 868 additions and 455 deletions.
27 changes: 18 additions & 9 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ steps:
- id: parallel-workload-dml
label: "Parallel Workload (DML)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
Expand All @@ -808,7 +808,7 @@ steps:
- id: parallel-workload-ddl
label: "Parallel Workload (DDL)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
Expand All @@ -819,19 +819,18 @@ steps:
- id: parallel-workload-100-threads
label: "Parallel Workload (100 threads)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-workload
args: [--runtime=1500, --threads=100]
skip: "TODO(def-): Reenable when #21954 is fixed"

- id: parallel-workload-rename-naughty
label: "Parallel Workload (rename + naughty identifiers)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
Expand All @@ -842,7 +841,7 @@ steps:
- id: parallel-workload-rename
label: "Parallel Workload (rename)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
Expand All @@ -853,7 +852,7 @@ steps:
- id: parallel-workload-cancel
label: "Parallel Workload (cancel)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
Expand All @@ -864,14 +863,24 @@ steps:
- id: parallel-workload-kill
label: "Parallel Workload (kill)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 40
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-workload
args: [--runtime=1500, --scenario=kill]
skip: "TODO(def-) Enable after figuring out restoring catalog"

- id: parallel-workload-backup-restore
label: "Parallel Workload (backup & restore)"
artifact_paths: [junit_*.xml, parallel-workload-queries.log.zst]
timeout_in_minutes: 60
agents:
queue: builder-linux-x86_64
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-workload
args: [--runtime=1500, --scenario=backup-restore, --naughty-identifiers]

- id: incident-70
label: "Test for incident 70"
Expand Down
19 changes: 11 additions & 8 deletions misc/python/materialize/data_ingest/data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def random_value(
record_size: RecordSize = RecordSize.LARGE,
in_query: bool = False,
) -> Any:
return random.choice((True, False))
return rng.choice((True, False))

@staticmethod
def name(backend: Backend = Backend.POSTGRES) -> str:
Expand Down Expand Up @@ -231,7 +231,7 @@ def random_value(
if rng.randrange(10) == 0:
result = rng.choice(
[
# "NULL", # TODO: Reenable after #21937 is fixed
"NULL",
"0.0",
"True",
# "",
Expand All @@ -243,13 +243,13 @@ def random_value(
# chars = string.printable
chars = string.ascii_letters + string.digits
if record_size == RecordSize.TINY:
result = random.choice(("foo", "bar", "baz"))
result = rng.choice(("foo", "bar", "baz"))
elif record_size == RecordSize.SMALL:
result = "".join(random.choice(chars) for _ in range(3))
result = "".join(rng.choice(chars) for _ in range(3))
elif record_size == RecordSize.MEDIUM:
result = "".join(random.choice(chars) for _ in range(10))
result = "".join(rng.choice(chars) for _ in range(10))
elif record_size == RecordSize.LARGE:
result = "".join(random.choice(chars) for _ in range(100))
result = "".join(rng.choice(chars) for _ in range(100))
else:
raise ValueError(f"Unexpected record size {record_size}")

Expand Down Expand Up @@ -357,10 +357,13 @@ def numeric_value(num: int, in_query: bool = False) -> Any:
return f"'{values_str}'::map[text=>text]" if in_query else values_str


DATA_TYPES = list(all_subclasses(DataType))
# Sort to keep determinism for reproducible runs with specific seed
DATA_TYPES = sorted(list(all_subclasses(DataType)), key=repr)

# fastavro._schema_common.UnknownType: record
# bytea requires Python bytes type instead of str
DATA_TYPES_FOR_AVRO = list(set(DATA_TYPES) - {TextTextMap, Jsonb, Bytea, Boolean})
DATA_TYPES_FOR_AVRO = sorted(
list(set(DATA_TYPES) - {TextTextMap, Jsonb, Bytea, Boolean}), key=repr
)

NUMBER_TYPES = [SmallInt, Int, Long, Float, Double]
29 changes: 15 additions & 14 deletions misc/python/materialize/data_ingest/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from materialize.data_ingest.data_type import Backend
from materialize.data_ingest.field import Field, formatted_value
from materialize.data_ingest.query_error import QueryError
from materialize.data_ingest.row import Operation
from materialize.data_ingest.transaction import Transaction

Expand Down Expand Up @@ -76,9 +77,9 @@ def execute(self, cur: pg8000.Cursor, query: str) -> None:
self.reconnect()
with self.mz_conn.cursor() as cur:
self.execute(cur, query)
except:
print(f"Query failed: {query}")
raise
except Exception as e:
print(f"Query failed: {query} {e}")
raise QueryError(str(e), query)

def execute_with_retry_on_error(
self,
Expand Down Expand Up @@ -217,10 +218,10 @@ def create(self) -> None:
with self.mz_conn.cursor() as cur:
self.execute(
cur,
f"""CREATE SOURCE {identifier(self.schema)}.{identifier(self.table)}
FROM KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
FROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC '{self.topic}')
FORMAT AVRO
USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
USING CONFLUENT SCHEMA REGISTRY CONNECTION materialize.public.csr_conn
ENVELOPE UPSERT""",
)
self.mz_conn.autocommit = False
Expand Down Expand Up @@ -333,7 +334,7 @@ def create(self) -> None:
)
self.execute(
cur,
f"""CREATE SOURCE {identifier(self.schema)}.{identifier(self.source)}
f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.source)}
FROM POSTGRES CONNECTION pg{self.num} (PUBLICATION 'postgres_source')
FOR TABLES ({identifier(self.table)} AS {identifier(self.table)})""",
)
Expand Down Expand Up @@ -425,13 +426,13 @@ def create(self) -> None:
self.execute(cur, f"DROP TABLE IF EXISTS {identifier(self.table_original)}")
self.execute(
cur,
f"""CREATE TABLE {identifier(self.schema)}.{identifier(self.table_original)} (
f"""CREATE TABLE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)} (
{", ".join(values)},
PRIMARY KEY ({", ".join(keys)}));""",
)
self.execute(
cur,
f"""CREATE SINK {identifier(self.schema)}.sink{self.num} FROM {identifier(self.table_original)}
f"""CREATE SINK {identifier(self.database)}.{identifier(self.schema)}.sink{self.num} FROM {identifier(self.table_original)}
INTO KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
KEY ({", ".join([identifier(key) for key in keys])})
FORMAT AVRO
Expand All @@ -440,7 +441,7 @@ def create(self) -> None:
)
self.execute_with_retry_on_error(
cur,
f"""CREATE SOURCE {identifier(self.schema)}.{identifier(self.table)}
f"""CREATE SOURCE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table)}
FROM KAFKA CONNECTION kafka_conn (TOPIC '{self.topic}')
FORMAT AVRO
USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
Expand All @@ -466,7 +467,7 @@ def run(self, transaction: Transaction) -> None:
)
self.execute(
cur,
f"""INSERT INTO {identifier(self.schema)}.{identifier(self.table_original)}
f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
VALUES ({values_str})
""",
)
Expand All @@ -492,7 +493,7 @@ def run(self, transaction: Transaction) -> None:
self.mz_conn.autocommit = True
self.execute(
cur,
f"""UPDATE {identifier(self.schema)}.{identifier(self.table_original)}
f"""UPDATE {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
SET {set_str}
WHERE {cond_str}
""",
Expand All @@ -504,7 +505,7 @@ def run(self, transaction: Transaction) -> None:
)
self.execute(
cur,
f"""INSERT INTO {identifier(self.schema)}.{identifier(self.table_original)}
f"""INSERT INTO {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
VALUES ({values_str})
""",
)
Expand All @@ -518,7 +519,7 @@ def run(self, transaction: Transaction) -> None:
self.mz_conn.autocommit = True
self.execute(
cur,
f"""DELETE FROM {identifier(self.schema)}.{identifier(self.table_original)}
f"""DELETE FROM {identifier(self.database)}.{identifier(self.schema)}.{identifier(self.table_original)}
WHERE {cond_str}
""",
)
Expand Down
17 changes: 17 additions & 0 deletions misc/python/materialize/data_ingest/query_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


class QueryError(Exception):
msg: str
query: str

def __init__(self, msg: str, query: str):
self.msg = msg
self.query = query
Loading

0 comments on commit fac720b

Please sign in to comment.