Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ceph refactor #1538

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ jobs:
env:
CGO_LDFLAGS_ALLOW: "(-Wl,-wrap,pthread_create)|(-Wl,-z,now)"
INCUS_CEPH_CLUSTER: "ceph"
INCUS_CEPH_CEPHFS: "cephfs"
INCUS_CEPH_CEPHFS: "cephfs-incus"
INCUS_CEPH_CLIENT: "incus"
INCUS_CEPH_CEPHOBJECT_RADOSGW: "http://127.0.0.1"
INCUS_CONCURRENT: "1"
INCUS_VERBOSE: "1"
Expand Down Expand Up @@ -335,8 +336,7 @@ jobs:
sudo microceph enable rgw
sudo microceph.ceph osd pool create cephfs_meta 32
sudo microceph.ceph osd pool create cephfs_data 32
sudo microceph.ceph fs new cephfs cephfs_meta cephfs_data
sudo microceph.ceph fs ls
sudo microceph.ceph fs new ${INCUS_CEPH_CEPHFS} cephfs_meta cephfs_data
sleep 30
sudo microceph.ceph status
# Wait until there are no more "unkowns" pgs
Expand Down Expand Up @@ -389,7 +389,7 @@ jobs:

- name: Create build directory
run: |
mkdir bin
mkdir bin

- name: Build static x86_64 incus
env:
Expand Down
65 changes: 41 additions & 24 deletions internal/server/device/device_utils_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,36 @@ const RBDFormatPrefix = "rbd"
// RBDFormatSeparator is the field separate used in disk paths for RBD devices.
const RBDFormatSeparator = " "

// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and list of options.
func DiskParseRBDFormat(rbd string) (string, string, []string, error) {
if !strings.HasPrefix(rbd, fmt.Sprintf("%s%s", RBDFormatPrefix, RBDFormatSeparator)) {
return "", "", nil, fmt.Errorf("Invalid rbd format, missing prefix")
}

fields := strings.SplitN(rbd, RBDFormatSeparator, 3)
if len(fields) != 3 {
return "", "", nil, fmt.Errorf("Invalid rbd format, invalid number of fields")
}

opts := fields[2]
// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and map of options.
func DiskParseRBDFormat(rbd string) (pool string, volume string, opts map[string]string, err error) {
// FIXME: This does not handle escaped strings
// Remove and check the prefix
prefix, rbd, _ := strings.Cut(rbd, RBDFormatSeparator)
if prefix != RBDFormatPrefix {
return "", "", nil, fmt.Errorf("Invalid rbd format, wrong prefix: %q", prefix)
}

// Split the path and options
path, rawOpts, _ := strings.Cut(rbd, RBDFormatSeparator)

// Check for valid RBD path
pool, volume, validPath := strings.Cut(path, "/")
if !validPath {
return "", "", nil, fmt.Errorf("Invalid rbd format, missing pool and/or volume: %q", path)
}

// Parse options
opts = make(map[string]string)
for _, o := range strings.Split(rawOpts, ":") {
k, v, isValid := strings.Cut(o, "=")
if !isValid {
return "", "", nil, fmt.Errorf("Invalid rbd format, bad option: %q", o)
}

fields = strings.SplitN(fields[1], "/", 2)
if len(fields) != 2 {
return "", "", nil, fmt.Errorf("Invalid rbd format, invalid pool or volume")
opts[k] = v
}

return fields[0], fields[1], strings.Split(opts, ":"), nil
return pool, volume, opts, nil
}

// DiskGetRBDFormat returns a rbd formatted string with the given values.
Expand All @@ -59,7 +70,6 @@ func DiskGetRBDFormat(clusterName string, userName string, poolName string, volu
opts := []string{
fmt.Sprintf("id=%s", optEscaper.Replace(userName)),
fmt.Sprintf("pool=%s", optEscaper.Replace(poolName)),
fmt.Sprintf("conf=/etc/ceph/%s.conf", optEscaper.Replace(clusterName)),
}

return fmt.Sprintf("%s%s%s/%s%s%s", RBDFormatPrefix, RBDFormatSeparator, optEscaper.Replace(poolName), optEscaper.Replace(volumeName), RBDFormatSeparator, strings.Join(opts, ":"))
Expand Down Expand Up @@ -240,6 +250,12 @@ again:

// diskCephfsOptions returns the mntSrcPath and fsOptions to use for mounting a cephfs share.
func diskCephfsOptions(clusterName string, userName string, fsName string, fsPath string) (string, []string, error) {
// Get the FSID
fsid, err := storageDrivers.CephFsid(clusterName)
if err != nil {
return "", nil, err
}

// Get the monitor list.
monAddresses, err := storageDrivers.CephMonitors(clusterName)
if err != nil {
Expand All @@ -252,14 +268,15 @@ func diskCephfsOptions(clusterName string, userName string, fsName string, fsPat
return "", nil, err
}

// Prepare mount entry.
fsOptions := []string{
fmt.Sprintf("name=%v", userName),
fmt.Sprintf("secret=%v", secret),
fmt.Sprintf("mds_namespace=%v", fsName),
}
srcPath, fsOptions := storageDrivers.CephBuildMount(
userName,
secret,
fsid,
monAddresses,
fsName,
fsPath,
)

srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath
return srcPath, fsOptions, nil
}

Expand Down
68 changes: 10 additions & 58 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -4148,7 +4148,7 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int
} else if isRBDImage {
blockDev["driver"] = "rbd"

_, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath)
poolName, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath)
if err != nil {
return nil, fmt.Errorf("Failed parsing rbd string: %w", err)
}
Expand All @@ -4173,68 +4173,20 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int
vol := storageDrivers.NewVolume(nil, "", volumeType, rbdContentType, volumeName, nil, nil)
rbdImageName := storageDrivers.CephGetRBDImageName(vol, "", false)

// Parse the options (ceph credentials).
userName := storageDrivers.CephDefaultUser
clusterName := storageDrivers.CephDefaultCluster
poolName := ""

for _, option := range opts {
fields := strings.Split(option, "=")
if len(fields) != 2 {
return nil, fmt.Errorf("Unexpected volume rbd option %q", option)
}

if fields[0] == "id" {
userName = fields[1]
} else if fields[0] == "pool" {
poolName = fields[1]
} else if fields[0] == "conf" {
baseName := filepath.Base(fields[1])
clusterName = strings.TrimSuffix(baseName, ".conf")
// scan & pass through options
blockDev["pool"] = poolName
blockDev["image"] = rbdImageName
for key, val := range opts {
// We use 'id' where qemu uses 'user'
if key == "id" {
blockDev["user"] = val
} else {
blockDev[key] = val
}
}

if poolName == "" {
return nil, fmt.Errorf("Missing pool name")
}

// The aio option isn't available when using the rbd driver.
delete(blockDev, "aio")
blockDev["pool"] = poolName
blockDev["image"] = rbdImageName
blockDev["user"] = userName
blockDev["server"] = []map[string]string{}

// Derference ceph config path.
cephConfPath := fmt.Sprintf("/etc/ceph/%s.conf", clusterName)
target, err := filepath.EvalSymlinks(cephConfPath)
if err == nil {
cephConfPath = target
}

blockDev["conf"] = cephConfPath

// Setup the Ceph cluster config (monitors and keyring).
monitors, err := storageDrivers.CephMonitors(clusterName)
if err != nil {
return nil, err
}

for _, monitor := range monitors {
idx := strings.LastIndex(monitor, ":")
host := monitor[:idx]
port := monitor[idx+1:]

blockDev["server"] = append(blockDev["server"].([]map[string]string), map[string]string{
"host": strings.Trim(host, "[]"),
"port": port,
})
}

rbdSecret, err = storageDrivers.CephKeyring(clusterName, userName)
if err != nil {
return nil, err
}
}

readonly := slices.Contains(driveConf.Opts, "ro")
Expand Down
109 changes: 76 additions & 33 deletions internal/server/storage/drivers/driver_cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,8 @@ func (d *cephfs) Create() error {
d.config["cephfs.path"] = d.config["source"]

// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := "/"
if len(fields) > 1 {
fsPath = fields[1]
}
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// If the filesystem already exists, disallow keys associated to creating the filesystem.
fsExists, err := d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName)
Expand Down Expand Up @@ -265,15 +261,35 @@ func (d *cephfs) Create() error {
return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err)
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
// Collect Ceph information
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return err
}

monitors, err := CephMonitors(clusterName)
if err != nil {
return err
}

key, err := CephKeyring(clusterName, userName)
if err != nil {
return err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName, "/",
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/"
err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName))
err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ","))
if err != nil {
return err
}
Expand All @@ -300,12 +316,8 @@ func (d *cephfs) Create() error {
// Delete clears any local and remote data related to this driver instance.
func (d *cephfs) Delete(op *operations.Operation) error {
// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := "/"
if len(fields) > 1 {
fsPath = fields[1]
}
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// Create a temporary mountpoint.
mountPath, err := os.MkdirTemp("", "incus_cephfs_")
Expand All @@ -326,15 +338,35 @@ func (d *cephfs) Delete(op *operations.Operation) error {
return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err)
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
// Collect Ceph information
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return err
}

monitors, err := CephMonitors(clusterName)
if err != nil {
return err
}

key, err := CephKeyring(clusterName, userName)
if err != nil {
return err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName, "/",
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/"
err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName))
err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ","))
if err != nil {
return err
}
Expand Down Expand Up @@ -397,28 +429,39 @@ func (d *cephfs) Mount() (bool, error) {
}

// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := ""
if len(fields) > 1 {
fsPath = fields[1]
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// Collect Ceph information
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return false, err
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
monitors, err := CephMonitors(clusterName)
if err != nil {
return false, err
}

// Mount options.
options := fmt.Sprintf("name=%s,secret=%s,mds_namespace=%s", d.config["cephfs.user.name"], userSecret, fsName)
if util.IsTrue(d.config["cephfs.fscache"]) {
options += ",fsc"
key, err := CephKeyring(clusterName, userName)
if err != nil {
return false, err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName,
fsPath,
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath
err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, options)
err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, strings.Join(options, ","))
if err != nil {
return false, err
}
Expand Down
17 changes: 0 additions & 17 deletions internal/server/storage/drivers/driver_cephfs_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,3 @@ func (d *cephfs) osdPoolExists(clusterName string, userName string, osdPoolName

return true, nil
}

// getConfig parses the Ceph configuration file and returns the list of monitors and secret key.
func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) {
// Get the monitor list.
monitors, err := CephMonitors(clusterName)
if err != nil {
return nil, "", err
}

// Get the keyring entry.
secret, err := CephKeyring(clusterName, userName)
if err != nil {
return nil, "", err
}

return monitors, secret, nil
}
Loading
Loading