Skip to content

Commit

Permalink
Improved logging and refactored some functions and files
Browse files Browse the repository at this point in the history
  • Loading branch information
DavyLandman committed Apr 24, 2024
1 parent 109dbad commit 2f97a07
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 58 deletions.
81 changes: 31 additions & 50 deletions cmd/backup/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package main

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync"

"github.com/pelletier/go-toml/v2"
log "github.com/sirupsen/logrus"

"github.com/swat-engineering/borg-backup-remotely/internal/borg"
"github.com/swat-engineering/borg-backup-remotely/internal/config"
"github.com/swat-engineering/borg-backup-remotely/internal/util"
)

func readConfig() *config.BorgBackups {
Expand Down Expand Up @@ -74,58 +73,39 @@ type done struct {
Output io.ReadWriter
}

type ThreadSafeBuffer struct {
b bytes.Buffer
m sync.Mutex
}

func (b *ThreadSafeBuffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
func (b *ThreadSafeBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *ThreadSafeBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

func backupServer(backupConfig config.BorgConfig, server config.Server, finished chan done) {
result := done{
Name: server.Name,
Output: new(ThreadSafeBuffer),
}
reportError := func(err error) {
result.Err = err
finished <- result
Output: new(util.ThreadSafeBuffer),
}
result.Err = runBackupServer(backupConfig, server, result.Output)
finished <- result

borg, err := borg.BuildBorg(server, backupConfig, result.Output)
}

func runBackupServer(backupConfig config.BorgConfig, server config.Server, output io.ReadWriter) error {
borg, err := borg.BuildBorg(server, backupConfig, output)
if err != nil {
reportError(err)
return
return err
}
defer borg.Close()

myLog := log.WithField("name", server.Name)

myLog.Info("Connections established")

myLog.Info("Initializing borg repo if needed")
err = borg.ExecLocal("borg init --encryption=repokey-blake2")
if err != nil {
reInit := false
var initError *exec.ExitError
log.Infof("%T", err)
if errors.As(err, &initError) {
reInit = initError.ExitCode() == 2
}
if !reInit {
result.Err = fmt.Errorf("creating borg repo: %w", err)
finished <- result
return
return fmt.Errorf("creating borg repo: %w", err)
} else {
log.Debug("The borg repo was already initialized, which is excepted for daily backups")
myLog.Debug("The borg repo was already initialized, which is excepted for daily backups")
}
}

Expand All @@ -138,28 +118,29 @@ func backupServer(backupConfig config.BorgConfig, server config.Server, finished
cmd = cmd + " '" + p + "'"
}

myLog.Info("Creating new borg archive")
err = borg.ExecRemote(cmd)

if err != nil {
result.Err = fmt.Errorf("creating borg archive failed: %w", err)
finished <- result
return
return fmt.Errorf("creating borg archive failed: %w", err)
}

err = borg.ExecLocal("borg check --verbose")

myLog.Info("Pruning old backups")
err = borg.ExecRemote(fmt.Sprintf("borg prune --lock-wait 600 --stats %s", backupConfig.PruneSetting))
if err != nil {
result.Err = fmt.Errorf("checking borg archive failed: %w", err)
finished <- result
return
return fmt.Errorf("pruning borg archive failed: %w", err)
}

err = borg.ExecLocal(fmt.Sprintf("borg prune --lock-wait 600 --stats %s", backupConfig.PruneSetting))
myLog.Info("Checking if client was maybe messing things up for us")
err = borg.ExecLocal("borg check")
if err != nil {
myLog.WithError(err).Fatal("*** Borg check failed, this tends to indicate someone is messing with the data ***")
return fmt.Errorf("borg check failed, data corruption? : %w", err)
}

myLog.Info("Applying deletes if the checks were successful")
err = borg.ExecLocal("borg compact")
if err != nil {
result.Err = fmt.Errorf("pruning borg archive failed: %w", err)
finished <- result
return
return fmt.Errorf("borg compact failed, data corruption? : %w", err)
}
finished <- result
return nil
}
15 changes: 9 additions & 6 deletions internal/borg/borg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

log "github.com/sirupsen/logrus"
"github.com/swat-engineering/borg-backup-remotely/internal/config"
specialSSH "github.com/swat-engineering/borg-backup-remotely/internal/ssh"
sshCon "github.com/swat-engineering/borg-backup-remotely/internal/ssh"
)

type Borg struct {
backup *specialSSH.SshConnection
target *specialSSH.SshConnection
backup *sshCon.SshConnection
target *sshCon.SshConnection
cfg config.Server
output io.Writer
rootCfg config.BorgConfig
Expand All @@ -28,12 +28,12 @@ func (b *Borg) Close() {
}

func BuildBorg(cfg config.Server, root config.BorgConfig, output io.Writer) (*Borg, error) {
backupConnection, err := specialSSH.SetupConnection(root.Server)
backupConnection, err := sshCon.SetupConnection(root.Server)
if err != nil {
return nil, fmt.Errorf("could not connect to the backup server: %w", err)
}

targetConnection, err := specialSSH.SetupConnection(cfg.Connection)
targetConnection, err := sshCon.SetupConnection(cfg.Connection)
if err != nil {
return nil, fmt.Errorf("could not connect to the target server: %w", err)
}
Expand Down Expand Up @@ -64,6 +64,9 @@ func (b *Borg) ExecRemote(cmd string) error {
"BORG_PASSCOMMAND": "cat -", // take passphrase from stdin
"BORG_RELOCATED_REPO_ACCESS_IS_OK": "yes",
}
if _, err := b.output.Write([]byte("remote> " + cmd + "\n")); err != nil {
return fmt.Errorf("could write to output buffer: %w", err)
}

stdInPassphrase := bytes.NewReader([]byte(b.cfg.BorgTarget.Passphrase + "\r\n"))
return b.target.ExecuteSingleCommand(cmd, stdInPassphrase, b.output, b.output, env)
Expand Down Expand Up @@ -109,7 +112,7 @@ func (b *Borg) ExecLocal(cmd string) error {
borgCommand.Stderr = b.output

if _, err := b.output.Write([]byte("local> " + cmd + "\n")); err != nil {
log.WithError(err).Error("Could not write to output buffer")
return fmt.Errorf("could write to output buffer: %w", err)
}

return borgCommand.Run()
Expand Down
6 changes: 5 additions & 1 deletion internal/ssh/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type SshConnection struct {
}

func SetupConnection(target config.Connection) (SshConnection, error) {
log.WithField("host", target.Host).Debug("Opening ssh client")
con, jumpCon, err := dialSsh(target)
if err != nil {
return SshConnection{}, err
}
log.WithField("host", target.Host).Debug("Opened ssh client")
result := SshConnection{nil, nil, nil, nil}
result.client = con
result.keepAlives = sendKeepAlive(con, 10*time.Second, 30)
Expand All @@ -43,7 +45,7 @@ func (c SshConnection) ExecuteSingleCommand(cmd string, stdIn io.Reader, stdOut
if err != nil {
return fmt.Errorf("opening ssh session: %w", err)
}
log.WithField("session", session).Info("Opened session")
log.Debug("New session opened")
defer session.Close()

for k, v := range env {
Expand All @@ -56,6 +58,8 @@ func (c SshConnection) ExecuteSingleCommand(cmd string, stdIn io.Reader, stdOut
session.Stdout = stdOut
session.Stderr = stdErr

log.WithField("cmd", cmd).Debug("Running command")

return session.Run(cmd)
}

Expand Down
7 changes: 6 additions & 1 deletion internal/ssh/ssh.go → internal/ssh/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func dialSsh(con config.Connection) (*ssh.Client, *ssh.Client, error) {
},
HostKeyCallback: khCallback,
}
log.WithField("config", config).Info("Connecting to server")
log.WithField("config", config).Debug("Connecting to server")

var client *ssh.Client
var proxyJumpClient *ssh.Client
Expand All @@ -70,10 +70,13 @@ func dialSsh(con config.Connection) (*ssh.Client, *ssh.Client, error) {
if !strings.Contains(targetJumpAddr, ":") {
targetJumpAddr += ":22"
}
log.WithField("jump", targetJumpAddr).Debug("Connecting to jump host first")
proxyJumpClient, err = ssh.Dial("tcp", targetJumpAddr, config)
if err != nil {
return nil, nil, fmt.Errorf("connecting to proxyJumpHost: %w", err)
}

log.WithField("target", targetAddr).Debug("Connecting to target via jump")
jumpedDial, err := proxyJumpClient.Dial("tcp", targetAddr)
if err != nil {
defer proxyJumpClient.Close()
Expand All @@ -88,6 +91,7 @@ func dialSsh(con config.Connection) (*ssh.Client, *ssh.Client, error) {
}
client = ssh.NewClient(c, chans, regs)
} else {
log.WithField("target", targetAddr).Debug("Connecting to target")
client, err = ssh.Dial("tcp", targetAddr, config)
if err != nil {
return nil, nil, fmt.Errorf("opening ssh connection: %w", err)
Expand Down Expand Up @@ -120,6 +124,7 @@ func sendKeepAlive(client *ssh.Client, every time.Duration, maxErrors int) chan<
fails = 0
}
case <-done:
log.Debug("Gracefully stop sending keep-alives")
return
}
}
Expand Down
27 changes: 27 additions & 0 deletions internal/util/thread_safe_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"bytes"
"sync"
)

type ThreadSafeBuffer struct {
b bytes.Buffer
m sync.Mutex
}

func (b *ThreadSafeBuffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Read(p)
}
func (b *ThreadSafeBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *ThreadSafeBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

0 comments on commit 2f97a07

Please sign in to comment.