Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Aug 2, 2023
2 parents f4464ff + 0154af3 commit 492c882
Show file tree
Hide file tree
Showing 5 changed files with 778 additions and 15 deletions.
117 changes: 103 additions & 14 deletions pkg/backup/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package backup

import (
"bytes"
"context"
"encoding/json"
"fmt"
"path"
"strings"
"time"

Expand All @@ -31,27 +33,39 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
)

type backupManager struct {
deps *controller.Dependencies
backupCleaner BackupCleaner
backupTracker BackupTracker
statusUpdater controller.BackupConditionUpdaterInterface
deps *controller.Dependencies
backupCleaner BackupCleaner
backupTracker BackupTracker
statusUpdater controller.BackupConditionUpdaterInterface
manifestFetchers []ManifestFetcher
}

// NewBackupManager return backupManager
func NewBackupManager(deps *controller.Dependencies) backup.BackupManager {
statusUpdater := controller.NewRealBackupConditionUpdater(deps.Clientset, deps.BackupLister, deps.Recorder)
manifestFetchers := []ManifestFetcher{
NewTiDBClusterAutoScalerFetcher(deps.TiDBClusterAutoScalerLister),
NewTiDBDashboardFetcher(deps.TiDBDashboardLister),
NewTiDBInitializerFetcher(deps.TiDBInitializerLister),
NewTiDBMonitorFetcher(deps.TiDBMonitorLister),
NewTiDBNgMonitoringFetcher(deps.TiDBNGMonitoringLister),
}
return &backupManager{
deps: deps,
backupCleaner: NewBackupCleaner(deps, statusUpdater),
backupTracker: NewBackupTracker(deps, statusUpdater),
statusUpdater: statusUpdater,
deps: deps,
backupCleaner: NewBackupCleaner(deps, statusUpdater),
backupTracker: NewBackupTracker(deps, statusUpdater),
statusUpdater: statusUpdater,
manifestFetchers: manifestFetchers,
}
}

Expand Down Expand Up @@ -785,17 +799,92 @@ func (bm *backupManager) saveClusterMetaToExternalStorage(b *v1alpha1.Backup, cs
}

func (bm *backupManager) volumeSnapshotBackup(b *v1alpha1.Backup, tc *v1alpha1.TidbCluster) (string, error) {
if s, reason, err := snapshotter.NewSnapshotterForBackup(b.Spec.Mode, bm.deps); err != nil {
if err := bm.backupManifests(b, tc); err != nil {
return "BackupManifestsFailed", err
}

s, reason, err := snapshotter.NewSnapshotterForBackup(b.Spec.Mode, bm.deps)
if err != nil {
return reason, err
} else if s != nil {
csb, reason, err := s.GenerateBackupMetadata(b, tc)
}

csb, reason, err := s.GenerateBackupMetadata(b, tc)
if err != nil {
return reason, err
}

if reason, err = bm.saveClusterMetaToExternalStorage(b, csb); err != nil {
return reason, err
}
return "", nil
}

func (bm *backupManager) backupManifests(b *v1alpha1.Backup, tc *v1alpha1.TidbCluster) error {
cred := backuputil.GetStorageCredential(b.Namespace, b.Spec.StorageProvider, bm.deps.SecretLister)
externalStorage, err := backuputil.NewStorageBackend(b.Spec.StorageProvider, cred)
if err != nil {
return err
}

manifests := make([]runtime.Object, 0, 4)
manifests = append(manifests, tc)
for _, fetcher := range bm.manifestFetchers {
objects, err := fetcher.ListByTC(tc)
if err != nil {
return err
}
manifests = append(manifests, objects...)
}

for _, manifest := range manifests {
if err := bm.saveManifest(b, manifest.DeepCopyObject(), externalStorage); err != nil {
return err
}
}
return nil
}

func (bm *backupManager) saveManifest(b *v1alpha1.Backup, manifest runtime.Object, externalStorage *backuputil.StorageBackend) error {
if manifest.GetObjectKind().GroupVersionKind().Empty() {
gvk, err := controller.InferObjectKind(manifest)
if err != nil {
return reason, err
return err
}
manifest.GetObjectKind().SetGroupVersionKind(gvk)
}
kind := manifest.GetObjectKind().GroupVersionKind().Kind
metadataAccessor, err := meta.Accessor(manifest)
if err != nil {
return err
}
namespace := metadataAccessor.GetNamespace()
name := metadataAccessor.GetName()
// remove managedFields because it is used for k8s internal housekeeping, and we don't need them in backup
metadataAccessor.SetManagedFields(nil)
klog.Infof("%s/%s get manifest meta, kind: %s, namespace: %s, name: %s", b.Namespace, b.Name, kind, namespace, name)

return bm.saveClusterMetaToExternalStorage(b, csb)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

filePath := path.Join(constants.ClusterManifests, namespace, kind, fmt.Sprintf("%s.yaml", name))
existed, err := externalStorage.Exists(ctx, filePath)
if err != nil {
return err
}
return "", nil
if existed {
return nil
}

buf := bytes.NewBuffer(make([]byte, 0, 1024))
printer := printers.YAMLPrinter{}
if err := printer.PrintObj(manifest, buf); err != nil {
return err
}
if err := externalStorage.WriteAll(ctx, filePath, buf.Bytes(), nil); err != nil {
return err
}
klog.Infof("%s/%s upload manifest %s successfully", b.Namespace, b.Name, filePath)
return nil
}

func (bm *backupManager) ensureBackupPVCExist(backup *v1alpha1.Backup) (string, error) {
Expand Down
195 changes: 195 additions & 0 deletions pkg/backup/backup/backup_manifests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package backup

import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)

type ManifestFetcher interface {
ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error)
}

