From 7bfb2eb9d703387d70e27ff5e72f196dff9df888 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?p=C3=BDrus?= <kayrus@users.noreply.github.com>
Date: Thu, 14 Nov 2024 01:02:46 +0100
Subject: [PATCH] [csi-plugins] add support for PVC annotations (#2687)

* [csi-plugins] add support for PVC annotations

* Add support for volume/share name resolve

* Add a timeout to a e2e job, so we can fetch logs
---
 charts/cinder-csi-plugin/Chart.yaml           |   2 +-
 .../controllerplugin-deployment.yaml          |   3 +
 charts/cinder-csi-plugin/values.yaml          |   3 +
 charts/manila-csi-plugin/Chart.yaml           |   2 +-
 .../controllerplugin-statefulset.yaml         |   7 +-
 charts/manila-csi-plugin/values.yaml          |   3 +
 cmd/cinder-csi-plugin/main.go                 |   9 +-
 cmd/manila-csi-plugin/main.go                 |   4 +
 docs/cinder-csi-plugin/examples.md            |  49 +++++
 .../using-cinder-csi-plugin.md                |  27 +++
 .../using-manila-csi-plugin.md                |  28 ++-
 go.mod                                        |  14 +-
 go.sum                                        |  24 +--
 .../cinder-csi-controllerplugin.yaml          |   1 +
 .../csi-controllerplugin.yaml                 |   4 +-
 pkg/csi/cinder/controllerserver.go            |  91 ++++++---
 pkg/csi/cinder/controllerserver_test.go       |  47 ++---
 pkg/csi/cinder/driver.go                      |  21 +-
 pkg/csi/cinder/nodeserver.go                  |   3 +-
 pkg/csi/cinder/nodeserver_test.go             |   5 +-
 pkg/csi/cinder/openstack/openstack.go         |   4 +-
 pkg/csi/cinder/openstack/openstack_mock.go    |  37 +++-
 pkg/csi/cinder/openstack/openstack_volumes.go |  81 ++++++--
 pkg/csi/csi.go                                | 184 ++++++++++++++++++
 pkg/csi/manila/controllerserver.go            |  65 +++++--
 pkg/csi/manila/controllerserver_test.go       |  32 +--
 pkg/csi/manila/driver.go                      |   6 +
 pkg/csi/manila/manilaclient/client.go         |   8 +
 pkg/csi/manila/manilaclient/interface.go      |   3 +
 pkg/csi/manila/options/shareoptions.go        |   3 +
 pkg/csi/manila/share.go                       |  37 ++++
 pkg/csi/manila/volumesource.go                |  72 ++++---
 pkg/util/util.go                              |  59 +++---
 .../roles/install-csi-cinder/tasks/main.yaml  |   2 +
 tests/sanity/cinder/fakecloud.go              |  39 +++-
 tests/sanity/manila/fakemanilaclient.go       |   7 +
 36 files changed, 774 insertions(+), 212 deletions(-)
 create mode 100644 pkg/csi/csi.go

diff --git a/charts/cinder-csi-plugin/Chart.yaml b/charts/cinder-csi-plugin/Chart.yaml
index c21b962414..7a3437428f 100644
--- a/charts/cinder-csi-plugin/Chart.yaml
+++ b/charts/cinder-csi-plugin/Chart.yaml
@@ -2,7 +2,7 @@ apiVersion: v1
 appVersion: v1.31.0
 description: Cinder CSI Chart for OpenStack
 name: openstack-cinder-csi
-version: 2.31.3
+version: 2.31.4
 home: https://github.com/kubernetes/cloud-provider-openstack
 icon: https://github.com/kubernetes/kubernetes/blob/master/logo/logo.png
 maintainers:
diff --git a/charts/cinder-csi-plugin/templates/controllerplugin-deployment.yaml b/charts/cinder-csi-plugin/templates/controllerplugin-deployment.yaml
index 5c7ced79c0..4cc161fb1c 100644
--- a/charts/cinder-csi-plugin/templates/controllerplugin-deployment.yaml
+++ b/charts/cinder-csi-plugin/templates/controllerplugin-deployment.yaml
@@ -175,6 +175,9 @@ spec:
             {{- if .Values.csi.plugin.httpEndpoint.enabled }}
             - "--http-endpoint=:{{ .Values.csi.plugin.httpEndpoint.port }}"
             {{- end }}
+            {{- if .Values.pvcAnnotations }}
+            - "--pvc-annotations"
+            {{- end }}
             {{- if .Values.csi.plugin.extraArgs }}
             {{- with .Values.csi.plugin.extraArgs }}
             {{- tpl . $ | trim | nindent 12 }}
diff --git a/charts/cinder-csi-plugin/values.yaml b/charts/cinder-csi-plugin/values.yaml
index d73c64ff9d..3a0a17abdc 100644
--- a/charts/cinder-csi-plugin/values.yaml
+++ b/charts/cinder-csi-plugin/values.yaml
@@ -207,6 +207,9 @@ storageClass:
 # to volume metadata in newly provisioned volumes as `cinder.csi.openstack.org/cluster=<cluster ID>`.
 clusterID: "kubernetes"
 
+# Enable PVC annotations support to create PVCs with extra parameters
+pvcAnnotations: false
+
 priorityClassName: ""
 
 imagePullSecrets: []
diff --git a/charts/manila-csi-plugin/Chart.yaml b/charts/manila-csi-plugin/Chart.yaml
index 78a5e25ec6..a5a66f138c 100644
--- a/charts/manila-csi-plugin/Chart.yaml
+++ b/charts/manila-csi-plugin/Chart.yaml
@@ -2,7 +2,7 @@ apiVersion: v1
 appVersion: v1.31.0
 description: Manila CSI Chart for OpenStack
 name: openstack-manila-csi
-version: 2.31.0
+version: 2.31.1
 home: http://github.com/kubernetes/cloud-provider-openstack
 icon: https://github.com/kubernetes/kubernetes/blob/master/logo/logo.png
 maintainers:
diff --git a/charts/manila-csi-plugin/templates/controllerplugin-statefulset.yaml b/charts/manila-csi-plugin/templates/controllerplugin-statefulset.yaml
index 91a443336d..bdb3f6874b 100644
--- a/charts/manila-csi-plugin/templates/controllerplugin-statefulset.yaml
+++ b/charts/manila-csi-plugin/templates/controllerplugin-statefulset.yaml
@@ -26,7 +26,7 @@ spec:
             {{- if $.Values.csimanila.topologyAwarenessEnabled }}
             - "--feature-gates=Topology=true"
             {{- end }}
-            {{- if $.Values.controllerplugin.provisioner.extraCreateMetadata }}
+            {{- if or $.Values.controllerplugin.provisioner.extraCreateMetadata $.Values.csimanila.pvcAnnotations }}
             - "--extra-create-metadata"
             {{- end }}
           env:
@@ -101,7 +101,10 @@ spec:
             {{- if .compatibilitySettings }}
             --compatibility-settings={{ .compatibilitySettings }}
             {{- end }}
-            --cluster-id="{{ $.Values.csimanila.clusterID }}"'
+            --cluster-id="{{ $.Values.csimanila.clusterID }}"
+            {{- if $.Values.csimanila.pvcAnnotations }}
+            --pvc-annotations
+            {{- end }}'
           ]
           env:
             - name: DRIVER_NAME
diff --git a/charts/manila-csi-plugin/values.yaml b/charts/manila-csi-plugin/values.yaml
index f8523e9c53..81fd752ad1 100644
--- a/charts/manila-csi-plugin/values.yaml
+++ b/charts/manila-csi-plugin/values.yaml
@@ -43,6 +43,9 @@ csimanila:
   # to share metadata in newly provisioned shares as `manila.csi.openstack.org/cluster=<cluster ID>`.
   clusterID: ""
 
+  # Enable PVC annotations support to create PVCs with extra parameters
+  pvcAnnotations: false
+
   # Image spec
   image:
     repository: registry.k8s.io/provider-os/manila-csi-plugin
