Skip to content

Commit

Permalink
Adjust formatting to be consistent with gofmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MaXal committed Sep 11, 2024
1 parent 12573d4 commit 6343f79
Show file tree
Hide file tree
Showing 88 changed files with 7,869 additions and 7,869 deletions.
6 changes: 5 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@
end_of_line = lf
indent_style = space
indent_size = 2
charset = utf-8
charset = utf-8

[*.go]
indent_style = tab
tab_width = 2
18 changes: 9 additions & 9 deletions cmd/backend/backend.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package main

import (
"github.com/JetBrains/ij-perf-report-aggregator/pkg/server"
"go.deanishe.net/env"
"log/slog"
"os"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/server"
"go.deanishe.net/env"
"log/slog"
"os"
)

func main() {
err := server.Serve(env.Get("CLICKHOUSE", server.DefaultDbUrl), env.Get("NATS", ""))
if err != nil {
slog.Error("error on starting backend", "error", err)
os.Exit(1)
}
err := server.Serve(env.Get("CLICKHOUSE", server.DefaultDbUrl), env.Get("NATS", ""))
if err != nil {
slog.Error("error on starting backend", "error", err)
os.Exit(1)
}
}
396 changes: 198 additions & 198 deletions cmd/benchmark/clickhouse_perf_test.go

Large diffs are not rendered by default.

194 changes: 97 additions & 97 deletions cmd/clickhouse-backup/main.go
Original file line number Diff line number Diff line change
@@ -1,110 +1,110 @@
package main

import (
"context"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/backup"
"github.com/Altinity/clickhouse-backup/pkg/status"
clickhousebackup "github.com/JetBrains/ij-perf-report-aggregator/pkg/clickhouse-backup"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"github.com/nats-io/nats.go"
"go.deanishe.net/env"
"log/slog"
"os"
"time"
"context"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/backup"
"github.com/Altinity/clickhouse-backup/pkg/status"
clickhousebackup "github.com/JetBrains/ij-perf-report-aggregator/pkg/clickhouse-backup"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"github.com/nats-io/nats.go"
"go.deanishe.net/env"
"log/slog"
"os"
"time"
)

func main() {
err := start("nats://" + env.Get("NATS", "nats:4222"))
if err != nil {
slog.Error("cannot start backup", "err", err)
os.Exit(1)
}
err := start("nats://" + env.Get("NATS", "nats:4222"))
if err != nil {
slog.Error("cannot start backup", "err", err)
os.Exit(1)
}
}

func start(natsUrl string) error {
taskContext, cancel := util.CreateCommandContext()
defer cancel()

if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
clickhousebackup.SetS3EnvForLocalRun()
}

backuper := clickhousebackup.CreateBackuper()

if env.GetBool("DO_BACKUP") {
err := executeBackup(taskContext, backuper)
return err
}

slog.Info("started", "nats", natsUrl)
nc, err := nats.Connect(natsUrl)
if err != nil {
return fmt.Errorf("cannot connect to nats: %w", err)
}

sub, err := nc.SubscribeSync("db.backup")
if err != nil {
return fmt.Errorf("cannot subscribe to db.backup: %w", err)
}

lastBackupTime := time.Time{}
for taskContext.Err() == nil {
_, err = sub.NextMsgWithContext(taskContext)
if err != nil {
contextError := taskContext.Err()
if contextError != nil {
slog.Info("cancelled", "reason", contextError)
return nil
}
return fmt.Errorf("cannot receive message: %w", err)
}

if taskContext.Err() != nil {
return nil
}

if time.Since(lastBackupTime) < 24*time.Hour {
// do not create backups too often
slog.Info("backup request skipped", "reason", "time threshold", "lastBackupTime", lastBackupTime)
continue
}

slog.Info("backup requested")
err = executeBackup(taskContext, backuper)
if err != nil {
slog.Error("cannot backup", "error", err)
} else {
lastBackupTime = time.Now()
}
}

return nil
taskContext, cancel := util.CreateCommandContext()
defer cancel()

if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
clickhousebackup.SetS3EnvForLocalRun()
}

backuper := clickhousebackup.CreateBackuper()

if env.GetBool("DO_BACKUP") {
err := executeBackup(taskContext, backuper)
return err
}

slog.Info("started", "nats", natsUrl)
nc, err := nats.Connect(natsUrl)
if err != nil {
return fmt.Errorf("cannot connect to nats: %w", err)
}

sub, err := nc.SubscribeSync("db.backup")
if err != nil {
return fmt.Errorf("cannot subscribe to db.backup: %w", err)
}

lastBackupTime := time.Time{}
for taskContext.Err() == nil {
_, err = sub.NextMsgWithContext(taskContext)
if err != nil {
contextError := taskContext.Err()
if contextError != nil {
slog.Info("cancelled", "reason", contextError)
return nil
}
return fmt.Errorf("cannot receive message: %w", err)
}

if taskContext.Err() != nil {
return nil
}

if time.Since(lastBackupTime) < 24*time.Hour {
// do not create backups too often
slog.Info("backup request skipped", "reason", "time threshold", "lastBackupTime", lastBackupTime)
continue
}

slog.Info("backup requested")
err = executeBackup(taskContext, backuper)
if err != nil {
slog.Error("cannot backup", "error", err)
} else {
lastBackupTime = time.Now()
}
}

