forked from couchbase/couchbase-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcbrestorewrapper
executable file
·299 lines (264 loc) · 13.1 KB
/
cbrestorewrapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python3
# -*-python-*-
import base64
import optparse
import os
import platform
import re
import json
import subprocess
import sys
import urllib.request, urllib.error, urllib.parse
"""Written by Daniel Owen [email protected] on 27 June 2014
Version 1.4 Last Updated 17 Sept 2014 (Ian McCloy)
This script is to be used to restore backups made using cbbackupwrapper.py script.
It is a wrapper to the cbrestore command that comes with Couchbase Server 2.5.1
The script uses the same number of processes for each bucket that were used to produce
original backup. You can specify a bucket to restore, if no bucket is specified then
all buckets will be restored. Note: The destination cluster must have the
buckets created first.
An example invocation is as follows:
python cbrestorewrapper.py ../backup/ http://127.0.0.1:8091 -u Administrator \
-p myPassword --path /opt/couchbbase/bin/ -v
This will restore all the buckets from ../backup onto cluster 127.0.0.1
Access to the cluster is authenticated using username=Administrator and
password=myPassword. Finally, cbrestore will be found in /opt/couchbase/bin
Run python cbrestorewrapper -h for more information."""
bucketList = []
processes = {}
bucket_target = ""
def opt_extra_help(parser, extra_defaults):
extra_help = "; ".join([f'{k}={extra_defaults[k][0]} ({ extra_defaults[k][1]})'
for k in sorted(extra_defaults.keys())])
group = optparse.OptionGroup(parser, "Available extra config parameters (-x)",
extra_help)
parser.add_option_group(group)
def opt_extra_defaults():
return {
"batch_max_size": (1000, "Transfer this # of documents per batch"),
"batch_max_bytes": (400000, "Transfer this # of bytes per batch"),
"cbb_max_mb": (100000, "Split backup file on destination cluster if it exceeds MB"),
"max_retry": (10, "Max number of sequential retries if transfer fails"),
"report": (5, "Number batches transferred before updating progress bar in console"),
"report_full": (2000, "Number batches transferred before emitting progress information in console"),
"recv_min_bytes": (4096, "Amount of bytes for every TCP/IP call transferred"),
"rehash": (0, "For value 1, rehash the partition id's of each item; \
this is needed when transferring data between clusters with different number of partitions, \
such as when transferring data from an OSX server to a non-OSX cluster"),
"conflict_resolve":(1, "By default, enable conflict resolution."),
"data_only": (0, "For value 1, only transfer data from a backup file or cluster"),
"design_doc_only": (0, "For value 1, transfer design documents only from a backup file or cluster"),
"seqno": (0, "By default, start seqno from beginning."),
"uncompress": (0, "For value 1, restore data in uncompressed mode"),
"backoff_cap": (10, "Max backoff time during rebalance period"),
"flow_control": (1, "For value 0, disable flow control to improve throughput"),
"dcp_consumer_queue_length": (1000,"A DCP client needs a queue for incoming documents/messages. A large length is more efficient, but memory proportional to length*avg. doc size. Below length 150, performance degrades significantly."),
}
def opt_parse_extra(extra, extra_defaults):
"""Convert an extra string (comma-separated key=val pairs) into
a dict, using default values from extra_defaults dict."""
extra_in = dict([(x[0], x[1]) for x in
[(kv + '=').split('=') for kv in
(extra or "").split(',')]])
for k, v in extra_in.items():
if k and not extra_defaults.get(k):
sys.exit(f'error: unknown extra option: {k}')
return dict([(k, float(extra_in.get(k, extra_defaults[k][0])))
for k in extra_defaults.keys()])
def argumentParsing():
usage = "usage: %prog BACKUPDIR CLUSTER OPTIONS"
parser = optparse.OptionParser(usage)
opt_extra_help(parser, opt_extra_defaults())
parser.add_option("-a", "--add", action="store_true", default=False,
help="""use add instead of set to not overwrite existing items in the destination""")
parser.add_option('-b', '--bucket-source', default='',
help='Specify the bucket to restore. Defaults to all buckets')
parser.add_option('-B', '--bucket-destination', default='',
help='Target bucket on destination cluster. Defaults to bucket-source name')
#This allows you to transfer to a bucket with a different name
#Only valid if --bucket-source is specified
parser.add_option('-u', '--username', default='Administrator',
help='REST username for source cluster or server node. Default is Administrator')
parser.add_option('-p', '--password', default='PASSWORD',
help='REST password for source cluster or server node. Defaults to PASSWORD')
parser.add_option("-s", "--ssl",
action="store_true", default=False,
help="Transfer data with SSL enabled")
parser.add_option('-v', '--verbose', action='store_true',
default=False, help='Enable verbose messaging')
parser.add_option('--path', default='.',
help='Specify the path to cbrestore. Defaults to current directory')
parser.add_option('--port', default='11210',
help='Specify the bucket port. Defaults to 11210')
parser.add_option("", "--from-date",
action="store", type="string", default=None,
help="""restore data from the date specified as yyyy-mm-dd. By default,
all data from the very beginning will be restored""")
parser.add_option("", "--to-date",
action="store", type="string", default=None,
help="""restore data till the date specified as yyyy-mm-dd. By default,
all data that are collected will be restored""")
parser.add_option('-x', '--extra', default=None,
help="""Provide extra, uncommon config parameters;
comma-separated key=val(,key=val)* pairs""")
options, rest = parser.parse_args()
if len(rest) != 2:
parser.print_help()
sys.exit("\nError: please provide both backup directory path and cluster IP.")
opt_parse_extra(options.extra, opt_extra_defaults())
return options, rest[0], rest[1]
# Get the buckets that exist on the cluster
def getBuckets(node, rest_port, username, password):
request = urllib.request.Request(
'http://' + node + ':' + rest_port + '/pools/default/buckets')
base64string = base64.encodebytes(f'{username}:{password}'.encode()).decode().replace('\n', '')
request.add_header('Authorization', f'Basic {base64string}')
try:
response = urllib.request.urlopen(request)
except:
print('Authorization failed. Please check username and password.')
sys.exit(1)
bucketsOnCluster = []
data = json.loads(response.read())
for item in data:
bucket = item['name']
bucketsOnCluster.append(bucket)
return bucketsOnCluster
def getVbucketsToRestore(path, bucket):
vBucketList = []
# for each file in the directory
files = os.listdir(path)
regex = re.compile(r'^(\d+)-(\d+)$')
cleaned_list = list(filter(regex.search, files))
return cleaned_list
if __name__ == '__main__':
# Parse the arguments given.
args, backupDir, cluster = argumentParsing()
restore_exe = 'cbrestore'
if platform.system() == "Windows":
restore_exe = 'cbrestore.exe'
# Remove any white-spaces from start and end of strings
backupDir = backupDir.strip()
path = args.path.strip()
# Check to see if root backup directory exists
if not os.path.isdir(backupDir):
print(f'\n\nThe directory {backupDir} does not exist')
print('Please enter a different backup directory\n')
sys.exit(1)
# Check to see if path is correct
if not os.path.isdir(path):
print('The path to cbrestore does not exist')
print('Please run with a different path')
sys.exit(1)
if not os.path.isfile(os.path.join(path, restore_exe)):
print(f'cbrestore could not be found in {path}')
sys.exit(1)
# Check to see if log directory exists if not create it
dir = os.path.join(backupDir, 'logs')
try:
os.stat(dir)
except:
try:
os.mkdir(dir)
except:
print(f'Error trying to create directory {dir}')
sys.exit(1)
# Separate out node and REST port
matchObj = re.match(r'^http://(.*):(\d+)$', cluster, re.I)
if matchObj:
node = matchObj.group(1)
rest = matchObj.group(2)
else:
print("Please enter the destination as http://hostname:port")
print("For example http://localhost:8091 or http://127.0.0.1:8091")
sys.exit(1)
# Check to see if restoring all buckets or just a specified bucket
if args.bucket_source == '':
if not args.bucket_destination == '':
print('please specify a bucket_source')
sys.exit(1)
bucketList = getBuckets(
node, rest, args.username, args.password)
else:
# Check that the bucket exists
if not args.bucket_destination == '':
bucket_target = args.bucket_destination
else:
bucket_target = args.bucket_source
for item in getBuckets(node, rest, args.username, args.password):
if item == bucket_target:
bucketList.append(bucket_target)
if len(bucketList) == 0:
print(f'Bucket {bucket_target} does not exist')
print('Please enter a different bucket')
sys.exit(1)
extra_options = ''
if args.extra:
extra_options = ' -x ' + args.extra
ssl_option = ''
if args.ssl:
ssl_option = ' -s '
add_option = ''
if args.add:
add_option = ' -a '
from_date_option = ''
if args.from_date:
from_date_option = ' --from-date=' + args.from_date
to_date_option = ''
if args.to_date:
to_date_option = ' --to-date=' + args.to_date
for bucket in bucketList:
if not args.bucket_destination == '':
vbucketList = getVbucketsToRestore(backupDir, args.bucket_source.strip())
if len(vbucketList) == 0:
print('Error reading source backup vBuckets for bucket', args.bucket_source.strip())
sys.exit(1)
else:
vbucketList = getVbucketsToRestore(backupDir, bucket)
if len(vbucketList) == 0:
print('Error reading source backup vBuckets for bucket', bucket)
sys.exit(1)
for vbuckets in vbucketList:
# Invoke cbrestore on each of the active vbuckets that reside on
# the node
if args.verbose:
print("vBucket: ", vbuckets)
if not args.bucket_destination == '':
command_line = '"' + os.path.join(path, restore_exe) + '"' + ' -v -t 1 -b ' \
+ args.bucket_source.strip() + ' -B ' + args.bucket_destination \
+ ' ' + '"' + os.path.join(backupDir, vbuckets) + '"' \
+ ' http://' + node + ':' + rest \
+ ' -u ' + args.username + ' -p ' + args.password + extra_options + ssl_option + add_option + from_date_option + to_date_option \
+ ' 2> ' + '"' + os.path.join(backupDir, 'logs', vbuckets) \
+ '-restore-' + bucket + '.err' + '"'
else:
command_line = '"' + os.path.join(path, restore_exe) + '"' + ' -v -t 1 -b ' + bucket \
+ ' ' + '"' + os.path.join(backupDir, vbuckets) + '"' + ' http://' + node + ':' + rest \
+ ' -u ' + args.username + ' -p ' + args.password + from_date_option + to_date_option + add_option + extra_options \
+ ' 2> ' + '"' + os.path.join(backupDir, 'logs', vbuckets) \
+ '-restore-' + bucket + '.err' + '"'
if args.verbose:
print(command_line)
p = subprocess.Popen(command_line, shell=True)
processes[p] = vbuckets + '-restore-' + bucket
# Did we restore anything?
if len(processes) == 0:
print('Did not restore anything')
print('Please check that the backup directory contains data to restore')
print('Also please check that you have the correct buckets created on ' + args.node)
sys.exit(1)
else:
print('Waiting for the restore to complete...')
successCount = 0
for p in processes:
p.wait()
if p.returncode == 1:
print(f'Error with backup - look in '
f'{os.path.join(backupDir, "logs", processes[p])}.err for details')
else:
successCount += 1
if successCount == len(processes):
print('SUCCESSFULLY COMPLETED!')
else:
print('ERROR!')
sys.exit(1)