diff --git a/cmd/cinder-csi-plugin/main.go b/cmd/cinder-csi-plugin/main.go
index 1331f5f587..b12c562e38 100644
--- a/cmd/cinder-csi-plugin/main.go
+++ b/cmd/cinder-csi-plugin/main.go
@@ -22,6 +22,7 @@ import (
 
 	"github.com/spf13/cobra"
 	"github.com/spf13/pflag"
+	"k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
@@ -72,6 +73,8 @@ func main() {
 		Version: version.Version,
 	}
 
+	csi.AddPVCFlags(cmd)
+
 	cmd.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
 	if err := cmd.PersistentFlags().MarkDeprecated("nodeid", "This flag would be removed in future. Currently, the value is ignored by the driver"); err != nil {
 		klog.Fatalf("Unable to mark flag nodeid to be deprecated: %v", err)
@@ -103,7 +106,11 @@ func main() {
 
 func handle() {
 	// Initialize cloud
-	d := cinder.NewDriver(&cinder.DriverOpts{Endpoint: endpoint, ClusterID: cluster})
+	d := cinder.NewDriver(&cinder.DriverOpts{
+		Endpoint:  endpoint,
+		ClusterID: cluster,
+		PVCLister: csi.GetPVCLister(),
+	})
 
 	openstack.InitOpenStackProvider(cloudConfig, httpEndpoint)
 
diff --git a/cmd/manila-csi-plugin/main.go b/cmd/manila-csi-plugin/main.go
index b68217dce0..716a544062 100644
--- a/cmd/manila-csi-plugin/main.go
+++ b/cmd/manila-csi-plugin/main.go
@@ -22,6 +22,7 @@ import (
 	"strings"
 
 	"github.com/spf13/cobra"
+	"k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/csiclient"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/manilaclient"
@@ -86,6 +87,7 @@ func main() {
 				ManilaClientBuilder: manilaClientBuilder,
 				CSIClientBuilder:    csiClientBuilder,
 				ClusterID:           clusterID,
+				PVCLister:           csi.GetPVCLister(),
 			}
 
 			if provideNodeService {
@@ -119,6 +121,8 @@ func main() {
 		Version: version.Version,
 	}
 
+	csi.AddPVCFlags(cmd)
+
 	cmd.PersistentFlags().StringVar(&endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
 
 	cmd.PersistentFlags().StringVar(&driverName, "drivername", "manila.csi.openstack.org", "name of the driver")
diff --git a/docs/cinder-csi-plugin/examples.md b/docs/cinder-csi-plugin/examples.md
index 6a7e84d5b1..3cbf2e8999 100644
--- a/docs/cinder-csi-plugin/examples.md
+++ b/docs/cinder-csi-plugin/examples.md
@@ -10,6 +10,7 @@
   - [Snapshot Create and Restore](#snapshot-create-and-restore)
   - [Use Topology](#use-topology)
   - [Disaster recovery of PV and PVC](#disaster-recovery-of-pv-and-pvc)
+  - [Use scheduler hints annotations](#use-scheduler-hints-annotations)
 
 <!-- END doctoc generated TOC please keep comment here to allow auto update -->
 
@@ -453,3 +454,51 @@ spec:
   storageClassName: sata
   volumeMode: Filesystem
 ```
+
+## Use scheduler hints annotations
+
+Cinder CSI driver supports the use of scheduler hints to influence the
+placement of volumes. Scheduler hints can be specified in the
+PersistentVolumeClaim (PVC) annotations:
+
+* `cinder.csi.openstack.org/affinity`
+* `cinder.csi.openstack.org/anti-affinity`
+
+In order to use scheduler hints, the Cinder CSI controller plugin must be
+started with the `--pvc-annotations` flag. The PVC annotations take effect only
+when the PVC is created. The scheduler hints are not updated when the PVC is
+updated. The following example demonstrates how to use scheduler hints to
+influence the placement of volumes:
+
+```
+$ kubectl apply -f scheduler-hints-pvc.yaml
+```
+
+```
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  name: csi-pvc-cinderplugin
+  annotations:
+    cinder.csi.openstack.org/affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130003"
+    cinder.csi.openstack.org/anti-affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130004,pv-k8s--cluster-1b5f47bf-0119-442e-8529-254c36e43644"
+spec:
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 1Gi
+  storageClassName: csi-sc-cinderplugin
+```
+
+where `1b4e28ba-2fa1-11ec-8d3d-0242ac130003`,
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130004` and
+`pv-k8s--cluster-1b5f47bf-0119-442e-8529-254c36e43644` are names or UUIDs of
+the already provisioned volumes in the OpenStack cloud. The scheduler will try
+to place the volume in the same block storage server as the volume with the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130003` UUID and avoid placing the volume in the
+same block storage server as the volumes with the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130004` UUID and the
+`pv-k8s--cluster-1b5f47bf-0119-442e-8529-254c36e43644` volume name. If the
+scheduler hints are not satisfied, the volume will not be provisioned with an
+error message in the controller logs.
diff --git a/docs/cinder-csi-plugin/using-cinder-csi-plugin.md b/docs/cinder-csi-plugin/using-cinder-csi-plugin.md
index 3e1a3c3e03..dbbed195b0 100644
--- a/docs/cinder-csi-plugin/using-cinder-csi-plugin.md
+++ b/docs/cinder-csi-plugin/using-cinder-csi-plugin.md
@@ -17,6 +17,7 @@
   - [Supported Features](#supported-features)
   - [Sidecar Compatibility](#sidecar-compatibility)
   - [Supported Parameters](#supported-parameters)
+  - [Supported PVC Annotations](#supported-pvc-annotations)
   - [Local Development](#local-development)
     - [Build](#build)
     - [Testing](#testing)
@@ -110,6 +111,14 @@ In addition to the standard set of klog flags, `cinder-csi-plugin` accepts the f
   If set to true then the CSI driver does provide the node service.
 
   The default is to provide the node service.
+
+  <dt>--pvc-annotations &lt;disabled&gt;</dt>
+  <dd>
+  If set to true then the CSI driver will use PVC annotations to provide volume
+  scheduler hints. See [Supported PVC Annotations](#supported-pvc-annotations)
+  for more information.
+
+  The default is not to provide the PVC annotations support.
   </dd>
 </dl>
 
@@ -273,6 +282,24 @@ helm install --namespace kube-system --name cinder-csi ./charts/cinder-csi-plugi
 | Inline Volume `volumeAttributes`   | `capacity`              | `1Gi`       | volume size for creating inline volumes|
 | Inline Volume `VolumeAttributes`   | `type`              | Empty String  | Name/ID of Volume type. Corresponding volume type should exist in cinder |
 
+## Supported PVC Annotations
+
+The PVC annotations support must be enabled in the Cinder CSI controller with
+the `--pvc-annotations` flag. The PVC annotations take effect only when the PVC
+is created. The scheduler hints are not updated when the PVC is updated. The
+following PVC annotations are supported:
+
+| Annotation Name            | Description      | Example |
+|-------------------------   |-----------------|----------|
+| `cinder.csi.openstack.org/affinity` | Volume affinity to existing volume or volumes names/UUIDs. The value should be a comma-separated list of volume names/UUIDs. | `cinder.csi.openstack.org/affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130003"` |
+| `cinder.csi.openstack.org/anti-affinity` | Volume anti-affinity to existing volume or volumes names/UUIDs. The value should be a comma-separated list of volume names/UUIDs. | `cinder.csi.openstack.org/anti-affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130004,pv-k8s--cluster-1b5f47bf-0119-442e-8529-254c36e43644"` |
+
+If the PVC annotation is set, the volume will be created according to the
+existing volume names/UUIDs placements, i.e. on the same host as the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130003` volume and not on the same host as the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130004` and
+`pv-k8s--cluster-1b5f47bf-0119-442e-8529-254c36e43644` volumes.
+
 ## Local Development
 
 ### Build
diff --git a/docs/manila-csi-plugin/using-manila-csi-plugin.md b/docs/manila-csi-plugin/using-manila-csi-plugin.md
index f9e02636eb..f9e0301ad8 100644
--- a/docs/manila-csi-plugin/using-manila-csi-plugin.md
+++ b/docs/manila-csi-plugin/using-manila-csi-plugin.md
@@ -15,6 +15,7 @@
       - [Verifying the deployment](#verifying-the-deployment)
       - [Enabling topology awareness](#enabling-topology-awareness)
   - [Share protocol support matrix](#share-protocol-support-matrix)
+  - [Supported PVC annotations](#supported-pvc-annotations)
   - [For developers](#for-developers)
 
 <!-- END doctoc generated TOC please keep comment here to allow auto update -->
@@ -42,6 +43,7 @@ Option | Default value | Description
 `--cluster-id` | _none_ | The identifier of the cluster that the plugin is running in. If set then the plugin will add "manila.csi.openstack.org/cluster: \<clusterID\>" to metadata of created shares.
 `--provide-controller-service` | `true` | If set to true then the CSI driver does provide the controller service.
 `--provide-node-service` | `true` | If set to true then the CSI driver does provide the node service.
+`--pvc-annotations` | `false` | If set to true then the CSI driver will use PVC annotations as an additional information when creating shares. See [Supported PVC annotations](#supported-pvc-annotations) for more info.
 
 ### Controller Service volume parameters
 
@@ -53,6 +55,7 @@ Parameter | Required | Description
 `shareNetworkID` | _no_ | Manila [share network ID](https://wiki.openstack.org/wiki/Manila/Concepts#share_network)
 `availability` | _no_ | Manila availability zone of the provisioned share. If none is provided, the default Manila zone will be used. Note that this parameter is opaque to the CO and does not influence placement of workloads that will consume this share, meaning they may be scheduled onto any node of the cluster. If the specified Manila AZ is not equally accessible from all compute nodes of the cluster, use [Topology-aware dynamic provisioning](#topology-aware-dynamic-provisioning).
 `autoTopology` | _no_ | When set to "true" and the `availability` parameter is empty, the Manila CSI controller will map the Manila availability zone to the target compute node availability zone.
+`groupID` | _no_ | The UUID of the share group to which the provisioned share belongs. If not empty, the share will be created in the specified share group. The share group must be created in advance before the PVC is created.
 `appendShareMetadata` | _no_ | Append user-defined metadata to the provisioned share. If not empty, this field must be a string with a valid JSON object. The object must consist of key-value pairs of type string. Example: `"{..., \"key\": \"value\"}"`.
 `cephfs-mounter` | _no_ | Relevant for CephFS Manila shares. Specifies which mounting method to use with the CSI CephFS driver. Available options are `kernel` and `fuse`, defaults to `fuse`. See [CSI CephFS docs](https://github.com/ceph/ceph-csi/blob/csi-v1.0/docs/deploy-cephfs.md#configuration) for further information.
 `cephfs-kernelMountOptions` | _no_ | Relevant for CephFS Manila shares. Specifies mount options for CephFS kernel client. See [CSI CephFS docs](https://github.com/ceph/ceph-csi/blob/csi-v1.0/docs/deploy-cephfs.md#configuration) for further information.
@@ -272,7 +275,30 @@ Manila share protocol | CSI Node Plugin
 `CEPHFS` | [CSI CephFS](https://github.com/ceph/ceph-csi) : v1.0.0
 `NFS` | [CSI NFS](https://github.com/kubernetes-csi/csi-driver-nfs) : v1.0.0
 
+## Supported PVC Annotations
+
+The PVC annotations support must be enabled in the Manila CSI controller with
+the `--pvc-annotations` flag. The PVC annotations take effect only when the PVC
+is created. The scheduler hints are not updated when the PVC is updated. The
+minimum Manila API microversion required for scheduler hints is 2.65. Make sure
+that the Manila API microversion is supported by the Manila backend. The
+following PVC annotations are supported:
+
+| Annotation Name            | Description      | Example |
+|-------------------------   |-----------------|----------|
+| `manila.csi.openstack.org/affinity` | Share affinity to existing share or shares names/UUIDs. The value should be a comma-separated list of share names or UUIDs. | `manila.csi.openstack.org/affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130003"` |
+| `manila.csi.openstack.org/anti-affinity` | Share anti-affinity to existing share or shares names/UUIDs. The value should be a comma-separated list of share names/UUIDs. | `manila.csi.openstack.org/anti-affinity: "1b4e28ba-2fa1-11ec-8d3d-0242ac130004,pv-default-50c5a3b3-e0b5-48d6-a163-4e68956aeb54"` |
+| `manila.csi.openstack.org/group-id` | The UUID of the share group to which the provisioned share must belong. The share group must be created before the PVC is created. | `manila.csi.openstack.org/group-id: "1b4e28ba-2fa1-11ec-8d3d-0242ac130006"` |
+
+If the PVC annotation is set, the share will be created according to the
+existing share names/UUIDs placements, i.e. on the same host as the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130003` share and not on the same host as the
+`1b4e28ba-2fa1-11ec-8d3d-0242ac130004` and
+`pv-default-50c5a3b3-e0b5-48d6-a163-4e68956aeb54` shares.
+
+The `manila.csi.openstack.org/group-id` annotation value overrides the storage
+class `groupID` parameter if both are set.
+
 ## For developers
 
 If you'd like to contribute to CSI Manila, check out `docs/manila-csi-plugin/developers-csi-manila.md` to get you started.
-
diff --git a/go.mod b/go.mod
index 2bb58d8c49..166fb15668 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,8 @@ go 1.22.0
 require (
 	github.com/container-storage-interface/spec v1.9.0
 	github.com/go-chi/chi/v5 v5.0.8
-	github.com/gophercloud/gophercloud/v2 v2.0.0
+	github.com/google/uuid v1.6.0
+	github.com/gophercloud/gophercloud/v2 v2.1.2-0.20241016132526-1c4dd03733fe
 	github.com/gophercloud/utils/v2 v2.0.0-20240701101423-2401526caee5
 	github.com/hashicorp/go-version v1.6.0
 	github.com/kubernetes-csi/csi-lib-utils v0.13.0
@@ -22,8 +23,8 @@ require (
 	github.com/stretchr/testify v1.9.0
 	go.uber.org/goleak v1.3.0
 	golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
-	golang.org/x/sys v0.21.0
-	golang.org/x/term v0.21.0
+	golang.org/x/sys v0.26.0
+	golang.org/x/term v0.25.0
 	google.golang.org/grpc v1.65.0
 	google.golang.org/protobuf v1.34.2
 	gopkg.in/gcfg.v1 v1.2.3
@@ -91,7 +92,6 @@ require (
 	github.com/google/go-cmp v0.6.0 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect
-	github.com/google/uuid v1.6.0 // indirect
 	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
@@ -142,11 +142,11 @@ require (
 	go.opentelemetry.io/proto/otlp v1.3.1 // indirect
 	go.uber.org/multierr v1.11.0 // indirect
 	go.uber.org/zap v1.26.0 // indirect
-	golang.org/x/crypto v0.24.0 // indirect
+	golang.org/x/crypto v0.28.0 // indirect
 	golang.org/x/net v0.26.0 // indirect
 	golang.org/x/oauth2 v0.21.0 // indirect
-	golang.org/x/sync v0.7.0 // indirect
-	golang.org/x/text v0.16.0 // indirect
+	golang.org/x/sync v0.8.0 // indirect
+	golang.org/x/text v0.19.0 // indirect
 	golang.org/x/time v0.3.0 // indirect
 	golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
 	google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
diff --git a/go.sum b/go.sum
index cce5a6f9d0..1299736f82 100644
--- a/go.sum
+++ b/go.sum
@@ -228,8 +228,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
 github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
-github.com/gophercloud/gophercloud/v2 v2.0.0 h1:iH0x0Ji79a/ULzmq95fvOBAyie7+M+wUAEu+JrRMsCk=
-github.com/gophercloud/gophercloud/v2 v2.0.0/go.mod h1:ZKbcGNjxFTSaP5wlvtLDdsppllD/UGGvXBPqcjeqA8Y=
+github.com/gophercloud/gophercloud/v2 v2.1.2-0.20241016132526-1c4dd03733fe h1:cbowqLRC8NQOuuJl/CkO7C0cCWJ0hAGt+V0Cz+XDWfI=
+github.com/gophercloud/gophercloud/v2 v2.1.2-0.20241016132526-1c4dd03733fe/go.mod h1:f2hMRC7Kakbv5vM7wSGHrIPZh6JZR60GVHryJlF/K44=
 github.com/gophercloud/utils/v2 v2.0.0-20240701101423-2401526caee5 h1:/mLIQMTyjIVfiwQkknJS9XxEPLFuB70ss+ZrofChBf8=
 github.com/gophercloud/utils/v2 v2.0.0-20240701101423-2401526caee5/go.mod h1:3tI9DoiOJFBkqbOeAPqPns/QUnMCiflwYBvgR6KJdM4=
 github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
@@ -454,8 +454,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
-golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
+golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
+golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -559,8 +559,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
-golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
+golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -612,12 +612,12 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
-golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
+golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
-golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
+golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
+golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -626,8 +626,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
-golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
+golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
diff --git a/manifests/cinder-csi-plugin/cinder-csi-controllerplugin.yaml b/manifests/cinder-csi-plugin/cinder-csi-controllerplugin.yaml
index 8b871d7988..7e3cb641e9 100644
--- a/manifests/cinder-csi-plugin/cinder-csi-controllerplugin.yaml
+++ b/manifests/cinder-csi-plugin/cinder-csi-controllerplugin.yaml
@@ -99,6 +99,7 @@ spec:
             - "--endpoint=$(CSI_ENDPOINT)"
             - "--cloud-config=$(CLOUD_CONFIG)"
             - "--cluster=$(CLUSTER_NAME)"
+            - "--pvc-annotations"
             - "--v=1"
           env:
             - name: CSI_ENDPOINT
diff --git a/manifests/manila-csi-plugin/csi-controllerplugin.yaml b/manifests/manila-csi-plugin/csi-controllerplugin.yaml
index fda9e63d95..caa0c970ac 100644
--- a/manifests/manila-csi-plugin/csi-controllerplugin.yaml
+++ b/manifests/manila-csi-plugin/csi-controllerplugin.yaml
@@ -39,6 +39,7 @@ spec:
           image: "registry.k8s.io/sig-storage/csi-provisioner:v3.0.0"
           args:
             - "--csi-address=$(ADDRESS)"
+            - "--extra-create-metadata"
             # To enable topology awareness in csi-provisioner, uncomment the following line:
             # - "--feature-gates=Topology=true"
           env:
@@ -84,7 +85,8 @@ spec:
             --endpoint=$(CSI_ENDPOINT)
             --drivername=$(DRIVER_NAME)
             --share-protocol-selector=$(MANILA_SHARE_PROTO)
-            --fwdendpoint=$(FWD_CSI_ENDPOINT)'
+            --fwdendpoint=$(FWD_CSI_ENDPOINT)
+            --pvc-annotations'
             # To enable topology awareness and retrieve compute node AZs from the OpenStack Metadata Service, add the following flags:
             # --with-topology
             # --nodeaz=$(curl http://169.254.169.254/openstack/latest/meta_data.json | jq -r .availability_zone)
diff --git a/pkg/csi/cinder/controllerserver.go b/pkg/csi/cinder/controllerserver.go
index c69171aada..43b2ba1941 100644
--- a/pkg/csi/cinder/controllerserver.go
+++ b/pkg/csi/cinder/controllerserver.go
@@ -33,6 +33,7 @@ import (
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
+	sharedcsi "k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 	"k8s.io/cloud-provider-openstack/pkg/util"
 	cpoerrors "k8s.io/cloud-provider-openstack/pkg/util/errors"
@@ -46,6 +47,8 @@ type controllerServer struct {
 
 const (
 	cinderCSIClusterIDKey = "cinder.csi.openstack.org/cluster"
+	affinityKey           = "cinder.csi.openstack.org/affinity"
+	antiAffinityKey       = "cinder.csi.openstack.org/anti-affinity"
 )
 
 func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
@@ -61,6 +64,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 	// Volume Name
 	volName := req.GetName()
 	volCapabilities := req.GetVolumeCapabilities()
+	volParams := req.GetParameters()
 
 	if len(volName) == 0 {
 		return nil, status.Error(codes.InvalidArgument, "[CreateVolume] missing Volume Name")
@@ -78,43 +82,48 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 	volSizeGB := int(util.RoundUpSize(volSizeBytes, 1024*1024*1024))
 
 	// Volume Type
-	volType := req.GetParameters()["type"]
+	volType := volParams["type"]
 
 	// First check if volAvailability is already specified, if not get preferred from Topology
 	// Required, incase vol AZ is different from node AZ
-	volAvailability := req.GetParameters()["availability"]
+	volAvailability := volParams["availability"]
 	if volAvailability == "" {
 		// Check from Topology
 		if req.GetAccessibilityRequirements() != nil {
-			volAvailability = util.GetAZFromTopology(topologyKey, req.GetAccessibilityRequirements())
+			volAvailability = sharedcsi.GetAZFromTopology(topologyKey, req.GetAccessibilityRequirements())
 		}
 	}
 
 	ignoreVolumeAZ := cloud.GetBlockStorageOpts().IgnoreVolumeAZ
 
+	// get the PVC annotation
+	pvcAnnotations := sharedcsi.GetPVCAnnotations(cs.Driver.pvcLister, volParams)
+	for k, v := range pvcAnnotations {
+		klog.V(4).Infof("CreateVolume: retrieved %q pvc annotation: %s: %s", k, v, volName)
+	}
+
 	// Verify a volume with the provided name doesn't already exist for this tenant
-	volumes, err := cloud.GetVolumesByName(volName)
+	vols, err := cloud.GetVolumesByName(volName)
 	if err != nil {
 		klog.Errorf("Failed to query for existing Volume during CreateVolume: %v", err)
 		return nil, status.Errorf(codes.Internal, "Failed to get volumes: %v", err)
 	}
 
-	if len(volumes) == 1 {
-		if volSizeGB != volumes[0].Size {
+	if len(vols) == 1 {
+		if volSizeGB != vols[0].Size {
 			return nil, status.Error(codes.AlreadyExists, "Volume Already exists with same name and different capacity")
 		}
-		klog.V(4).Infof("Volume %s already exists in Availability Zone: %s of size %d GiB", volumes[0].ID, volumes[0].AvailabilityZone, volumes[0].Size)
-		return getCreateVolumeResponse(&volumes[0], ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
-	} else if len(volumes) > 1 {
+		klog.V(4).Infof("Volume %s already exists in Availability Zone: %s of size %d GiB", vols[0].ID, vols[0].AvailabilityZone, vols[0].Size)
+		return getCreateVolumeResponse(&vols[0], nil, ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
+	} else if len(vols) > 1 {
 		klog.V(3).Infof("found multiple existing volumes with selected name (%s) during create", volName)
 		return nil, status.Error(codes.Internal, "Multiple volumes reported by Cinder with same name")
-
 	}
 
 	// Volume Create
 	properties := map[string]string{cinderCSIClusterIDKey: cs.Driver.cluster}
 	//Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
-	for _, mKey := range []string{"csi.storage.k8s.io/pvc/name", "csi.storage.k8s.io/pvc/namespace", "csi.storage.k8s.io/pv/name"} {
+	for _, mKey := range sharedcsi.RecognizedCSIProvisionerParams {
 		if v, ok := req.Parameters[mKey]; ok {
 			properties[mKey] = v
 		}
@@ -177,20 +186,59 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 		}
 	}
 
-	vol, err := cloud.CreateVolume(volName, volSizeGB, volType, volAvailability, snapshotID, sourceVolID, sourceBackupID, properties)
-	// When creating a volume from a backup, the response does not include the backupID.
-	if sourceBackupID != "" {
-		vol.BackupID = &sourceBackupID
+	opts := &volumes.CreateOpts{
+		Name:             volName,
+		Size:             volSizeGB,
+		VolumeType:       volType,
+		AvailabilityZone: volAvailability,
+		SnapshotID:       snapshotID,
+		SourceVolID:      sourceVolID,
+		BackupID:         sourceBackupID,
+		Metadata:         properties,
+	}
+
+	// Set scheduler hints if affinity or anti-affinity is set in PVC annotations
+	var schedulerHints volumes.SchedulerHintOptsBuilder
+	var volCtx map[string]string
+	affinity := pvcAnnotations[affinityKey]
+	antiAffinity := pvcAnnotations[antiAffinityKey]
+	if affinity != "" || antiAffinity != "" {
+		klog.V(4).Infof("CreateVolume: Getting scheduler hints: affinity=%s, anti-affinity=%s", affinity, antiAffinity)
+
+		// resolve volume names to UUIDs
+		affinity, err = cloud.ResolveVolumeListToUUIDs(affinity)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "failed to resolve affinity volume UUIDs: %v", err)
+		}
+		antiAffinity, err = cloud.ResolveVolumeListToUUIDs(antiAffinity)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "failed to resolve anti-affinity volume UUIDs: %v", err)
+		}
+
+		volCtx = util.SetMapIfNotEmpty(volCtx, "affinity", affinity)
+		volCtx = util.SetMapIfNotEmpty(volCtx, "anti-affinity", antiAffinity)
+		schedulerHints = &volumes.SchedulerHintOpts{
+			SameHost:      util.SplitTrim(affinity, ','),
+			DifferentHost: util.SplitTrim(antiAffinity, ','),
+		}
+
+		klog.V(4).Infof("CreateVolume: Resolved scheduler hints: affinity=%s, anti-affinity=%s", affinity, antiAffinity)
 	}
 
+	vol, err := cloud.CreateVolume(opts, schedulerHints)
 	if err != nil {
 		klog.Errorf("Failed to CreateVolume: %v", err)
 		return nil, status.Errorf(codes.Internal, "CreateVolume failed with error %v", err)
 	}
 
+	// When creating a volume from a backup, the response does not include the backupID.
+	if sourceBackupID != "" {
+		vol.BackupID = &sourceBackupID
+	}
+
 	klog.V(4).Infof("CreateVolume: Successfully created volume %s in Availability Zone: %s of size %d GiB", vol.ID, vol.AvailabilityZone, vol.Size)
 
-	return getCreateVolumeResponse(vol, ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
+	return getCreateVolumeResponse(vol, volCtx, ignoreVolumeAZ, req.GetAccessibilityRequirements()), nil
 }
 
 func (d *controllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
@@ -686,7 +734,6 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
 }
 
 func (cs *controllerServer) createSnapshot(cloud openstack.IOpenStack, name string, volumeID string, parameters map[string]string) (snap *snapshots.Snapshot, err error) {
-
 	filters := map[string]string{}
 	filters["Name"] = name
 
@@ -723,7 +770,7 @@ func (cs *controllerServer) createSnapshot(cloud openstack.IOpenStack, name stri
 	// Also, we don't want to tag every param but we still want to send the
 	// 'force-create' flag to openstack layer so that we will honor the
 	// force create functions
-	for _, mKey := range []string{"csi.storage.k8s.io/volumesnapshot/name", "csi.storage.k8s.io/volumesnapshot/namespace", "csi.storage.k8s.io/volumesnapshotcontent/name", openstack.SnapshotForceCreate} {
+	for _, mKey := range append(sharedcsi.RecognizedCSISnapshotterParams, openstack.SnapshotForceCreate) {
 		if v, ok := parameters[mKey]; ok {
 			properties[mKey] = v
 		}
@@ -742,7 +789,6 @@ func (cs *controllerServer) createSnapshot(cloud openstack.IOpenStack, name stri
 }
 
 func (cs *controllerServer) createBackup(cloud openstack.IOpenStack, name string, volumeID string, snap *snapshots.Snapshot, parameters map[string]string) (*backups.Backup, error) {
-
 	// Add cluster ID to the snapshot metadata
 	properties := map[string]string{cinderCSIClusterIDKey: cs.Driver.cluster}
 
@@ -750,7 +796,7 @@ func (cs *controllerServer) createBackup(cloud openstack.IOpenStack, name string
 	// Also, we don't want to tag every param but we still want to send the
 	// 'force-create' flag to openstack layer so that we will honor the
 	// force create functions
-	for _, mKey := range []string{"csi.storage.k8s.io/volumesnapshot/name", "csi.storage.k8s.io/volumesnapshot/namespace", "csi.storage.k8s.io/volumesnapshotcontent/name", openstack.SnapshotForceCreate, openstack.SnapshotType} {
+	for _, mKey := range append(sharedcsi.RecognizedCSISnapshotterParams, openstack.SnapshotForceCreate, openstack.SnapshotType) {
 		if v, ok := parameters[mKey]; ok {
 			properties[mKey] = v
 		}
@@ -806,7 +852,6 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
 }
 
 func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
-
 	// Volume cloud
 	volCloud := req.GetSecrets()["cloud"]
 	cloud, cloudExist := cs.Clouds[volCloud]
@@ -901,7 +946,6 @@ func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *
 }
 
 func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
-
 	// Volume cloud
 	volCloud := req.GetSecrets()["cloud"]
 	cloud, cloudExist := cs.Clouds[volCloud]
@@ -1058,8 +1102,7 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
 	}, nil
 }
 
-func getCreateVolumeResponse(vol *volumes.Volume, ignoreVolumeAZ bool, accessibleTopologyReq *csi.TopologyRequirement) *csi.CreateVolumeResponse {
-
+func getCreateVolumeResponse(vol *volumes.Volume, volCtx map[string]string, ignoreVolumeAZ bool, accessibleTopologyReq *csi.TopologyRequirement) *csi.CreateVolumeResponse {
 	var volsrc *csi.VolumeContentSource
 	volCnx := map[string]string{}
 
diff --git a/pkg/csi/cinder/controllerserver_test.go b/pkg/csi/cinder/controllerserver_test.go
index e65d8c661d..d04131ccae 100644
--- a/pkg/csi/cinder/controllerserver_test.go
+++ b/pkg/csi/cinder/controllerserver_test.go
@@ -24,6 +24,7 @@ import (
 	"github.com/gophercloud/gophercloud/v2/openstack/blockstorage/v3/volumes"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
+	sharedcsi "k8s.io/cloud-provider-openstack/pkg/csi"
 	openstack "k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 )
 
@@ -53,7 +54,7 @@ func init() {
 // Test CreateVolume
 func TestCreateVolume(t *testing.T) {
 	// mock OpenStack
-	properties := map[string]string{"cinder.csi.openstack.org/cluster": FakeCluster}
+	properties := map[string]string{cinderCSIClusterIDKey: FakeCluster}
 	// CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (string, string, int, error)
 	osmock.On("CreateVolume", FakeVolName, mock.AnythingOfType("int"), FakeVolType, FakeAvailability, "", "", "", properties).Return(&FakeVol, nil)
 
@@ -75,7 +76,7 @@ func TestCreateVolume(t *testing.T) {
 		AccessibilityRequirements: &csi.TopologyRequirement{
 			Requisite: []*csi.Topology{
 				{
-					Segments: map[string]string{"topology.cinder.csi.openstack.org/zone": FakeAvailability},
+					Segments: map[string]string{topologyKey: FakeAvailability},
 				},
 			},
 		},
@@ -99,7 +100,7 @@ func TestCreateVolume(t *testing.T) {
 // Test CreateVolume with additional param
 func TestCreateVolumeWithParam(t *testing.T) {
 	// mock OpenStack
-	properties := map[string]string{"cinder.csi.openstack.org/cluster": FakeCluster}
+	properties := map[string]string{cinderCSIClusterIDKey: FakeCluster}
 	// CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (string, string, int, error)
 	// Vol type and availability comes from CreateVolumeRequest.Parameters
 	osmock.On("CreateVolume", FakeVolName, mock.AnythingOfType("int"), "dummyVolType", "cinder", "", "", "", properties).Return(&FakeVol, nil)
@@ -127,7 +128,7 @@ func TestCreateVolumeWithParam(t *testing.T) {
 		AccessibilityRequirements: &csi.TopologyRequirement{
 			Requisite: []*csi.Topology{
 				{
-					Segments: map[string]string{"topology.cinder.csi.openstack.org/zone": FakeAvailability},
+					Segments: map[string]string{topologyKey: FakeAvailability},
 				},
 			},
 		},
@@ -151,10 +152,10 @@ func TestCreateVolumeWithParam(t *testing.T) {
 func TestCreateVolumeWithExtraMetadata(t *testing.T) {
 	// mock OpenStack
 	properties := map[string]string{
-		"cinder.csi.openstack.org/cluster": FakeCluster,
-		"csi.storage.k8s.io/pv/name":       FakePVName,
-		"csi.storage.k8s.io/pvc/name":      FakePVCName,
-		"csi.storage.k8s.io/pvc/namespace": FakePVCNamespace,
+		cinderCSIClusterIDKey:     FakeCluster,
+		sharedcsi.PvNameKey:       FakePVName,
+		sharedcsi.PvcNameKey:      FakePVCName,
+		sharedcsi.PvcNamespaceKey: FakePVCNamespace,
 	}
 	// CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (string, string, int, error)
 	osmock.On("CreateVolume", FakeVolName, mock.AnythingOfType("int"), FakeVolType, FakeAvailability, "", "", "", properties).Return(&FakeVol, nil)
@@ -165,9 +166,9 @@ func TestCreateVolumeWithExtraMetadata(t *testing.T) {
 	fakeReq := &csi.CreateVolumeRequest{
 		Name: FakeVolName,
 		Parameters: map[string]string{
-			"csi.storage.k8s.io/pv/name":       FakePVName,
-			"csi.storage.k8s.io/pvc/name":      FakePVCName,
-			"csi.storage.k8s.io/pvc/namespace": FakePVCNamespace,
+			sharedcsi.PvNameKey:       FakePVName,
+			sharedcsi.PvcNameKey:      FakePVCName,
+			sharedcsi.PvcNamespaceKey: FakePVCNamespace,
 		},
 		VolumeCapabilities: []*csi.VolumeCapability{
 			{
@@ -180,7 +181,7 @@ func TestCreateVolumeWithExtraMetadata(t *testing.T) {
 		AccessibilityRequirements: &csi.TopologyRequirement{
 			Requisite: []*csi.Topology{
 				{
-					Segments: map[string]string{"topology.cinder.csi.openstack.org/zone": FakeAvailability},
+					Segments: map[string]string{topologyKey: FakeAvailability},
 				},
 			},
 		},
@@ -195,7 +196,7 @@ func TestCreateVolumeWithExtraMetadata(t *testing.T) {
 }
 
 func TestCreateVolumeFromSnapshot(t *testing.T) {
-	properties := map[string]string{"cinder.csi.openstack.org/cluster": FakeCluster}
+	properties := map[string]string{cinderCSIClusterIDKey: FakeCluster}
 	// CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (string, string, int, error)
 	osmock.On("CreateVolume", FakeVolName, mock.AnythingOfType("int"), FakeVolType, "", FakeSnapshotID, "", "", properties).Return(&FakeVolFromSnapshot, nil)
 	osmock.On("GetVolumesByName", FakeVolName).Return(FakeVolListEmpty, nil)
@@ -242,7 +243,7 @@ func TestCreateVolumeFromSnapshot(t *testing.T) {
 }
 
 func TestCreateVolumeFromSourceVolume(t *testing.T) {
-	properties := map[string]string{"cinder.csi.openstack.org/cluster": FakeCluster}
+	properties := map[string]string{cinderCSIClusterIDKey: FakeCluster}
 	// CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (string, string, int, error)
 	osmock.On("CreateVolume", FakeVolName, mock.AnythingOfType("int"), FakeVolType, "", "", FakeVolID, "", properties).Return(&FakeVolFromSourceVolume, nil)
 	osmock.On("GetVolumesByName", FakeVolName).Return(FakeVolListEmpty, nil)
@@ -1347,11 +1348,11 @@ func TestCreateSnapshot(t *testing.T) {
 // Test CreateSnapshot with extra metadata
 func TestCreateSnapshotWithExtraMetadata(t *testing.T) {
 	properties := map[string]string{
-		"cinder.csi.openstack.org/cluster":              FakeCluster,
-		"csi.storage.k8s.io/volumesnapshot/name":        FakeSnapshotName,
-		"csi.storage.k8s.io/volumesnapshotcontent/name": FakeSnapshotContentName,
-		"csi.storage.k8s.io/volumesnapshot/namespace":   FakeSnapshotNamespace,
-		openstack.SnapshotForceCreate:                   "true",
+		cinderCSIClusterIDKey:               FakeCluster,
+		sharedcsi.VolSnapshotNameKey:        FakeSnapshotName,
+		sharedcsi.VolSnapshotContentNameKey: FakeSnapshotContentName,
+		sharedcsi.VolSnapshotNamespaceKey:   FakeSnapshotNamespace,
+		openstack.SnapshotForceCreate:       "true",
 	}
 
 	osmock.On("CreateSnapshot", FakeSnapshotName, FakeVolID, properties).Return(&FakeSnapshotRes, nil)
@@ -1366,10 +1367,10 @@ func TestCreateSnapshotWithExtraMetadata(t *testing.T) {
 		Name:           FakeSnapshotName,
 		SourceVolumeId: FakeVolID,
 		Parameters: map[string]string{
-			"csi.storage.k8s.io/volumesnapshot/name":        FakeSnapshotName,
-			"csi.storage.k8s.io/volumesnapshotcontent/name": FakeSnapshotContentName,
-			"csi.storage.k8s.io/volumesnapshot/namespace":   FakeSnapshotNamespace,
-			openstack.SnapshotForceCreate:                   "true",
+			sharedcsi.VolSnapshotNameKey:        FakeSnapshotName,
+			sharedcsi.VolSnapshotContentNameKey: FakeSnapshotContentName,
+			sharedcsi.VolSnapshotNamespaceKey:   FakeSnapshotNamespace,
+			openstack.SnapshotForceCreate:       "true",
 		},
 	}
 
diff --git a/pkg/csi/cinder/driver.go b/pkg/csi/cinder/driver.go
index 1040a09bdb..bd5180a03e 100644
--- a/pkg/csi/cinder/driver.go
+++ b/pkg/csi/cinder/driver.go
@@ -22,6 +22,7 @@ import (
 	"github.com/container-storage-interface/spec/lib/go/csi"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"k8s.io/client-go/listers/core/v1"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
 	"k8s.io/cloud-provider-openstack/pkg/util/mount"
@@ -67,9 +68,10 @@ type Driver struct {
 	endpoint  string
 	cluster   string
 
-	ids *identityServer
-	cs  *controllerServer
-	ns  *nodeServer
+	ids       *identityServer
+	cs        *controllerServer
+	ns        *nodeServer
+	pvcLister v1.PersistentVolumeClaimLister
 
 	vcap  []*csi.VolumeCapability_AccessMode
 	cscap []*csi.ControllerServiceCapability
@@ -79,14 +81,17 @@ type Driver struct {
 type DriverOpts struct {
 	ClusterID string
 	Endpoint  string
+	PVCLister v1.PersistentVolumeClaimLister
 }
 
 func NewDriver(o *DriverOpts) *Driver {
-	d := &Driver{}
-	d.name = driverName
-	d.fqVersion = fmt.Sprintf("%s@%s", Version, version.Version)
-	d.endpoint = o.Endpoint
-	d.cluster = o.ClusterID
+	d := &Driver{
+		name:      driverName,
+		fqVersion: fmt.Sprintf("%s@%s", Version, version.Version),
+		endpoint:  o.Endpoint,
+		cluster:   o.ClusterID,
+		pvcLister: o.PVCLister,
+	}
 
 	klog.Info("Driver: ", d.name)
 	klog.Info("Driver version: ", d.fqVersion)
diff --git a/pkg/csi/cinder/nodeserver.go b/pkg/csi/cinder/nodeserver.go
index d9ed3142d6..49d56e521b 100644
--- a/pkg/csi/cinder/nodeserver.go
+++ b/pkg/csi/cinder/nodeserver.go
@@ -30,6 +30,7 @@ import (
 	"k8s.io/klog/v2"
 	utilpath "k8s.io/utils/path"
 
+	sharedcsi "k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 	"k8s.io/cloud-provider-openstack/pkg/util/blockdevice"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
@@ -63,7 +64,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
 		return nil, status.Error(codes.InvalidArgument, "NodePublishVolume Volume Capability must be provided")
 	}
 
-	ephemeralVolume := req.GetVolumeContext()["csi.storage.k8s.io/ephemeral"] == "true"
+	ephemeralVolume := req.GetVolumeContext()[sharedcsi.VolEphemeralKey] == "true"
 	if ephemeralVolume {
 		// See https://github.com/kubernetes/cloud-provider-openstack/issues/2599
 		return nil, status.Error(codes.Unimplemented, "CSI inline ephemeral volumes support is removed in 1.31 release.")
diff --git a/pkg/csi/cinder/nodeserver_test.go b/pkg/csi/cinder/nodeserver_test.go
index d3fe32dc32..b3fae1e9c7 100644
--- a/pkg/csi/cinder/nodeserver_test.go
+++ b/pkg/csi/cinder/nodeserver_test.go
@@ -26,6 +26,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	sharedcsi "k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
 	"k8s.io/cloud-provider-openstack/pkg/util/mount"
@@ -134,7 +135,7 @@ func TestNodePublishVolume(t *testing.T) {
 
 func TestNodePublishVolumeEphemeral(t *testing.T) {
 
-	properties := map[string]string{"cinder.csi.openstack.org/cluster": FakeCluster}
+	properties := map[string]string{cinderCSIClusterIDKey: FakeCluster}
 	fvolName := fmt.Sprintf("ephemeral-%s", FakeVolID)
 
 	omock.On("CreateVolume", fvolName, 2, "test", "nova", "", "", "", properties).Return(&FakeVol, nil)
@@ -159,7 +160,7 @@ func TestNodePublishVolumeEphemeral(t *testing.T) {
 		TargetPath:       FakeTargetPath,
 		VolumeCapability: stdVolCap,
 		Readonly:         false,
-		VolumeContext:    map[string]string{"capacity": "2Gi", "csi.storage.k8s.io/ephemeral": "true", "type": "test"},
+		VolumeContext:    map[string]string{"capacity": "2Gi", sharedcsi.VolEphemeralKey: "true", "type": "test"},
 	}
 
 	// Invoke NodePublishVolume
diff --git a/pkg/csi/cinder/openstack/openstack.go b/pkg/csi/cinder/openstack/openstack.go
index 8b4a028452..26f96dd01b 100644
--- a/pkg/csi/cinder/openstack/openstack.go
+++ b/pkg/csi/cinder/openstack/openstack.go
@@ -45,7 +45,7 @@ func AddExtraFlags(fs *pflag.FlagSet) {
 }
 
 type IOpenStack interface {
-	CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (*volumes.Volume, error)
+	CreateVolume(*volumes.CreateOpts, volumes.SchedulerHintOptsBuilder) (*volumes.Volume, error)
 	DeleteVolume(volumeID string) error
 	AttachVolume(instanceID, volumeID string) (string, error)
 	ListVolumes(limit int, startingToken string) ([]volumes.Volume, string, error)
@@ -56,6 +56,7 @@ type IOpenStack interface {
 	GetAttachmentDiskPath(instanceID, volumeID string) (string, error)
 	GetVolume(volumeID string) (*volumes.Volume, error)
 	GetVolumesByName(name string) ([]volumes.Volume, error)
+	GetVolumeByName(name string) (*volumes.Volume, error)
 	CreateSnapshot(name, volID string, tags map[string]string) (*snapshots.Snapshot, error)
 	ListSnapshots(filters map[string]string) ([]snapshots.Snapshot, string, error)
 	DeleteSnapshot(snapID string) error
@@ -72,6 +73,7 @@ type IOpenStack interface {
 	GetMaxVolLimit() int64
 	GetMetadataOpts() metadata.Opts
 	GetBlockStorageOpts() BlockStorageOpts
+	ResolveVolumeListToUUIDs(volumes string) (string, error)
 }
 
 type OpenStack struct {
diff --git a/pkg/csi/cinder/openstack/openstack_mock.go b/pkg/csi/cinder/openstack/openstack_mock.go
index 53263e1f8e..2fa01876b1 100644
--- a/pkg/csi/cinder/openstack/openstack_mock.go
+++ b/pkg/csi/cinder/openstack/openstack_mock.go
@@ -17,11 +17,14 @@ limitations under the License.
 package openstack
 
 import (
+	"fmt"
+
 	"github.com/gophercloud/gophercloud/v2/openstack/blockstorage/v3/backups"
 	"github.com/gophercloud/gophercloud/v2/openstack/blockstorage/v3/snapshots"
 	"github.com/gophercloud/gophercloud/v2/openstack/blockstorage/v3/volumes"
 	"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/servers"
 	"github.com/stretchr/testify/mock"
+	"k8s.io/cloud-provider-openstack/pkg/util/errors"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
 )
 
@@ -85,7 +88,16 @@ func (_m *OpenStackMock) AttachVolume(instanceID string, volumeID string) (strin
 }
 
 // CreateVolume provides a mock function with given fields: name, size, vtype, availability, tags
-func (_m *OpenStackMock) CreateVolume(name string, size int, vtype string, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (*volumes.Volume, error) {
+func (_m *OpenStackMock) CreateVolume(opts *volumes.CreateOpts, _ volumes.SchedulerHintOptsBuilder) (*volumes.Volume, error) {
+	name := opts.Name
+	size := opts.Size
+	vtype := opts.VolumeType
+	availability := opts.AvailabilityZone
+	snapshotID := opts.SnapshotID
+	sourceVolID := opts.SourceVolID
+	sourceBackupID := opts.BackupID
+	tags := opts.Metadata
+
 	ret := _m.Called(name, size, vtype, availability, snapshotID, sourceVolID, sourceBackupID, tags)
 
 	var r0 *volumes.Volume
@@ -225,6 +237,24 @@ func (_m *OpenStackMock) GetVolumesByName(name string) ([]volumes.Volume, error)
 	return r0, r1
 }
 
+// GetVolumeByName provides a mock function with given fields: name
+func (_m *OpenStackMock) GetVolumeByName(name string) (*volumes.Volume, error) {
+	vols, err := _m.GetVolumesByName(name)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(vols) == 0 {
+		return nil, errors.ErrNotFound
+	}
+
+	if len(vols) > 1 {
+		return nil, fmt.Errorf("found %d volumes with name %q", len(vols), name)
+	}
+
+	return &vols[0], nil
+}
+
 // ListSnapshots provides a mock function with given fields: limit, offset, filters
 func (_m *OpenStackMock) ListSnapshots(filters map[string]string) ([]snapshots.Snapshot, string, error) {
 	ret := _m.Called(filters)
@@ -492,3 +522,8 @@ func (_m *OpenStackMock) GetMetadataOpts() metadata.Opts {
 func (_m *OpenStackMock) GetBlockStorageOpts() BlockStorageOpts {
 	return BlockStorageOpts{}
 }
+
+// ResolveVolumeListToUUIDs provides a mock function to return volume UUIDs
+func (_m *OpenStackMock) ResolveVolumeListToUUIDs(v string) (string, error) {
+	return v, nil
+}
diff --git a/pkg/csi/cinder/openstack/openstack_volumes.go b/pkg/csi/cinder/openstack/openstack_volumes.go
index 5ac62fc73f..5f810e7416 100644
--- a/pkg/csi/cinder/openstack/openstack_volumes.go
+++ b/pkg/csi/cinder/openstack/openstack_volumes.go
@@ -20,14 +20,18 @@ import (
 	"context"
 	"fmt"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gophercloud/gophercloud/v2/openstack"
 	"github.com/gophercloud/gophercloud/v2/openstack/blockstorage/v3/volumes"
 	"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/volumeattach"
 	"github.com/gophercloud/gophercloud/v2/pagination"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"k8s.io/apimachinery/pkg/util/wait"
 	"k8s.io/cloud-provider-openstack/pkg/metrics"
+	"k8s.io/cloud-provider-openstack/pkg/util"
 	cpoerrors "k8s.io/cloud-provider-openstack/pkg/util/errors"
 
 	"k8s.io/klog/v2"
@@ -51,22 +55,7 @@ const (
 var volumeErrorStates = [...]string{"error", "error_extending", "error_deleting"}
 
 // CreateVolume creates a volume of given size
-func (os *OpenStack) CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (*volumes.Volume, error) {
-
-	opts := &volumes.CreateOpts{
-		Name:             name,
-		Size:             size,
-		VolumeType:       vtype,
-		AvailabilityZone: availability,
-		Description:      volumeDescription,
-		SnapshotID:       snapshotID,
-		SourceVolID:      sourceVolID,
-		BackupID:         sourceBackupID,
-	}
-	if tags != nil {
-		opts.Metadata = tags
-	}
-
+func (os *OpenStack) CreateVolume(opts *volumes.CreateOpts, schedulerHints volumes.SchedulerHintOptsBuilder) (*volumes.Volume, error) {
 	blockstorageClient, err := openstack.NewBlockStorageV3(os.blockstorage.ProviderClient, os.epOpts)
 	if err != nil {
 		return nil, err
@@ -74,12 +63,13 @@ func (os *OpenStack) CreateVolume(name string, size int, vtype, availability str
 
 	// creating volumes from backups and backups cross-az is available since 3.51 microversion
 	// https://docs.openstack.org/cinder/latest/contributor/api_microversion_history.html#id47
-	if !os.bsOpts.IgnoreVolumeMicroversion && sourceBackupID != "" {
+	if !os.bsOpts.IgnoreVolumeMicroversion && opts.BackupID != "" {
 		blockstorageClient.Microversion = "3.51"
 	}
 
 	mc := metrics.NewMetricContext("volume", "create")
-	vol, err := volumes.Create(context.TODO(), blockstorageClient, opts, nil).Extract()
+	opts.Description = volumeDescription
+	vol, err := volumes.Create(context.TODO(), blockstorageClient, opts, schedulerHints).Extract()
 	if mc.ObserveRequest(err) != nil {
 		return nil, err
 	}
@@ -154,6 +144,25 @@ func (os *OpenStack) GetVolumesByName(n string) ([]volumes.Volume, error) {
 	return vols, nil
 }
 
+// GetVolumeByName is a wrapper around GetVolumesByName that returns a single Volume reference
+// with the specified name
+func (os *OpenStack) GetVolumeByName(n string) (*volumes.Volume, error) {
+	vols, err := os.GetVolumesByName(n)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(vols) == 0 {
+		return nil, cpoerrors.ErrNotFound
+	}
+
+	if len(vols) > 1 {
+		return nil, fmt.Errorf("found %d volumes with name %q", len(vols), n)
+	}
+
+	return &vols[0], nil
+}
+
 // DeleteVolume delete a volume
 func (os *OpenStack) DeleteVolume(volumeID string) error {
 	used, err := os.diskIsUsed(volumeID)
@@ -428,3 +437,39 @@ func (os *OpenStack) diskIsUsed(volumeID string) (bool, error) {
 func (os *OpenStack) GetBlockStorageOpts() BlockStorageOpts {
 	return os.bsOpts
 }
+
+// ResolveVolumeListToUUIDs resolves a list of volume names or UUIDs to a
+// string of UUIDs
+func (os *OpenStack) ResolveVolumeListToUUIDs(affinityList string) (string, error) {
+	list := util.SplitTrim(affinityList, ',')
+	if len(list) == 0 {
+		return "", nil
+	}
+
+	affinityUUIDs := make([]string, 0, len(list))
+	for _, v := range list {
+		var volume *volumes.Volume
+		var err error
+
+		if id, e := util.UUID(v); e == nil {
+			// First try to get volume by ID
+			volume, err = os.GetVolume(id)
+			if err != nil && cpoerrors.IsNotFound(err) {
+				volume, err = os.GetVolumeByName(v)
+			}
+		} else {
+			// If not a UUID, try to get volume by name
+			volume, err = os.GetVolumeByName(v)
+		}
+		if err != nil {
+			if cpoerrors.IsNotFound(err) {
+				return "", status.Errorf(codes.NotFound, "referenced volume %s not found: %v", v, err)
+			}
+			return "", status.Errorf(codes.Internal, "failed to resolve volume %s: %v", v, err)
+		}
+
+		affinityUUIDs = append(affinityUUIDs, volume.ID)
+	}
+
+	return strings.Join(affinityUUIDs, ","), nil
+}
diff --git a/pkg/csi/csi.go b/pkg/csi/csi.go
new file mode 100644
index 0000000000..6d43c5a754
--- /dev/null
+++ b/pkg/csi/csi.go
@@ -0,0 +1,184 @@
+/*
+Copyright 2024 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package csi
+
+import (
+	"context"
+	"math/rand"
+	"os"
+	"time"
+
+	"github.com/container-storage-interface/spec/lib/go/csi"
+	"github.com/spf13/cobra"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/listers/core/v1"
+	"k8s.io/client-go/tools/cache"
+	"k8s.io/client-go/tools/clientcmd"
+	"k8s.io/klog/v2"
+)
+
+const (
+	// https://github.com/kubernetes-csi/external-snapshotter/pull/375
+	VolSnapshotNameKey        = "csi.storage.k8s.io/volumesnapshot/name"
+	VolSnapshotNamespaceKey   = "csi.storage.k8s.io/volumesnapshot/namespace"
+	VolSnapshotContentNameKey = "csi.storage.k8s.io/volumesnapshotcontent/name"
+	// https://github.com/kubernetes-csi/external-provisioner/pull/399
+	PvcNameKey      = "csi.storage.k8s.io/pvc/name"
+	PvcNamespaceKey = "csi.storage.k8s.io/pvc/namespace"
+	PvNameKey       = "csi.storage.k8s.io/pv/name"
+	// https://github.com/kubernetes/kubernetes/pull/79983
+	VolEphemeralKey = "csi.storage.k8s.io/ephemeral"
+)
+
+var (
+	// Recognized volume parameters passed by Kubernetes csi-snapshotter sidecar
+	// when run with --extra-create-metadata flag. These are added to metadata
+	// of newly created snapshots if present.
+	RecognizedCSISnapshotterParams = []string{
+		VolSnapshotNameKey,
+		VolSnapshotNamespaceKey,
+		VolSnapshotContentNameKey,
+	}
+	// Recognized volume parameters passed by Kubernetes csi-provisioner sidecar
+	// when run with --extra-create-metadata flag. These are added to metadata
+	// of newly created shares if present.
+	RecognizedCSIProvisionerParams = []string{
+		PvcNameKey,
+		PvcNamespaceKey,
+		PvNameKey,
+	}
+)
+
+var (
+	// CSI controller options
+	pvcAnnotations bool
+	// k8s client options
+	master          string
+	kubeconfig      string
+	kubeAPIQPS      float32
+	kubeAPIBurst    int
+	minResyncPeriod time.Duration
+)
+
+func AddPVCFlags(cmd *cobra.Command) {
+	cmd.PersistentFlags().StringVar(&master, "master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
+	cmd.PersistentFlags().StringVar(&kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
+	cmd.PersistentFlags().Float32Var(&kubeAPIQPS, "kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver.")
+	cmd.PersistentFlags().IntVar(&kubeAPIBurst, "kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver.")
+	cmd.PersistentFlags().DurationVar(&minResyncPeriod, "min-resync-period", 12*time.Hour, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod.")
+
+	cmd.PersistentFlags().BoolVar(&pvcAnnotations, "pvc-annotations", false, "Enable support for PVC annotations in the controller's CreateVolume CSI method (enabling this flag requires enabling the --extra-create-metadata flag in csi-provisioner)")
+}
+
+func GetAZFromTopology(topologyKey string, requirement *csi.TopologyRequirement) string {
+	var zone string
+	var exists bool
+
+	defer func() { klog.V(1).Infof("detected AZ from the topology: %s", zone) }()
+	klog.V(4).Infof("preferred topology requirement: %+v", requirement.GetPreferred())
+	klog.V(4).Infof("requisite topology requirement: %+v", requirement.GetRequisite())
+
+	for _, topology := range requirement.GetPreferred() {
+		zone, exists = topology.GetSegments()[topologyKey]
+		if exists {
+			return zone
+		}
+	}
+
+	for _, topology := range requirement.GetRequisite() {
+		zone, exists = topology.GetSegments()[topologyKey]
+		if exists {
+			return zone
+		}
+	}
+
+	return zone
+}
+
+func GetPVCLister() v1.PersistentVolumeClaimLister {
+	if !pvcAnnotations {
+		return nil
+	}
+
+	// get the KUBECONFIG from env if specified (useful for local/debug cluster)
+	kubeconfigEnv := os.Getenv("KUBECONFIG")
+
+	if kubeconfigEnv != "" {
+		klog.Infof("Found KUBECONFIG environment variable set, using that..")
+		kubeconfig = kubeconfigEnv
+	}
+
+	config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
+	if err != nil {
+		klog.Fatalf("Failed to create config: %v", err)
+	}
+
+	config.QPS = kubeAPIQPS
+	config.Burst = kubeAPIBurst
+
+	config.ContentType = runtime.ContentTypeProtobuf
+	clientset, err := kubernetes.NewForConfig(config)
+	if err != nil {
+		klog.Fatalf("Failed to create client: %v", err)
+	}
+
+	factory := informers.NewSharedInformerFactory(clientset, resyncPeriod(minResyncPeriod))
+	ctx := context.TODO()
+	pvcInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
+	go pvcInformer.Run(ctx.Done())
+	if !cache.WaitForCacheSync(ctx.Done(), pvcInformer.HasSynced) {
+		klog.Fatal("Error syncing PVC informer cache")
+	}
+
+	klog.Info("Successully created PVC Annotations Lister")
+
+	return factory.Core().V1().PersistentVolumeClaims().Lister()
+}
+
+// GetPVCAnnotations returns PVC annotations for the given PVC name and
+// namespace stored in the params map.
+func GetPVCAnnotations(pvcLister v1.PersistentVolumeClaimLister, params map[string]string) map[string]string {
+	if pvcLister == nil {
+		return nil
+	}
+
+	namespace := params[PvcNamespaceKey]
+	pvcName := params[PvcNameKey]
+	if namespace == "" || pvcName == "" {
+		klog.Errorf("Invalid namespace or PVC name (%s/%s), check whether the --extra-create-metadata flag is set in csi-provisioner", namespace, pvcName)
+		return nil
+	}
+
+	pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
+	if err != nil {
+		klog.Errorf("Failed to get PVC %s/%s: %v", namespace, pvcName, err)
+		return nil
+	}
+
+	return pvc.Annotations
+}
+
+// resyncPeriod generates a random duration so that multiple controllers don't
+// get into lock-step and all hammer the apiserver with list requests
+// simultaneously. Copied from the
+// k8s.io/cloud-provider/app/controllermanager.go
+func resyncPeriod(s time.Duration) time.Duration {
+	factor := rand.Float64() + 1
+	return time.Duration(float64(s.Nanoseconds()) * factor)
+}
diff --git a/pkg/csi/manila/controllerserver.go b/pkg/csi/manila/controllerserver.go
index c3e8541613..ea891f926e 100644
--- a/pkg/csi/manila/controllerserver.go
+++ b/pkg/csi/manila/controllerserver.go
@@ -27,6 +27,7 @@ import (
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/types/known/timestamppb"
 	"k8s.io/apimachinery/pkg/util/wait"
+	sharedcsi "k8s.io/cloud-provider-openstack/pkg/csi"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/options"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/shareadapters"
 	"k8s.io/cloud-provider-openstack/pkg/util"
@@ -34,7 +35,12 @@ import (
 	"k8s.io/klog/v2"
 )
 
-const clusterMetadataKey = "manila.csi.openstack.org/cluster"
+const (
+	clusterMetadataKey = "manila.csi.openstack.org/cluster"
+	affinityKey        = "manila.csi.openstack.org/affinity"
+	antiAffinityKey    = "manila.csi.openstack.org/anti-affinity"
+	groupIDKey         = "manila.csi.openstack.org/group-id"
+)
 
 type controllerServer struct {
 	d *Driver
@@ -43,15 +49,6 @@ type controllerServer struct {
 var (
 	pendingVolumes   = sync.Map{}
 	pendingSnapshots = sync.Map{}
-
-	// Recognized volume parameters passed by Kubernetes csi-provisioner sidecar
-	// when run with --extra-create-metadata flag. These are added to metadata
-	// of newly created shares if present.
-	recognizedCSIProvisionerParams = []string{
-		"csi.storage.k8s.io/pvc/name",
-		"csi.storage.k8s.io/pvc/namespace",
-		"csi.storage.k8s.io/pv/name",
-	}
 )
 
 func getVolumeCreator(source *csi.VolumeContentSource) (volumeCreator, error) {
@@ -63,8 +60,8 @@ func getVolumeCreator(source *csi.VolumeContentSource) (volumeCreator, error) {
 		return nil, status.Error(codes.Unimplemented, "volume cloning is not supported yet")
 	}
 
-	if source.GetSnapshot() != nil {
-		return &volumeFromSnapshot{}, nil
+	if s := source.GetSnapshot(); s != nil {
+		return &volumeFromSnapshot{s.SnapshotId}, nil
 	}
 
 	return nil, status.Error(codes.InvalidArgument, "invalid volume content source")
@@ -88,7 +85,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 	}
 
 	// Configuration
-
+	shareName := req.GetName()
 	params := req.GetParameters()
 	if params == nil {
 		params = make(map[string]string)
@@ -139,13 +136,42 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 
 		// When "autoTopology" is enabled and "availability" is empty, obtain the AZ from the target node.
 		if shareOpts.AvailabilityZone == "" && strings.EqualFold(shareOpts.AutoTopology, "true") {
-			shareOpts.AvailabilityZone = util.GetAZFromTopology(topologyKey, accessibleTopologyReq)
+			shareOpts.AvailabilityZone = sharedcsi.GetAZFromTopology(topologyKey, accessibleTopologyReq)
 			accessibleTopology = []*csi.Topology{{
 				Segments: map[string]string{topologyKey: shareOpts.AvailabilityZone},
 			}}
 		}
 	}
 
+	// get the PVC annotation
+	pvcAnnotations := sharedcsi.GetPVCAnnotations(cs.d.pvcLister, params)
+	for k, v := range pvcAnnotations {
+		klog.V(4).Infof("CreateVolume: retrieved %q pvc annotation: %s: %s", k, v, shareName)
+	}
+	affinity := pvcAnnotations[affinityKey]
+	antiAffinity := pvcAnnotations[antiAffinityKey]
+	if affinity != "" || antiAffinity != "" {
+		klog.V(4).Infof("CreateVolume: Getting scheduler hints: affinity=%s, anti-affinity=%s", affinity, antiAffinity)
+
+		// resolve share names to UUIDs
+		shareOpts.Affinity, err = resolveShareListToUUIDs(manilaClient, affinity)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "failed to resolve affinity share UUIDs: %v", err)
+		}
+		shareOpts.AntiAffinity, err = resolveShareListToUUIDs(manilaClient, antiAffinity)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "failed to resolve anti-affinity share UUIDs: %v", err)
+		}
+
+		klog.V(4).Infof("CreateVolume: Resolved scheduler hints: affinity=%v, anti-affinity=%v", shareOpts.Affinity, shareOpts.AntiAffinity)
+	}
+
+	// override the storage class group ID if it is set in the PVC annotation
+	if v, ok := pvcAnnotations[groupIDKey]; ok {
+		shareOpts.GroupID = v
+		klog.V(4).Infof("CreateVolume: Overriding share group ID: %s", v)
+	}
+
 	// Retrieve an existing share or create a new one
 
 	volCreator, err := getVolumeCreator(req.GetVolumeContentSource())
@@ -153,7 +179,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 		return nil, err
 	}
 
-	share, err := volCreator.create(manilaClient, req, req.GetName(), sizeInGiB, shareOpts, shareMetadata)
+	share, err := volCreator.create(manilaClient, shareName, sizeInGiB, shareOpts, shareMetadata)
 	if err != nil {
 		return nil, err
 	}
@@ -177,8 +203,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
 	}
 
 	volCtx := filterParametersForVolumeContext(params, options.NodeVolumeContextFields())
-	volCtx["shareID"] = share.ID
-	volCtx["shareAccessID"] = accessRight.ID
+	volCtx = util.SetMapIfNotEmpty(volCtx, "shareID", share.ID)
+	volCtx = util.SetMapIfNotEmpty(volCtx, "shareAccessID", accessRight.ID)
+	volCtx = util.SetMapIfNotEmpty(volCtx, "groupID", share.ShareGroupID)
+	volCtx = util.SetMapIfNotEmpty(volCtx, "affinity", shareOpts.Affinity)
+	volCtx = util.SetMapIfNotEmpty(volCtx, "antiAffinity", shareOpts.AntiAffinity)
 
 	return &csi.CreateVolumeResponse{
 		Volume: &csi.Volume{
@@ -508,7 +537,7 @@ func prepareShareMetadata(appendShareMetadata, clusterID string, volumeParams ma
 	shareMetadata := make(map[string]string)
 
 	// Get extra metadata provided by csi-provisioner sidecar if present.
-	for _, k := range recognizedCSIProvisionerParams {
+	for _, k := range sharedcsi.RecognizedCSIProvisionerParams {
 		if v, ok := volumeParams[k]; ok {
 			shareMetadata[k] = v
 		}
diff --git a/pkg/csi/manila/controllerserver_test.go b/pkg/csi/manila/controllerserver_test.go
index 584015c834..58fd464214 100644
--- a/pkg/csi/manila/controllerserver_test.go
+++ b/pkg/csi/manila/controllerserver_test.go
@@ -16,6 +16,8 @@ package manila
 import (
 	"fmt"
 	"testing"
+
+	"k8s.io/cloud-provider-openstack/pkg/csi"
 )
 
 func TestPrepareShareMetadata(t *testing.T) {
@@ -76,15 +78,15 @@ func TestPrepareShareMetadata(t *testing.T) {
 		{
 			// csi-provisioner PV/PVC metadata
 			allVolumeParams: map[string]string{
-				"csi.storage.k8s.io/pvc/name":      "pvc-name",
-				"csi.storage.k8s.io/pvc/namespace": "pvc-namespace",
-				"csi.storage.k8s.io/pv/name":       "pv-name",
+				csi.PvcNameKey:      "pvc-name",
+				csi.PvcNamespaceKey: "pvc-namespace",
+				csi.PvNameKey:       "pv-name",
 			},
 			cluster: "",
 			expectedResult: map[string]string{
-				"csi.storage.k8s.io/pvc/name":      "pvc-name",
-				"csi.storage.k8s.io/pvc/namespace": "pvc-namespace",
-				"csi.storage.k8s.io/pv/name":       "pv-name",
+				csi.PvcNameKey:      "pvc-name",
+				csi.PvcNamespaceKey: "pvc-namespace",
+				csi.PvNameKey:       "pv-name",
 			},
 			appendShareMetadata: "",
 			expectedError:       false,
@@ -92,18 +94,18 @@ func TestPrepareShareMetadata(t *testing.T) {
 		{
 			// csi-provisioner PV/PVC metadata with conflicting appendShareMetadata
 			allVolumeParams: map[string]string{
-				"csi.storage.k8s.io/pvc/name":      "pvc-name",
-				"csi.storage.k8s.io/pvc/namespace": "pvc-namespace",
-				"csi.storage.k8s.io/pv/name":       "pv-name",
-				"appendShareMetadata":              `{"csi.storage.k8s.io/pvc/name": "SomeValue", "keyX": "valueX"}`,
+				csi.PvcNameKey:        "pvc-name",
+				csi.PvcNamespaceKey:   "pvc-namespace",
+				csi.PvNameKey:         "pv-name",
+				"appendShareMetadata": `{"` + csi.PvcNameKey + `": "SomeValue", "keyX": "valueX"}`,
 			},
-			appendShareMetadata: `{"csi.storage.k8s.io/pvc/name": "SomeValue", "keyX": "valueX"}`,
+			appendShareMetadata: `{"` + csi.PvcNameKey + `": "SomeValue", "keyX": "valueX"}`,
 			cluster:             "",
 			expectedResult: map[string]string{
-				"csi.storage.k8s.io/pvc/name":      "pvc-name",
-				"csi.storage.k8s.io/pvc/namespace": "pvc-namespace",
-				"csi.storage.k8s.io/pv/name":       "pv-name",
-				"keyX":                             "valueX",
+				csi.PvcNameKey:      "pvc-name",
+				csi.PvcNamespaceKey: "pvc-namespace",
+				csi.PvNameKey:       "pv-name",
+				"keyX":              "valueX",
 			},
 			expectedError: false,
 		},
diff --git a/pkg/csi/manila/driver.go b/pkg/csi/manila/driver.go
index 80e3d849aa..8b888d0907 100644
--- a/pkg/csi/manila/driver.go
+++ b/pkg/csi/manila/driver.go
@@ -29,6 +29,7 @@ import (
 	"github.com/container-storage-interface/spec/lib/go/csi"
 	"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
 	"google.golang.org/grpc"
+	"k8s.io/client-go/listers/core/v1"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/csiclient"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/manilaclient"
 	"k8s.io/cloud-provider-openstack/pkg/version"
@@ -48,6 +49,8 @@ type DriverOpts struct {
 
 	ManilaClientBuilder manilaclient.Builder
 	CSIClientBuilder    csiclient.Builder
+
+	PVCLister v1.PersistentVolumeClaimLister
 }
 
 type Driver struct {
@@ -72,6 +75,8 @@ type Driver struct {
 
 	manilaClientBuilder manilaclient.Builder
 	csiClientBuilder    csiclient.Builder
+
+	pvcLister v1.PersistentVolumeClaimLister
 }
 
 type nonBlockingGRPCServer struct {
@@ -122,6 +127,7 @@ func NewDriver(o *DriverOpts) (*Driver, error) {
 		manilaClientBuilder: o.ManilaClientBuilder,
 		csiClientBuilder:    o.CSIClientBuilder,
 		clusterID:           o.ClusterID,
+		pvcLister:           o.PVCLister,
 	}
 
 	klog.Info("Driver: ", d.name)
diff --git a/pkg/csi/manila/manilaclient/client.go b/pkg/csi/manila/manilaclient/client.go
index 1e0962b79e..7e441bae7d 100644
--- a/pkg/csi/manila/manilaclient/client.go
+++ b/pkg/csi/manila/manilaclient/client.go
@@ -33,6 +33,14 @@ type Client struct {
 	c *gophercloud.ServiceClient
 }
 
+func (c Client) GetMicroversion() string {
+	return c.c.Microversion
+}
+
+func (c Client) SetMicroversion(version string) {
+	c.c.Microversion = version
+}
+
 func (c Client) GetShareByID(shareID string) (*shares.Share, error) {
 	return shares.Get(context.TODO(), c.c, shareID).Extract()
 }
diff --git a/pkg/csi/manila/manilaclient/interface.go b/pkg/csi/manila/manilaclient/interface.go
index a6be597bcd..5ed276f2e7 100644
--- a/pkg/csi/manila/manilaclient/interface.go
+++ b/pkg/csi/manila/manilaclient/interface.go
@@ -25,6 +25,9 @@ import (
 )
 
 type Interface interface {
+	GetMicroversion() string
+	SetMicroversion(version string)
+
 	GetShareByID(shareID string) (*shares.Share, error)
 	GetShareByName(shareName string) (*shares.Share, error)
 	CreateShare(opts shares.CreateOptsBuilder) (*shares.Share, error)
diff --git a/pkg/csi/manila/options/shareoptions.go b/pkg/csi/manila/options/shareoptions.go
index f3dd6c0d5a..fd4dadc149 100644
--- a/pkg/csi/manila/options/shareoptions.go
+++ b/pkg/csi/manila/options/shareoptions.go
@@ -27,6 +27,9 @@ type ControllerVolumeContext struct {
 	AutoTopology        string `name:"autoTopology" value:"default:false" matches:"(?i)^true|false$"`
 	AvailabilityZone    string `name:"availability" value:"optional"`
 	AppendShareMetadata string `name:"appendShareMetadata" value:"optional"`
+	Affinity            string `name:"affinity" value:"optional"`
+	AntiAffinity        string `name:"antiAffinity" value:"optional"`
+	GroupID             string `name:"groupID" value:"optional"`
 
 	// Adapter options
 
diff --git a/pkg/csi/manila/share.go b/pkg/csi/manila/share.go
index f3f68f5982..9e2cca3b3d 100644
--- a/pkg/csi/manila/share.go
+++ b/pkg/csi/manila/share.go
@@ -18,6 +18,7 @@ package manila
 
 import (
 	"fmt"
+	"strings"
 	"time"
 
 	"github.com/gophercloud/gophercloud/v2/openstack/sharedfilesystems/v2/shares"
@@ -25,6 +26,7 @@ import (
 	"google.golang.org/grpc/status"
 	"k8s.io/apimachinery/pkg/util/wait"
 	"k8s.io/cloud-provider-openstack/pkg/csi/manila/manilaclient"
+	"k8s.io/cloud-provider-openstack/pkg/util"
 	clouderrors "k8s.io/cloud-provider-openstack/pkg/util/errors"
 	"k8s.io/klog/v2"
 )
@@ -197,3 +199,38 @@ func waitForShareStatus(manilaClient manilaclient.Interface, shareID string, val
 		return false, fmt.Errorf("share %s is in an unexpected state: wanted either %v or %s, got %s", shareID, validTransientStates, desiredStatus, share.Status)
 	})
 }
+
+func resolveShareListToUUIDs(manilaClient manilaclient.Interface, affinityList string) (string, error) {
+	list := util.SplitTrim(affinityList, ',')
+	if len(list) == 0 {
+		return "", nil
+	}
+
+	affinityUUIDs := make([]string, 0, len(list))
+	for _, v := range list {
+		var share *shares.Share
+		var err error
+
+		if id, e := util.UUID(v); e == nil {
+			// First try to get share by ID
+			share, err = manilaClient.GetShareByID(id)
+			if err != nil && clouderrors.IsNotFound(err) {
+				// If not found by ID, try to get share by ID as name
+				share, err = manilaClient.GetShareByName(v)
+			}
+		} else {
+			// If not a UUID, try to get share by name
+			share, err = manilaClient.GetShareByName(v)
+		}
+		if err != nil {
+			if clouderrors.IsNotFound(err) {
+				return "", status.Errorf(codes.NotFound, "referenced share %s not found: %v", v, err)
+			}
+			return "", status.Errorf(codes.Internal, "failed to resolve share %s: %v", v, err)
+		}
+
+		affinityUUIDs = append(affinityUUIDs, share.ID)
+	}
+
+	return strings.Join(affinityUUIDs, ","), nil
+}
diff --git a/pkg/csi/manila/volumesource.go b/pkg/csi/manila/volumesource.go
index c7d20f4554..e2f82c9ced 100644
--- a/pkg/csi/manila/volumesource.go
+++ b/pkg/csi/manila/volumesource.go
@@ -17,7 +17,6 @@ limitations under the License.
 package manila
 
 import (
-	"github.com/container-storage-interface/spec/lib/go/csi"
 	"github.com/gophercloud/gophercloud/v2/openstack/sharedfilesystems/v2/shares"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -28,21 +27,33 @@ import (
 )
 
 type volumeCreator interface {
-	create(manilaClient manilaclient.Interface, req *csi.CreateVolumeRequest, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error)
+	create(manilaClient manilaclient.Interface, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error)
 }
 
-type blankVolume struct{}
-
-func (blankVolume) create(manilaClient manilaclient.Interface, req *csi.CreateVolumeRequest, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error) {
+func create(manilaClient manilaclient.Interface, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string, snapshotID string) (*shares.Share, error) {
 	createOpts := &shares.CreateOpts{
 		AvailabilityZone: shareOpts.AvailabilityZone,
 		ShareProto:       shareOpts.Protocol,
 		ShareType:        shareOpts.Type,
 		ShareNetworkID:   shareOpts.ShareNetworkID,
+		ShareGroupID:     shareOpts.GroupID,
 		Name:             shareName,
 		Description:      shareDescription,
 		Size:             sizeInGiB,
 		Metadata:         shareMetadata,
+		SnapshotID:       snapshotID,
+	}
+
+	// Set scheduler hints if affinity or anti-affinity is set in PVC annotations
+	if shareOpts.Affinity != "" || shareOpts.AntiAffinity != "" {
+		// Set microversion to 2.65 to use scheduler hints
+		v := manilaClient.GetMicroversion()
+		manilaClient.SetMicroversion("2.65")
+		defer manilaClient.SetMicroversion(v)
+		createOpts.SchedulerHints = &shares.SchedulerHints{
+			DifferentHost: shareOpts.AntiAffinity,
+			SameHost:      shareOpts.Affinity,
+		}
 	}
 
 	share, manilaErrCode, err := getOrCreateShare(manilaClient, shareName, createOpts)
@@ -56,28 +67,37 @@ func (blankVolume) create(manilaClient manilaclient.Interface, req *csi.CreateVo
 			tryDeleteShare(manilaClient, share)
 		}
 
+		if snapshotID != "" {
+			return nil, status.Errorf(manilaErrCode.toRPCErrorCode(), "failed to restore snapshot %s into volume %s: %v", snapshotID, shareName, err)
+		}
 		return nil, status.Errorf(manilaErrCode.toRPCErrorCode(), "failed to create volume %s: %v", shareName, err)
 	}
 
 	return share, err
 }
 
-type volumeFromSnapshot struct{}
+type blankVolume struct{}
 
-func (volumeFromSnapshot) create(manilaClient manilaclient.Interface, req *csi.CreateVolumeRequest, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error) {
-	snapshotSource := req.GetVolumeContentSource().GetSnapshot()
+func (blankVolume) create(manilaClient manilaclient.Interface, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error) {
+	return create(manilaClient, shareName, sizeInGiB, shareOpts, shareMetadata, "")
+}
 
-	if snapshotSource.GetSnapshotId() == "" {
+type volumeFromSnapshot struct {
+	snapshotID string
+}
+
+func (v volumeFromSnapshot) create(manilaClient manilaclient.Interface, shareName string, sizeInGiB int, shareOpts *options.ControllerVolumeContext, shareMetadata map[string]string) (*shares.Share, error) {
+	if v.snapshotID == "" {
 		return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
 	}
 
-	snapshot, err := manilaClient.GetSnapshotByID(snapshotSource.GetSnapshotId())
+	snapshot, err := manilaClient.GetSnapshotByID(v.snapshotID)
 	if err != nil {
 		if clouderrors.IsNotFound(err) {
-			return nil, status.Errorf(codes.NotFound, "source snapshot %s not found: %v", snapshotSource.GetSnapshotId(), err)
+			return nil, status.Errorf(codes.NotFound, "source snapshot %s not found: %v", v.snapshotID, err)
 		}
 
-		return nil, status.Errorf(codes.Internal, "failed to retrieve snapshot %s: %v", snapshotSource.GetSnapshotId(), err)
+		return nil, status.Errorf(codes.Internal, "failed to retrieve snapshot %s: %v", v.snapshotID, err)
 	}
 
 	if snapshot.Status != snapshotAvailable {
@@ -88,31 +108,5 @@ func (volumeFromSnapshot) create(manilaClient manilaclient.Interface, req *csi.C
 		return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s is in invalid state: expected 'available', got '%s'", snapshot.ID, snapshot.Status)
 	}
 
-	createOpts := &shares.CreateOpts{
-		AvailabilityZone: shareOpts.AvailabilityZone,
-		SnapshotID:       snapshot.ID,
-		ShareProto:       shareOpts.Protocol,
-		ShareType:        shareOpts.Type,
-		ShareNetworkID:   shareOpts.ShareNetworkID,
-		Name:             shareName,
-		Description:      shareDescription,
-		Size:             sizeInGiB,
-		Metadata:         shareMetadata,
-	}
-
-	share, manilaErrCode, err := getOrCreateShare(manilaClient, shareName, createOpts)
-	if err != nil {
-		if wait.Interrupted(err) {
-			return nil, status.Errorf(codes.DeadlineExceeded, "deadline exceeded while waiting for volume %s to become available", share.Name)
-		}
-
-		if manilaErrCode != 0 {
-			// An error has occurred, try to roll-back the share
-			tryDeleteShare(manilaClient, share)
-		}
-
-		return nil, status.Errorf(manilaErrCode.toRPCErrorCode(), "failed to restore snapshot %s into volume %s: %v", snapshotSource.GetSnapshotId(), shareName, err)
-	}
-
-	return share, err
+	return create(manilaClient, shareName, sizeInGiB, shareOpts, shareMetadata, snapshot.ID)
 }
diff --git a/pkg/util/util.go b/pkg/util/util.go
index d3bce47d79..3497f8189e 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -9,14 +9,13 @@ import (
 	"time"
 	"unicode"
 
-	"github.com/container-storage-interface/spec/lib/go/csi"
+	"github.com/google/uuid"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
 	"k8s.io/apimachinery/pkg/util/sets"
 	"k8s.io/apimachinery/pkg/util/strategicpatch"
 	clientset "k8s.io/client-go/kubernetes"
-	"k8s.io/klog/v2"
 )
 
 // CutString255 makes sure the string length doesn't exceed 255, which is usually the maximum string length in OpenStack.
@@ -135,31 +134,6 @@ func PatchService(ctx context.Context, client clientset.Interface, cur, mod *v1.
 	return nil
 }
 
-func GetAZFromTopology(topologyKey string, requirement *csi.TopologyRequirement) string {
-	var zone string
-	var exists bool
-
-	defer func() { klog.V(1).Infof("detected AZ from the topology: %s", zone) }()
-	klog.V(4).Infof("preferred topology requirement: %+v", requirement.GetPreferred())
-	klog.V(4).Infof("requisite topology requirement: %+v", requirement.GetRequisite())
-
-	for _, topology := range requirement.GetPreferred() {
-		zone, exists = topology.GetSegments()[topologyKey]
-		if exists {
-			return zone
-		}
-	}
-
-	for _, topology := range requirement.GetRequisite() {
-		zone, exists = topology.GetSegments()[topologyKey]
-		if exists {
-			return zone
-		}
-	}
-
-	return zone
-}
-
 func SanitizeLabel(input string) string {
 	// Replace non-alphanumeric characters (except '-', '_', '.') with '-'
 	reg := regexp.MustCompile(`[^-a-zA-Z0-9_.]+`)
@@ -176,6 +150,28 @@ func SanitizeLabel(input string) string {
 	return sanitized
 }
 
+// SetMapIfNotEmpty sets the value of the key in the provided map if the value
+// is not empty (i.e., it is not the zero value for that type) and returns a
+// pointer to the new map. If the map is nil, it will be initialized with a new
+// map.
+func SetMapIfNotEmpty[K comparable, V comparable](m map[K]V, key K, value V) map[K]V {
+	// Check if the value is the zero value for its type
+	var zeroValue V
+	if value == zeroValue {
+		return m
+	}
+
+	// Initialize the map if it's nil
+	if m == nil {
+		m = make(map[K]V)
+	}
+
+	// Set the value in the map
+	m[key] = value
+
+	return m
+}
+
 // SplitTrim splits a string of values separated by sep rune into a slice of
 // strings with trimmed spaces.
 func SplitTrim(s string, sep rune) []string {
@@ -184,3 +180,12 @@ func SplitTrim(s string, sep rune) []string {
 	}
 	return strings.FieldsFunc(s, f)
 }
+
+// UUID converts a string to a valid UUID string.
+func UUID(s string) (string, error) {
+	u, err := uuid.Parse(s)
+	if err != nil {
+		return "", err
+	}
+	return u.String(), nil
+}
diff --git a/tests/playbooks/roles/install-csi-cinder/tasks/main.yaml b/tests/playbooks/roles/install-csi-cinder/tasks/main.yaml
index 9c28cb0c64..1ddc9c2818 100644
--- a/tests/playbooks/roles/install-csi-cinder/tasks/main.yaml
+++ b/tests/playbooks/roles/install-csi-cinder/tasks/main.yaml
@@ -195,6 +195,8 @@
         -report-dir="/var/log/csi-pod" | tee "/var/log/csi-pod/cinder-csi-e2e.log"
   register: functional_test_result
   ignore_errors: true
+  async: 5400 # wait 1h30m then fail and fetch the logs
+  poll: 15
 
 - name: Collect pod logs for debug purpose
   shell:
diff --git a/tests/sanity/cinder/fakecloud.go b/tests/sanity/cinder/fakecloud.go
index 53c0dcba4f..85d8216402 100644
--- a/tests/sanity/cinder/fakecloud.go
+++ b/tests/sanity/cinder/fakecloud.go
@@ -13,6 +13,7 @@ import (
 	"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/servers"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder"
 	"k8s.io/cloud-provider-openstack/pkg/csi/cinder/openstack"
+	"k8s.io/cloud-provider-openstack/pkg/util/errors"
 	"k8s.io/cloud-provider-openstack/pkg/util/metadata"
 )
 
@@ -34,18 +35,17 @@ func getfakecloud() *cloud {
 var _ openstack.IOpenStack = &cloud{}
 
 // Fake Cloud
-func (cloud *cloud) CreateVolume(name string, size int, vtype, availability string, snapshotID string, sourceVolID string, sourceBackupID string, tags map[string]string) (*volumes.Volume, error) {
-
+func (cloud *cloud) CreateVolume(opts *volumes.CreateOpts, _ volumes.SchedulerHintOptsBuilder) (*volumes.Volume, error) {
 	vol := &volumes.Volume{
 		ID:               randString(10),
-		Name:             name,
+		Name:             opts.Name,
+		Size:             opts.Size,
 		Status:           "available",
-		Size:             size,
-		VolumeType:       vtype,
-		AvailabilityZone: availability,
-		SnapshotID:       snapshotID,
-		SourceVolID:      sourceVolID,
-		BackupID:         &sourceBackupID,
+		VolumeType:       opts.VolumeType,
+		AvailabilityZone: opts.AvailabilityZone,
+		SnapshotID:       opts.SnapshotID,
+		SourceVolID:      opts.SourceVolID,
+		BackupID:         &opts.BackupID,
 	}
 
 	cloud.volumes[vol.ID] = vol
@@ -134,6 +134,23 @@ func (cloud *cloud) GetVolumesByName(name string) ([]volumes.Volume, error) {
 	return vlist, nil
 }
 
+func (cloud *cloud) GetVolumeByName(n string) (*volumes.Volume, error) {
+	vols, err := cloud.GetVolumesByName(n)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(vols) == 0 {
+		return nil, errors.ErrNotFound
+	}
+
+	if len(vols) > 1 {
+		return nil, fmt.Errorf("found %d volumes with name %q", len(vols), n)
+	}
+
+	return &vols[0], nil
+}
+
 func (cloud *cloud) GetVolume(volumeID string) (*volumes.Volume, error) {
 	vol, ok := cloud.volumes[volumeID]
 
@@ -341,3 +358,7 @@ func (cloud *cloud) GetMetadataOpts() metadata.Opts {
 func (cloud *cloud) GetBlockStorageOpts() openstack.BlockStorageOpts {
 	return openstack.BlockStorageOpts{}
 }
+
+func (cloud *cloud) ResolveVolumeListToUUIDs(v string) (string, error) {
+	return v, nil
+}
diff --git a/tests/sanity/manila/fakemanilaclient.go b/tests/sanity/manila/fakemanilaclient.go
index b9f69599fe..1d86fae09c 100644
--- a/tests/sanity/manila/fakemanilaclient.go
+++ b/tests/sanity/manila/fakemanilaclient.go
@@ -66,6 +66,13 @@ func shareExists(shareID string) bool {
 	return ok
 }
 
+func (c fakeManilaClient) GetMicroversion() string {
+	return ""
+}
+
+func (c fakeManilaClient) SetMicroversion(_ string) {
+}
+
 func (c fakeManilaClient) GetShareByID(shareID string) (*shares.Share, error) {
 	s, ok := fakeShares[strToInt(shareID)]
 	if !ok {