Skip to content

Commit

Permalink
Change git operations from fetch to pull (#6668)
Browse files Browse the repository at this point in the history
* Update to using FETCH_HEAD since we are using FETCH which doesnt update HEAD.

* Update to using FETCH_HEAD since we are using FETCH which doesnt update HEAD.

* Use pull and not fetch

* add comments

* fix linting

* Update CHANGELOG.md

Co-authored-by: Robert Fratto <[email protected]>

---------

Co-authored-by: Robert Fratto <[email protected]>
  • Loading branch information
mattdurham and rfratto authored Mar 13, 2024
1 parent 954f411 commit 677b687
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Main (unreleased)

- Fix a bug where structured metadata and parsed field are not passed further in `loki.source.api` (@marchellodev)

- Change `import.git` to use Git pulls rather than fetches to fix scenarios where the local code did not get updated. (@mattdurham)

### Other changes

- Clustering for Grafana Agent in Flow mode has graduated from beta to stable.
Expand Down
12 changes: 6 additions & 6 deletions docs/sources/flow/reference/components/module.git.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ module.git "LABEL" {

The following arguments are supported:

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`repository` | `string` | The Git repository address to retrieve the module from. | | yes
`revision` | `string` | The Git revision to retrieve the module from. | `"HEAD"` | no
`path` | `string` | The path in the repository where the module is stored. | | yes
`pull_frequency` | `duration` | The frequency to pull the repository for updates. | `"60s"` | no
Name | Type | Description | Default | Required
-----------------|------------|---------------------------------------------------------|----------|---------
`repository` | `string` | The Git repository address to retrieve the module from. | | yes
`revision` | `string` | The Git revision to retrieve the module from. | `"HEAD"` | no
`path` | `string` | The path in the repository where the module is stored. | | yes
`pull_frequency` | `duration` | The frequency to pull the repository for updates. | `"60s"` | no

The `repository` attribute must be set to a repository address that would be
recognized by Git with a `git clone REPOSITORY_ADDRESS` command, such as
Expand Down
96 changes: 96 additions & 0 deletions internal/flow/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io/fs"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -250,6 +251,101 @@ func TestImportError(t *testing.T) {
}
}

func TestPullUpdating(t *testing.T) {
// Previously we used fetch instead of pull, which would set the FETCH_HEAD but not HEAD
// This caused changes not to propagate if there were changes, since HEAD was pinned to whatever it was on the initial download.
// Switching to pull removes this problem at the expense of network bandwidth.
// Tried switching to FETCH_HEAD but FETCH_HEAD is only set on fetch and not initial repo clone so we would need to
// remember to always call fetch after clone.
//
// This test ensures we can pull the correct values down if they update no matter what, it works by creating a local
// file based git repo then committing a file, running the component, then updating the file in the repo.
testRepo := t.TempDir()

contents := `declare "add" {
argument "a" {}
argument "b" {}
export "sum" {
value = argument.a.value + argument.b.value
}
}`
main := `
import.git "testImport" {
repository = "` + testRepo + `"
path = "math.river"
pull_frequency = "5s"
}
testImport.add "cc" {
a = 1
b = 1
}
`
init := exec.Command("git", "init", testRepo)
err := init.Run()
require.NoError(t, err)
math := filepath.Join(testRepo, "math.river")
err = os.WriteFile(math, []byte(contents), 0666)
require.NoError(t, err)
add := exec.Command("git", "add", ".")
add.Dir = testRepo
err = add.Run()
require.NoError(t, err)
commit := exec.Command("git", "commit", "-m \"test\"")
commit.Dir = testRepo
err = commit.Run()
require.NoError(t, err)

defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, main)
err = ctrl.LoadSource(f, nil)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 2
}, 3*time.Second, 10*time.Millisecond)

contentsMore := `declare "add" {
argument "a" {}
argument "b" {}
export "sum" {
value = argument.a.value + argument.b.value + 1
}
}`
err = os.WriteFile(math, []byte(contentsMore), 0666)
require.NoError(t, err)
add2 := exec.Command("git", "add", ".")
add2.Dir = testRepo
add2.Run()

commit2 := exec.Command("git", "commit", "-m \"test2\"")
commit2.Dir = testRepo
commit2.Run()

// Check for final condition.
require.Eventually(t, func() bool {
export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc")
return export["sum"] == 3
}, 20*time.Second, 1*time.Millisecond)
}

func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
Expand Down
25 changes: 16 additions & 9 deletions internal/vcs/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type GitRepo struct {
// managed at storagePath.
//
// If storagePath is empty on disk, NewGitRepo initializes GitRepo by cloning
// the repository. Otherwise, NewGitRepo will do a fetch.
// the repository. Otherwise, NewGitRepo will do a pull.
//
// After GitRepo is initialized, it checks out to the Revision specified in
// GitRepoOptions.
Expand Down Expand Up @@ -58,13 +58,20 @@ func NewGitRepo(ctx context.Context, storagePath string, opts GitRepoOptions) (*
}
}

// Fetch the latest contents. This may be a no-op if we just did a clone.
fetchRepoErr := repo.FetchContext(ctx, &git.FetchOptions{
// Pulls the latest contents. This may be a no-op if we just did a clone.
wt, err := repo.Worktree()
if err != nil {
return nil, DownloadFailedError{
Repository: opts.Repository,
Inner: err,
}
}
pullRepoErr := wt.PullContext(ctx, &git.PullOptions{
RemoteName: "origin",
Force: true,
Auth: opts.Auth.Convert(),
})
if fetchRepoErr != nil && !errors.Is(fetchRepoErr, git.NoErrAlreadyUpToDate) {
if pullRepoErr != nil && !errors.Is(pullRepoErr, git.NoErrAlreadyUpToDate) {
workTree, err := repo.Worktree()
if err != nil {
return nil, err
Expand All @@ -75,7 +82,7 @@ func NewGitRepo(ctx context.Context, storagePath string, opts GitRepoOptions) (*
workTree: workTree,
}, UpdateFailedError{
Repository: opts.Repository,
Inner: fetchRepoErr,
Inner: pullRepoErr,
}
}

Expand Down Expand Up @@ -109,19 +116,19 @@ func isRepoCloned(dir string) bool {
return dirError == nil && len(fi) > 0
}

// Update updates the repository by fetching new content and re-checking out to
// Update updates the repository by pulling new content and re-checking out to
// latest version of Revision.
func (repo *GitRepo) Update(ctx context.Context) error {
var err error
fetchRepoErr := repo.repo.FetchContext(ctx, &git.FetchOptions{
pullRepoErr := repo.workTree.PullContext(ctx, &git.PullOptions{
RemoteName: "origin",
Force: true,
Auth: repo.opts.Auth.Convert(),
})
if fetchRepoErr != nil && !errors.Is(fetchRepoErr, git.NoErrAlreadyUpToDate) {
if pullRepoErr != nil && !errors.Is(pullRepoErr, git.NoErrAlreadyUpToDate) {
return UpdateFailedError{
Repository: repo.opts.Repository,
Inner: fetchRepoErr,
Inner: pullRepoErr,
}
}

Expand Down

0 comments on commit 677b687

Please sign in to comment.