diff --git a/src/persist-types/src/schema.rs b/src/persist-types/src/schema.rs index 83b35f0a3604d..eece78663f5ee 100644 --- a/src/persist-types/src/schema.rs +++ b/src/persist-types/src/schema.rs @@ -139,10 +139,12 @@ impl StructArrayMigration { *fields = f.finish().fields; } DropField { name } => { - let (idx, _) = fields + let (idx, field) = fields .find(name) .unwrap_or_else(|| panic!("expected to find field {} in {:?}", name, fields)); - arrays.remove(idx); + let array = arrays.remove(idx); + // Defensive check to make sure we removed the correct array. + assert_eq!(array.data_type(), field.data_type()); let mut f = SchemaBuilder::from(&*fields); f.remove(idx); *fields = f.finish().fields; diff --git a/src/repr/src/row/encoding2.rs b/src/repr/src/row/encoding2.rs index 57158fb321c03..c0f445fcddaf4 100644 --- a/src/repr/src/row/encoding2.rs +++ b/src/repr/src/row/encoding2.rs @@ -1167,6 +1167,13 @@ impl RowColumnarDecoder { nullability: col.logical_nulls(), }) } + + /// Returns if all of the resulting [`Row`]s would be composed entirely of [`Datum::Null`]. + pub fn all_null(&self) -> bool { + self.decoders + .iter() + .all(|(_name, null_count, _decoder)| *null_count == Some(self.len)) + } } impl ColumnDecoder for RowColumnarDecoder { diff --git a/src/storage-types/proptest-regressions/sources.txt b/src/storage-types/proptest-regressions/sources.txt index f9f351989c047..7f4f7f83684f7 100644 --- a/src/storage-types/proptest-regressions/sources.txt +++ b/src/storage-types/proptest-regressions/sources.txt @@ -6,3 +6,5 @@ # everyone who runs the test benefits from these saved cases. cc d1ea0360ec508a3282786c6c4c1dec3b9fcfeab508f358b779c4d3d93ed70cc0 # shrinks to (old, diffs, datas) = (RelationDesc { typ: RelationType { column_types: [ColumnType { scalar_type: List { element_type: Char { length: None }, custom_id: Some(User(61546888405338325)) }, nullable: false }, ColumnType { scalar_type: Time, nullable: false }, ColumnType { scalar_type: VarChar { max_length: Some(VarCharMaxLength(178)) }, nullable: false }, ColumnType { scalar_type: Jsonb, nullable: true }], keys: [] }, names: [ColumnName("N"), ColumnName("^`hMRqf"), ColumnName("`xI]zZmwxNuE_]f"), ColumnName("zKI_9")] }, [AddColumn { name: ColumnName("YC"), typ: ColumnType { scalar_type: Char { length: Some(CharLength(52)) }, nullable: false } }, AddColumn { name: ColumnName("RTJBRC]DHW"), typ: ColumnType { scalar_type: Int64, nullable: false } }, AddColumn { name: ColumnName("`sLRisp"), typ: ColumnType { scalar_type: Record { fields: [(ColumnName("LuU`a"), ColumnType { scalar_type: String, nullable: true }), (ColumnName("X]O]A`LBcd"), ColumnType { scalar_type: PgLegacyName, nullable: false }), (ColumnName("]j`O]J"), ColumnType { scalar_type: UInt64, nullable: false })], custom_id: Some(Transient(11897034714846114187)) }, nullable: false } }, AddColumn { name: ColumnName("y^Ckw"), typ: ColumnType { scalar_type: List { element_type: MzTimestamp, custom_id: None }, nullable: false } }, AddColumn { name: ColumnName("gX\u{36455}oeZh"), typ: ColumnType { scalar_type: Timestamp { precision: Some(TimestampPrecision(101)) }, nullable: true } }, AddColumn { name: ColumnName("QT"), typ: ColumnType { scalar_type: List { element_type: VarChar { max_length: Some(VarCharMaxLength(293)) }, custom_id: None }, nullable: false } }, AddColumn { name: ColumnName("JbbuiQ"), typ: ColumnType { scalar_type: Record { fields: [(ColumnName("bJBkZx"), ColumnType { scalar_type: Array(VarChar { max_length: Some(VarCharMaxLength(46)) }), nullable: true }), (ColumnName("KQleY]Ka"), ColumnType { scalar_type: Float32, nullable: false }), (ColumnName("\u{50665}OunQGW"), ColumnType { scalar_type: Float32, nullable: false }), (ColumnName("yNO"), ColumnType { scalar_type: MzTimestamp, nullable: true }), (ColumnName("lJENw"), ColumnType { scalar_type: AclItem, nullable: false }), (ColumnName("I"), ColumnType { scalar_type: RegType, nullable: false })], custom_id: None }, nullable: false } }, MovePosition { name: ColumnName("^`hMRqf"), new_pos: 2 }, MovePosition { name: ColumnName("zKI_9"), new_pos: 0 }, ToggleNullability { name: ColumnName("`xI]zZmwxNuE_]f") }, ToggleNullability { name: ColumnName("zKI_9") }, ChangeType { name: ColumnName("`xI]zZmwxNuE_]f"), typ: ColumnType { scalar_type: RegProc, nullable: true } }], [SourceData(Ok(Row{[List([String("$\u{7d58c}\0�/J[y\u{3e716}{\u{4bf2e}>\t&U?Ⱥ5\u{7fe1d}Cu\u{b82f4}¥"), String("\u{f683e}*'¥¥\u{80}:/"), String("\u{6cc5a}\u{b5fd9}`𨚆<\u{1b}"), String("="), String(":\t𥕯$\t/r\u{ea63e}png\u{87bbd}H*"), String(".2�:\0`.\"{>\r\u{202e}\u{4}\u{8}\u{33369}\t\u{2}/\u{b8710}Y\u{1b}*𦐪\t\u{10cfd5}\u{202e}\u{41b44}\u{e9119}/�\u{ebad}¥"), String("\u{9cd3d}MȺh+'Y/\u{db4bf}\u{b}\u{33402}V�"), String("s|l\u{10206}`[\0\"¥\u{1038ba}h"), String("\0?/\u{e568b}A𘘠\t"), String("Ѩ\u{ad8b4}\u{7f9dd}\u{feff}%\tD\r\u{7f}|\r/<¥iÐ\u{34081}:\\D{6\u{1b}'4\u{3dbca}\\Ⱥ&o\\"), String("\u{367d5}Yt\u{202e}4\u{202e}@<🕴\u{10303b}\u{1b}\u{64393}*�\u{b9b81}?>]'M\u{7f}{Ѩ\u{93645}\u{d8cc4}%\u{d6f51}\u{1b}U"), String("\u{5}{\u{feff}\u{484de}\\\u{6bc9a}s\u{7}\"\u{a348b}🕴{\u{7658b}.="), String("\u{4d79e}-4\u{b8622}\u{669b6}YG\u{1b}û\u{9a3ed}\\\u{7f}=_1qȺ6!i\t\t\u{a50f1}\t$\t\""), String("=y,\u{81388}Ⱥ<Ѩ'\u{b}\u{153d0}{\u{cccd0}N\u{1b}\u{fac9d}!$\\Ѩ�/\u{202e}齬\u{6ae43}\u{9e}\rm\u{b}"), Null, String("{"), String("\u{57820}\u{a99d9}+cKJ{&`°?\u{ddf69}\u{6ff79}Ñz%8{.\r¥{\u{45fb9}"), String(":\u{1b}y\""), String("\u{69c27}I*¥\rÄ=*🕴\u{c8a58}'$%\u{dc2a9}W5&\u{6fb8e}🕴.c."), String("\u{15776}\u{fef88}\u{b}�w3&&\t3🕴\u{8e13a}?Ѩ\0\r"), String("\u{ad}🕴Ⱥ\u{b5f20}\rѨ\"\u{1cd11}0 @Rsz\u{a5e31}\u{f8ffb}"), String("=\u{1db9a}\u{6ef61}."), String("Ѩ\"K\u{6f47a}q*R\u{811b6}\u{8f7b4}%~:\u{b0be8}'"), String("\u{90e8c}\rx]/:z\u{ee3ed}p\u{9bb30}\u{9e}\u{1b}\u{7f}\u{202e}\u{bd6a0}?\u{3}\0&\u{7f}\u{e3851}\u{d8eab}z\u{40a88}T\u{392ff}Ò\u{feff}{\u{1}\u{4}\u{4}"), String("`]\u{202e}\u{1}tj´@\u{d8901}\u{c8325}/$J\t\u{8f79b}�\u{86d3d}v\u{646a5}\u{feff}[\u{4}\u{1}.Ⱥ'"), String("gk`9\u{6cae0}\0&*2$\u{bdb7f}t*I\u{b}[fT\u{5a361}\u{4}ᅥ\r\u{1}\t\u{7}¥\u{8dbd7}*¥.à"), String("§Ⱥ'fG\u{954bc}\u{3e37d}\\%\u{7f}{."), Null, String("A7\u{7}\u{7f}¥\u{3}%D\"V\u{dde6c}"), String("\u{cb14f}\u{2}Ï"), String("\u{202e}:\u{abfd1}🕴/\u{1b}{&o\05\u{b}O\u{ffad8}=\u{c64d1}�\u{61577}\u{10e1f4}🕴\u{b}"), String("\u{d8e6a}`%\u{16c48}\r�kd\u{babb8}\u{1}[f=d\u{15af5}=W="), String("\u{b}}\u{5}\r{B\u{1b}"), String("\tK\u{6}.`=\t\0`*&(\0\u{520e6}\u{92358}?\t\u{1}N\u{1b}\u{919f0}/\u{8c}.%\u{7f}`&\u{e0c6e}%\r"), String("¥\u{202e}E\u{b}\u{1c33e}🕴\u{f365a}=🕴\u{c8b93}\u{2}Ѩ🕴?%\u{352da}\u{7abb1}\"\u{73241}"), String("M\u{74b90}\u{feff}\u{1b}kg�\u{2f70d}1?\u{1b}?\u{8d0a0}\u{3}\u{feff}\u{5eef2}�\0<"), String("x\u{945f5}\\𧻪6\\I\u{6f85b}\u{e7d12}"), String("\u{95}\u{84}𭆜?\u{7f}e\r\u{7db87}\t\tP\u{64514}&J㰶a\t?\t¥\u{e5be9}\u{68ebb}\u{5}&\u{b}\u{e3f49}"), String("<\0𪐊%\u{5607d}'\u{5}Äd\u{3af82}:Ѩ{"), String("�\u{202e}%9\u{202e}/PѨz¥\u{7b44b}l"), String("\u{7f}틭¥`:🕴\u{6b856}$𩌑\0\t"), String("{{🕴�\u{202e})"), String("�?:':"), Null, String(",E\u{1b}\0\u{7f}\0\u{98}\u{4}ȺȺ%*)&:\u{8e}뱴\u{b}\u{1086ae}\t.<"), String("n"), String("\\?Ѩ=7\ts\rH#.y'#\u{7f}¥C\u{2}")]), Time(17:35:04.184552), String("\\\u{4f49f}Ѩ\u{7b23f}g\u{46dd4}\n𥃧;k:.%<\u{cc0b9}\nI`"), Null]})), SourceData(Ok(Row{[List([String("\u{e5390}`J\u{d1d4e}\u{1}A\u{bebad}D\u{1b}$.꤉*@\u{1b}*"), String("\u{7f}$*\u{528dd}\u{1b}`\u{f13c7}\u{41124}W&>"), String("\u{3}/%\u{b}\u{4d199}\u{6ae78}awk\"¥'\u{db1e4}*U.Ⱥ\u{4}q\u{6649d}?"), String("\u{10592f}\u{78162}\u{869ca}&\u{fcef6}/'¥:Ö*\u{90a1b}=\u{87}\u{1a75a}\u{1b}=\u{6d7fb}\u{6}gȺ"), String("\u{202e}\u{202e}Ⱥ\u{b5703}=\t"), String("\\\r\r.🕴<(\0`\0ó=\u{4f001}\u{5}\u{feff}=<:\u{8c}\t\u{3af84}*/O\u{3a7a0}'\r"), String("\u{afc86}\u{db4e5}<[\u{9a5ce}\u{d672c}{.=𦎔1\u{3}$`\u{8}\u{10ab65}`\t¥"), String("\0/\u{beeaf}\u{1034a5}0Ⱥ~𤷞\u{52a9f}E\r%2|\u{b}?\r\u{94262}q$w\u{202e}:"), String("@{8.G%~HM::&$\u{3522d}\u{98917}\\C\u{b0406}\r&𫪰\r¥\u{b}"), String("=È&\u{7f}<\u{1a1d8}\u{b}ꔟ\t_`"), String("%!x:\u{5e00d}:?\r\u{da62a}\u{43bda}\\"), String("\u{102bdd}e$i\u{f8654}\0\u{865d0}#Ѩ\0*=$"), String(":🕴?=🕴.@n\u{6072d}:\0'𢡣𮌵?Ⱥ|>ì\u{a7c16}="), String("\\"), String("\u{2}U\0\u{1b}*\u{473b2}l\u{a401e}\u{62c52}\u{1}I矍\\<\u{cb94a}\u{327c6}\\\u{7f}\u{a3ed5}\u{a69f3}^ 𪩂\u{46151}?h\u{2}"), String("`K=?7�\0\\{\u{a3df5}í\r&`\u{ca5da}\u{6d306}?"), String("u&`\u{d2e4c}\u{6589a}\u{1b}\r\u{45d92}Ѩ*:%Ѩ\u{16326}\\\u{3}=\u{1b}\u{61641}\u{368f4}&�$\u{5}\u{202e}"), String("$/\u{202e}*\rÔ=Ⱥf\u{7f}d\"\u{8}Ky\u{c5254}\u{ab2ce}\u{41f39}&£\u{94a6e}.\u{b8fe7}':?\0"), String("\u{9d}�b\\\u{1b}\u{202e}\rȺ\\\r<\0`C$uȺ\u{15a1f}.\u{7492e}�1\u{b}?\u{7f}I"), String("𤱯\u{1b}/ㄶ\u{a8c90}"), String("\\èѨ.Öy*\u{81de2}\u{b}`\rE*aȺ\u{8}$Ѩ\u{1b}\u{feff}"), String("(\u{3}$%/\u{1}`&*{\u{6c9ba}🕴\"c\u{ac2e9}'M\u{3d282}\u{5}\"'뮩"), String(":?i'%=\rZ\r"), Null, String("`\u{7f}{£\"¥¡`\t�:\u{b4f04}Ê\u{b}v?\u{6ecec}:\u{7}\u{b9efd}8ik(\u{b}\u{4}"), String("\0\r\u{578a6}.¥\u{7f}\"I`k\u{6088b}\u{84083}\\שln� for SourceDataColumnarDecoder { }; self.row_decoder.decode(idx, row); } - (true, true) => panic!("should have one of 'ok' or 'err'"), - (false, false) => panic!("cannot have both 'ok' and 'err'"), + (true, true) => panic!( + "should have one of 'ok' or 'err' @ {idx} \n\n {:?} \n\n {:?}", + self.row_decoder, self.err_decoder + ), + (false, false) => { + // When Persist migrates a `RelationDesc` with no columns to one with columns it + // replaces the single arrow::NullArray with an arrow::StructArray where all inner + // fields entirely Null. We sniff that scenario out here. + // + // TODO(parkmycar): Add more validation here once we have a versioned RelationDesc. + + let SourceDataRowColumnarDecoder::Row(decoder) = &self.row_decoder else { + panic!("found non-Row decoder when we have both 'ok' and 'err' values"); + }; + if !decoder.all_null() { + panic!("cannot have both 'ok' and 'err'"); + } + + let err = self.err_decoder.value(idx); + let err = ProtoDataflowError::decode(err) + .expect("proto should be valid") + .into_rust() + .expect("error should be valid"); + val.0 = Err(err); + } } } @@ -2003,6 +2026,68 @@ mod tests { }); } + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux` + fn migrated_source_data_roundtrips() { + fn test_case(old: RelationDesc, diffs: Vec, datas: Vec) { + let mut new = old.clone(); + for diff in diffs.into_iter() { + diff.apply(&mut new); + } + let old_data_type = get_data_type(&old); + let new_data_type = get_data_type(&new); + + let Some(migration) = backward_compatible(&old_data_type, &new_data_type) else { + // womp womp. + return; + }; + + // Encode data with our original schema. + let mut encoder = >::encoder(&old).unwrap(); + for data in &datas { + encoder.append(data); + } + + // Migrate the columnar data. + let col = encoder.finish(); + let col = migration.migrate(Arc::new(col)); + + // Make sure we can decode it with the new schema. + let decoder = >::decoder_any(&new, &col).unwrap(); + let mut rnd_data = SourceData(Ok(Row::default())); + for (idx, og_data) in datas.iter().enumerate() { + decoder.decode(idx, &mut rnd_data); + + // TODO(parkmycar): Add logic to make sure the migration did what we expect at + // the Row level. + match (&og_data, &rnd_data) { + (SourceData(Ok(_)), SourceData(Ok(_))) => (), + (SourceData(Err(og_err)), SourceData(Err(rnd_err))) => { + assert_eq!(og_err, rnd_err) + } + (og, rnd) => { + panic!("SourceData changed type during migration! {og:?}, {rnd:?}") + } + } + } + } + + let strat = any::() + .prop_flat_map(|desc| { + proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 0..8) + .no_shrink() + .prop_map(move |datas| (desc.clone(), datas)) + }) + .prop_flat_map(|(desc, datas)| { + arb_relation_desc_diff(&desc) + .prop_map(move |diffs| (desc.clone(), diffs, datas.clone())) + }); + + proptest!(|((desc, diffs, source_datas) in strat)| { + test_case(desc, diffs, source_datas); + }); + } + fn is_sorted(array: &dyn Array) -> bool { let Ok(cmp) = build_compare(array, array) else { // TODO: arrow v51.0.0 doesn't support comparing structs. When @@ -2017,6 +2102,22 @@ mod tests { .all(|(i, j)| cmp(i, j).is_le()) } + /// Returns if we should be able to migrate a [`RelationDesc`] with the + /// provided set of [`PropRelationDescDiff`]s. + fn should_be_able_to_migrate(diffs: &Vec) -> bool { + // TODO(parkmycar): As we iterate on schema migrations more things should become compatible. + diffs.iter().all(|diff| match diff { + // We only support adding nullable columns. + PropRelationDescDiff::AddColumn { + typ: ColumnType { nullable, .. }, + .. + } => *nullable, + // TODO(parkmycar): Re-enable DropColumn. + // PropRelationDescDiff::DropColumn { .. } => true, + _ => false, + }) + } + fn get_data_type(schema: &impl Schema2) -> arrow::datatypes::DataType { use mz_persist_types::columnar::ColumnEncoder; let array = Schema2::encoder(schema).expect("valid schema").finish(); @@ -2083,17 +2184,7 @@ mod tests { #[cfg_attr(miri, ignore)] fn backward_compatible_migrate_from_common() { fn test_case(old: RelationDesc, diffs: Vec, datas: Vec) { - // TODO(parkmycar): As we iterate on schema migrations more things should become compatible. - let should_be_compatible = diffs.iter().all(|diff| match diff { - // We only support adding nullable columns. - PropRelationDescDiff::AddColumn { - typ: ColumnType { nullable, .. }, - .. - } => *nullable, - // TODO(parkmycar): Re-enable DropColumn. - // PropRelationDescDiff::DropColumn { .. } => true, - _ => false, - }); + let should_be_compatible = should_be_able_to_migrate(&diffs); let mut new = old.clone(); for diff in diffs.into_iter() {