Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 136 additions & 2 deletions pkg/controller/chi/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (

"github.com/juliangruber/go-intersect"
"gopkg.in/d4l3k/messagediff.v1"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

//"github.com/altinity/queue"

"github.com/altinity/queue"
Expand Down Expand Up @@ -753,7 +755,7 @@ func (w *worker) prepareHostStatefulSetWithStatus(ctx context.Context, host *chi

// StatefulSet for a host
_ = w.creator.CreateStatefulSet(host, shutdown)
(&host.ReconcileAttributes).SetStatus(w.getStatefulSetStatus(host))
(&host.ReconcileAttributes).SetStatus(w.getStatefulSetStatus(ctx, host))
}

// reconcileHostStatefulSet reconciles host's StatefulSet
Expand Down Expand Up @@ -1795,7 +1797,7 @@ func (w *worker) reconcileService(ctx context.Context, chi *chiv1.ClickHouseInst
}

// getStatefulSetStatus
func (w *worker) getStatefulSetStatus(host *chiv1.ChiHost) chiv1.StatefulSetStatus {
func (w *worker) getStatefulSetStatus(ctx context.Context, host *chiv1.ChiHost) chiv1.StatefulSetStatus {
statefulSet := host.StatefulSet
w.a.V(2).M(host).S().Info(util.NamespaceNameString(statefulSet.ObjectMeta))
defer w.a.V(2).M(host).E().Info(util.NamespaceNameString(statefulSet.ObjectMeta))
Expand All @@ -1808,6 +1810,36 @@ func (w *worker) getStatefulSetStatus(host *chiv1.ChiHost) chiv1.StatefulSetStat
curLabel, curHasLabel := w.creator.GetStatefulSetVersion(curStatefulSet)
newLabel, newHasLabel := w.creator.GetStatefulSetVersion(statefulSet)
if curHasLabel && newHasLabel {
if curLabel != newLabel {
// When only pvc storage changes, directly update PVCs to skip restarting StatefulSet
if restoreStatefulSet, ok := w.checkIfStorageChangedOnly(ctx, host, curStatefulSet, statefulSet, curLabel); ok {
for index := range statefulSet.Spec.VolumeClaimTemplates {
newTemplate := &statefulSet.Spec.VolumeClaimTemplates[index]
// Equal number of templates has been checked, so no error here
curTemplate := &curStatefulSet.Spec.VolumeClaimTemplates[index]

newStorage := newTemplate.Spec.Resources.Requests.Storage()
oldStorage := curTemplate.Spec.Resources.Requests.Storage()

if err := w.updatePVCs(ctx, host, newTemplate.Name, *newStorage); err != nil {
w.a.WithEvent(host.CHI, eventActionReconcile, eventReasonReconcileFailed).
WithStatusAction(host.CHI).
WithStatusError(host.CHI).
M(host).F().
Error("FAILED to update PVC Storage: %s=>%s templateName: %s CHI: %s ",
oldStorage.String(), newStorage.String(), newTemplate.Name, host.CHI.Name)
return chiv1.StatefulSetStatusModified
} else {
w.a.V(1).F().L().Info("update PVC Storage: %s=>%s templateName: %s", oldStorage.String(), newStorage.String(), newTemplate.Name)
}
}
host.StatefulSet = restoreStatefulSet

w.a.M(host).F().Info("INFO StatefulSet ARE EQUAL based on labels (Only pvc storage changes) no reconcile is actually needed %s", util.NamespaceNameString(statefulSet.ObjectMeta))
return chiv1.StatefulSetStatusSame
}
}

if curLabel == newLabel {
w.a.M(host).F().Info("INFO StatefulSet ARE EQUAL based on labels no reconcile is actually needed %s", util.NamespaceNameString(statefulSet.ObjectMeta))
return chiv1.StatefulSetStatusSame
Expand Down Expand Up @@ -2176,3 +2208,105 @@ func (w *worker) applyResource(
curResourceList[resourceName] = desiredResourceList[resourceName]
return true
}

// checkIfStorageChangedOnly Check if only the pvc storage has changed
func (w *worker) checkIfStorageChangedOnly(ctx context.Context, host *chiv1.ChiHost,
curStatefulSet, statefulSet *apps.StatefulSet, curLabel string) (newStatefulSet *apps.StatefulSet, ok bool) {
// Make sure the number of pvc templates is consistent
if len(curStatefulSet.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
return nil, false
}

changedStorage := false
statefulSetCopy := statefulSet.DeepCopy()
for index := range statefulSetCopy.Spec.VolumeClaimTemplates {
newTemplate := &statefulSetCopy.Spec.VolumeClaimTemplates[index]
curTemplate := &curStatefulSet.Spec.VolumeClaimTemplates[index]

// Do not continue if there are differences in the pvc template
if newTemplate.Name != curTemplate.Name {
return nil, false
}

newStorage := newTemplate.Spec.Resources.Requests.Storage()
oldStorage := curTemplate.Spec.Resources.Requests.Storage()

if newStorage != nil && oldStorage != nil &&
newStorage.Cmp(*oldStorage) != 0 {
// You can only modify storage without copying the entire spec.
// Otherwise, the generated object-version will be inconsistent
newTemplate.Spec.Resources.Requests[core.ResourceStorage] = *oldStorage
changedStorage = true
}
}

if !changedStorage {
return nil, false
}

w.creator.SetupStatefulSetVersion(statefulSetCopy)
newLabel, _ := w.creator.GetStatefulSetVersion(statefulSetCopy)

return statefulSetCopy, curLabel == newLabel
}

// updatePVCs Update the storage of the specified volume, (update only when the storage changes)
func (w *worker) updatePVCs(ctx context.Context, host *chiv1.ChiHost, templateName string, storage resource.Quantity) error {
if util.IsContextDone(ctx) {
return nil
}

namespace := host.Address.Namespace
w.a.V(2).M(host).S().Info("host %s/%s", namespace, host.Name)
defer w.a.V(2).M(host).E().Info("host %s/%s", namespace, host.Name)

host.WalkVolumeMounts(func(volumeMount *core.VolumeMount) {
if util.IsContextDone(ctx) {
return
}

volumeClaimTemplateName := volumeMount.Name
if volumeClaimTemplateName != templateName {
return
}

volumeClaimTemplate, ok := host.CHI.GetVolumeClaimTemplate(volumeClaimTemplateName)
if !ok {
// No this is not a reference to VolumeClaimTemplate
return
}

// pvc storage is the same size as expected
if volumeClaimTemplate.Spec.Resources.Requests.Storage().Cmp(storage) == 0 {
return
}

// update storage
volumeClaimTemplate.Spec.Resources.Requests[core.ResourceStorage] = storage

pvcName := chopmodel.CreatePVCName(host, volumeMount, volumeClaimTemplate)
w.a.V(2).M(host).Info("reconcile volumeMount (%s/%s/%s/%s) - start", namespace, host.Name, volumeMount.Name, pvcName)
defer w.a.V(2).M(host).Info("reconcile volumeMount (%s/%s/%s/%s) - end", namespace, host.Name, volumeMount.Name, pvcName)

pvc, err := w.c.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, newGetOptions())
if err != nil {
if apierrors.IsNotFound(err) {
// This is not an error per se, means PVC is not created (yet)?
} else {
w.a.M(host).F().Error("ERROR unable to get PVC(%s/%s) err: %v", namespace, pvcName, err)
}
return
}

pvc, err = w.reconcilePVC(ctx, pvc, host, volumeClaimTemplate)
if err != nil {
w.a.M(host).F().Error("ERROR unable to reconcile PVC(%s/%s) err: %v", namespace, pvcName, err)
w.registryFailed.RegisterPVC(pvc.ObjectMeta)
return
}

w.registryReconciled.RegisterPVC(pvc.ObjectMeta)
})

return nil
}
7 changes: 7 additions & 0 deletions pkg/model/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ func (c *Creator) setupStatefulSetVersion(statefulSet *apps.StatefulSet) {
// c.a.V(3).F().Info("StatefulSet(%s/%s)\n%s", statefulSet.Namespace, statefulSet.Name, util.Dump(statefulSet))
}

// SetupStatefulSetVersion
func (c *Creator) SetupStatefulSetVersion(statefulSet *apps.StatefulSet) {
// Existing LabelObjectVersion label must be removed first
delete(statefulSet.Labels, LabelObjectVersion)
c.setupStatefulSetVersion(statefulSet)
}

// GetStatefulSetVersion gets version of the StatefulSet
// TODO property of the labeler?
func (c *Creator) GetStatefulSetVersion(statefulSet *apps.StatefulSet) (string, bool) {
Expand Down