-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathschema_test.py
243 lines (186 loc) · 10.1 KB
/
schema_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import time
import pytest
import logging
from cassandra.concurrent import execute_concurrent_with_args
from tools.assertions import assert_invalid, assert_all, assert_one, assert_none, assert_some
from dtest import Tester, create_ks, create_cf
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestSchema(Tester):
def test_table_alteration(self):
"""
Tests that table alters return as expected with many sstables at different schema points
"""
cluster = self.cluster
cluster.populate(1).start()
node1, = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("use ks;")
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
rows_to_insert = 50
for n in range(5):
parameters = [(x, 'aaa', 'bbb') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt1, parameters, concurrency=rows_to_insert)
node1.flush()
session.execute("alter table tbl_o_churn add c2 text")
session.execute("alter table tbl_o_churn drop c0")
stmt2 = session.prepare("insert into tbl_o_churn (id, c1, c2) values (?, ?, ?);")
for n in range(5, 10):
parameters = [(x, 'ccc', 'ddd') for x in range(n * rows_to_insert, (n * rows_to_insert) + rows_to_insert)]
execute_concurrent_with_args(session, stmt2, parameters, concurrency=rows_to_insert)
node1.flush()
rows = session.execute("select * from tbl_o_churn")
for row in rows:
if row.id < rows_to_insert * 5:
assert row.c1 == 'bbb'
assert row.c2 is None
assert not hasattr(row, 'c0')
else:
assert row.c1 == 'ccc'
assert row.c2 == 'ddd'
assert not hasattr(row, 'c0')
@since("2.0", max_version="3.X") # Compact Storage
def test_drop_column_compact(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")
assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
def test_drop_column_compaction(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
node = self.cluster.nodelist()[0]
node.flush()
node.compact()
# test that c1 values have been compacted away.
session = self.patient_cql_connection(node)
assert_all(session, "SELECT c1 FROM ks.cf", [[None], [None], [None], [4]], ignore_order=True)
def test_drop_column_queries(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int)")
session.execute("CREATE INDEX ON cf(c2)")
# insert some data.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (0, 1, 2)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (1, 2, 3)")
session.execute("INSERT INTO cf (key, c1, c2) VALUES (2, 3, 4)")
# drop and readd c1.
session.execute("ALTER TABLE cf DROP c1")
session.execute("ALTER TABLE cf ADD c1 int")
# add another row.
session.execute("INSERT INTO cf (key, c1, c2) VALUES (3, 4, 5)")
# test that old (pre-drop) c1 values aren't returned and new ones are.
assert_all(session, "SELECT c1 FROM cf", [[None], [None], [None], [4]], ignore_order=True)
assert_all(session, "SELECT * FROM cf", [[0, None, 2], [1, None, 3], [2, None, 4], [3, 4, 5]], ignore_order=True)
assert_one(session, "SELECT c1 FROM cf WHERE key = 0", [None])
assert_one(session, "SELECT c1 FROM cf WHERE key = 3", [4])
assert_one(session, "SELECT * FROM cf WHERE c2 = 2", [0, None, 2])
assert_one(session, "SELECT * FROM cf WHERE c2 = 5", [3, 4, 5])
def test_drop_column_and_restart(self):
"""
Simply insert data in a table, drop a column involved in the insert and restart the node afterwards.
This ensures that the dropped_columns system table is properly flushed on the alter or the restart
fails as in CASSANDRA-11050.
@jira_ticket CASSANDRA-11050
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE t (k int PRIMARY KEY, c1 int, c2 int)")
session.execute("INSERT INTO t (k, c1, c2) VALUES (0, 0, 0)")
session.execute("ALTER TABLE t DROP c2")
assert_one(session, "SELECT * FROM t", [0, 0])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_one(session, "SELECT * FROM t", [0, 0])
def test_drop_static_column_and_restart(self):
"""
Dropping a static column caused an sstable corrupt exception after restarting, here
we test that we can drop a static column and restart safely.
@jira_ticket CASSANDRA-12582
"""
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE ts (id1 int, id2 int, id3 int static, val text, PRIMARY KEY (id1, id2))")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 1, 0, 'v1')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (1, 2, 0, 'v2')")
session.execute("INSERT INTO ts (id1, id2, id3, val) VALUES (2, 1, 1, 'v3')")
self.cluster.nodelist()[0].nodetool('flush ks ts')
assert_all(session, "SELECT * FROM ts", [[1, 1, 0, 'v1'], [1, 2, 0, 'v2'], [2, 1, 1, 'v3']])
session.execute("alter table ts drop id3")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
self.cluster.stop()
self.cluster.start()
session = self.patient_cql_connection(self.cluster.nodelist()[0])
session.execute("USE ks")
assert_all(session, "SELECT * FROM ts", [[1, 1, 'v1'], [1, 2, 'v2'], [2, 1, 'v3']])
@since('3.0')
def drop_table_reflected_in_size_estimates_test(self):
"""
A dropped table should result in its entries being removed from size estimates, on both
nodes that are up and down at the time of the drop.
@jira_ticket CASSANDRA-14905
"""
cluster = self.cluster
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
create_ks(session, 'ks1', 2)
create_ks(session, 'ks2', 2)
create_cf(session, 'ks1.cf1', columns={'c1': 'text', 'c2': 'text'})
create_cf(session, 'ks2.cf1', columns={'c1': 'text', 'c2': 'text'})
create_cf(session, 'ks2.cf2', columns={'c1': 'text', 'c2': 'text'})
node1.nodetool('refreshsizeestimates')
node2.nodetool('refreshsizeestimates')
node2.stop()
session.execute('DROP TABLE ks2.cf1')
session.execute('DROP KEYSPACE ks1')
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
session.cluster.control_connection.wait_for_schema_agreement()
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks1'")
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf1'")
assert_some(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf2'")
assert_none(session2, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks1'")
assert_none(session2, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf1'")
assert_some(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='ks2' AND table_name='cf2'")
@since('3.0')
def invalid_entries_removed_from_size_estimates_on_restart_test(self):
"""
Entries for dropped tables/keyspaces should be cleared from size_estimates on restart.
@jira_ticket CASSANDRA-14905
"""
cluster = self.cluster
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
session.execute("USE system;")
session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ( 'system_auth', 'bad_table', '-5', '5', 0, 0);")
# Invalid keyspace and table
session.execute("INSERT INTO size_estimates (keyspace_name, table_name, range_start, range_end, mean_partition_size, partitions_count) VALUES ( 'bad_keyspace', 'bad_table', '-5', '5', 0, 0);")
node.stop()
node.start()
session = self.patient_cql_connection(node)
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='system_auth' AND table_name='bad_table'")
assert_none(session, "SELECT * FROM system.size_estimates WHERE keyspace_name='bad_keyspace'")
def prepare(self):
cluster = self.cluster
cluster.populate(1).start()
time.sleep(.5)
nodes = cluster.nodelist()
session = self.patient_cql_connection(nodes[0])
create_ks(session, 'ks', 1)
return session