Skip to content

Commit

Permalink
apply concrete implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsaniara committed Aug 18, 2021
1 parent 6df2c37 commit 3d72334
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 271 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# goInterLock
![Go Interval Lock](material/gointerlock_bg.png)

_known as: ⏰ Interval (Cron / Job / Task / Scheduler) Go Distributed Lock ⏱️_
_known as: ⏰ Interval (Cron / Job / Task / Scheduler) Go Centralized Lock ⏱️_

## **Go** **Interval** job timer, with distributed **Lock**
## Go Interval job timer, with centralized Lock for Distributed Systems

`goInterLock` is go job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,
`goInterLock` is go job/task scheduler with centralized locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

**For example:** if your application has a task of calling some external APIs or doing some DB querying every 10 minutes, the lock prevents the process been run in every instance of that application, and you ended up running that task multiple time every 10 minutes. (let say in **kubernetes**)

Expand Down
5 changes: 3 additions & 2 deletions example/redis/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package main
import (
"context"
"fmt"
"github.com/ehsaniara/gointerlock"
"log"
"time"

"github.com/ehsaniara/gointerlock"
)

func myJob() {
Expand All @@ -21,7 +22,7 @@ func main() {
Interval: 2 * time.Second,
Arg: myJob,
RedisHost: "localhost:6379",
RedisPassword: "MyRedisPassword",
RedisPassword: "secret",
}

//test cron
Expand Down
206 changes: 44 additions & 162 deletions goInterval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@ package gointerlock
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/go-redis/redis/v8"
"log"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/go-redis/redis/v8"
)

var locker Locker
var locker Lock

type LockVendor int32

Expand Down Expand Up @@ -95,9 +90,35 @@ func (t *GoInterval) Run(ctx context.Context) error {
}
}

err := t.init(ctx)
if err != nil {
return err
switch t.LockVendor {
case RedisLock:
r := &RedisLocker{
redisConnector: t.RedisConnector,
Name: t.Name,
RedisHost: t.RedisHost,
RedisPassword: t.RedisPassword,
RedisDB: t.RedisDB,
}
err := r.SetClient()
if err != nil {
return err
}

locker = r
case AwsDynamoDbLock:
d := &DynamoDbLocker{
AwsDynamoDbRegion: t.AwsDynamoDbRegion,
AwsDynamoDbEndpoint: t.AwsDynamoDbEndpoint,
AwsDynamoDbAccessKeyID: t.AwsDynamoDbAccessKeyID,
AwsDynamoDbSecretAccessKey: t.AwsDynamoDbSecretAccessKey,
AwsDynamoDbSessionToken: t.AwsDynamoDbSessionToken,
}
err := d.SetClient()
if err != nil {
return err
}

locker = d
}

t.updateTimer()
Expand Down Expand Up @@ -128,168 +149,29 @@ func (t *GoInterval) Run(ctx context.Context) error {
}
}

func (t *GoInterval) init(ctx context.Context) error {
// distributed mod is enabled
switch t.LockVendor {
case RedisLock:

//if given connection is null the use the built-in one
if t.RedisConnector == nil {

log.Printf("Job %s started in distributed mode!", t.Name)

//if Redis host missed, use the default one
if t.RedisHost == "" {
t.RedisHost = "localhost:6379"
}

locker.redisConnector = redis.NewClient(&redis.Options{
Addr: t.RedisHost,
Password: t.RedisPassword, // no password set
DB: 0, // use default DB
})

} else {
// set the connection
locker.redisConnector = t.RedisConnector
}

//validate the connection
if locker.redisConnector.Conn(ctx) == nil {
return errors.New("`Redis Connection Failed!`")
}

log.Printf("Job %s started in distributed mode by provided redis connection", t.Name)

case AwsDynamoDbLock:

// override the AWS profile credentials
if aws.String(t.AwsDynamoDbEndpoint) == nil {
// Initialize a session that the SDK will use to load
// credentials from the shared credentials file ~/.aws/credentials
// and region from the shared configuration file ~/.aws/config.
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
// Create DynamoDB client
locker.dynamoClient = dynamodb.New(sess)
} else {

if aws.String(t.AwsDynamoDbRegion) == nil {
return errors.New("`AwsDynamoDbRegion is missing (AWS Region)`")
}

//setting StaticCredentials
awsConfig := &aws.Config{
Credentials: credentials.NewStaticCredentials(t.AwsDynamoDbAccessKeyID, t.AwsDynamoDbSecretAccessKey, t.AwsDynamoDbSessionToken),
Region: aws.String(t.AwsDynamoDbRegion),
Endpoint: aws.String(t.AwsDynamoDbEndpoint),
}
sess, err := session.NewSession(awsConfig)
if err != nil {
return err
}
// Create DynamoDB client
locker.dynamoClient = dynamodb.New(sess)
}

//sess, err := session.NewSession(&aws.Config{
// Region: aws.String("us-west-2"),
// Credentials: credentials.NewStaticCredentials(conf.AWS_ACCESS_KEY_ID, conf.AWS_SECRET_ACCESS_KEY, ""),
//})

if locker.dynamoClient == nil {
return errors.New("`DynamoDb Connection Failed!`")
}

//check if table exist, if not create one
tableInput := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("id"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("id"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(10),
},
//TimeToLiveDescription: &dynamodb.TimeToLiveDescription{
// AttributeName: aws.String("ttl"),
// TimeToLiveStatus: aws.String("enable"),
//},
TableName: aws.String(Prefix),
}

_, err := locker.dynamoClient.CreateTable(tableInput)
if err != nil {
log.Printf("Got error calling CreateTable: %s", err)
} else {
fmt.Println("Created the table", Prefix)
}

default:

}
return nil
}

func (t *GoInterval) isNotLockThenLock(ctx context.Context) (bool, error) {

// distributed mod is enabled
switch t.LockVendor {
case RedisLock:

locked, err := locker.RedisLock(ctx, t.Name, t.Interval)

if err != nil {
return false, err
}
return locked, nil

case AwsDynamoDbLock:

locked, err := locker.DynamoDbLock(ctx, t.Name, t.Interval)

if err != nil {
return false, err
}
return locked, nil

default:

// no distributed lock
//lock
if t.LockVendor == SingleApp {
return true, nil
}
locked, err := locker.Lock(ctx, t.Name, t.Interval)

if err != nil {
log.Fatalf("err:%v", err)
return false, err
}
return locked, nil
}

func (t *GoInterval) UnLock(ctx context.Context) {
//unlock
switch t.LockVendor {
case RedisLock:

err := locker.RedisUnlock(ctx, t.Name)
if err != nil {
return
}

case AwsDynamoDbLock:

err := locker.DynamoDbUnlock(ctx, t.Name)
if err != nil {
return
}

default:
if t.LockVendor == SingleApp {
return
}

// no distributed lock
err := locker.UnLock(ctx, t.Name)
if err != nil {
return
}
}
Expand Down
104 changes: 0 additions & 104 deletions goIntervalLock.go

This file was deleted.

Loading

0 comments on commit 3d72334

Please sign in to comment.