Skip to content

Commit

Permalink
Prevent node cache update during attach & detach (#2597) (#2601)
Browse files Browse the repository at this point in the history
* Prevent node cache update during attach & detach

* Update NodeManager interface method names
  • Loading branch information
chethanv28 authored Oct 13, 2023
1 parent dad9bb5 commit 98a3fb4
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 48 deletions.
51 changes: 38 additions & 13 deletions pkg/common/cns-lib/node/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ type Manager interface {
// given its UUID. If datacenter is present, GetNode will search within this
// datacenter given its UUID. If not, it will search in all registered
// datacenters.
GetNode(ctx context.Context, nodeUUID string, dc *vsphere.Datacenter) (*vsphere.VirtualMachine, error)
// GetNodeByName refreshes and returns the VirtualMachine for a registered
GetNodeVMAndUpdateCache(ctx context.Context, nodeUUID string, dc *vsphere.Datacenter) (*vsphere.VirtualMachine, error)
// GetNodeVMByUuid returns the VirtualMachine for a registered node
// given its UUID.
GetNodeVMByUuid(ctx context.Context, nodeUUID string) (*vsphere.VirtualMachine, error)
// GetNodeVMByNameAndUpdateCache refreshes and returns the VirtualMachine for a registered
// node given its name.
GetNodeByName(ctx context.Context, nodeName string) (*vsphere.VirtualMachine, error)
// GetNodeByNameOrUUID refreshes and returns VirtualMachine for a registered node
GetNodeVMByNameAndUpdateCache(ctx context.Context, nodeName string) (*vsphere.VirtualMachine, error)
// GetNodeVMByNameOrUUID refreshes and returns VirtualMachine for a registered node
// using either its name or UUID.
GetNodeByNameOrUUID(ctx context.Context, nodeName string) (*vsphere.VirtualMachine, error)
GetNodeVMByNameOrUUID(ctx context.Context, nodeNameOrUuid string) (*vsphere.VirtualMachine, error)
// GetNodeNameByUUID fetches the name of the node given the VM UUID.
GetNodeNameByUUID(ctx context.Context, nodeUUID string) (string, error)
// GetAllNodes refreshes and returns VirtualMachine for all registered
Expand Down Expand Up @@ -155,13 +158,14 @@ func (m *defaultManager) DiscoverNode(ctx context.Context, nodeUUID string) erro
return nil
}

// GetNodeByName refreshes and returns the VirtualMachine for a registered node
// GetNodeVMByNameAndUpdateCache refreshes and returns the VirtualMachine for a registered node
// given its name.
func (m *defaultManager) GetNodeByName(ctx context.Context, nodeName string) (*vsphere.VirtualMachine, error) {
func (m *defaultManager) GetNodeVMByNameAndUpdateCache(ctx context.Context,
nodeName string) (*vsphere.VirtualMachine, error) {
log := logger.GetLogger(ctx)
nodeUUID, found := m.nodeNameToUUID.Load(nodeName)
if found && nodeUUID != nil && nodeUUID.(string) != "" {
return m.GetNode(ctx, nodeUUID.(string), nil)
return m.GetNodeVMAndUpdateCache(ctx, nodeUUID.(string), nil)
}
log.Infof("Empty nodeUUID observed in cache for the node: %q", nodeName)
k8snodeUUID, err := k8s.GetNodeUUID(ctx, m.k8sClient, nodeName,
Expand All @@ -171,11 +175,11 @@ func (m *defaultManager) GetNodeByName(ctx context.Context, nodeName string) (*v
return nil, err
}
m.nodeNameToUUID.Store(nodeName, k8snodeUUID)
return m.GetNode(ctx, k8snodeUUID, nil)
return m.GetNodeVMAndUpdateCache(ctx, k8snodeUUID, nil)

}

func (m *defaultManager) GetNodeByNameOrUUID(
func (m *defaultManager) GetNodeVMByNameOrUUID(
ctx context.Context, nodeNameOrUUID string) (*vsphere.VirtualMachine, error) {
log := logger.GetLogger(ctx)
nodeUUID, found := m.nodeNameToUUID.Load(nodeNameOrUUID)
Expand All @@ -184,15 +188,15 @@ func (m *defaultManager) GetNodeByNameOrUUID(
return nil, ErrNodeNotFound
}
if nodeUUID != nil && nodeUUID.(string) != "" {
return m.GetNode(ctx, nodeUUID.(string), nil)
return m.GetNodeVMAndUpdateCache(ctx, nodeUUID.(string), nil)
}
log.Infof("Empty nodeUUID observed in cache for the node: %q", nodeNameOrUUID)
k8snodeUUID, err := k8s.GetNodeUUID(ctx, m.k8sClient, nodeNameOrUUID, m.useNodeUuid)
if err != nil {
log.Errorf("failed to get node UUID from node: %q. Err: %v", nodeNameOrUUID, err)
return nil, err
}
return m.GetNode(ctx, k8snodeUUID, nil)
return m.GetNodeVMAndUpdateCache(ctx, k8snodeUUID, nil)
}

// GetNodeNameByUUID fetches the name of the node given the VM UUID.
Expand Down Expand Up @@ -225,7 +229,7 @@ func (m *defaultManager) GetK8sNode(ctx context.Context, nodename string) (*v1.N

// GetNode refreshes and returns the VirtualMachine for a registered node
// given its UUID.
func (m *defaultManager) GetNode(ctx context.Context,
func (m *defaultManager) GetNodeVMAndUpdateCache(ctx context.Context,
nodeUUID string, dc *vsphere.Datacenter) (*vsphere.VirtualMachine, error) {
log := logger.GetLogger(ctx)
vmInf, discovered := m.nodeVMs.Load(nodeUUID)
Expand Down Expand Up @@ -265,6 +269,27 @@ func (m *defaultManager) GetNode(ctx context.Context,
return vm, nil
}

// GetNodeVMByUuid returns the VirtualMachine for a registered node
// given its UUID. This is called by ControllerPublishVolume and
// ControllerUnpublishVolume to perform attach and detach operations.
func (m *defaultManager) GetNodeVMByUuid(ctx context.Context,
nodeUUID string) (*vsphere.VirtualMachine, error) {
log := logger.GetLogger(ctx)
vmInf, discovered := m.nodeVMs.Load(nodeUUID)
if !discovered {
log.Infof("Node VM not found with nodeUUID %s", nodeUUID)
vm, err := vsphere.GetVirtualMachineByUUID(ctx, nodeUUID, false)
if err != nil {
log.Errorf("Couldn't find VM instance with nodeUUID %s, failed to discover with err: %v", nodeUUID, err)
return nil, err
}
log.Infof("Node was successfully found with nodeUUID %s in vm %v", nodeUUID, vm)
return vm, nil
}
vm := vmInf.(*vsphere.VirtualMachine)
return vm, nil
}

// GetAllNodes refreshes and returns VirtualMachine for all registered nodes.
func (m *defaultManager) GetAllNodes(ctx context.Context) ([]*vsphere.VirtualMachine, error) {
log := logger.GetLogger(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/cns-lib/node/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestDefaultManager_GetNodeByName(t *testing.T) {
k8sClient := k8sClientWithNodes(nodeName)
m.SetKubernetesClient(k8sClient)

vm, _ := m.GetNodeByName(context.TODO(), nodeName)
vm, _ := m.GetNodeVMByNameAndUpdateCache(context.TODO(), nodeName)
if vm != nil {
t.Errorf("Unexpected vm found:%v", vm)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/common/cns-lib/node/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,19 @@ func (nodes *Nodes) csiNodeDelete(obj interface{}) {
}
}

// GetNodeByName returns VirtualMachine object for given nodeName.
// GetNodeVMByNameAndUpdateCache returns VirtualMachine object for given nodeName.
// This is called by ControllerPublishVolume and ControllerUnpublishVolume
// to perform attach and detach operations.
func (nodes *Nodes) GetNodeByName(ctx context.Context, nodeName string) (
func (nodes *Nodes) GetNodeVMByNameAndUpdateCache(ctx context.Context, nodeName string) (
*cnsvsphere.VirtualMachine, error) {
return nodes.cnsNodeManager.GetNodeByName(ctx, nodeName)
return nodes.cnsNodeManager.GetNodeVMByNameAndUpdateCache(ctx, nodeName)
}

// GetNodeByNameOrUUID returns VirtualMachine object for given nodeName
// GetNodeVMByNameOrUUID returns VirtualMachine object for given nodeName
// This function can be called either using nodeName or nodeUID.
func (nodes *Nodes) GetNodeByNameOrUUID(
func (nodes *Nodes) GetNodeVMByNameOrUUID(
ctx context.Context, nodeNameOrUUID string) (*cnsvsphere.VirtualMachine, error) {
return nodes.cnsNodeManager.GetNodeByNameOrUUID(ctx, nodeNameOrUUID)
return nodes.cnsNodeManager.GetNodeVMByNameOrUUID(ctx, nodeNameOrUUID)
}

// GetNodeNameByUUID fetches the name of the node given the VM UUID.
Expand All @@ -203,11 +203,11 @@ func (nodes *Nodes) GetNodeNameByUUID(ctx context.Context, nodeUUID string) (
return nodes.cnsNodeManager.GetNodeNameByUUID(ctx, nodeUUID)
}

// GetNodeByUuid returns VirtualMachine object for given nodeUuid.
// GetNodeVMByUuid returns VirtualMachine object for given nodeUuid.
// This is called by ControllerPublishVolume and ControllerUnpublishVolume
// to perform attach and detach operations.
func (nodes *Nodes) GetNodeByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error) {
return nodes.cnsNodeManager.GetNode(ctx, nodeUuid, nil)
func (nodes *Nodes) GetNodeVMByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error) {
return nodes.cnsNodeManager.GetNodeVMByUuid(ctx, nodeUuid)
}

// GetAllNodes returns VirtualMachine objects for all registered nodes in cluster.
Expand Down
8 changes: 4 additions & 4 deletions pkg/csi/service/common/commonco/k8sorchestrator/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,10 +1252,10 @@ func (volTopology *controllerVolumeTopology) getTopologySegmentsWithMatchingNode
var nodeVM *cnsvsphere.VirtualMachine
if volTopology.isCSINodeIdFeatureEnabled &&
volTopology.clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
nodeVM, err = volTopology.nodeMgr.GetNode(ctx,
nodeVM, err = volTopology.nodeMgr.GetNodeVMAndUpdateCache(ctx,
nodeTopologyInstance.Spec.NodeUUID, nil)
} else {
nodeVM, err = volTopology.nodeMgr.GetNodeByName(ctx,
nodeVM, err = volTopology.nodeMgr.GetNodeVMByNameAndUpdateCache(ctx,
nodeTopologyInstance.Spec.NodeID)
}
if err != nil {
Expand Down Expand Up @@ -1323,10 +1323,10 @@ func (volTopology *controllerVolumeTopology) getNodesMatchingTopologySegment(ctx
var nodeVM *cnsvsphere.VirtualMachine
if volTopology.isCSINodeIdFeatureEnabled &&
volTopology.clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
nodeVM, err = volTopology.nodeMgr.GetNode(ctx,
nodeVM, err = volTopology.nodeMgr.GetNodeVMAndUpdateCache(ctx,
nodeTopologyInstance.Spec.NodeUUID, nil)
} else {
nodeVM, err = volTopology.nodeMgr.GetNodeByName(ctx,
nodeVM, err = volTopology.nodeMgr.GetNodeVMByNameAndUpdateCache(ctx,
nodeTopologyInstance.Spec.NodeID)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/csi/service/common/placementengine/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func getTopologySegmentsWithMatchingNodes(ctx context.Context, requestedSegments
}
// If there is a match, fetch the nodeVM object and add it to matchingNodeVMs.
if isMatch {
nodeVM, err := nodeMgr.GetNode(ctx, nodeTopologyInstance.Spec.NodeUUID, nil)
nodeVM, err := nodeMgr.GetNodeVMAndUpdateCache(ctx, nodeTopologyInstance.Spec.NodeUUID, nil)
if err != nil {
return nil, nil, logger.LogNewErrorf(log,
"failed to retrieve NodeVM %q. Error - %+v", nodeTopologyInstance.Spec.NodeID, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/service/common/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func AddLabelsToTopologyVCMap(ctx context.Context, nodeTopoObj csinodetopologyv1
log := logger.GetLogger(ctx)
// Get node manager instance.
nodeManager := node.GetManager(ctx)
nodeVM, err := nodeManager.GetNode(ctx, nodeTopoObj.Spec.NodeUUID, nil)
nodeVM, err := nodeManager.GetNodeVMAndUpdateCache(ctx, nodeTopoObj.Spec.NodeUUID, nil)
if err != nil {
log.Errorf("Node %q is not yet registered in the node manager. Error: %+v",
nodeTopoObj.Spec.NodeUUID, err)
Expand All @@ -267,7 +267,7 @@ func RemoveLabelsFromTopologyVCMap(ctx context.Context, nodeTopoObj csinodetopol
log := logger.GetLogger(ctx)
// Get node manager instance.
nodeManager := node.GetManager(ctx)
nodeVM, err := nodeManager.GetNode(ctx, nodeTopoObj.Spec.NodeUUID, nil)
nodeVM, err := nodeManager.GetNodeVMAndUpdateCache(ctx, nodeTopoObj.Spec.NodeUUID, nil)
if err != nil {
log.Errorf("Node %q is not yet registered in the node manager. Error: %+v",
nodeTopoObj.Spec.NodeUUID, err)
Expand Down
20 changes: 10 additions & 10 deletions pkg/csi/service/vanilla/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ import (
type NodeManagerInterface interface {
Initialize(ctx context.Context, useNodeUuid bool) error
GetSharedDatastoresInK8SCluster(ctx context.Context) ([]*cnsvsphere.DatastoreInfo, error)
GetNodeByName(ctx context.Context, nodeName string) (*cnsvsphere.VirtualMachine, error)
GetNodeByNameOrUUID(ctx context.Context, nodeName string) (*cnsvsphere.VirtualMachine, error)
GetNodeVMByNameAndUpdateCache(ctx context.Context, nodeName string) (*cnsvsphere.VirtualMachine, error)
GetNodeVMByNameOrUUID(ctx context.Context, nodeName string) (*cnsvsphere.VirtualMachine, error)
GetNodeNameByUUID(ctx context.Context, nodeUUID string) (string, error)
GetNodeByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error)
GetNodeVMByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error)
GetAllNodes(ctx context.Context) ([]*cnsvsphere.VirtualMachine, error)
GetAllNodesByVC(ctx context.Context, vcHost string) ([]*cnsvsphere.VirtualMachine, error)
}
Expand Down Expand Up @@ -2116,14 +2116,14 @@ func (c *controller) ControllerPublishVolume(ctx context.Context, req *csi.Contr
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.UseCSINodeId) {
// if node is not yet updated to run the release of the driver publishing Node VM UUID as Node ID
// look up Node by name
nodevm, err = c.nodeMgr.GetNodeByNameOrUUID(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByNameOrUUID(ctx, req.NodeId)
if err == node.ErrNodeNotFound {
log.Infof("Performing node VM lookup using node VM UUID: %q", req.NodeId)
nodevm, err = c.nodeMgr.GetNodeByUuid(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByUuid(ctx, req.NodeId)
}

} else {
nodevm, err = c.nodeMgr.GetNodeByName(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByNameAndUpdateCache(ctx, req.NodeId)
}
if err != nil {
return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal,
Expand Down Expand Up @@ -2257,13 +2257,13 @@ func (c *controller) ControllerUnpublishVolume(ctx context.Context, req *csi.Con
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.UseCSINodeId) {
// if node is not yet updated to run the release of the driver publishing Node VM UUID as Node ID
// look up Node by name
nodevm, err = c.nodeMgr.GetNodeByNameOrUUID(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByNameOrUUID(ctx, req.NodeId)
if err == node.ErrNodeNotFound {
log.Infof("Performing node VM lookup using node VM UUID: %q", req.NodeId)
nodevm, err = c.nodeMgr.GetNodeByUuid(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByUuid(ctx, req.NodeId)
}
} else {
nodevm, err = c.nodeMgr.GetNodeByName(ctx, req.NodeId)
nodevm, err = c.nodeMgr.GetNodeVMByNameAndUpdateCache(ctx, req.NodeId)
}
if err != nil {
if err == cnsvsphere.ErrVMNotFound {
Expand Down Expand Up @@ -2605,7 +2605,7 @@ func (c *controller) processQueryResultsListVolumes(ctx context.Context, startin
publishedNodeIds := commonco.ContainerOrchestratorUtility.GetNodesForVolumes(ctx, []string{fileVolID})
for volID, nodeName := range publishedNodeIds {
if volID == fileVolID && len(nodeName) != 0 {
nodeVMObj, err := c.nodeMgr.GetNodeByName(ctx, publishedNodeIds[fileVolID][0])
nodeVMObj, err := c.nodeMgr.GetNodeVMByNameAndUpdateCache(ctx, publishedNodeIds[fileVolID][0])
if err != nil {
log.Errorf("Failed to get node vm object from the node name, err:%v", err)
return entries, nextToken, volumeType, err
Expand Down
9 changes: 5 additions & 4 deletions pkg/csi/service/vanilla/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ func (f *FakeNodeManager) GetSharedDatastoresInK8SCluster(ctx context.Context) (
}, nil
}

func (f *FakeNodeManager) GetNodeByName(ctx context.Context, nodeName string) (*cnsvsphere.VirtualMachine, error) {
func (f *FakeNodeManager) GetNodeVMByNameAndUpdateCache(ctx context.Context,
nodeName string) (*cnsvsphere.VirtualMachine, error) {
var vm *cnsvsphere.VirtualMachine
var t *testing.T
if v := os.Getenv("VSPHERE_DATACENTER"); v != "" {
Expand All @@ -246,16 +247,16 @@ func (f *FakeNodeManager) GetNodeByName(ctx context.Context, nodeName string) (*
return vm, nil
}

func (f *FakeNodeManager) GetNodeByNameOrUUID(
func (f *FakeNodeManager) GetNodeVMByNameOrUUID(
ctx context.Context, nodeNameOrUUID string) (*cnsvsphere.VirtualMachine, error) {
return f.GetNodeByName(ctx, nodeNameOrUUID)
return f.GetNodeVMByNameAndUpdateCache(ctx, nodeNameOrUUID)
}

func (f *FakeNodeManager) GetNodeNameByUUID(ctx context.Context, nodeUUID string) (string, error) {
return "", nil
}

func (f *FakeNodeManager) GetNodeByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error) {
func (f *FakeNodeManager) GetNodeVMByUuid(ctx context.Context, nodeUuid string) (*cnsvsphere.VirtualMachine, error) {
var vm *cnsvsphere.VirtualMachine
var t *testing.T
if v := os.Getenv("VSPHERE_DATACENTER"); v != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ func (r *ReconcileCSINodeTopology) reconcileForVanilla(ctx context.Context, requ
if r.useNodeUuid && clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
nodeID = instance.Spec.NodeUUID
if nodeID != "" {
nodeVM, err = nodeManager.GetNode(ctx, nodeID, nil)
nodeVM, err = nodeManager.GetNodeVMAndUpdateCache(ctx, nodeID, nil)
} else {
return reconcile.Result{RequeueAfter: timeout}, nil
}
} else {
nodeID = instance.Spec.NodeID
nodeVM, err = nodeManager.GetNodeByName(ctx, nodeID)
nodeVM, err = nodeManager.GetNodeVMByNameAndUpdateCache(ctx, nodeID)
}
if err != nil {
if err == node.ErrNodeNotFound {
Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/metadatasyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func startTopologyCRInformer(ctx context.Context, cfg *restclient.Config) error
// in the MetadataSyncer.topologyVCMap parameter.
func addLabelsToTopologyVCMap(ctx context.Context, nodeTopoObj csinodetopologyv1alpha1.CSINodeTopology) {
log := logger.GetLogger(ctx)
nodeVM, err := nodeMgr.GetNode(ctx, nodeTopoObj.Spec.NodeUUID, nil)
nodeVM, err := nodeMgr.GetNodeVMAndUpdateCache(ctx, nodeTopoObj.Spec.NodeUUID, nil)
if err != nil {
log.Errorf("Node %q is not yet registered in the node manager. Error: %+v",
nodeTopoObj.Spec.NodeUUID, err)
Expand Down Expand Up @@ -854,7 +854,7 @@ func topoCRDeleted(obj interface{}) {
// instance in the MetadataSyncer.topologyVCMap parameter.
func removeLabelsFromTopologyVCMap(ctx context.Context, nodeTopoObj csinodetopologyv1alpha1.CSINodeTopology) {
log := logger.GetLogger(ctx)
nodeVM, err := nodeMgr.GetNode(ctx, nodeTopoObj.Spec.NodeUUID, nil)
nodeVM, err := nodeMgr.GetNodeVMAndUpdateCache(ctx, nodeTopoObj.Spec.NodeUUID, nil)
if err != nil {
log.Errorf("Node %q is not yet registered in the node manager. Error: %+v",
nodeTopoObj.Spec.NodeUUID, err)
Expand Down

0 comments on commit 98a3fb4

Please sign in to comment.