return nil
}

func executeBackup(taskContext context.Context, backuper *backup.Backuper) error {
backupName := backup.NewBackupName()
logger := slog.With("backup", backupName)

err := backuper.CreateBackup(backupName, "", nil, false, false, false, false, "unknown", status.NotFromAPI)
if err != nil {
return fmt.Errorf("cannot create backup: %w", err)
}

if taskContext.Err() != nil {
return nil
}

logger.Info("upload")
err = backuper.Upload(backupName, "", "", "", nil, false, false, status.NotFromAPI)
if err != nil {
return err
}

if taskContext.Err() != nil {
return nil
}

logger.Info("uploaded")
return nil
backupName := backup.NewBackupName()
logger := slog.With("backup", backupName)

err := backuper.CreateBackup(backupName, "", nil, false, false, false, false, "unknown", status.NotFromAPI)
if err != nil {
return fmt.Errorf("cannot create backup: %w", err)
}

if taskContext.Err() != nil {
return nil
}

logger.Info("upload")
err = backuper.Upload(backupName, "", "", "", nil, false, false, status.NotFromAPI)
if err != nil {
return err
}

if taskContext.Err() != nil {
return nil
}

logger.Info("uploaded")
return nil
}
132 changes: 66 additions & 66 deletions cmd/clickhouse-tasks/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,83 @@
package main

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"log"
"os"
"strings"
"time"
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"log"
"os"
"strings"
"time"
)

func main() {
taskContext, cancel := util.CreateCommandContext()
defer cancel()
taskContext, cancel := util.CreateCommandContext()
defer cancel()

err := execute(taskContext)
if err != nil {
log.Fatalf("%+v", err)
}
err := execute(taskContext)
if err != nil {
log.Fatalf("%+v", err)
}
}

func execute(taskContext context.Context) error {
backupDir := "/Volumes/data/ch-backup"
entries, err := os.ReadDir(backupDir)
if err != nil {
return fmt.Errorf("%w", err)
}
backupDir := "/Volumes/data/ch-backup"
entries, err := os.ReadDir(backupDir)
if err != nil {
return fmt.Errorf("%w", err)
}

db, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9000"},
ConnMaxLifetime: 6 * time.Hour,
DialTimeout: time.Hour,
ReadTimeout: time.Hour,
Settings: map[string]interface{}{
// https://github.com/ClickHouse/ClickHouse/issues/2833
// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
"send_timeout": 30_000,
"receive_timeout": 3000,
"max_memory_usage": 100000000000,
},
})
if err != nil {
return fmt.Errorf("%w", err)
}
db, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9000"},
ConnMaxLifetime: 6 * time.Hour,
DialTimeout: time.Hour,
ReadTimeout: time.Hour,
Settings: map[string]interface{}{
// https://github.com/ClickHouse/ClickHouse/issues/2833
// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
"send_timeout": 30_000,
"receive_timeout": 3000,
"max_memory_usage": 100000000000,
},
})
if err != nil {
return fmt.Errorf("%w", err)
}

for _, entry := range entries {
name := entry.Name()
if name == "backup" || name[0] == '.' {
continue
}
for _, entry := range entries {
name := entry.Name()
if name == "backup" || name[0] == '.' {
continue
}

dbAndTable := strings.SplitN(name, "_", 2)
dbName := dbAndTable[0]
tableName := dbAndTable[1]
dbAndTable := strings.SplitN(name, "_", 2)
dbName := dbAndTable[0]
tableName := dbAndTable[1]

err = db.Exec(taskContext, "create database IF NOT EXISTS "+dbName)
if err != nil {
return fmt.Errorf("%w", err)
}
err = db.Exec(taskContext, "create database IF NOT EXISTS "+dbName)
if err != nil {
return fmt.Errorf("%w", err)
}

query := "RESTORE TABLE " + dbName + "." + tableName + " FROM Disk('backups', '" + name + "')"
log.Println(query)
err = db.Exec(taskContext, query)
if err != nil {
return fmt.Errorf("%w", err)
}
query := "RESTORE TABLE " + dbName + "." + tableName + " FROM Disk('backups', '" + name + "')"
log.Println(query)
err = db.Exec(taskContext, query)
if err != nil {
return fmt.Errorf("%w", err)
}

// tableSqlFile := filepath.Join(backupDir, name, "metadata", dbName, tableName+".sql")
// sql, err := os.ReadFile(tableSqlFile)
// if err != nil {
// return fmt.Errorf("%w", err)
// }
//
// fixedSql := bytes.Replace(sql, []byte(", storage_policy = 's3'"), []byte(""), 1)
// err = os.WriteFile(tableSqlFile, fixedSql, 0666)
// if err != nil {
// return fmt.Errorf("%w", err)
// }
}
return nil
// tableSqlFile := filepath.Join(backupDir, name, "metadata", dbName, tableName+".sql")
// sql, err := os.ReadFile(tableSqlFile)
// if err != nil {
// return fmt.Errorf("%w", err)
// }
//
// fixedSql := bytes.Replace(sql, []byte(", storage_policy = 's3'"), []byte(""), 1)
// err = os.WriteFile(tableSqlFile, fixedSql, 0666)
// if err != nil {
// return fmt.Errorf("%w", err)
// }
}
return nil
}
Loading

0 comments on commit 6343f79

Please sign in to comment.