Skip to content

Commit

Permalink
Merge branch 'elasticity' into 3861-tgw-shutdown-all-tservers-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Oct 24, 2023
2 parents e347f46 + e39bdb6 commit 63b3e34
Show file tree
Hide file tree
Showing 123 changed files with 1,812 additions and 2,374 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ public Iterator<Entry<Key,Value>> iterator() {

for (int i = 0; i < sources.length; i++) {
// TODO may have been a bug with multiple files and caching in older version...
FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
CachableBuilder cb =
new CachableBuilder().input(inputStream, "source-" + i).length(sources[i].getLength())
.conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService);
CachableBuilder cb = new CachableBuilder()
.input((FSDataInputStream) sources[i].getInputStream(), "source-" + i)
.length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider)
.cryptoService(cryptoService);
readers.add(RFile.getReader(cb, sources[i].getRange()));
}

Expand Down
15 changes: 0 additions & 15 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,6 @@ public enum Property {
TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for root tablet service", "2.1.0"),
TSERV_COMPACTION_SERVICE_ROOT_RATE_LIMIT("tserver.compaction.major.service.root.rate.limit", "0B",
PropertyType.BYTES,
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN(
"tserver.compaction.major.service.root.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open", "2.1.0"),
Expand All @@ -606,11 +601,6 @@ public enum Property {
TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Compaction planner for metadata table", "2.1.0"),
TSERV_COMPACTION_SERVICE_META_RATE_LIMIT("tserver.compaction.major.service.meta.rate.limit", "0B",
PropertyType.BYTES,
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
TSERV_COMPACTION_SERVICE_META_MAX_OPEN(
"tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", PropertyType.COUNT,
"The maximum number of files a compaction will open", "2.1.0"),
Expand All @@ -623,11 +613,6 @@ public enum Property {
TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner",
DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
"Planner for default compaction service.", "2.1.0"),
TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT("tserver.compaction.major.service.default.rate.limit",
"0B", PropertyType.BYTES,
"Maximum number of bytes to read or write per second over all major"
+ " compactions in this compaction service, or 0B for unlimited.",
"2.1.0"),
TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN(
"tserver.compaction.major.service.default.planner.opts.maxOpen", "10", PropertyType.COUNT,
"The maximum number of files a compaction will open", "2.1.0"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -171,7 +170,6 @@ protected static class FileOptions {
public final TabletFile file;
public final FileSystem fs;
public final Configuration fsConf;
public final RateLimiter rateLimiter;
// writer only objects
public final String compression;
public final FSDataOutputStream outputStream;
Expand All @@ -188,15 +186,14 @@ protected static class FileOptions {
public final boolean dropCacheBehind;

protected FileOptions(AccumuloConfiguration tableConfiguration, TabletFile file, FileSystem fs,
Configuration fsConf, RateLimiter rateLimiter, String compression,
FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider,
Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService,
Range range, Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) {
Configuration fsConf, String compression, FSDataOutputStream outputStream,
boolean enableAccumuloStart, CacheProvider cacheProvider, Cache<String,Long> fileLenCache,
boolean seekToBeginning, CryptoService cryptoService, Range range,
Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) {
this.tableConfiguration = tableConfiguration;
this.file = Objects.requireNonNull(file);
this.fs = fs;
this.fsConf = fsConf;
this.rateLimiter = rateLimiter;
this.compression = compression;
this.outputStream = outputStream;
this.enableAccumuloStart = enableAccumuloStart;
Expand Down Expand Up @@ -226,10 +223,6 @@ public Configuration getConfiguration() {
return fsConf;
}

public RateLimiter getRateLimiter() {
return rateLimiter;
}

public String getCompression() {
return compression;
}
Expand Down Expand Up @@ -279,7 +272,6 @@ public static class FileHelper {
private TabletFile file;
private FileSystem fs;
private Configuration fsConf;
private RateLimiter rateLimiter;
private CryptoService cryptoService;
private boolean dropCacheBehind = false;

Expand All @@ -303,11 +295,6 @@ protected FileHelper tableConfiguration(AccumuloConfiguration tableConfiguration
return this;
}

protected FileHelper rateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}

protected FileHelper cryptoService(CryptoService cs) {
this.cryptoService = Objects.requireNonNull(cs);
return this;
Expand All @@ -320,28 +307,27 @@ protected FileHelper dropCacheBehind(boolean drop) {

protected FileOptions toWriterBuilderOptions(String compression,
FSDataOutputStream outputStream, boolean startEnabled) {
return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, compression,
outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true,
return new FileOptions(tableConfiguration, file, fs, fsConf, compression, outputStream,
startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true,
dropCacheBehind);
}

protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
Cache<String,Long> fileLenCache, boolean seekToBeginning) {
return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false,
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false,
cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, seekToBeginning,
cryptoService, null, null, true, dropCacheBehind);
}

protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false,
NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, dropCacheBehind);
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER,
fileLenCache, false, cryptoService, null, null, true, dropCacheBehind);
}

protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false,
NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive,
dropCacheBehind);
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER,
null, false, cryptoService, range, columnFamilies, inclusive, dropCacheBehind);
}

protected AccumuloConfiguration getTableConfiguration() {
Expand Down Expand Up @@ -388,11 +374,6 @@ public WriterBuilder withCompression(String compression) {
return this;
}

public WriterBuilder withRateLimiter(RateLimiter rateLimiter) {
rateLimiter(rateLimiter);
return this;
}

public WriterBuilder dropCachesBehind() {
this.dropCacheBehind(true);
return this;
Expand Down Expand Up @@ -441,11 +422,6 @@ public ReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
return this;
}

public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) {
rateLimiter(rateLimiter);
return this;
}

public ReaderBuilder dropCachesBehind() {
this.dropCacheBehind(true);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,11 +66,10 @@ public static String pathToCacheId(Path p) {

public static class CachableBuilder {
String cacheId = null;
IoeSupplier<InputStream> inputSupplier = null;
IoeSupplier<FSDataInputStream> inputSupplier = null;
IoeSupplier<Long> lengthSupplier = null;
Cache<String,Long> fileLenCache = null;
volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER;
RateLimiter readLimiter = null;
Configuration hadoopConf = null;
CryptoService cryptoService = null;

Expand Down Expand Up @@ -109,7 +105,7 @@ public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBeh
return this;
}

public CachableBuilder input(InputStream is, String cacheId) {
public CachableBuilder input(FSDataInputStream is, String cacheId) {
this.cacheId = cacheId;
this.inputSupplier = () -> is;
return this;
Expand All @@ -130,11 +126,6 @@ public CachableBuilder cacheProvider(CacheProvider cacheProvider) {
return this;
}

public CachableBuilder readLimiter(RateLimiter readLimiter) {
this.readLimiter = readLimiter;
return this;
}

public CachableBuilder cryptoService(CryptoService cryptoService) {
this.cryptoService = cryptoService;
return this;
Expand All @@ -145,7 +136,6 @@ public CachableBuilder cryptoService(CryptoService cryptoService) {
* Class wraps the BCFile reader.
*/
public static class Reader implements Closeable {
private final RateLimiter readLimiter;
// private BCFile.Reader _bc;
private final String cacheId;
private CacheProvider cacheProvider;
Expand All @@ -155,7 +145,7 @@ public static class Reader implements Closeable {
private final Configuration conf;
private final CryptoService cryptoService;

private final IoeSupplier<InputStream> inputSupplier;
private final IoeSupplier<FSDataInputStream> inputSupplier;
private final IoeSupplier<Long> lengthSupplier;
private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>();

Expand Down Expand Up @@ -185,8 +175,7 @@ private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {

BCFile.Reader reader = bcfr.get();
if (reader == null) {
RateLimitedInputStream fsIn =
new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter);
FSDataInputStream fsIn = inputSupplier.get();
BCFile.Reader tmpReader = null;
if (serializedMetadata == null) {
if (fileLenCache == null) {
Expand Down Expand Up @@ -385,7 +374,6 @@ public Reader(CachableBuilder b) {
this.lengthSupplier = b.lengthSupplier;
this.fileLenCache = b.fileLenCache;
this.cacheProvider = b.cacheProvider;
this.readLimiter = b.readLimiter;
this.conf = b.hadoopConf;
this.cryptoService = Objects.requireNonNull(b.cryptoService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ private static RFileSKVIterator getReader(FileOptions options) throws IOExceptio
CachableBuilder cb = new CachableBuilder()
.fsPath(options.getFileSystem(), options.getFile().getPath(), options.dropCacheBehind)
.conf(options.getConfiguration()).fileLen(options.getFileLenCache())
.cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
.cryptoService(options.getCryptoService());
.cacheProvider(options.cacheProvider).cryptoService(options.getCryptoService());
return RFile.getReader(cb, options.getFile());
}

Expand Down Expand Up @@ -156,8 +155,7 @@ protected FileSKVWriter openWriter(FileOptions options) throws IOException {
}
}

BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
conf, options.cryptoService);
BCFile.Writer _cbw = new BCFile.Writer(outputStream, compression, conf, options.cryptoService);

return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ public void execute(String[] args) throws Exception {
int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
try (
Writer small = new RFile.Writer(
new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, cs), blockSize);
new BCFile.Writer(fs.create(new Path(smallName)), "gz", conf, cs), blockSize);
Writer large = new RFile.Writer(
new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, cs),
blockSize)) {
new BCFile.Writer(fs.create(new Path(largeName)), "gz", conf, cs), blockSize)) {
small.startDefaultLocalityGroup();
large.startDefaultLocalityGroup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.accumulo.core.crypto.CryptoUtils;
import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
Expand All @@ -47,7 +46,6 @@
import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -104,7 +102,7 @@ private BCFile() {
* BCFile writer, the entry point for creating a new BCFile.
*/
public static class Writer implements Closeable {
private final RateLimitedOutputStream out;
private final FSDataOutputStream out;
private final Configuration conf;
private FileEncrypter encrypter;
private CryptoEnvironmentImpl cryptoEnvironment;
Expand All @@ -131,18 +129,18 @@ private static final class WBlockState {
private final CompressionAlgorithm compressAlgo;
private Compressor compressor; // !null only if using native
// Hadoop compression
private final RateLimitedOutputStream fsOut;
private final FSDataOutputStream fsOut;
private final OutputStream cipherOut;
private final long posStart;
private final SimpleBufferedOutputStream fsBufferedOutput;
private OutputStream out;

public WBlockState(CompressionAlgorithm compressionAlgo, RateLimitedOutputStream fsOut,
public WBlockState(CompressionAlgorithm compressionAlgo, FSDataOutputStream fsOut,
BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter encrypter)
throws IOException {
this.compressAlgo = compressionAlgo;
this.fsOut = fsOut;
this.posStart = fsOut.position();
this.posStart = fsOut.getPos();

fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));

Expand Down Expand Up @@ -174,7 +172,7 @@ OutputStream getOutputStream() {
* @return The current byte offset in underlying file.
*/
long getCurrentPos() {
return fsOut.position() + fsBufferedOutput.size();
return fsOut.getPos() + fsBufferedOutput.size();
}

long getStartPos() {
Expand Down Expand Up @@ -311,13 +309,13 @@ public void close() throws IOException {
* blocks.
* @see Compression#getSupportedAlgorithms
*/
public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
Configuration conf, CryptoService cryptoService) throws IOException {
public Writer(FSDataOutputStream fout, String compressionName, Configuration conf,
CryptoService cryptoService) throws IOException {
if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}

this.out = new RateLimitedOutputStream(fout, writeLimiter);
this.out = fout;
this.conf = conf;
dataIndex = new DataIndex(compressionName);
metaIndex = new MetaIndex();
Expand Down Expand Up @@ -349,10 +347,10 @@ public void close() throws IOException {
dataIndex.write(appender);
}

long offsetIndexMeta = out.position();
long offsetIndexMeta = out.getPos();
metaIndex.write(out);

long offsetCryptoParameter = out.position();
long offsetCryptoParameter = out.getPos();
byte[] cryptoParams = this.encrypter.getDecryptionParameters();
out.writeInt(cryptoParams.length);
out.write(cryptoParams);
Expand All @@ -362,7 +360,7 @@ public void close() throws IOException {
API_VERSION_3.write(out);
Magic.write(out);
out.flush();
length = out.position();
length = out.getPos();
out.close();
}
} finally {
Expand Down
Loading

0 comments on commit 63b3e34

Please sign in to comment.