diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index e2e319f7..0db12acc 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -27,6 +27,22 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AgentContainerSpec) DeepCopyInto(out *AgentContainerSpec) { + *out = *in + in.Resources.DeepCopyInto(&out.Resources) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AgentContainerSpec. +func (in *AgentContainerSpec) DeepCopy() *AgentContainerSpec { + if in == nil { + return nil + } + out := new(AgentContainerSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BRConfig) DeepCopyInto(out *BRConfig) { *out = *in @@ -482,6 +498,11 @@ func (in *NebulaClusterSpec) DeepCopyInto(out *NebulaClusterSpec) { *out = new(SSLCertsSpec) (*in).DeepCopyInto(*out) } + if in.Agent != nil { + in, out := &in.Agent, &out.Agent + *out = new(AgentContainerSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NebulaClusterSpec. diff --git a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml index 5c5c06f9..2ae648ef 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml @@ -424,6 +424,44 @@ spec: type: array type: object type: object + agent: + properties: + image: + type: string + resources: + properties: + claims: + items: + properties: + name: + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + type: object + version: + type: string + type: object enableBR: type: boolean enablePVReclaim: @@ -11139,51 +11177,21 @@ spec: type: array graphd: properties: - conditions: - items: - properties: - lastTransitionTime: - format: date-time - type: string - message: - maxLength: 32768 - type: string - observedGeneration: - format: int64 - minimum: 0 - type: integer - reason: - maxLength: 1024 - minLength: 1 - pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ - type: string - status: - enum: - - "True" - - "False" - - Unknown - type: string - type: - maxLength: 316 - pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ - type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array phase: type: string version: type: string workload: properties: + availableReplicas: + format: int32 + type: integer collisionCount: format: int32 type: integer + currentReplicas: + format: int32 + type: integer currentRevision: type: string observedGeneration: @@ -11197,9 +11205,6 @@ spec: type: integer updateRevision: type: string - updatedReadyReplicas: - format: int32 - type: integer updatedReplicas: format: int32 type: integer @@ -11211,51 +11216,21 @@ spec: type: object metad: properties: - conditions: - items: - properties: - lastTransitionTime: - format: date-time - type: string - message: - maxLength: 32768 - type: string - observedGeneration: - format: int64 - minimum: 0 - type: integer - reason: - maxLength: 1024 - minLength: 1 - pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ - type: string - status: - enum: - - "True" - - "False" - - Unknown - type: string - type: - maxLength: 316 - pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ - type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array phase: type: string version: type: string workload: properties: + availableReplicas: + format: int32 + type: integer collisionCount: format: int32 type: integer + currentReplicas: + format: int32 + type: integer currentRevision: type: string observedGeneration: @@ -11269,9 +11244,6 @@ spec: type: integer updateRevision: type: string - updatedReadyReplicas: - format: int32 - type: integer updatedReplicas: format: int32 type: integer @@ -11281,6 +11253,9 @@ spec: - updatedReplicas type: object type: object + observedGeneration: + format: int64 + type: integer storaged: properties: balancedSpaces: @@ -11288,42 +11263,6 @@ spec: format: int32 type: integer type: array - conditions: - items: - properties: - lastTransitionTime: - format: date-time - type: string - message: - maxLength: 32768 - type: string - observedGeneration: - format: int64 - minimum: 0 - type: integer - reason: - maxLength: 1024 - minLength: 1 - pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ - type: string - status: - enum: - - "True" - - "False" - - Unknown - type: string - type: - maxLength: 316 - pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ - type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array hostsAdded: type: boolean lastBalanceJob: @@ -11341,9 +11280,15 @@ spec: type: string workload: properties: + availableReplicas: + format: int32 + type: integer collisionCount: format: int32 type: integer + currentReplicas: + format: int32 + type: integer currentRevision: type: string observedGeneration: @@ -11357,9 +11302,6 @@ spec: type: integer updateRevision: type: string - updatedReadyReplicas: - format: int32 - type: integer updatedReplicas: format: int32 type: integer diff --git a/config/samples/full-backup-job.yaml b/config/samples/full-backup-job.yaml index 0e71c6ba..c827f979 100644 --- a/config/samples/full-backup-job.yaml +++ b/config/samples/full-backup-job.yaml @@ -4,12 +4,12 @@ metadata: name: nebula-full-backup spec: parallelism: 1 - ttlSecondsAfterFinished: 60 + ttlSecondsAfterFinished: 600 template: spec: restartPolicy: OnFailure containers: - - image: vesoft/br-ent:v3.4.0 + - image: vesoft/br-ent:v3.5.0 imagePullPolicy: Always name: backup command: diff --git a/config/samples/incremental-backup-job.yaml b/config/samples/incremental-backup-job.yaml index ee6bbf82..3456ece9 100644 --- a/config/samples/incremental-backup-job.yaml +++ b/config/samples/incremental-backup-job.yaml @@ -4,8 +4,8 @@ metadata: name: aws-s3-secret type: Opaque data: - access-key: QVNJQVE0WFlSWFE1TlhVMlczNlUK - secret-key: ZFJ6OEdNcDdxenMwVGFGdExVM2RpYkk4b2hHRWRSamgvNTdzWkg3Ugo= + access-key: + secret-key: --- apiVersion: batch/v1 kind: Job @@ -13,12 +13,12 @@ metadata: name: nebula-incr-backup spec: parallelism: 1 - ttlSecondsAfterFinished: 60 + ttlSecondsAfterFinished: 600 template: spec: restartPolicy: OnFailure containers: - - image: vesoft/br-ent:v3.4.0 + - image: vesoft/br-ent:v3.5.0 imagePullPolicy: Always name: backup command: diff --git a/config/samples/restore-pod.yaml b/config/samples/restore-pod.yaml index a879df95..cf5ed60c 100644 --- a/config/samples/restore-pod.yaml +++ b/config/samples/restore-pod.yaml @@ -13,7 +13,7 @@ metadata: name: nebula-restore spec: containers: - - image: vesoft/br-ent:v3.4.0 + - image: vesoft/br-ent:v3.5.0 imagePullPolicy: Always name: restore command: diff --git a/doc/user/br_guide.md b/doc/user/br_guide.md index 11594ca5..1d09cef1 100644 --- a/doc/user/br_guide.md +++ b/doc/user/br_guide.md @@ -66,8 +66,8 @@ metadata: name: aws-s3-secret type: Opaque data: - access-key: QVNJQVE0WFlSXFE1TlhVMlczNlUK - secret-key: ZFJ6OEdNcDdxenMwVGFGdFxVM2RpYkk4b2hHRWRSamgvNTdzWkg3Ugo= + access-key: + secret-key: --- apiVersion: apps.nebula-graph.io/v1alpha1 kind: NebulaRestore diff --git a/doc/user/ssl_guide.md b/doc/user/ssl_guide.md index 22e74dee..9f67c2d5 100644 --- a/doc/user/ssl_guide.md +++ b/doc/user/ssl_guide.md @@ -83,19 +83,19 @@ sslCerts: # Name of the server cert secret serverSecret: "server-cert" # The key to server PEM encoded public key certificate, default name is tls.crt - serverPublicKey: "" + serverCert: "" # The key to server private key associated with given certificate, default name is tls.key - serverPrivateKey: "" + serverKey: "" # Name of the client cert secret clientSecret: "client-cert" # The key to server PEM encoded public key certificate, default name is tls.crt - clientPublicKey: "" + clientCert: "" # The key to client private key associated with given certificate, default name is tls.key - clientPrivateKey: "" + clientKey: "" # Name of the CA cert secret caSecret: "ca-cert" # The key to CA PEM encoded public key certificate, default name is ca.crt - caPublicKey: "" + caCert: "" # InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name insecureSkipVerify: false ``` \ No newline at end of file diff --git a/go.mod b/go.mod index b106cee2..724c2538 100644 --- a/go.mod +++ b/go.mod @@ -23,8 +23,8 @@ require ( k8s.io/client-go v0.26.5 k8s.io/code-generator v0.26.5 k8s.io/klog/v2 v2.90.1 - k8s.io/kube-scheduler v0.0.0 - k8s.io/kubectl v0.0.0 + k8s.io/kube-scheduler v0.26.5 + k8s.io/kubectl v0.26.5 k8s.io/kubernetes v1.26.5 k8s.io/utils v0.0.0-20230209194617-a36077c30491 sigs.k8s.io/controller-runtime v0.14.6 diff --git a/pkg/controller/component/reclaimer/meta_reconciler.go b/pkg/controller/component/reclaimer/meta_reconciler.go index 9726bbd9..8ab26dc4 100644 --- a/pkg/controller/component/reclaimer/meta_reconciler.go +++ b/pkg/controller/component/reclaimer/meta_reconciler.go @@ -103,7 +103,7 @@ func (m *meta) resolvePVCFromPod(pod *corev1.Pod) ([]*corev1.PersistentVolumeCla } pvc, err := m.clientSet.PVC().GetPVC(pod.Namespace, pvcName) if err != nil { - klog.Error(err, "pod [%s/%s] get PVC %s failed: %v", pod.Namespace, pod.Name, pvcName, err) + klog.Errorf("pod [%s/%s] get PVC %s failed: %v", pod.Namespace, pod.Name, pvcName, err) continue } pvcs = append(pvcs, pvc) diff --git a/pkg/controller/component/storaged_scaler.go b/pkg/controller/component/storaged_scaler.go index 7f065461..432d5a96 100644 --- a/pkg/controller/component/storaged_scaler.go +++ b/pkg/controller/component/storaged_scaler.go @@ -140,7 +140,7 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep defer func() { err := metaClient.Disconnect() if err != nil { - klog.Error("meta client disconnect failed: %v", err) + klog.Errorf("meta client disconnect failed: %v", err) } }() diff --git a/pkg/controller/nebularestore/nebula_restore_control.go b/pkg/controller/nebularestore/nebula_restore_control.go index ac566435..31a78dad 100644 --- a/pkg/controller/nebularestore/nebula_restore_control.go +++ b/pkg/controller/nebularestore/nebula_restore_control.go @@ -108,7 +108,7 @@ func (c *defaultRestoreControl) UpdateNebulaRestore(rt *v1alpha1.NebulaRestore) } updated, err := c.clientSet.NebulaRestore().GetNebulaRestore(ns, rt.Name) if err != nil { - klog.Error("Fail to get NebulaRestore [%s/%s], %v", ns, name, err) + klog.Errorf("Fail to get NebulaRestore [%s/%s], %v", ns, name, err) } if err := c.deleteRestoredCluster(ns, updated.Status.ClusterName); err != nil { klog.Errorf("Fail to delete NebulaCluster %v", err) diff --git a/pkg/controller/nebularestore/nebula_restore_controller.go b/pkg/controller/nebularestore/nebula_restore_controller.go index 876e0b7f..218263b6 100644 --- a/pkg/controller/nebularestore/nebula_restore_controller.go +++ b/pkg/controller/nebularestore/nebula_restore_controller.go @@ -70,7 +70,7 @@ func NewRestoreReconciler(mgr ctrl.Manager) (*Reconciler, error) { func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res reconcile.Result, retErr error) { var restore v1alpha1.NebulaRestore key := req.NamespacedName.String() - subCtx, cancel := context.WithTimeout(ctx, reconcileTimeOut) + subCtx, cancel := context.WithTimeout(ctx, time.Minute*1) defer cancel() startTime := time.Now() @@ -115,6 +115,6 @@ func (r *Reconciler) syncNebulaRestore(restore *v1alpha1.NebulaRestore) error { func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.NebulaRestore{}). - WithOptions(opts). + WithOptions(controller.Options{MaxConcurrentReconciles: 5}). Complete(r) } diff --git a/pkg/controller/nebularestore/nebula_restore_manager.go b/pkg/controller/nebularestore/nebula_restore_manager.go index 5303dba4..da3d045d 100644 --- a/pkg/controller/nebularestore/nebula_restore_manager.go +++ b/pkg/controller/nebularestore/nebula_restore_manager.go @@ -49,7 +49,7 @@ const ( S3SecretKey = "secret-key" ) -type Restore struct { +type RestoreAgent struct { ctx context.Context cfg *rtutil.Config sto storage.ExternalStorage @@ -60,7 +60,6 @@ type Restore struct { rootURI string backupName string - backSuffix string metaDir *ng.DirInfo storageHosts []*meta.ServiceInfo @@ -78,7 +77,6 @@ var _ Manager = (*restoreManager)(nil) type restoreManager struct { clientSet kube.ClientSet - restore *Restore } func NewRestoreManager(clientSet kube.ClientSet) Manager { @@ -127,7 +125,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { } nc.Annotations = nil if err := rm.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil { - klog.Error("remove cluster [%s/%s] annotations failed: %v", ns, restoredName, err) + klog.Errorf("remove cluster [%s/%s] annotations failed: %v", ns, restoredName, err) } return rm.clientSet.NebulaRestore().UpdateNebulaRestoreStatus(rt, @@ -157,11 +155,12 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return err } - if err := rm.initRestore(rt); err != nil { + restoreAgent, err := initRestoreAgent(rm.clientSet, rt) + if err != nil { return err } - if err := rm.loadCluster(original, restored, options); err != nil { + if err := rm.loadCluster(original, restored, restoreAgent, options); err != nil { return err } @@ -169,21 +168,21 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return err } - if err := rm.restore.loadBakMetas(rm.restore.backupName); err != nil { + if err := restoreAgent.loadBakMetas(restoreAgent.backupName); err != nil { return err } klog.Infof("[%s/%s] load backup metad file successfully", ns, restoredName) - if err := rm.restore.checkTopology(rm.restore.bakMetas[0], rm.restore.storageHosts); err != nil { + if err := restoreAgent.checkTopology(); err != nil { return err } if !condition.IsRestoreMetadComplete(rt) { - if !rm.endpointsConnected(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)) { + if !rm.endpointsConnected(restoreAgent, restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for metad init agent are connected", ns, restoredName) } - if err := rm.restore.downloadMetaData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)); err != nil { + if err := restoreAgent.downloadMetaData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift)); err != nil { klog.Errorf("download metad files failed: %v", err) return err } @@ -198,9 +197,9 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for metad running", ns, restoredName) } - hostPairs := rm.restore.genHostPairs(rm.restore.bakMetas[0], restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) + hostPairs := restoreAgent.genHostPairs(restoreAgent.bakMetas[0], restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) metaEndpoints := restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift) - restoreResp, err := rm.restore.restoreMeta(rm.restore.bakMetas[0], hostPairs, metaEndpoints, options) + restoreResp, err := restoreAgent.restoreMeta(restoreAgent.bakMetas[0], hostPairs, metaEndpoints, options) if err != nil { klog.Errorf("restore metad data [%s/%s] failed, error: %v", ns, restoredName, err) return err @@ -231,11 +230,11 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { } if !condition.IsRestoreStoragedComplete(rt) { - if !rm.endpointsConnected(restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { + if !rm.endpointsConnected(restoreAgent, restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for storaged init agent are connected", ns, restoredName) } - checkpoints, err := rm.restore.downloadStorageData(rt.Status.Partitions, rm.restore.storageHosts) + checkpoints, err := restoreAgent.downloadStorageData(rt.Status.Partitions, restoreAgent.storageHosts) if err != nil { klog.Errorf("download storaged files failed: %v", err) return err @@ -243,7 +242,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { klog.Infof("restoring [%s/%s] in stage1, download storaged files successfully", ns, restoredName) - if err := rm.restore.playBackStorageData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift), rm.restore.storageHosts); err != nil { + if err := restoreAgent.playBackStorageData(restored.GetMetadEndpoints(v1alpha1.MetadPortNameThrift), restoreAgent.storageHosts); err != nil { return err } @@ -275,11 +274,11 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for cluster ready", ns, restoredName) } - if !rm.endpointsConnected(restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { + if !rm.endpointsConnected(restoreAgent, restored.GetStoragedEndpoints(v1alpha1.StoragedPortNameThrift)) { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage1, waiting for storaged sidecar agent are connected", ns, restoredName) } - if err := rm.restore.removeDownloadCheckpoints(rt.Status.Checkpoints); err != nil { + if err := restoreAgent.removeDownloadCheckpoints(rt.Status.Checkpoints); err != nil { klog.Errorf("remove downloaded checkpoints failed: %v", err) return err } @@ -294,7 +293,7 @@ func (rm *restoreManager) syncRestoreProcess(rt *v1alpha1.NebulaRestore) error { return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage2, waiting for cluster ready", ns, restoredName) } -func (rm *restoreManager) loadCluster(original, restored *v1alpha1.NebulaCluster, options []nebula.Option) error { +func (rm *restoreManager) loadCluster(original, restored *v1alpha1.NebulaCluster, restoreAgent *RestoreAgent, options []nebula.Option) error { mc, err := nebula.NewMetaClient([]string{original.GetMetadThriftConnAddress()}, options...) if err != nil { return err @@ -310,19 +309,19 @@ func (rm *restoreManager) loadCluster(original, restored *v1alpha1.NebulaCluster return err } - rm.restore.metaDir = hosts.GetMetas()[0].Dir - rm.replaceStorageHosts(hosts.GetStorages(), restored) + restoreAgent.metaDir = hosts.GetMetas()[0].Dir + rm.replaceStorageHosts(hosts.GetStorages(), restored, restoreAgent) - klog.Infof("restore metad dir info, root: %s data: %s", string(rm.restore.metaDir.Root), string(rm.restore.metaDir.Data[0])) + klog.Infof("restore metad dir info, root: %s data: %s", string(restoreAgent.metaDir.Root), string(restoreAgent.metaDir.Data[0])) return nil } -func (rm *restoreManager) replaceStorageHosts(original []*meta.ServiceInfo, restored *v1alpha1.NebulaCluster) { +func (rm *restoreManager) replaceStorageHosts(original []*meta.ServiceInfo, restored *v1alpha1.NebulaCluster, restoreAgent *RestoreAgent) { for i := range original { original[i].Addr.Host = restored.StoragedComponent().GetPodFQDN(int32(i)) } - rm.restore.storageHosts = original + restoreAgent.storageHosts = original } func (rm *restoreManager) genNebulaCluster(restoredName string, rt *v1alpha1.NebulaRestore, original *v1alpha1.NebulaCluster) *v1alpha1.NebulaCluster { @@ -350,17 +349,16 @@ func (rm *restoreManager) genNebulaCluster(restoredName string, rt *v1alpha1.Neb return nc } -func (rm *restoreManager) initRestore(restore *v1alpha1.NebulaRestore) error { - var err error +func initRestoreAgent(clientSet kube.ClientSet, restore *v1alpha1.NebulaRestore) (*RestoreAgent, error) { backend := &pb.Backend{} if err := backend.SetUri(fmt.Sprintf("s3://%s", restore.Spec.BR.S3.Bucket)); err != nil { - return err + return nil, err } backend.GetS3().Region = restore.Spec.BR.S3.Region backend.GetS3().Endpoint = restore.Spec.BR.S3.Endpoint - accessKey, secretKey, err := rm.getS3Key(restore.Namespace, restore.Spec.BR.S3.SecretName) + accessKey, secretKey, err := getS3Key(clientSet, restore.Namespace, restore.Spec.BR.S3.SecretName) if err != nil { - return fmt.Errorf("get S3 key failed: %v", err) + return nil, fmt.Errorf("get S3 key failed: %v", err) } backend.GetS3().AccessKey = accessKey backend.GetS3().SecretKey = secretKey @@ -371,22 +369,21 @@ func (rm *restoreManager) initRestore(restore *v1alpha1.NebulaRestore) error { Backend: backend, } - if rm.restore == nil { - if rm.restore, err = newRestore(cfg); err != nil { - return err - } + ra, err := newRestoreAgent(cfg) + if err != nil { + return nil, err } - return nil + return ra, nil } -func newRestore(cfg *rtutil.Config) (*Restore, error) { +func newRestoreAgent(cfg *rtutil.Config) (*RestoreAgent, error) { sto, err := storage.New(cfg.Backend) if err != nil { return nil, err } - return &Restore{ + return &RestoreAgent{ ctx: context.TODO(), cfg: cfg, sto: sto, @@ -396,15 +393,15 @@ func newRestore(cfg *rtutil.Config) (*Restore, error) { }, nil } -func (r *Restore) checkTopology(bak *meta.BackupMeta, storageHosts []*meta.ServiceInfo) error { - if len(storageHosts) != len(bak.GetStorageHosts()) { +func (r *RestoreAgent) checkTopology() error { + if len(r.bakMetas[0].GetStorageHosts()) != len(r.storageHosts) { return fmt.Errorf("the cluster topology of storaged count must be consistent") } return nil } -func (r *Restore) loadBakMetas(backupName string) error { +func (r *RestoreAgent) loadBakMetas(backupName string) error { // check backup dir existence rootURI, err := rtutil.UriJoin(r.cfg.Backend.Uri(), backupName) if err != nil { @@ -437,7 +434,7 @@ func (r *Restore) loadBakMetas(backupName string) error { return nil } -func (r *Restore) downloadMetaData(metaEndpoints []string) error { +func (r *RestoreAgent) downloadMetaData(metaEndpoints []string) error { // {backupRoot}/{backupName}/meta/*.sst externalUri, _ := rtutil.UriJoin(r.rootURI, r.backupName, "meta") backend, err := r.sto.GetDir(r.ctx, externalUri) @@ -473,7 +470,7 @@ func (r *Restore) downloadMetaData(metaEndpoints []string) error { return nil } -func (r *Restore) downloadStorageData(parts map[string][]*ng.HostAddr, storageHosts []*meta.ServiceInfo) (map[string]map[string]string, error) { +func (r *RestoreAgent) downloadStorageData(parts map[string][]*ng.HostAddr, storageHosts []*meta.ServiceInfo) (map[string]map[string]string, error) { // checkpoints save the download checkpoint paths, make cleanup restore data easier checkpoints := make(map[string]map[string]string) @@ -543,7 +540,7 @@ func (r *Restore) downloadStorageData(parts map[string][]*ng.HostAddr, storageHo return checkpoints, group.Wait() } -func (r *Restore) restoreMeta(backup *meta.BackupMeta, storageMap map[string]string, metaEndpoints []string, options []nebula.Option) (*meta.RestoreMetaResp, error) { +func (r *RestoreAgent) restoreMeta(backup *meta.BackupMeta, storageMap map[string]string, metaEndpoints []string, options []nebula.Option) (*meta.RestoreMetaResp, error) { addrMap := make([]*meta.HostPair, 0, len(storageMap)) for from, to := range storageMap { fromAddr, err := rtutil.ParseAddr(from) @@ -593,7 +590,7 @@ func (r *Restore) restoreMeta(backup *meta.BackupMeta, storageMap map[string]str } // genHostPairs generate old:new storage host pairs -func (r *Restore) genHostPairs(backup *meta.BackupMeta, restoreHosts []string) map[string]string { +func (r *RestoreAgent) genHostPairs(backup *meta.BackupMeta, restoreHosts []string) map[string]string { hostPairs := make(map[string]string) backupHosts := make([]string, 0) @@ -611,7 +608,7 @@ func (r *Restore) genHostPairs(backup *meta.BackupMeta, restoreHosts []string) m return hostPairs } -func (r *Restore) playBackStorageData(metaEndpoints []string, storageHosts []*meta.ServiceInfo) error { +func (r *RestoreAgent) playBackStorageData(metaEndpoints []string, storageHosts []*meta.ServiceInfo) error { group := async.NewGroup(context.TODO(), r.cfg.Concurrency) for _, s := range storageHosts { agent, err := r.agentMgr.GetAgent(s.GetAddr()) @@ -661,7 +658,7 @@ func (r *Restore) playBackStorageData(metaEndpoints []string, storageHosts []*me return nil } -func (r *Restore) removeDownloadCheckpoints(checkpoints map[string]map[string]string) error { +func (r *RestoreAgent) removeDownloadCheckpoints(checkpoints map[string]map[string]string) error { for addr, paths := range checkpoints { host, err := rtutil.ParseAddr(addr) if err != nil { @@ -688,7 +685,7 @@ func (rm *restoreManager) removeInitAgentContainer(namespace, ncName string) err } updated.SetAnnotations(map[string]string{annotation.AnnRestoreStageKey: annotation.AnnRestoreStage2Val}) - updated.Spec.Storaged.EnableForceUpdate = pointer.Bool(false) + updated.Spec.Storaged.EnableForceUpdate = nil m := make([]corev1.Container, 0) for _, c := range updated.Spec.Metad.InitContainers { @@ -740,14 +737,14 @@ func (rm *restoreManager) updateClusterAnnotations(namespace, ncName string, ann return nil } -func (rm *restoreManager) endpointsConnected(endpoints []string) bool { +func (rm *restoreManager) endpointsConnected(restoreAgent *RestoreAgent, endpoints []string) bool { for _, ep := range endpoints { host, err := rtutil.ParseAddr(ep) if err != nil { klog.Error(err) return false } - agent, err := rm.restore.agentMgr.GetAgent(host) + agent, err := restoreAgent.agentMgr.GetAgent(host) if err != nil { klog.Error(err) return false @@ -798,9 +795,9 @@ func (rm *restoreManager) getRestoredName(rt *v1alpha1.NebulaRestore) (string, e return rt.Status.ClusterName, nil } -func (rm *restoreManager) getS3Key(namespace, secretName string) (accessKey string, secretKey string, err error) { +func getS3Key(clientSet kube.ClientSet, namespace, secretName string) (accessKey string, secretKey string, err error) { var secret *corev1.Secret - secret, err = rm.clientSet.Secret().GetSecret(namespace, secretName) + secret, err = clientSet.Secret().GetSecret(namespace, secretName) if err != nil { return }