Skip to content

Commit

Permalink
(2.11) ADR-44: JetStream Dynamic Metadata (#5857)
Browse files Browse the repository at this point in the history
Implements
[ADR-44](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-44.md)
Extends #5850

Add dynamic JetStream stream/consumer metadata:
```
_nats.server.version
_nats.server.api_level
```

These are not persisted and are only returned in the response of a
create/update/info request.

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Sep 7, 2024
1 parent 2f6d8a2 commit 69c57cb
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 24 deletions.
18 changes: 10 additions & 8 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,10 +1475,11 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
TimeStamp: time.Now().UTC(),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Expand Down Expand Up @@ -1569,10 +1570,11 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
return
}

msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Expand Down Expand Up @@ -1951,11 +1953,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}

config := mset.config()

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.stateWithDetail(details),
Config: config,
Config: *setDynamicStreamMetadata(&config),
Domain: s.getOpts().JetStreamDomain,
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Expand Down Expand Up @@ -3591,10 +3592,11 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
} else {
msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
TimeStamp: time.Now().UTC(),
}
s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
Expand Down Expand Up @@ -4057,7 +4059,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.ConsumerInfo = o.initialInfo()
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))

if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
Expand Down Expand Up @@ -4403,7 +4405,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
Config: setDynamicConsumerMetadata(ca.Config),
TimeStamp: time.Now().UTC(),
}
b := s.jsonResponse(resp)
Expand Down Expand Up @@ -4446,7 +4448,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}

if resp.ConsumerInfo = obs.info(); resp.ConsumerInfo == nil {
if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil {
// This consumer returned nil which means it's closed. Respond with not found.
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down
15 changes: 9 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3275,10 +3275,11 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
resp.Error = NewJSStreamCreateError(err, Unless(err))
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Expand Down Expand Up @@ -3705,10 +3706,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss

// Send our response.
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Expand Down Expand Up @@ -3772,10 +3774,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme

if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
msetCfg := mset.config()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Config: *setDynamicStreamMetadata(&msetCfg),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Expand Down Expand Up @@ -4485,7 +4488,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
client, subject, reply := ca.Client, ca.Subject, ca.Reply
js.mu.Unlock()
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
return
}
Expand Down Expand Up @@ -4523,7 +4526,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
js.mu.RUnlock()
if !recovering {
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
}
Expand Down Expand Up @@ -5223,7 +5226,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
resp.Error = NewJSConsumerCreateError(err, Unless(err))
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.ConsumerInfo = o.initialInfo()
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo())
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
o.sendCreateAdvisory()
}
Expand Down
57 changes: 57 additions & 0 deletions server/jetstream_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@ const (
JSCreatedVersionMetadataKey = "_nats.created.server.version"
JSCreatedLevelMetadataKey = "_nats.created.server.api_level"
JSRequiredLevelMetadataKey = "_nats.server.require.api_level"
JSServerVersionMetadataKey = "_nats.server.version"
JSServerLevelMetadataKey = "_nats.server.api_level"
)

// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated
//
// Any dynamic metadata is removed, it must not be stored and only be added for responses.
func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
} else {
deleteDynamicMetadata(cfg.Metadata)
}

var prevMetadata map[string]string
Expand All @@ -47,13 +53,29 @@ func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) {
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// setDynamicStreamMetadata adds dynamic fields into the (copied) metadata.
func setDynamicStreamMetadata(cfg *StreamConfig) *StreamConfig {
newCfg := *cfg
newCfg.Metadata = make(map[string]string)
for key, value := range cfg.Metadata {
newCfg.Metadata[key] = value
}
newCfg.Metadata[JSServerVersionMetadataKey] = VERSION
newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel)
return &newCfg
}

// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated
//
// Any dynamic metadata is removed, it must not be stored and only be added for responses.
func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
} else {
deleteDynamicMetadata(cfg.Metadata)
}

var prevMetadata map[string]string
Expand All @@ -78,12 +100,41 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// setDynamicConsumerMetadata adds dynamic fields into the (copied) metadata.
func setDynamicConsumerMetadata(cfg *ConsumerConfig) *ConsumerConfig {
newCfg := *cfg
newCfg.Metadata = make(map[string]string)
for key, value := range cfg.Metadata {
newCfg.Metadata[key] = value
}
newCfg.Metadata[JSServerVersionMetadataKey] = VERSION
newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel)
return &newCfg
}

// setDynamicConsumerInfoMetadata adds dynamic fields into the (copied) metadata.
func setDynamicConsumerInfoMetadata(info *ConsumerInfo) *ConsumerInfo {
if info == nil {
return nil
}

newInfo := *info
cfg := setDynamicConsumerMetadata(info.Config)
newInfo.Config = cfg
return &newInfo
}

// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg.
// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg.
// Any dynamic metadata is removed, it must not be stored and only be added for responses.
//
// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences.
// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata.
func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
if cfg.Metadata != nil {
deleteDynamicMetadata(cfg.Metadata)
}

// Remove fields when no previous metadata.
if prevCfg == nil || prevCfg.Metadata == nil {
if cfg.Metadata != nil {
Expand Down Expand Up @@ -136,3 +187,9 @@ func preserveCreatedMetadata(metadata, prevMetadata map[string]string) {
delete(metadata, JSCreatedLevelMetadataKey)
}
}

// deleteDynamicMetadata deletes dynamic fields from the metadata.
func deleteDynamicMetadata(metadata map[string]string) {
delete(metadata, JSServerVersionMetadataKey)
delete(metadata, JSServerLevelMetadataKey)
}
Loading

0 comments on commit 69c57cb

Please sign in to comment.