-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathindex.js
258 lines (225 loc) · 7.49 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
var fs = require("fs");
var knox = require("knox");
var mime = require("mime");
var waitress = require("waitress");
/**
* @class Retry
* @constructor
* @param opts {Object} an object containing config parameters
* `key`, `secret` and `bucket` are all requried.
* `region` is the S3 region. It defaults to us-west-2
* `maxRetries` is the number of times to retry before failing. It defaults to 3
* `backoffInterval` is a multiplier used to calculate exponential backoff.
* It defaults to 51.
* @param knox {Object} knox library by default, or mocked version for testing
* @param mime {Object} mime lib by default, or mocked version for testing
*/
function Retry(opts, s3Lib, mimeLib) {
// Inject deps for testing
if (typeof s3Lib === "undefined") {
s3Lib = knox;
}
this.knox = s3Lib;
if (typeof mimeLib === "undefined") {
mimeLib = mime;
}
this.mime = mimeLib;
validateRequiredOptions(opts);
this.key = opts.key || opts.accessKeyId;
this.secret = opts.secret || opts.secretAccessKey;
this.bucket = opts.bucket;
this.region = opts.region || "us-west-2";
this.s3Client = this.knox.createClient(opts);
this.maxRetries = opts.maxRetries || 3;
this.backoffInterval = opts.backoffInterval || 51;
}
function validateRequiredOptions(opts) {
if (!opts.key && !opts.accessKeyId) {
throw new Error("Missing required 'key' option from opts.");
} else if (!opts.secret && !opts.secretAccessKey) {
throw new Error("Missing required 'secret' option from opts.");
} else if (!opts.bucket) {
throw new Error("Missing required 'bucket' option from opts.");
}
}
Retry.prototype.calculateBackoff = function(numRetries) {
var randMultiplier = Math.ceil(
Math.random() * (Math.pow(2, numRetries + 2) - 1)
);
return this.backoffInterval * randMultiplier;
};
/**
* Upload a file at sourcePath with automatic retries and exponential backoff
*
* @param sourcePath {String} location of the file to upload on the fs
* @param destination {String} path in s3 to upload file to
* @param _headers {Object} (optional) custom headers to pass around.
* @param cb {Function} function(err) called when upload is done or has failed too many times
*/
Retry.prototype.upload = function(sourcePath, destination, _headers, cb) {
var self = this;
if (typeof _headers === "function" && typeof cb === "undefined") {
cb = _headers;
_headers = {};
}
fs.readFile(sourcePath, function(err, file) {
if (err) {
return cb(err);
}
var headers = {
"Content-Type": self.mime.lookup(sourcePath),
"Content-Length": file.length
};
// Add any properties from _headers to headers.
// Don't override Content-Type and Content-Length
for (var prop in _headers) {
if (
_headers[prop] &&
prop.toLowerCase() != "content-type" &&
prop.toLowerCase() != "content-length"
) {
headers[prop] = _headers[prop];
}
}
self.uploadWithRetries(file, headers, destination, cb);
});
};
/**
* Upload a buffer with accompanying headers to S3.
*
* @param buffer {Buffer} buffer to put to s3
* @param _headers {Object} headers. Will set default Content-Type and
* Content-Length if none is provided.
* @param destination {String} path to put buffer to
* @param cb {Funtion} function(err, res) called when upload has succeeded
* or failed too many times.
*/
Retry.prototype.uploadBuffer = function(buffer, _headers, destination, cb) {
var headers = {};
for (var prop in _headers) {
if (_headers[prop]) {
headers[prop] = _headers[prop];
}
}
this.uploadWithRetries(buffer, headers, destination, cb);
};
/**
* Upload buffers with accompanying headers to S3.
*
* @param buffers {Array} buffers to put to s3
* @param headers {Object} { data: {Buffer}, headers: {}}, dest: /path } content, headers and destination of the buffer to upload
* @param destination {String} path to put buffer to
* @param cb {Funtion} function(err, res) called when upload has succeeded
* or failed too many times.
*/
Retry.prototype.uploadBuffers = function(buffers, cb) {
var self = this;
var done = waitress(buffers.length, cb);
buffers.forEach(function(buffer) {
if (typeof buffer.headers !== "object" || buffer.headers === null) {
buffer.headers = {};
}
self.uploadBuffer(buffer.data, buffer.headers, buffer.dest, done);
});
};
/**
* Upload a buffer with accompanying headers to s3. Recursively calls itself
* until `timesRetried` exceeds `this.maxRetries`.
*
* @private
* @param data {Buffer} data to put to S3
* @param headers {Object} headers to send with request to S3. Will set a default
* Content-Length and Content-Type if none is provided.
* @param destination {String} path to put the buffer to S3
* @param timesRetried {Number} number of times this current upload has retried.
* Defaults to 0 if not passed in, and will increment every time an upload fails.
* @param cb {Function} function(err, res) called when upload is done or has failed
* too many times.
*/
Retry.prototype.uploadWithRetries = function(
data,
headers,
destination,
timesRetried,
cb
) {
var self = this;
// prevent callback from being called twice
var callbackCalled = false;
// sometimes knox failures will give EPIPE errors after sending a non-200
// status code, so we have to guard against recusing twice
var recursionScheduled = false;
// Set content type and length if they aren't included
headers = headers || {};
if (!(headers["Content-Type"] || headers["content-type"])) {
headers["Content-Type"] = "application/octet-stream";
}
if (
headers["Content-Length"] == undefined ||
headers["content-length"] == undefined
) {
headers["Content-Length"] = data.length;
}
if (typeof timesRetried === "function") {
cb = timesRetried;
// start at -1 so we can increment up to 0 on the first call
timesRetried = -1;
}
timesRetried++;
function endWithError(err) {
if (callbackCalled) {
return;
}
if (timesRetried >= self.maxRetries) {
if (!callbackCalled) {
callbackCalled = true;
return cb(err, null, timesRetried);
}
} else {
if (recursionScheduled) {
return;
}
recursionScheduled = true;
setTimeout(
self.uploadWithRetries.bind(self),
self.calculateBackoff(timesRetried),
data,
headers,
destination,
timesRetried,
cb
);
}
}
function endWithResponse(res) {
if (res.statusCode !== 200) {
return endWithError(new Error("Invalid status code: " + res.statusCode));
}
if (!callbackCalled) {
callbackCalled = true;
return cb(null, res, timesRetried);
}
}
var client = this.s3Client.put(destination, headers);
client.on("response", endWithResponse);
client.on("error", endWithError);
client.end(data);
};
/**
* Upload a file at sourcePath with automatic retries and exponential backoff
*
* @param files {Object} { src: /path, dest: /path, headers:{} } location, destination and headers of the file to upload on the fs
* @param cb {Function} function(err) called when all uploads are done or have failed too many times
*/
Retry.prototype.uploadFiles = function(files, cb) {
var self = this;
var done = waitress(files.length, cb);
files.forEach(function(file) {
if (typeof file.headers === "object" && file.headers !== null) {
self.upload(file.src, file.dest, file.headers, done);
} else {
self.upload(file.src, file.dest, done);
}
});
};
module.exports = Retry;