Skip to content

Commit

Permalink
Merge branch 'elasticity' into 3632-move-replacements-to-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Oct 31, 2023
2 parents b4a7167 + 300b39f commit 8a3b072
Show file tree
Hide file tree
Showing 49 changed files with 1,047 additions and 1,000 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.net.URI;

import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.metadata.CompactableFileImpl;

/**
Expand All @@ -33,6 +34,16 @@ public interface CompactableFile {

public URI getUri();

/**
* @return A range associated with the file. If a file has an associated range then Accumulo will
* limit reads to within the range. Not all files have an associated range, it a file does
* not have a range then an infinite range is returned. The URI plus this range uniquely
* identify a file.
*
* @since 3.1.0
*/
public Range getRange();

public long getEstimatedSize();

public long getEstimatedEntries();
Expand All @@ -41,4 +52,12 @@ static CompactableFile create(URI uri, long estimatedSize, long estimatedEntries
return new CompactableFileImpl(uri, estimatedSize, estimatedEntries);
}

/**
* Creates a new CompactableFile object that implements this interface.
*
* @since 3.1.0
*/
static CompactableFile create(URI uri, Range range, long estimatedSize, long estimatedEntries) {
return new CompactableFileImpl(uri, range, estimatedSize, estimatedEntries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class RootClientTabletCache extends ClientTabletCache {
public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations,
Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) {
CachedTablet rootCachedTablet = getRootTabletLocation(context);
if (rootCachedTablet != null) {
if (rootCachedTablet != null && rootCachedTablet.getTserverLocation().isPresent()) {
var tsm = new TabletServerMutations<T>(rootCachedTablet.getTserverSession().orElseThrow());
for (T mutation : mutations) {
tsm.addMutation(RootTable.EXTENT, mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletFile;
Expand Down Expand Up @@ -119,32 +120,33 @@ private static String getSize(Collection<CompactableFile> files) {
* Lazily converts TableFile to file names. The lazy part is really important because when it is
* not called with log.isDebugEnabled().
*/
private static Collection<String> asFileNames(Collection<CompactableFile> files) {
return Collections2.transform(files, CompactableFile::getFileName);
private static Collection<String> asMinimalString(Collection<CompactableFile> files) {
return Collections2.transform(files,
cf -> CompactableFileImpl.toStoredTabletFile(cf).toMinimalString());
}

public static void selected(KeyExtent extent, CompactionKind kind,
Collection<StoredTabletFile> inputs) {
fileLog.trace("{} changed compaction selection set for {} new set {}", extent, kind,
Collections2.transform(inputs, StoredTabletFile::getFileName));
Collections2.transform(inputs, StoredTabletFile::toMinimalString));
}

public static void compacting(KeyExtent extent, CompactionJob job, CompactionConfig config) {
if (fileLog.isDebugEnabled()) {
if (config == null) {
fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, job.getExecutor(),
job.getKind(), asFileNames(job.getFiles()), getSize(job.getFiles()));
job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles()));
} else {
fileLog.debug("Compacting {} on {} for {} from {} size {} config {}", extent,
job.getExecutor(), job.getKind(), asFileNames(job.getFiles()), getSize(job.getFiles()),
config);
job.getExecutor(), job.getKind(), asMinimalString(job.getFiles()),
getSize(job.getFiles()), config);
}
}
}

public static void compacted(KeyExtent extent, CompactionJob job, StoredTabletFile output) {
fileLog.debug("Compacted {} for {} created {} from {}", extent, job.getKind(), output,
asFileNames(job.getFiles()));
asMinimalString(job.getFiles()));
}

public static void flushed(KeyExtent extent, Optional<StoredTabletFile> newDatafile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AssignmentParamsImpl implements TabletBalancer.AssignmentParameters {

private static final Logger LOG = LoggerFactory.getLogger(AssignmentParamsImpl.class);

private final SortedMap<TabletServerId,TServerStatus> currentStatus;
private final Map<TabletId,TabletServerId> unassigned;
private final Map<TabletId,TabletServerId> assignmentsOut;
Expand All @@ -50,16 +55,26 @@ public static AssignmentParamsImpl fromThrift(
Map<KeyExtent,TServerInstance> unassigned, Map<KeyExtent,TServerInstance> assignmentsOut) {

SortedMap<TabletServerId,TServerStatus> currentStatusNew = new TreeMap<>();
currentStatus.forEach((tsi, status) -> currentStatusNew.put(new TabletServerIdImpl(tsi),
TServerStatusImpl.fromThrift(status)));

Map<String,Set<TabletServerId>> tserverGroups = new HashMap<>();
currentTServerGrouping.forEach((k, v) -> {
currentTServerGrouping.forEach((group, serversInGroup) -> {
Set<TabletServerId> servers = new HashSet<>();
v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi)));
tserverGroups.put(k, servers);
serversInGroup.forEach(tsi -> {
TabletServerIdImpl id = TabletServerIdImpl.fromThrift(tsi);
if (currentStatus.containsKey(tsi)) {
currentStatusNew.put(id, TServerStatusImpl.fromThrift(currentStatus.get(tsi)));
servers.add(id);
} else {
LOG.debug("Dropping tserver {} from group {} as it's not in set of all servers", id,
group);
}
});
if (!servers.isEmpty()) {
tserverGroups.put(group, servers);
}
});

LOG.debug("TServer groups for balancer assignment: {}", tserverGroups);

Map<TabletId,TabletServerId> unassignedNew = new HashMap<>();
unassigned.forEach(
(ke, tsi) -> unassignedNew.put(new TabletIdImpl(ke), TabletServerIdImpl.fromThrift(tsi)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public TabletServerIdImpl(TServerInstance tServerInstance) {
this.tServerInstance = requireNonNull(tServerInstance);
}

public TServerInstance getTServerInstance() {
return tServerInstance;
}

@Override
public String getHost() {
return tServerInstance.getHostAndPort().getHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Objects;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -82,4 +83,24 @@ private static boolean isExclusiveKey(Key key) {
return row.length() > 0 && row.byteAt(row.length() - 1) == (byte) 0x00;
}

private static String stripZeroTail(ByteSequence row) {
if (row.byteAt(row.length() - 1) == (byte) 0x00) {
return row.subSequence(0, row.length() - 1).toString();
}
return row.toString();
}

@Override
public String toMinimalString() {
if (hasRange()) {
String startRow =
range.isInfiniteStartKey() ? "-inf" : stripZeroTail(range.getStartKey().getRowData());
String endRow =
range.isInfiniteStopKey() ? "+inf" : stripZeroTail(range.getEndKey().getRowData());
return getFileName() + " (" + startRow + "," + endRow + "]";
} else {
return getFileName();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;

import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.metadata.schema.DataFileValue;

public class CompactableFileImpl implements CompactableFile {
Expand All @@ -36,6 +37,11 @@ public CompactableFileImpl(URI uri, long size, long entries) {
this.dataFileValue = new DataFileValue(size, entries);
}

public CompactableFileImpl(URI uri, Range range, long size, long entries) {
this.storedTabletFile = StoredTabletFile.of(uri, range);
this.dataFileValue = new DataFileValue(size, entries);
}

public CompactableFileImpl(StoredTabletFile storedTabletFile, DataFileValue dataFileValue) {
this.storedTabletFile = Objects.requireNonNull(storedTabletFile);
this.dataFileValue = Objects.requireNonNull(dataFileValue);
Expand All @@ -46,6 +52,11 @@ public URI getUri() {
return storedTabletFile.getPath().toUri();
}

@Override
public Range getRange() {
return storedTabletFile.getRange();
}

@Override
public String getFileName() {
return storedTabletFile.getFileName();
Expand Down Expand Up @@ -91,6 +102,6 @@ public static StoredTabletFile toStoredTabletFile(CompactableFile cf) {

@Override
public String toString() {
return "[" + storedTabletFile.getFileName() + ", " + dataFileValue + "]";
return "[" + storedTabletFile.toMinimalString() + ", " + dataFileValue + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public interface TabletFile {
*
*/
boolean hasRange();

/**
* @return a string with the filename and row range if there is one.
*/
String toMinimalString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,18 @@
*/
package org.apache.accumulo.core.metadata;

import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public enum TabletState {
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, NEEDS_REASSIGNMENT;
UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED;

private static Logger log = LoggerFactory.getLogger(TabletState.class);

public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers) {
return compute(tm, liveTServers, null, null);
}

public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers,
TabletBalancer balancer, Map<TabletServerId,String> tserverGroups) {
TabletMetadata.Location current = null;
TabletMetadata.Location future = null;
if (tm.hasCurrent()) {
Expand All @@ -53,42 +42,6 @@ public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTS
: TabletState.ASSIGNED_TO_DEAD_SERVER;
} else if (current != null) {
if (liveTServers.contains(current.getServerInstance())) {
if (balancer != null) {
var tsii = new TabletServerIdImpl(current.getServerInstance());
var resourceGroup = tserverGroups.get(tsii);

if (resourceGroup != null) {
var reassign = balancer.needsReassignment(new TabletBalancer.CurrentAssignment() {
@Override
public TabletId getTablet() {
return new TabletIdImpl(tm.getExtent());
}

@Override
public TabletServerId getTabletServer() {
return tsii;
}

@Override
public String getResourceGroup() {
return resourceGroup;
}
});

if (reassign) {
return TabletState.NEEDS_REASSIGNMENT;
}
} else {
// A tablet server should always have a resource group, however there is a race
// conditions where the resource group map was read before a tablet server came into
// existence. Another possible cause for an absent resource group is a bug in accumulo.
// In either case do not call the balancer for now with the assumption that the resource
// group will be available later. Log a message in case it is a bug.
log.trace(
"Could not find resource group for tserver {}, so did not consult balancer. Assuming this is a temporary race condition.",
current.getServerInstance());
}
}
return TabletState.HOSTED;
} else {
return TabletState.ASSIGNED_TO_DEAD_SERVER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ private static TableId busiest(Map<String,TableStatistics> tables) {

@Override
public void getAssignments(AssignmentParameters params) {
if (params.currentStatus().isEmpty()) {
log.debug("No known TabletServers, skipping tablet assignment for now.");
return;
}
params.unassignedTablets().forEach((tabletId, tserverId) -> params.addAssignment(tabletId,
getAssignment(params.currentStatus(), tserverId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ private SortedMap<TabletServerId,TServerStatus> getCurrentSetForTable(
tserversInGroup.forEach(tsid -> {
TServerStatus tss = allTServers.get(tsid);
if (tss == null) {
throw new IllegalStateException("TabletServer " + tsid + " in " + groupNameInUse
+ " TabletServer group, but not in set of all TabletServers");
log.warn(
"Excluding TabletServer {} from group {} because TabletServerStatus is null, likely that Manager.StatusThread.updateStatus has not discovered it yet.",
tsid, groupNameInUse);
} else {
group.put(tsid, tss);
}
group.put(tsid, tss);
});
return group;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public void adminStopAll() throws IOException {
if (p.exitValue() != 0) {
throw new IOException("Failed to run `accumulo admin stopAll`");
}
stopAllServers(ServerType.COMPACTOR);
stopAllServers(ServerType.SCAN_SERVER);
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
<version.opentelemetry>1.27.0</version.opentelemetry>
<version.slf4j>2.0.7</version.slf4j>
<version.thrift>0.17.0</version.thrift>
<version.zookeeper>3.8.2</version.zookeeper>
<version.zookeeper>3.8.3</version.zookeeper>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -600,10 +600,9 @@
<version>2.2.1.Final</version>
</dependency>
<dependency>
<!-- stay on 3.21.0 for now due to https://github.com/apache/accumulo/issues/3446 -->
<groupId>org.jline</groupId>
<artifactId>jline</artifactId>
<version>3.21.0</version>
<version>3.24.0</version>
</dependency>
<dependency>
<groupId>org.latencyutils</groupId>
Expand Down
Loading

0 comments on commit 8a3b072

Please sign in to comment.