Skip to content

Commit

Permalink
Support Consul namespaces/partitions/peering
Browse files Browse the repository at this point in the history
  • Loading branch information
lkysow committed Oct 13, 2023
1 parent 4e1d5b9 commit 6bac956
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 14 deletions.
31 changes: 31 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
keyRe = `/?(?P<key>[^@]+)`
filterRe = `(\|(?P<filter>[[:word:]\,]+))?`
serviceNameRe = `(?P<name>[[:word:]\-\_]+)`
queryRe = `(?P<query>\?[[:word:]\-\_\=\&]+)`
nodeNameRe = `(?P<name>[[:word:]\.\-\_]+)`
nearRe = `(~(?P<near>[[:word:]\.\-\_]+))?`
prefixRe = `/?(?P<prefix>[^@]+)`
Expand Down Expand Up @@ -66,6 +67,9 @@ type QueryOptions struct {
VaultGrace time.Duration
WaitIndex uint64
WaitTime time.Duration
Peer string
Partition string
Namespace string
}

func (q *QueryOptions) Merge(o *QueryOptions) *QueryOptions {
Expand Down Expand Up @@ -117,13 +121,28 @@ func (q *QueryOptions) Merge(o *QueryOptions) *QueryOptions {
r.WaitTime = o.WaitTime
}

if o.Namespace != "" {
r.Namespace = o.Namespace
}

if o.Partition != "" {
r.Partition = o.Partition
}

if o.Peer != "" {
r.Peer = o.Peer
}

return &r
}

func (q *QueryOptions) ToConsulOpts() *consulapi.QueryOptions {
return &consulapi.QueryOptions{
AllowStale: q.AllowStale,
Datacenter: q.Datacenter,
Namespace: q.Namespace,
Partition: q.Partition,
Peer: q.Peer,
Near: q.Near,
RequireConsistent: q.RequireConsistent,
WaitIndex: q.WaitIndex,
Expand Down Expand Up @@ -162,6 +181,18 @@ func (q *QueryOptions) String() string {
u.Add("region", q.Region)
}

if q.Namespace != "" {
u.Add("namespace", q.Namespace)
}

if q.Peer != "" {
u.Add("peer", q.Peer)
}

if q.Partition != "" {
u.Add("partition", q.Partition)
}

if q.Near != "" {
u.Add("near", q.Near)
}
Expand Down
66 changes: 52 additions & 14 deletions dependency/health_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
HealthCritical = "critical"
HealthMaint = "maintenance"

QueryNamespace = "ns"
QueryPartition = "partition"
QueryPeer = "peer"

NodeMaint = "_node_maintenance"
ServiceMaint = "_service_maintenance:"
)
Expand All @@ -32,7 +36,7 @@ var (
_ Dependency = (*HealthServiceQuery)(nil)

// HealthServiceQueryRe is the regular expression to use.
HealthServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + dcRe + nearRe + filterRe + `\z`)
HealthServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + queryRe + dcRe + nearRe + filterRe + `\z`)
)

func init() {
Expand Down Expand Up @@ -62,12 +66,15 @@ type HealthService struct {
type HealthServiceQuery struct {
stopCh chan struct{}

dc string
filters []string
name string
near string
tag string
connect bool
dc string
filters []string
name string
near string
tag string
connect bool
partition string
peer string
namespace string
}

// NewHealthServiceQuery processes the strings to build a service dependency.
Expand Down Expand Up @@ -110,14 +117,42 @@ func healthServiceQuery(s string, connect bool) (*HealthServiceQuery, error) {
filters = []string{HealthPassing}
}

// Parse optional query into key pairs.
queryParams := make(map[string]string)
if queryRaw := m["query"]; queryRaw != "" {
queryRawStripped := strings.TrimPrefix(queryRaw, "?")
keyPairs := strings.Split(queryRawStripped, "&")
for _, keyPair := range keyPairs {
split := strings.Split(keyPair, "=")
if len(split) != 2 {
return nil, fmt.Errorf(
"health.service: invalid query: %q expected key pair %q to have two parts when split on \"=\"", queryRawStripped, keyPair)
}

key, val := split[0], split[1]
switch key {
case QueryNamespace,
QueryPeer,
QueryPartition:
queryParams[key] = val
default:
return nil,
fmt.Errorf("health.service: invalid query parameter key %q in query %q: supported keys: %s,%s,%s", key, queryRawStripped, QueryNamespace, QueryPeer, QueryPartition)
}
}
}

return &HealthServiceQuery{
stopCh: make(chan struct{}, 1),
dc: m["dc"],
filters: filters,
name: m["name"],
near: m["near"],
tag: m["tag"],
connect: connect,
stopCh: make(chan struct{}, 1),
dc: m["dc"],
filters: filters,
name: m["name"],
near: m["near"],
tag: m["tag"],
connect: connect,
namespace: queryParams[QueryNamespace],
peer: queryParams[QueryPeer],
partition: queryParams[QueryPartition],
}, nil
}

Expand All @@ -133,6 +168,9 @@ func (d *HealthServiceQuery) Fetch(clients *ClientSet, opts *QueryOptions) (inte
opts = opts.Merge(&QueryOptions{
Datacenter: d.dc,
Near: d.near,
Namespace: d.namespace,
Partition: d.partition,
Peer: d.peer,
})

u := &url.URL{
Expand Down

0 comments on commit 6bac956

Please sign in to comment.