Skip to content

Commit

Permalink
Merge branch 'main' into elasticity
Browse files Browse the repository at this point in the history
  • Loading branch information
ctubbsii committed Dec 14, 2023
2 parents 14b23a6 + 3061ff0 commit 2c8947b
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,31 @@

public class LogEntry {

private final String logReference;
private final String filePath;

public LogEntry(String logReference) {
validateLogReference(logReference);
this.logReference = logReference;
public LogEntry(String filePath) {
validateFilePath(filePath);
this.filePath = filePath;
}

public String getLogReference() {
return this.logReference;
public String getFilePath() {
return this.filePath;
}

/**
* Validates the expected format of the file path. We expect the path to contain a tserver
* (host:port) followed by a UUID as the file name. For example,
* localhost:1234/927ba659-d109-4bce-b0a5-bcbbcb9942a2 is a valid file path.
*
* @param logReference path to validate
* @throws IllegalArgumentException if the filepath is invalid
* @param filePath path to validate
* @throws IllegalArgumentException if the filePath is invalid
*/
private static void validateLogReference(String logReference) {
String[] parts = logReference.split("/");
private static void validateFilePath(String filePath) {
String[] parts = filePath.split("/");

if (parts.length < 2) {
throw new IllegalArgumentException(
"Invalid logReference format. The path should at least contain tserver/UUID.");
"Invalid filePath format. The path should at least contain tserver/UUID.");
}

String tserverPart = parts[parts.length - 2];
Expand All @@ -67,8 +67,8 @@ private static void validateLogReference(String logReference) {
HostAndPort.fromString(tserverPart);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid tserver format in logReference. Expected format: host:port. Found '"
+ tserverPart + "'");
"Invalid tserver format in filePath. Expected format: host:port. Found '" + tserverPart
+ "'");
}

try {
Expand All @@ -89,7 +89,7 @@ public void addToMutation(Mutation mutation) {

@Override
public String toString() {
return logReference;
return filePath;
}

@Override
Expand All @@ -101,12 +101,12 @@ public boolean equals(Object other) {
return false;
}
LogEntry logEntry = (LogEntry) other;
return this.logReference.equals(logEntry.logReference);
return this.filePath.equals(logEntry.filePath);
}

@Override
public int hashCode() {
return Objects.hash(logReference);
return Objects.hash(filePath);
}

public static LogEntry fromMetaWalEntry(Entry<Key,Value> entry) {
Expand All @@ -118,12 +118,12 @@ public static LogEntry fromMetaWalEntry(Entry<Key,Value> entry) {
}

public String getUniqueID() {
String[] parts = logReference.split("/");
String[] parts = filePath.split("/");
return parts[parts.length - 1];
}

public Text getColumnQualifier() {
return new Text("-/" + logReference);
return new Text("-/" + filePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void test() throws Exception {
// test from constructor
LogEntry one = new LogEntry(validFilename);
assertEquals(validFilename, one.toString());
assertEquals(validFilename, one.getLogReference());
assertEquals(validFilename, one.getFilePath());
assertEquals(new Text("-/" + validFilename), one.getColumnQualifier());
assertEquals(validUUID.toString(), one.getUniqueID());

Expand All @@ -61,7 +61,7 @@ public void test() throws Exception {
new Key(new Text("1<"), new Text("log"), one.getColumnQualifier()), new Value("unused")));
assertNotSame(one, two);
assertEquals(one.toString(), two.toString());
assertEquals(one.getLogReference(), two.getLogReference());
assertEquals(one.getFilePath(), two.getFilePath());
assertEquals(one.getColumnQualifier(), two.getColumnQualifier());
assertEquals(one.getUniqueID(), two.getUniqueID());
assertEquals(one, two);
Expand All @@ -74,7 +74,7 @@ public void testEquals() {

assertNotSame(one, two);
assertEquals(one.toString(), two.toString());
assertEquals(one.getLogReference(), two.getLogReference());
assertEquals(one.getFilePath(), two.getFilePath());
assertEquals(one.getColumnQualifier(), two.getColumnQualifier());
assertEquals(one.getUniqueID(), two.getUniqueID());
assertEquals(one, two);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,10 @@ public List<Short> check(Environment env, Mutation mutation) {
continue;
}

if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)
&& !HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier)
&& !columnFamily.equals(CompactedColumnFamily.NAME)) {
if (columnUpdate.getValue().length == 0 && !(columnFamily.equals(ScanFileColumnFamily.NAME)
|| columnFamily.equals(LogColumnFamily.NAME)
|| HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier)
|| columnFamily.equals(CompactedColumnFamily.NAME))) {
violations = addViolation(violations, 6);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ public static Path switchVolume(Path path, FileType ft, List<Pair<Path,Path>> re
}

protected static LogEntry switchVolumes(LogEntry le, List<Pair<Path,Path>> replacements) {
Path switchedPath = switchVolume(new Path(le.getLogReference()), FileType.WAL, replacements);
Path switchedPath = switchVolume(new Path(le.getFilePath()), FileType.WAL, replacements);
String switchedString;
int numSwitched = 0;
if (switchedPath != null) {
switchedString = switchedPath.toString();
numSwitched++;
} else {
switchedString = le.getLogReference();
switchedString = le.getFilePath();
}

if (numSwitched == 0) {
Expand Down Expand Up @@ -181,8 +181,8 @@ public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> repla
if (switchedLogEntry != null) {
logsToRemove.accept(logEntry);
logsToAdd.accept(switchedLogEntry);
log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getLogReference(),
switchedLogEntry.getLogReference());
log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getFilePath(),
switchedLogEntry.getFilePath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private static String getLogURI(String logEntry) {
}

private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
volumes.add(getLogURI(logEntry.getLogReference()));
volumes.add(getLogURI(logEntry.getFilePath()));
}

private static void listTable(Ample.DataLevel level, ServerContext context) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,16 @@ public void testWalVolumeReplacment() {
String fileName = "hdfs://nn1/accumulo/wal/localhost+9997/" + walUUID;
LogEntry le = new LogEntry(fileName);
LogEntry fixedVolume = VolumeUtil.switchVolumes(le, replacements);
assertEquals("viewfs:/a/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getLogReference());
assertEquals("viewfs:/a/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getFilePath());

fileName = "hdfs://nn1:9000/accumulo/wal/localhost+9997/" + walUUID;
le = new LogEntry(fileName);
fixedVolume = VolumeUtil.switchVolumes(le, replacements);
assertEquals("viewfs:/a/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getLogReference());
assertEquals("viewfs:/a/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getFilePath());

fileName = "hdfs://nn2/accumulo/wal/localhost+9997/" + walUUID;
le = new LogEntry(fileName);
fixedVolume = VolumeUtil.switchVolumes(le, replacements);
assertEquals("viewfs:/b/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getLogReference());
assertEquals("viewfs:/b/accumulo/wal/localhost+9997/" + walUUID, fixedVolume.getFilePath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
// Tablet is being recovered and has WAL references, remove all the WALs for the dead server
// that made the WALs.
for (LogEntry wals : tabletMetadata.getLogs()) {
String wal = wals.getLogReference();
String wal = wals.getFilePath();
UUID walUUID = path2uuid(new Path(wal));
TServerInstance dead = result.get(walUUID);
// There's a reference to a log file, so skip that server's logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) throws
boolean recoveryNeeded = false;

for (LogEntry entry : walogs) {
String walog = entry.getLogReference();
String walog = entry.getFilePath();

Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL,
manager.getContext().getVolumeReplacements());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void recover(VolumeManager fs, KeyExtent extent, Collection<LogEntry> log
List<Path> recoveryDirs = new ArrayList<>();
for (LogEntry entry : logEntries) {
Path recovery = null;
Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getLogReference()));
Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getFilePath()));
finished = SortedLogState.getFinishedMarkerPath(finished);
TabletServer.log.debug("Looking for " + finished);
if (fs.exists(finished)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public Tablet(final TabletServer tabletServer, final KeyExtent extent,
currentLogs = new HashSet<>();
for (LogEntry logEntry : logEntries) {
currentLogs.add(new DfsLogger(tabletServer.getContext(), tabletServer.getServerConfig(),
logEntry.getLogReference(), logEntry.getColumnQualifier().toString()));
logEntry.getFilePath(), logEntry.getColumnQualifier().toString()));
}

rebuildReferencedLogs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -238,7 +239,10 @@ public void testCancelWhileNew() throws Exception {
assertTrue(FAILED_IN_PROGRESS == getTxStatus(zk, txid) || FAILED == getTxStatus(zk, txid));
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op");
Wait.waitFor(() -> FAILED == getTxStatus(zk, txid));
// nothing should have run
assertEquals(1, callStarted.getCount());
fate.delete(txid);
assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid));
} finally {
fate.shutdown();
}
Expand Down Expand Up @@ -272,15 +276,17 @@ public void testCancelWhileSubmittedAndRunning() throws Exception {
long txid = fate.startTransaction();
LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid));
assertEquals(NEW, getTxStatus(zk, txid));
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op");
assertEquals(SUBMITTED, getTxStatus(zk, txid));
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), false, "Test Op");
Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid));
// This is false because the transaction runner has reserved the FaTe
// transaction.
Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid));
assertFalse(fate.cancel(txid));
callStarted.await();
finishCall.countDown();
Wait.waitFor(() -> IN_PROGRESS != getTxStatus(zk, txid));
fate.delete(txid);
assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid));
} finally {
fate.shutdown();
}
Expand Down

0 comments on commit 2c8947b

Please sign in to comment.