Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid overwrite if the task is dirty #280

Merged
merged 3 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ func (c *Client) PhysicalDelete(ctx context.Context, id string) error {
}

// Retrieve retrieves all tasks
func (c *Client) Retrieve(
ctx context.Context) (map[string]*service.Task, error) {
func (c *Client) Retrieve(ctx context.Context) (map[string]*service.Task, error) {
var tasks map[string]*service.Task

err := c.withClient(
Expand All @@ -114,8 +113,7 @@ func (c *Client) Retrieve(
}

// Retrieve retrieves all tasks
func (c *Client) RetrieveAll(
ctx context.Context) (map[string]*service.Task, error) {
func (c *Client) RetrieveAll(ctx context.Context) (map[string]*service.Task, error) {
var tasks map[string]*service.Task

err := c.withClient(
Expand All @@ -133,9 +131,7 @@ func (c *Client) RetrieveAll(
}

// UpdatePriority updates tasks' priorities
func (c *Client) UpdatePriority(
ctx context.Context,
priorities map[string]*service.Priority) (map[string]*service.Priority, error) {
func (c *Client) UpdatePriority(ctx context.Context, priorities map[string]*service.Priority) (map[string]*service.Priority, error) {
var ret map[string]*service.Priority

err := c.withClient(func(hc service.HashiraClient) error {
Expand All @@ -154,8 +150,7 @@ func (c *Client) UpdatePriority(
}

// RetrievePriority retrieves tasks' priorities
func (c *Client) RetrievePriority(
ctx context.Context) (map[string]*service.Priority, error) {
func (c *Client) RetrievePriority(ctx context.Context) (map[string]*service.Priority, error) {
var ret map[string]*service.Priority

err := c.withClient(func(hc service.HashiraClient) error {
Expand Down
59 changes: 15 additions & 44 deletions cmd/hashira-cui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"os/user"
"path/filepath"
"time"

"github.com/pankona/gocui"
hashirac "github.com/pankona/hashira/client"
Expand Down Expand Up @@ -68,16 +67,6 @@ func main() {
d.Stop()
}()

// Start synchronization with cloud if HASHIRA_ACCESS_TOKEN is set
accesstoken, ok := os.LookupEnv("HASHIRA_ACCESS_TOKEN")
var isAccessTokenValid bool
if ok {
if err := startSync(context.Background(), daemonPort, accesstoken); err != nil {
log.Printf("failed to start synchronization: %v", err)
}
isAccessTokenValid = true
}

// initialize gocui
// specify false means: supportOverlaps = false
g, err := gocui.NewGui(gocui.OutputNormal, false)
Expand All @@ -90,8 +79,22 @@ func main() {
hashirac := &hashirac.Client{Address: fmt.Sprintf("localhost:%d", daemonPort)}
syncclient := &syncutil.Client{DaemonPort: daemonPort}
m := NewModel(hashirac, syncclient)
if isAccessTokenValid {

// Start synchronization with cloud if HASHIRA_ACCESS_TOKEN is set
accesstoken, ok := os.LookupEnv("HASHIRA_ACCESS_TOKEN")

if ok {
sc := syncutil.Client{DaemonPort: daemonPort}
err := sc.TestAccessToken(accesstoken)
if err != nil {
log.Printf("HASHIRA_ACCESSTOKEN is invalid. Synchronization is not started: %v", err)
}
m.SetAccessToken(accesstoken)
go func() {
if err := m.SyncOnNotify(context.Background()); err != nil {
log.Printf("sync on notify finished: %v", err)
}
}()
}

// prepare controller
Expand Down Expand Up @@ -126,35 +129,3 @@ func main() {
log.Panicln(err)
}
}

func startSync(ctx context.Context, daemonPort int, accesstoken string) error {
log.Printf("start synchronization...\n")

sc := syncutil.Client{DaemonPort: daemonPort}
err := sc.TestAccessToken(accesstoken)
if err != nil {
return fmt.Errorf("HASHIRA_ACCESSTOKEN is invalid. Synchronization is not started: %w", err)
}
log.Printf("HASHIRA_ACCESSTOKEN is valid. hashira-web will work!\n")

const syncInterval = 10 * time.Minute

go func() {
for {
select {
case <-ctx.Done():
break
default:
if err := sc.Upload(accesstoken, syncutil.UploadDirtyOnly); err != nil {
log.Printf("failed to upload: %v", err)
}
if err := sc.Download(accesstoken); err != nil {
log.Printf("failed to download: %v", err)
}
<-time.After(syncInterval)
}
}
}()

return nil
}
71 changes: 52 additions & 19 deletions cmd/hashira-cui/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"time"

hashirac "github.com/pankona/hashira/client"
"github.com/pankona/hashira/service"
Expand All @@ -16,12 +19,14 @@ type Model struct {

// TODO: remove if accesstoken is held by syncclient
accesstoken string
syncChan chan struct{}
}

func NewModel(hc *hashirac.Client, sc *syncutil.Client) *Model {
return &Model{
hashirac: hc,
syncclient: sc,
syncChan: make(chan struct{}),
}
}

Expand All @@ -41,9 +46,7 @@ func (m *Model) UpdatePriority(ctx context.Context, p map[string]*service.Priori
if err != nil {
return nil, fmt.Errorf("failed to update priority: %w", err)
}
if err := m.sync(context.Background()); err != nil {
return nil, fmt.Errorf("failed to sync: %w", err)
}
m.NotifySync()

return p, nil
}
Expand All @@ -52,45 +55,75 @@ func (m *Model) Create(ctx context.Context, task *service.Task) error {
if err := m.hashirac.Create(ctx, task); err != nil {
return fmt.Errorf("failed to create a new task: %w", err)
}
if err := m.sync(context.Background()); err != nil {
return fmt.Errorf("failed to sync: %w", err)
}
m.NotifySync()

return nil
}

func (m *Model) Update(ctx context.Context, task *service.Task) error {
if err := m.hashirac.Update(ctx, task); err != nil {
return fmt.Errorf("failed to update a task: %w", err)
}
if err := m.sync(context.Background()); err != nil {
return fmt.Errorf("failed to sync: %w", err)
}
m.NotifySync()

return nil
}

func (m *Model) Delete(ctx context.Context, id string) error {
if err := m.hashirac.Delete(ctx, id); err != nil {
return fmt.Errorf("failed to delete a task: %w", err)
}
if err := m.sync(context.Background()); err != nil {
return fmt.Errorf("failed to sync: %w", err)
}
m.NotifySync()

return nil
}

func (m *Model) SetAccessToken(accesstoken string) {
m.accesstoken = accesstoken
}

func (m *Model) sync(ctx context.Context) error {
func (m *Model) NotifySync() {
select {
case m.syncChan <- struct{}{}:
default:
}
}

var errSyncOnNotifyCanceled = errors.New("sync on notify has been canceled")

func (m *Model) SyncOnNotify(ctx context.Context) error {
if m.accesstoken == "" {
return nil
}
if err := m.syncclient.Upload(m.accesstoken, syncutil.UploadDirtyOnly); err != nil {
return fmt.Errorf("failed to upload tasks: %w", err)
}
if err := m.syncclient.Download(m.accesstoken); err != nil {
return fmt.Errorf("failed to download tasks: %w", err)

var cancelFunc context.CancelFunc

for {
select {
case <-ctx.Done():
return errSyncOnNotifyCanceled
case <-m.syncChan:
if cancelFunc != nil {
cancelFunc()
}

ctx, cancel := context.WithCancel(ctx)
cancelFunc = cancel

go func(ctx context.Context) {
select {
case <-ctx.Done():
// do nothing
case <-time.After(1 * time.Minute):
if err := m.syncclient.Upload(m.accesstoken, syncutil.UploadDirtyOnly); err != nil {
log.Printf("failed to upload tasks: %v", err)
}
if err := m.syncclient.Download(m.accesstoken); err != nil {
log.Printf("failed to download tasks: %v", err)
}

}
}(ctx)
}
}
return nil
}
31 changes: 30 additions & 1 deletion sync/syncutil/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,28 @@ import (
)

func (c *Client) Download(accesstoken string) error {
cli := &hc.Client{Address: "localhost:" + strconv.Itoa(c.DaemonPort)}

allTasks, err := cli.RetrieveAll(context.Background())
if err != nil {
return fmt.Errorf("failed to retrieve tasks: %w", err)
}
allPriorities, err := cli.RetrievePriority(context.Background())
if err != nil {
return fmt.Errorf("failed to retrieve priorities: %w", err)
}

if dirtyTaskOrPriorityExists(allTasks, allPriorities) {
// don't download and overwrite tasks and priorities since there're some dirty task
return nil
}

sc := sync.NewClient()
result, err := sc.Download(accesstoken)
if err != nil {
return fmt.Errorf("failed to download task and priority: %w", err)
}

cli := &hc.Client{Address: "localhost:" + strconv.Itoa(c.DaemonPort)}
for _, task := range result.Tasks {
err = cli.Update(context.Background(), &service.Task{
Id: task.ID,
Expand Down Expand Up @@ -74,3 +89,17 @@ func unique(ss []string) []string {
}
return ids
}

func dirtyTaskOrPriorityExists(tasks map[string]*service.Task, priorities map[string]*service.Priority) bool {
for _, v := range tasks {
if v.IsDirty {
return true
}
}
for _, v := range priorities {
if v.IsDirty {
return true
}
}
return false
}
26 changes: 26 additions & 0 deletions sync/syncutil/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func newUploadRequest(tasks map[string]*service.Task, priorities map[string]*ser

func (c *Client) Upload(accesstoken string, uploadTarget UploadTarget) error {
cli := &hc.Client{Address: "localhost:" + strconv.Itoa(c.DaemonPort)}

allTasks, err := cli.RetrieveAll(context.Background())
if err != nil {
return fmt.Errorf("failed to retrieve tasks: %w", err)
Expand Down Expand Up @@ -75,6 +76,31 @@ func (c *Client) Upload(accesstoken string, uploadTarget UploadTarget) error {
}
}

// make all tasks and priorities clean since uploading is completed
for _, task := range allTasks {
if !task.IsDirty {
continue
}
err = cli.Update(context.Background(), &service.Task{
Id: task.Id,
Name: task.Name,
Place: task.Place,
IsDeleted: task.IsDeleted,
IsDirty: false,
})
if err != nil {
return fmt.Errorf("failed to update task: %w", err)
}
}

for _, priority := range allPriorities {
priority.IsDirty = false
}
_, err = cli.UpdatePriority(context.Background(), allPriorities)
if err != nil {
return fmt.Errorf("failed to update priority: %w", err)
}

return nil
}

Expand Down