diff --git a/.goreleaser.yml b/.goreleaser.yml index f11eabb3..0b719c36 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -14,7 +14,7 @@ builds: - main: ./nats-streaming-server.go binary: nats-streaming-server ldflags: - - -w -X github.com/nats-io/nats-streaming-server/server.gitCommit={{.ShortCommit}} -X github.com/nats-io/nats-streaming-server/vendor/github.com/nats-io/nats-server/v2/server.gitCommit=a27de5a + - -w -X github.com/nats-io/nats-streaming-server/server.gitCommit={{.ShortCommit}} -X github.com/nats-io/nats-streaming-server/vendor/github.com/nats-io/nats-server/v2/server.gitCommit=ea48105 env: - GO111MODULE=off - CGO_ENABLED=0 diff --git a/README.md b/README.md index 42e42811..6585ffaa 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,6 @@ under the Apache Version 2.0 license found in the LICENSE file. [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-streaming-server/badge.svg?branch=main&t=kIxrDE [ReportCard-Url]: http://goreportcard.com/report/nats-io/nats-streaming-server [ReportCard-Image]: http://goreportcard.com/badge/github.com/nats-io/nats-streaming-server -[Release-Url]: https://github.com/nats-io/nats-streaming-server/releases/tag/v0.23.1 -[Release-image]: https://img.shields.io/badge/release-v0.23.1-1eb0fc.svg +[Release-Url]: https://github.com/nats-io/nats-streaming-server/releases/tag/v0.23.2 +[Release-image]: https://img.shields.io/badge/release-v0.23.2-1eb0fc.svg [github-release]: https://github.com/nats-io/nats-streaming-server/releases/ diff --git a/go.mod b/go.mod index e95c1b1b..6c1dde44 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/hashicorp/go-msgpack v1.1.5 github.com/hashicorp/raft v1.3.2 github.com/lib/pq v1.10.4 - github.com/nats-io/nats-server/v2 v2.6.4 + github.com/nats-io/nats-server/v2 v2.6.5 github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 github.com/nats-io/nuid v1.0.1 github.com/nats-io/stan.go v0.10.2 diff --git a/go.sum b/go.sum index 03e23b37..cb5d27d7 100644 --- a/go.sum +++ b/go.sum @@ -60,8 +60,8 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.6.4 h1:WjR1ylV/5Urth88K8U78wEEnWFYEJ9DNM0Q5DTlTx0g= -github.com/nats-io/nats-server/v2 v2.6.4/go.mod h1:LlMieumxNUnCloOTVFv7Wog0YnasScxARUMXVXv9/+M= +github.com/nats-io/nats-server/v2 v2.6.5 h1:VTG8gdSw4bEqMwKudOHkBLqGwNpNaJOwruj3+rquQlQ= +github.com/nats-io/nats-server/v2 v2.6.5/go.mod h1:LlMieumxNUnCloOTVFv7Wog0YnasScxARUMXVXv9/+M= github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 h1:GMx3ZOcMEVM5qnUItQ4eJyQ6ycwmIEB/VC/UxvdevE0= github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= diff --git a/server/server.go b/server/server.go index 793c8a15..093dcaba 100644 --- a/server/server.go +++ b/server/server.go @@ -47,7 +47,7 @@ import ( // Server defaults. const ( // VERSION is the current version for the NATS Streaming server. - VERSION = "0.23.2-beta01" + VERSION = "0.23.2" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" diff --git a/vendor/github.com/nats-io/nats-server/v2/conf/fuzz.go b/vendor/github.com/nats-io/nats-server/v2/conf/fuzz.go index 9dce7f7e..99faf999 100644 --- a/vendor/github.com/nats-io/nats-server/v2/conf/fuzz.go +++ b/vendor/github.com/nats-io/nats-server/v2/conf/fuzz.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build gofuzz // +build gofuzz package conf diff --git a/vendor/github.com/nats-io/nats-server/v2/logger/syslog.go b/vendor/github.com/nats-io/nats-server/v2/logger/syslog.go index 4023da88..6b5c429b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/logger/syslog.go +++ b/vendor/github.com/nats-io/nats-server/v2/logger/syslog.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package logger diff --git a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go index f6292b6e..73479dfa 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go @@ -1175,7 +1175,11 @@ func (m1 *ServiceLatency) NATSTotalTime() time.Duration { // m1 TotalLatency is correct, so use that. // Will use those to back into NATS latency. func (m1 *ServiceLatency) merge(m2 *ServiceLatency) { - m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + m2.Responder.RTT) + rtt := time.Duration(0) + if m2.Responder != nil { + rtt = m2.Responder.RTT + } + m1.SystemLatency = m1.ServiceLatency - (m2.ServiceLatency + rtt) m1.ServiceLatency = m2.ServiceLatency m1.Responder = m2.Responder sanitizeLatencyMetric(m1) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/auth.go b/vendor/github.com/nats-io/nats-server/v2/server/auth.go index 3ee51576..5f5c9298 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/auth.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/auth.go @@ -42,14 +42,16 @@ type Authentication interface { // ClientAuthentication is an interface for client authentication type ClientAuthentication interface { - // Get options associated with a client + // GetOpts gets options associated with a client GetOpts() *ClientOpts - // If TLS is enabled, TLS ConnectionState, nil otherwise + // GetTLSConnectionState if TLS is enabled, TLS ConnectionState, nil otherwise GetTLSConnectionState() *tls.ConnectionState - // Optionally map a user after auth. + // RegisterUser optionally map a user after auth. RegisterUser(*User) // RemoteAddress expose the connection information of the client RemoteAddress() net.Addr + // GetNonce is the nonce presented to the user in the INFO line + GetNonce() []byte // Kind indicates what type of connection this is matching defined constants like CLIENT, ROUTER, GATEWAY, LEAF etc Kind() int } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 812b3b73..11fe8759 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -427,6 +427,14 @@ func (c *client) String() (id string) { return _EMPTY_ } +// GetNonce returns the nonce that was presented to the user on connection +func (c *client) GetNonce() []byte { + c.mu.Lock() + defer c.mu.Unlock() + + return c.nonce +} + // GetName returns the application supplied name for the connection. func (c *client) GetName() string { c.mu.Lock() @@ -1026,17 +1034,6 @@ func (c *client) writeLoop() { // sent to during processing. We pass in a budget as a time.Duration // for how much time to spend in place flushing for this client. func (c *client) flushClients(budget time.Duration) time.Time { - return c.flushClientsWithCheck(budget, false) -} - -// flushClientsWithCheck will make sure to flush any clients we may have -// sent to during processing. We pass in a budget as a time.Duration -// for how much time to spend in place flushing for this client. -// The 'clientsKindOnly' boolean indicates whether to check kind of client -// and pending client to run flushOutbound in flushClientsWithCheck. -// flushOutbound() could block the caller up to the write deadline when -// the receiving client cannot drain data from the socket fast enough. -func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time { last := time.Now().UTC() // Check pending clients for flush. @@ -1057,7 +1054,7 @@ func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly boo continue } - if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() { + if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() { budget -= cp.out.lft } else { cp.flushSignal() @@ -1199,17 +1196,8 @@ func (c *client) readLoop(pre []byte) { atomic.AddInt64(&s.inBytes, int64(c.in.bytes)) } - // Budget to spend in place flushing outbound data. - // Client will be checked on several fronts to see - // if applicable. Routes and Gateways will never - // spend time flushing outbound in place. - var budget time.Duration - if c.kind == CLIENT { - budget = time.Millisecond - } - - // Flush, or signal to writeLoop to flush to socket. - last := c.flushClientsWithCheck(budget, true) + // Signal to writeLoop to flush to socket. + last := c.flushClients(0) // Update activity, check read buffer size. c.mu.Lock() @@ -3863,7 +3851,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Send tracking info here if we are tracking this response. // This is always a response. var didSendTL bool - if si.tracking { + if si.tracking && !si.didDeliver { // Stamp that we attempted delivery. si.didDeliver = true didSendTL = acc.sendTrackingLatency(si, c) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index a9ce21f6..86327e2f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.6.4" + VERSION = "2.6.5" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail.go b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail.go index c7c7f2c6..20bb090a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail.go @@ -11,8 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows -// +build !openbsd +//go:build !windows && !openbsd +// +build !windows,!openbsd package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_openbsd.go b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_openbsd.go index 1349fe67..1dd4c0d2 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_openbsd.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_openbsd.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build openbsd // +build openbsd package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_windows.go index 0e057087..4c7cf749 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/disk_avail_windows.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build windows // +build windows package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 19bdfde3..dd6d795e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -1953,6 +1953,7 @@ func (mb *msgBlock) compact() { } // Always set last. mb.last.seq = seq &^ ebit + // Advance to next record. index += rl } @@ -2605,7 +2606,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte writeIndex := ts-mb.lwits > int64(2*time.Second) // Accounting - mb.updateAccounting(seq, ts, rl) + mb.updateAccounting(seq&^ebit, ts, rl) // Check if we are tracking per subject for our simple state. if len(subj) > 0 && mb.fss != nil { @@ -3840,6 +3841,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { } fs.lmb.first.seq = fs.state.FirstSeq fs.lmb.last.seq = fs.state.LastSeq + fs.lmb.writeIndexInfo() cb := fs.scb diff --git a/vendor/github.com/nats-io/nats-server/v2/server/fuzz.go b/vendor/github.com/nats-io/nats-server/v2/server/fuzz.go index 03f98ad1..86109cd3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/fuzz.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/fuzz.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build gofuzz // +build gofuzz package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index c6252c30..d605b98a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -351,6 +351,12 @@ type JSApiStreamNamesResponse struct { const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response" +type JSApiStreamListRequest struct { + ApiPagedRequest + // These are filters that can be applied to the list. + Subject string `json:"subject,omitempty"` +} + // JSApiStreamListResponse list of detailed stream information. // A nil request is valid and means all streams. type JSApiStreamListResponse struct { @@ -1541,27 +1547,38 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } var offset int + var filter string + if !isEmptyRequest(msg) { - var req JSApiStreamNamesRequest + var req JSApiStreamListRequest if err := json.Unmarshal(msg, &req); err != nil { resp.Error = NewJSInvalidJSONError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } offset = req.Offset + if req.Subject != _EMPTY_ { + filter = req.Subject + } } // Clustered mode will invoke a scatter and gather. if s.JetStreamIsClustered() { // Need to copy these off before sending.. msg = append(msg[:0:0], msg...) - s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) }) + s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) return } // TODO(dlc) - Maybe hold these results for large results that we expect to be paged. // TODO(dlc) - If this list is long maybe do this in a Go routine? - msets := acc.streams() + var msets []*stream + if filter == _EMPTY_ { + msets = acc.streams() + } else { + msets = acc.filteredStreams(filter) + } + sort.Slice(msets, func(i, j int) bool { return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0 }) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 4939af57..5e3fd72c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -1679,10 +1679,8 @@ func (mset *stream) resetClusteredState(err error) bool { // This will reset the stream and consumers. // Should be done in separate go routine. func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) { - js.processClusterCreateStream(acc, sa) - - // Check consumers. - js.mu.Lock() + // Check and collect consumers first. + js.mu.RLock() var consumers []*consumerAssignment if cc := js.cluster; cc != nil && cc.meta != nil { ourID := cc.meta.ID() @@ -1693,8 +1691,11 @@ func (js *jetStream) restartClustered(acc *Account, sa *streamAssignment) { } } } - js.mu.Unlock() + js.mu.RUnlock() + // Reset stream. + js.processClusterCreateStream(acc, sa) + // Reset consumers. for _, ca := range consumers { js.processClusterCreateConsumer(ca, nil, false) } @@ -1718,12 +1719,18 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(buf[1:]) if err != nil { + if node := mset.raftNode(); node != nil { + s.Errorf("JetStream cluster could not decode stream msg for '%s > %s' [%s]", + mset.account(), mset.name(), node.Group()) + } panic(err.Error()) } // Check for flowcontrol here. - if !isRecovering && len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) { - mset.sendFlowControlReply(reply) + if len(msg) == 0 && len(hdr) > 0 && reply != _EMPTY_ && isControlHdr(hdr) { + if !isRecovering { + mset.sendFlowControlReply(reply) + } continue } @@ -1761,6 +1768,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { + if node := mset.raftNode(); node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode delete msg for '%s > %s' [%s]", + mset.account(), mset.name(), node.Group()) + } panic(err.Error()) } s, cc := js.server(), js.cluster @@ -1802,6 +1814,11 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco case purgeStreamOp: sp, err := decodeStreamPurge(buf[1:]) if err != nil { + if node := mset.raftNode(); node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode purge msg for '%s > %s' [%s]", + mset.account(), mset.name(), node.Group()) + } panic(err.Error()) } // Ignore if we are recovering and we have already processed. @@ -2896,6 +2913,16 @@ func (cc *jetStreamCluster) isConsumerAssigned(a *Account, stream, consumer stri return false } +// Returns our stream and underlying raft node. +func (o *consumer) streamAndNode() (*stream, RaftNode) { + if o == nil { + return nil, nil + } + o.mu.RLock() + defer o.mu.RUnlock() + return o.mset, o.node +} + func (o *consumer) raftGroup() *raftGroup { if o == nil { return nil @@ -3002,6 +3029,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea // No-op needed? state, err := decodeConsumerState(e.Data) if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer snapshot for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) + } panic(err.Error()) } o.store.Update(state) @@ -3026,6 +3058,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if !isLeader { dseq, sseq, dc, ts, err := decodeDeliveredUpdate(buf[1:]) if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer delivered update for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) + } panic(err.Error()) } if err := o.store.UpdateDelivered(dseq, sseq, dc, ts); err != nil { @@ -3039,6 +3076,11 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea case updateAcksOp: dseq, sseq, err := decodeAckUpdate(buf[1:]) if err != nil { + if mset, node := o.streamAndNode(); mset != nil && node != nil { + s := js.srv + s.Errorf("JetStream cluster could not decode consumer ack update for '%s > %s > %s' [%s]", + mset.account(), mset.name(), o, node.Group()) + } panic(err.Error()) } o.processReplicatedAck(dseq, sseq) @@ -3836,7 +3878,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool { // This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader. // This will be running in a separate Go routine. -func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() @@ -3848,8 +3890,29 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs var streams []*streamAssignment for _, sa := range cc.streams[acc.Name] { - streams = append(streams, sa) + if IsNatsErr(sa.err, JSClusterNotAssignedErr) { + continue + } + + if filter != _EMPTY_ { + // These could not have subjects auto-filled in since they are raw and unprocessed. + if len(sa.Config.Subjects) == 0 { + if SubjectsCollide(filter, sa.Config.Name) { + streams = append(streams, sa) + } + } else { + for _, subj := range sa.Config.Subjects { + if SubjectsCollide(filter, subj) { + streams = append(streams, sa) + break + } + } + } + } else { + streams = append(streams, sa) + } } + // Needs to be sorted for offsets etc. if len(streams) > 1 { sort.Slice(streams, func(i, j int) bool { @@ -4501,6 +4564,7 @@ type streamSnapshot struct { Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` + Failed uint64 `json:"clfs"` Deleted []uint64 `json:"deleted,omitempty"` } @@ -4515,6 +4579,7 @@ func (mset *stream) stateSnapshot() []byte { Bytes: state.Bytes, FirstSeq: state.FirstSeq, LastSeq: state.LastSeq, + Failed: mset.clfs, Deleted: state.Deleted, } b, _ := json.Marshal(snap) @@ -4617,7 +4682,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Do proposal. err := mset.node.Propose(esm) - if err != nil { + if err != nil && mset.clseq > 0 { mset.clseq-- } mset.clMu.Unlock() @@ -4665,6 +4730,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { if snap.FirstSeq > state.FirstSeq { mset.store.Compact(snap.FirstSeq) state = mset.store.State() + mset.setLastSeq(snap.LastSeq) } // Range the deleted and delete if applicable. for _, dseq := range snap.Deleted { @@ -4752,6 +4818,7 @@ func (mset *stream) processSnapshot(snap *streamSnapshot) error { mset.processSnapshotDeletes(snap) mset.mu.Lock() + mset.clfs = snap.Failed state := mset.store.State() sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index ebbd830f..7f7f4709 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -681,10 +681,10 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) { return } // TODO(dlc) - Might want to optimize this. - for tseq := seq + 1; tseq < ss.Last; tseq++ { + for tseq := seq + 1; tseq <= ss.Last; tseq++ { if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj { ss.First = tseq - return + break } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/nkey.go b/vendor/github.com/nats-io/nats-server/v2/server/nkey.go index 87b20a4f..61eac1af 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/nkey.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/nkey.go @@ -33,7 +33,7 @@ func (s *Server) NonceRequired() bool { // nonceRequired tells us if we should send a nonce. // Lock should be held on entry. func (s *Server) nonceRequired() bool { - return len(s.nkeys) > 0 || s.trustedKeys != nil + return s.opts.AlwaysEnableNonce || len(s.nkeys) > 0 || s.trustedKeys != nil } // Generate a nonce for INFO challenge. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/ocsp.go b/vendor/github.com/nats-io/nats-server/v2/server/ocsp.go index 59fd4871..7511aa7b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/ocsp.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/ocsp.go @@ -137,7 +137,7 @@ func (oc *OCSPMonitor) getLocalStatus() ([]byte, *ocsp.Response, error) { resp, err := ocsp.ParseResponse(raw, oc.Issuer) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to get local status: %w", err) } if err := validOCSPResponse(resp); err != nil { return nil, nil, err @@ -206,7 +206,7 @@ func (oc *OCSPMonitor) getRemoteStatus() ([]byte, *ocsp.Response, error) { resp, err := ocsp.ParseResponse(raw, oc.Issuer) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to get remote status: %w", err) } if err := validOCSPResponse(resp); err != nil { return nil, nil, err @@ -389,7 +389,7 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni } // TODO: Add OCSP 'responder_cert' option in case CA cert not available. - issuer, err := getOCSPIssuer(caFile, cert.Certificate) + issuers, err := getOCSPIssuer(caFile, cert.Certificate) if err != nil { return nil, nil, err } @@ -402,7 +402,7 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni certFile: certFile, stopCh: make(chan struct{}, 1), Leaf: cert.Leaf, - Issuer: issuer, + Issuer: issuers[len(issuers)-1], } // Get the certificate status from the memory, then remote OCSP responder. @@ -446,13 +446,33 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni if len(s.VerifiedChains) == 0 { return fmt.Errorf("%s client missing TLS verified chains", kind) } + chain := s.VerifiedChains[0] - resp, err := ocsp.ParseResponseForCert(oresp, chain[0], issuer) + leaf := chain[0] + parent := issuers[len(issuers)-1] + + resp, err := ocsp.ParseResponseForCert(oresp, leaf, parent) if err != nil { return fmt.Errorf("failed to parse OCSP response from %s client: %w", kind, err) } - if err := resp.CheckSignatureFrom(issuer); err != nil { - return err + if resp.Certificate == nil { + if err := resp.CheckSignatureFrom(parent); err != nil { + return fmt.Errorf("OCSP staple not issued by issuer: %w", err) + } + } else { + if err := resp.Certificate.CheckSignatureFrom(parent); err != nil { + return fmt.Errorf("OCSP staple's signer not signed by issuer: %w", err) + } + ok := false + for _, eku := range resp.Certificate.ExtKeyUsage { + if eku == x509.ExtKeyUsageOCSPSigning { + ok = true + break + } + } + if !ok { + return fmt.Errorf("OCSP staple's signer missing authorization by CA to act as OCSP signer") + } } if resp.Status != ocsp.Good { return fmt.Errorf("bad status for OCSP Staple from %s client: %s", kind, ocspStatusString(resp.Status)) @@ -739,47 +759,61 @@ func (oc *OCSPMonitor) writeOCSPStatus(storeDir, file string, data []byte) error return nil } -func parseCertPEM(name string) (*x509.Certificate, error) { +func parseCertPEM(name string) ([]*x509.Certificate, error) { data, err := ioutil.ReadFile(name) if err != nil { return nil, err } - // Ignoring left over byte slice. - block, _ := pem.Decode(data) - if block == nil { - return nil, fmt.Errorf("failed to parse PEM cert %s", name) - } - if block.Type != "CERTIFICATE" { - return nil, fmt.Errorf("unexpected PEM certificate type: %s", block.Type) + var pemBytes []byte + + var block *pem.Block + for len(data) != 0 { + block, data = pem.Decode(data) + if block == nil { + break + } + if block.Type != "CERTIFICATE" { + return nil, fmt.Errorf("unexpected PEM certificate type: %s", block.Type) + } + + pemBytes = append(pemBytes, block.Bytes...) } - return x509.ParseCertificate(block.Bytes) + return x509.ParseCertificates(pemBytes) } // getOCSPIssuer returns a CA cert from the given path. If the path is empty, // then this checks a given cert chain. If both are empty, then it returns an // error. -func getOCSPIssuer(issuerCert string, chain [][]byte) (*x509.Certificate, error) { - var issuer *x509.Certificate +func getOCSPIssuer(issuerCert string, chain [][]byte) ([]*x509.Certificate, error) { + var issuers []*x509.Certificate var err error switch { case len(chain) == 1 && issuerCert == _EMPTY_: err = fmt.Errorf("ocsp ca required in chain or configuration") case issuerCert != _EMPTY_: - issuer, err = parseCertPEM(issuerCert) + issuers, err = parseCertPEM(issuerCert) case len(chain) > 1 && issuerCert == _EMPTY_: - issuer, err = x509.ParseCertificate(chain[1]) + issuers, err = x509.ParseCertificates(chain[1]) default: err = fmt.Errorf("invalid ocsp ca configuration") } if err != nil { return nil, err - } else if !issuer.IsCA { - return nil, fmt.Errorf("%s invalid ca basic constraints: is not ca", issuerCert) } - return issuer, nil + if len(issuers) == 0 { + return nil, fmt.Errorf("no issuers found") + } + + for _, issuer := range issuers { + if !issuer.IsCA { + return nil, fmt.Errorf("%s invalid ca basic constraints: is not ca", issuer.Subject) + } + } + + return issuers, nil } func ocspStatusString(n int) string { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index 276040ce..0d82f14d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -262,6 +262,11 @@ type Options struct { AccountResolver AccountResolver `json:"-"` AccountResolverTLSConfig *tls.Config `json:"-"` + // AlwaysEnableNonce will always present a nonce to new connections + // typically used by custom Authentication implementations who embeds + // the server and so not presented as a configuration option + AlwaysEnableNonce bool + CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` @@ -1247,12 +1252,12 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.Tags.Add(v...) case []interface{}: for _, t := range v { - if t, ok := t.(token); ok { - if t, ok := t.Value().(string); ok { - o.Tags.Add(t) + if token, ok := t.(token); ok { + if ts, ok := token.Value().(string); ok { + o.Tags.Add(ts) continue } else { - err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T where string is expected", t)} + err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T where string is expected", token)} } } else { err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T", t)} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_freebsd.go b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_freebsd.go index 849440fc..8884e529 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_freebsd.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_freebsd.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !amd64 // +build !amd64 package pse diff --git a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_rumprun.go b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_rumprun.go index 48e80fca..9f0fb01d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_rumprun.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_rumprun.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build rumprun // +build rumprun package pse diff --git a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go index e159ebb7..5ee9645f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/pse/pse_windows.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build windows // +build windows package pse diff --git a/vendor/github.com/nats-io/nats-server/v2/server/service.go b/vendor/github.com/nats-io/nats-server/v2/server/service.go index a44cbac3..dd01c970 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/service.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/service.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/signal.go b/vendor/github.com/nats-io/nats-server/v2/server/signal.go index 2b42d924..f59b424d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/signal.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/signal.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package server diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index fd869afe..0722d14c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -2686,7 +2686,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac hdr, msg := c.msgParts(rmsg) - // If we are not receiving directly from a client we should move this this Go routine. + // If we are not receiving directly from a client we should move this to another Go routine. if c.kind != CLIENT { mset.queueInboundMsg(subject, reply, hdr, msg) return diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_bsd.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_bsd.go index f62a7d27..ddab47ba 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_bsd.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_bsd.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build freebsd || openbsd || dragonfly || netbsd // +build freebsd openbsd dragonfly netbsd package sysmem diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_darwin.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_darwin.go index e5cce831..28944b33 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_darwin.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_darwin.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build darwin // +build darwin package sysmem diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_linux.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_linux.go index 6842a157..f4ed0587 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_linux.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_linux.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build linux // +build linux package sysmem diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go index 8bf88977..cc4c43dc 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/mem_windows.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build windows // +build windows package sysmem diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/sysctl.go b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/sysctl.go index e70e037c..a7fa266a 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sysmem/sysctl.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sysmem/sysctl.go @@ -11,6 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build darwin || freebsd || openbsd || dragonfly || netbsd // +build darwin freebsd openbsd dragonfly netbsd package sysmem diff --git a/vendor/modules.txt b/vendor/modules.txt index 34ae407a..e51d5e07 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -38,7 +38,7 @@ github.com/mattn/go-isatty github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.1.0 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.6.4 +# github.com/nats-io/nats-server/v2 v2.6.5 ## explicit github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/ldap