diff --git a/pychunkedgraph/graph/attributes.py b/pychunkedgraph/graph/attributes.py index ea03d2216..b58a6f0f8 100644 --- a/pychunkedgraph/graph/attributes.py +++ b/pychunkedgraph/graph/attributes.py @@ -106,7 +106,7 @@ class Connectivity: L2CrossChunkEdge = _AttributeArray( pattern=b"l2_cross_edge_%d", - family_id="3", + family_id="4", serializer=serializers.NumPyArray( dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22 ), @@ -114,13 +114,13 @@ class Connectivity: FakeEdges = _Attribute( key=b"fake_edges", - family_id="3", + family_id="4", serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID, shape=(-1, 2)), ) CrossChunkEdge = _AttributeArray( pattern=b"atomic_cross_edges_%d", - family_id="4", + family_id="3", serializer=serializers.NumPyArray( dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22 ), diff --git a/pychunkedgraph/graph/client/bigtable/client.py b/pychunkedgraph/graph/client/bigtable/client.py index 486cbdd73..19a08b9a8 100644 --- a/pychunkedgraph/graph/client/bigtable/client.py +++ b/pychunkedgraph/graph/client/bigtable/client.py @@ -636,9 +636,9 @@ def _create_column_families(self): f.create() f = self._table.column_family("2") f.create() - f = self._table.column_family("3") + f = self._table.column_family("3", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1))) f.create() - f = self._table.column_family("4", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1))) + f = self._table.column_family("4") f.create() def _get_ids_range(self, key: bytes, size: int) -> typing.Tuple: diff --git a/pychunkedgraph/ingest/create/atomic_layer.py b/pychunkedgraph/ingest/create/atomic_layer.py index d87638b26..a59bc9f20 100644 --- a/pychunkedgraph/ingest/create/atomic_layer.py +++ b/pychunkedgraph/ingest/create/atomic_layer.py @@ -101,7 +101,13 @@ def _get_remapping(chunk_edges_d: dict): def _process_component( - cg, chunk_edges_d, parent_id, node_ids, sparse_indices, remapping, time_stamp, + cg, + chunk_edges_d, + parent_id, + node_ids, + sparse_indices, + remapping, + time_stamp, ): nodes = [] chunk_out_edges = [] # out = between + cross @@ -145,3 +151,49 @@ def _get_outgoing_edges(node_id, chunk_edges_d, sparse_indices, remapping): # edges that this node is part of chunk_out_edges = np.concatenate([chunk_out_edges, edges[row_ids]]) return chunk_out_edges + + +def postprocess_atomic_chunk( + cg: ChunkedGraph, + chunk_coord: np.ndarray, + time_stamp: Optional[datetime.datetime] = None, +): + time_stamp = get_valid_timestamp(time_stamp) + + chunk_id = cg.get_chunk_id( + layer=2, x=chunk_coord[0], y=chunk_coord[1], z=chunk_coord[2] + ) + + properties = [ + attributes.Connectivity.CrossChunkEdge[l] for l in range(2, cg.meta.layer_count) + ] + + chunk_rr = cg.range_read_chunk( + chunk_id, properties=properties, time_stamp=time_stamp + ) + + result = {} + for l2id, raw_cx_edges in chunk_rr.items(): + try: + cx_edges = { + prop.index: val[0].value.copy() for prop, val in raw_cx_edges.items() + } + result[l2id] = cx_edges + except KeyError: + continue + + nodes = [] + val_dicts = [] + for l2id, cx_edges in result.items(): + val_dict = {} + for layer, edges in cx_edges.items(): + l2_edges = np.zeros_like(edges) + l2_edges[:, 0] = l2id + l2_edges[:, 1] = cg.get_parents(edges[:, 1]) + col = attributes.Connectivity.L2CrossChunkEdge[layer] + val_dict[col] = np.unique(l2_edges, axis=0) + val_dicts.append(val_dict) + + r_key = serializers.serialize_uint64(l2id) + nodes.append(cg.client.mutate_row(r_key, val_dict, time_stamp=time_stamp)) + cg.client.write(nodes)