Skip to content

Commit

Permalink
Merge pull request #282 from 0xff-dev/dataset-fix
Browse files Browse the repository at this point in the history
fix: supports synchronization of already uploaded data
  • Loading branch information
bjwswang authored Nov 23, 2023
2 parents ed2a584 + dbf173d commit 9aad9ce
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
15 changes: 10 additions & 5 deletions api/v1alpha1/versioneddataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,31 @@ var (
const InheritedFromVersionName = "inheritfrom-"

func generateInheriedFileStatus(minioClient *minio.Client, instance *VersionedDataset) []FileStatus {
srcBucket := instance.Spec.Dataset.Namespace
prefix := fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.InheritedFrom)
name := InheritedFromVersionName + instance.Spec.InheritedFrom
phase := FileProcessPhaseProcessing
if instance.Spec.InheritedFrom == "" {
return nil
prefix = fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.Version)
name = InheritedFromVersionName + instance.Spec.Version
phase = FileProcessPhaseSucceeded
}

srcBucket := instance.Spec.Dataset.Namespace
prefix := fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.InheritedFrom)
filePaths := minioutils.ListObjects(context.TODO(), *srcBucket, prefix, minioClient, -1)
status := make([]FileDetails, len(filePaths))
sort.Strings(filePaths)

for idx, fp := range filePaths {
status[idx] = FileDetails{
Path: strings.TrimPrefix(fp, prefix),
Phase: FileProcessPhaseProcessing,
Phase: phase,
}
}

return []FileStatus{
{
TypedObjectReference: TypedObjectReference{
Name: InheritedFromVersionName + instance.Spec.InheritedFrom,
Name: name,
Namespace: &instance.Namespace,
Kind: "VersionedDataset",
},
Expand Down
44 changes: 23 additions & 21 deletions pkg/scheduler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,30 @@ func (e *executor) generateJob(ctx context.Context, jobCh chan<- JobPayload, dat
dstPrefix := fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, e.instance.Spec.Version)

var srcBucket, srcPrefix string
switch fs.Kind {
case "Datasource":
// Since the data source can be configured with different minio addresses,
// it may involve copying of data from different minio,
// which may result in the operator memory increasing to be OOM.
// so currently it is considered that all operations are in the same minio.
ds := &v1alpha1.Datasource{}
if err := e.client.Get(ctx, types.NamespacedName{Namespace: *fs.Namespace, Name: fs.Name}, ds); err != nil {
klog.Errorf("generateJob: failed to get datasource %s", err)
return err
}
srcBucket = *fs.Namespace
if ds.Spec.OSS != nil {
srcBucket = ds.Spec.OSS.Bucket
if !removeAction {
switch fs.Kind {
case "Datasource":
// Since the data source can be configured with different minio addresses,
// it may involve copying of data from different minio,
// which may result in the operator memory increasing to be OOM.
// so currently it is considered that all operations are in the same minio.
ds := &v1alpha1.Datasource{}
if err := e.client.Get(ctx, types.NamespacedName{Namespace: *fs.Namespace, Name: fs.Name}, ds); err != nil {
klog.Errorf("generateJob: failed to get datasource %s", err)
return err
}
srcBucket = *fs.Namespace
if ds.Spec.OSS != nil {
srcBucket = ds.Spec.OSS.Bucket
}
case "VersionedDataset":
srcVersion := fs.Name[len(v1alpha1.InheritedFromVersionName):]
srcBucket = e.instance.Namespace
srcPrefix = fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, srcVersion)
default:
klog.Errorf("currently, copying data from a data source of the type %s is not supported", fs.Kind)
continue
}
case "VersionedDataset":
srcVersion := fs.Name[len(v1alpha1.InheritedFromVersionName):]
srcBucket = e.instance.Namespace
srcPrefix = fmt.Sprintf("dataset/%s/%s/", e.instance.Spec.Dataset.Name, srcVersion)
default:
klog.Errorf("currently, copying data from a data source of the type %s is not supported", fs.Kind)
continue
}

bucketExists, err := e.minioClient.BucketExists(ctx, dstBucket)
Expand Down

0 comments on commit 9aad9ce

Please sign in to comment.