Skip to content

Commit

Permalink
add warn logs about invalid property versions (#958)
Browse files Browse the repository at this point in the history
* add warn logs about invalid property versions

* move the log about merged uri properties version

* style fix

* minor cleanup

* modify the log msg

* add xDS flow uri version log
  • Loading branch information
bohhyang authored Dec 20, 2023
1 parent fbaef37 commit d376000
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 54 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.48.8] - 2023-12-19
- add warn logs about invalid property versions

## [29.48.7] - 2023-12-13
- fix publishing uri and cluster properties for symlink clusters

Expand Down Expand Up @@ -5584,7 +5587,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.7...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.48.8...master
[29.48.8]: https://github.com/linkedin/rest.li/compare/v29.48.7...v29.48.8
[29.48.7]: https://github.com/linkedin/rest.li/compare/v29.48.6...v29.48.7
[29.48.6]: https://github.com/linkedin/rest.li/compare/v29.48.5...v29.48.6
[29.48.5]: https://github.com/linkedin/rest.li/compare/v29.48.4...v29.48.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class UriPropertiesMerger implements ZooKeeperPropertyMerger<UriProperties>
{
private static final Logger LOG = LoggerFactory.getLogger(UriPropertiesMerger.class);

@Override
public UriProperties merge(String propertyName, Collection<UriProperties> propertiesToMerge)
{
Expand All @@ -48,6 +53,10 @@ public UriProperties merge(String propertyName, Collection<UriProperties> proper
}
}

if (maxVersion == -1)
{
LOG.warn("Merged Uri properties for cluster {} has invalid version -1. It should be > -1.", propertyName);
}
return new UriProperties(clusterName, partitionData, uriSpecificProperties, maxVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@
*/
public class ZooKeeperEphemeralStore<T> extends ZooKeeperStore<T>
{
private static final Logger _log =
LoggerFactory.getLogger(ZooKeeperEphemeralStore.class);
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperEphemeralStore.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/(.*)$");
public static final String DEFAULT_PREFIX = "ephemoral";
public static final String PUT_FAILURE_PATH_SUFFIX = "FAILURE";
Expand Down Expand Up @@ -206,7 +205,7 @@ public ZooKeeperEphemeralStore(ZKConnection client,

if (ephemeralNodesFilePath != null && !useNewWatcher)
{
_log.warn("Forcing enabling useNewWatcher with ephemeralNodesFilePath!=null");
LOG.warn("Forcing enabling useNewWatcher with ephemeralNodesFilePath!=null");
useNewWatcher = true;
}

Expand All @@ -228,7 +227,7 @@ public void put(final String prop, final T value, final Callback<None> callback)
{
_putStats.inc();

trace(_log, "put ", prop, ": ", value);
trace(LOG, "put ", prop, ": ", value);

final String path = getPath(prop);
_zkConn.ensurePersistentNodeExists(path, new Callback<None>()
Expand Down Expand Up @@ -284,7 +283,7 @@ public void remove(String prop, Callback<None> callback)
{
_removeStats.inc();

trace(_log, "remove: ", prop);
trace(LOG, "remove: ", prop);

String path = getPath(prop);
_zkConn.removeNodeUnsafeRecursive(path, callback);
Expand All @@ -302,7 +301,7 @@ public void removePartial(final String prop, final T value, final Callback<None>
{
final String path = getPath(prop);

trace(_log, "remove partial ", prop, ": ", value);
trace(LOG, "remove partial ", prop, ": ", value);

final Callback<Map<String, T>> childrenCallback = new Callback<Map<String, T>>()
{
Expand Down Expand Up @@ -352,7 +351,7 @@ public void processResult(int rc, String path, Object ctx, List<String> children
}
else
{
_log.warn("Ignoring request to removePartial with no children: {}", path);
LOG.warn("Ignoring request to removePartial with no children: {}", path);
callback.onSuccess(None.none());
}
break;
Expand Down Expand Up @@ -401,7 +400,7 @@ private void getMergedChildren(String path, List<String> children, ZKStoreWatche
final String propertyName = getPropertyForPath(path);
if (children.size() > 0)
{
_log.debug("getMergedChildren: collecting {}", children);
LOG.debug("getMergedChildren: collecting {}", children);
ChildCollector collector = new ChildCollector(children.size(), new CallbackAdapter<T, Map<String, T>>(callback)
{
@Override
Expand All @@ -417,7 +416,7 @@ protected T convertResponse(Map<String, T> response) throws Exception
}
else
{
_log.debug("getMergedChildren: no children");
LOG.debug("getMergedChildren: no children");
callback.onSuccess(_merger.merge(propertyName, Collections.emptyList()));
}
}
Expand All @@ -430,21 +429,21 @@ private void getChildrenData(String path, Collection<String> children, Callback<
{
if (children.size() > 0)
{
_log.debug("getChildrenData: collecting {}", children);
LOG.debug("getChildrenData: collecting {}", children);
ChildCollector collector = new ChildCollector(children.size(), callback);
children.forEach(child -> _zk.getData(path + "/" + child, null, collector, null));
}
else
{
_log.debug("getChildrenData: no children");
LOG.debug("getChildrenData: no children");
callback.onSuccess(Collections.emptyMap());
}
}

@Override
public void startPublishing(final String prop)
{
trace(_log, "register: ", prop);
trace(LOG, "register: ", prop);

if (_eventBus == null)
{
Expand Down Expand Up @@ -483,7 +482,7 @@ public void startPublishing(final String prop)
@Override
public void stopPublishing(String prop)
{
trace(_log, "unregister: ", prop);
trace(LOG, "unregister: ", prop);

if (_useNewWatcher)
{
Expand Down Expand Up @@ -609,7 +608,7 @@ public void processWatch(final String propertyName, WatchedEvent watchedEvent)
public void processResult(int rc, final String path, Object ctx, List<String> children)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
LOG.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
final boolean init = (Boolean)ctx;
final String property = getPropertyForPath(path);
switch (code)
Expand All @@ -623,23 +622,23 @@ public void onSuccess(T value)
if (init)
{
_eventBus.publishInitialize(property, value);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishAdd(property, value);
_log.debug("{}: published add", path);
LOG.debug("{}: published add", path);
}
}

@Override
public void onError(Throwable e)
{
_log.error("Failed to merge children for path " + path, e);
LOG.error("Failed to merge children for path " + path, e);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
}
});
Expand All @@ -648,22 +647,22 @@ public void onError(Throwable e)

case NONODE:
// The node whose children we are monitoring is gone; set an exists watch on it
_log.debug("{}: node is not present, calling exists", path);
LOG.debug("{}: node is not present, calling exists", path);
_zk.exists(path, this, this, false);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishRemove(property);
_log.debug("{}: published remove", path);
LOG.debug("{}: published remove", path);
}
break;

default:
_log.error("getChildren: unexpected error: {}: {}", code, path);
LOG.error("getChildren: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -675,22 +674,22 @@ public void onError(Throwable e)
public void processResult(int rc, String path, Object ctx, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: exists returned {}", path, code);
LOG.debug("{}: exists returned {}", path, code);
switch (code)
{
case OK:
// The node is back, get children and set child watch
_log.debug("{}: calling getChildren", path);
LOG.debug("{}: calling getChildren", path);
_zk.getChildren(path, this, this, false);
break;

case NONODE:
// The node doesn't exist; OK, the watch is set so now we wait.
_log.debug("{}: set exists watch", path);
LOG.debug("{}: set exists watch", path);
break;

default:
_log.error("exists: unexpected error: {}: {}", code, path);
LOG.error("exists: unexpected error: {}: {}", code, path);
break;
}

Expand Down Expand Up @@ -758,7 +757,7 @@ protected void processWatch(String propertyName, WatchedEvent event)
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
LOG.debug("{}: getChildren returned {}: {}", new Object[]{path, code, children});
final boolean init = (Boolean)ctx;
final String property = getPropertyForPath(path);
switch (code)
Expand All @@ -785,17 +784,17 @@ public void processResult(int rc, String path, Object ctx, List<String> children
}
_isInitialFetchRef.set(true); // set isInitialFetch to true so that when the exists watch is triggered, it's an initial fetch.
_initialFetchStartAtNanosRef.set(System.nanoTime());
_log.debug("{}: node is not present, calling exists", path);
LOG.debug("{}: node is not present, calling exists", path);
_zk.exists(path, this, this, false);
if (init)
{
_eventBus.publishInitialize(property, null);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishRemove(property);
_log.debug("{}: published remove", path);
LOG.debug("{}: published remove", path);
}
if (_fileStore != null)
{
Expand All @@ -804,7 +803,7 @@ public void processResult(int rc, String path, Object ctx, List<String> children
break;

default:
_log.error("getChildren: unexpected error: {}: {}", code, path);
LOG.error("getChildren: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -816,12 +815,12 @@ private Callback<Map<String, T>> getChildrenDataCallback(String path, boolean in
@Override
public void onError(Throwable e)
{
_log.error("Failed to merge children for path " + path, e);
LOG.error("Failed to merge children for path " + path, e);
if (init)
{
_eventBus.publishInitialize(property, null);
}
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}

@Override
Expand All @@ -843,12 +842,12 @@ public void onSuccess(Map<String, T> result)
if (init)
{
_eventBus.publishInitialize(property, mergedProperty);
_log.debug("{}: published init", path);
LOG.debug("{}: published init", path);
}
else
{
_eventBus.publishAdd(property, mergedProperty);
_log.debug("{}: published add", path);
LOG.debug("{}: published add", path);
}
}
};
Expand Down Expand Up @@ -920,22 +919,22 @@ private Set<String> calculateChildrenDeltaAndUpdateState(List<String> children,
public void processResult(int rc, String path, Object ctx, Stat stat)
{
KeeperException.Code code = KeeperException.Code.get(rc);
_log.debug("{}: exists returned {}", path, code);
LOG.debug("{}: exists returned {}", path, code);
switch (code)
{
case OK:
// The node is back, get children and set child watch
_log.debug("{}: calling getChildren", path);
LOG.debug("{}: calling getChildren", path);
_zk.getChildren(path, this, this, false);
break;

case NONODE:
// The node doesn't exist; OK, the watch is set so now we wait.
_log.debug("{}: set exists watch", path);
LOG.debug("{}: set exists watch", path);
break;

default:
_log.error("exists: unexpected error: {}: {}", code, path);
LOG.error("exists: unexpected error: {}: {}", code, path);
break;
}
}
Expand All @@ -944,15 +943,15 @@ private void emitSDStatusInitialRequestEvent(String property, boolean succeeded)
{
if (_eventEmitter == null)
{
_log.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
LOG.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
return;
}

// measure request duration and convert to milli-seconds
long initialFetchDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - _initialFetchStartAtNanosRef.get());
if (initialFetchDurationMillis < 0)
{
_log.warn("Failed to log ServiceDiscoveryStatusInitialRequest event, initialFetchStartAt time is greater than current time.");
LOG.warn("Failed to log ServiceDiscoveryStatusInitialRequest event, initialFetchStartAt time is greater than current time.");
return;
}
// emit service discovery status initial request event for success
Expand All @@ -963,7 +962,7 @@ private void emitSDStatusUpdateReceiptEvents(Map<String, T> updates, boolean isM
{
if (_eventEmitter == null)
{
_log.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
LOG.info("Service discovery event emitter in ZookeeperEphemeralStore is null. Skipping emitting events.");
return;
}

Expand All @@ -972,7 +971,7 @@ private void emitSDStatusUpdateReceiptEvents(Map<String, T> updates, boolean isM
{
if (!(uriProperty instanceof UriProperties))
{
_log.error("Unknown type of URI data, ignored: " + uriProperty.toString());
LOG.error("Unknown type of URI data, ignored: " + uriProperty.toString());
return;
}
UriProperties properties = (UriProperties) uriProperty;
Expand Down Expand Up @@ -1027,7 +1026,12 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat)
try
{
String childPath = s.substring(s.lastIndexOf('/') + 1);
T value = _serializer.fromBytes(bytes, stat.getMzxid());
long version = stat.getMzxid();
if (version <= 0)
{
LOG.warn("ZK data from {} has invalid version: {}", s, version);
}
T value = _serializer.fromBytes(bytes, version);
_properties.put(childPath, value);
if (_count == 0)
{
Expand All @@ -1046,7 +1050,7 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat)
{
_callback.onSuccess(_properties);
}
_log.debug("{} doesn't exist, count={}", s, _count);
LOG.debug("{} doesn't exist, count={}", s, _count);
break;

default:
Expand Down
Loading

0 comments on commit d376000

Please sign in to comment.