type TiDBDashboardFetcher struct {
lister listers.TidbDashboardLister
}

func NewTiDBDashboardFetcher(lister listers.TidbDashboardLister) ManifestFetcher {
return &TiDBDashboardFetcher{
lister: lister,
}
}

func (f *TiDBDashboardFetcher) ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error) {
emptySelector := labels.NewSelector()
dashboards, err := f.lister.List(emptySelector)
if err != nil {
return nil, err
}

for _, dashboard := range dashboards {
for _, cluster := range dashboard.Spec.Clusters {
name := cluster.Name
namespace := cluster.Namespace
if namespace == "" {
namespace = dashboard.Namespace
}
if name == tc.Name && namespace == tc.Namespace {
klog.Infof("TiDBDashboard %s/%s matches tc %s/%s",
dashboard.Namespace, dashboard.Name, tc.Namespace, tc.Name)
objects = append(objects, dashboard)
break
}
}
}
return
}

type TiDBMonitorFetcher struct {
lister listers.TidbMonitorLister
}

func NewTiDBMonitorFetcher(lister listers.TidbMonitorLister) ManifestFetcher {
return &TiDBMonitorFetcher{
lister: lister,
}
}

func (f *TiDBMonitorFetcher) ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error) {
emptySelector := labels.NewSelector()
monitors, err := f.lister.List(emptySelector)
if err != nil {
return nil, err
}

for _, monitor := range monitors {
for _, cluster := range monitor.Spec.Clusters {
name := cluster.Name
namespace := cluster.Namespace
if namespace == "" {
namespace = monitor.Namespace
}
if name == tc.Name && namespace == tc.Namespace {
klog.Infof("TidbMonitor %s/%s matches tc %s/%s",
monitor.Namespace, monitor.Name, tc.Namespace, tc.Name)
objects = append(objects, monitor)
break
}
}
}
return
}

type TiDBClusterAutoScalerFetcher struct {
lister listers.TidbClusterAutoScalerLister
}

func NewTiDBClusterAutoScalerFetcher(lister listers.TidbClusterAutoScalerLister) ManifestFetcher {
return &TiDBClusterAutoScalerFetcher{
lister: lister,
}
}

func (f *TiDBClusterAutoScalerFetcher) ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error) {
emptySelector := labels.NewSelector()
autoScalers, err := f.lister.List(emptySelector)
if err != nil {
return nil, err
}

for _, autoScaler := range autoScalers {
name := autoScaler.Spec.Cluster.Name
namespace := autoScaler.Spec.Cluster.Namespace
if namespace == "" {
namespace = autoScaler.Namespace
}
if name == tc.Name && namespace == tc.Namespace {
klog.Infof("TiDBClusterAutoScaler %s/%s matches tc %s/%s",
autoScaler.Namespace, autoScaler.Name, tc.Namespace, tc.Name)
objects = append(objects, autoScaler)
}
}
return
}

type TiDBInitializerFetcher struct {
lister listers.TidbInitializerLister
}

func NewTiDBInitializerFetcher(lister listers.TidbInitializerLister) ManifestFetcher {
return &TiDBInitializerFetcher{
lister: lister,
}
}

func (f *TiDBInitializerFetcher) ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error) {
emptySelector := labels.NewSelector()
initializers, err := f.lister.List(emptySelector)
if err != nil {
return nil, err
}

for _, initializer := range initializers {
name := initializer.Spec.Clusters.Name
namespace := initializer.Spec.Clusters.Namespace
if namespace == "" {
namespace = initializer.Namespace
}
if name == tc.Name && namespace == tc.Namespace {
klog.Infof("TiDBInitializer %s/%s matches tc %s/%s",
initializer.Namespace, initializer.Name, tc.Namespace, tc.Name)
objects = append(objects, initializer)
}
}
return
}

type TiDBNgMonitoringFetcher struct {
lister listers.TidbNGMonitoringLister
}

func NewTiDBNgMonitoringFetcher(lister listers.TidbNGMonitoringLister) ManifestFetcher {
return &TiDBNgMonitoringFetcher{
lister: lister,
}
}

func (f *TiDBNgMonitoringFetcher) ListByTC(tc *v1alpha1.TidbCluster) (objects []runtime.Object, err error) {
emptySelector := labels.NewSelector()
monitorings, err := f.lister.List(emptySelector)
if err != nil {
return nil, err
}

for _, monitoring := range monitorings {
for _, cluster := range monitoring.Spec.Clusters {
name := cluster.Name
namespace := cluster.Namespace
if namespace == "" {
namespace = monitoring.Namespace
}
if name == tc.Name && namespace == tc.Namespace {
klog.Infof("TidbNGMonitoring %s/%s matches tc %s/%s",
monitoring.Namespace, monitoring.Name, tc.Namespace, tc.Name)
objects = append(objects, monitoring)
break
}
}
}
return
}
Loading

0 comments on commit 492c882

Please sign in to comment.