You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When using SubscriptionContext with array of topic, the listeners are not removed on close function.
Only the listener on the last topic is removed on close.
These issue is a leak of socket, specially if you are using redis.
subscribe(topic,queue){returnnewPromise((resolve,reject)=>{functionlistener(value,cb){queue.push(value.payload)cb()}constclose=()=>{this.emitter.removeListener(topic,listener)}this.emitter.on(topic,listener,(err)=>{if(err){returnreject(err)}resolve()})queue.close=close// <---- The local close function is overrided})}
The solution is an array of close functions:
'use strict'const{ Readable }=require('readable-stream')classPubSub{constructor(emitter){this.emitter=emitter}subscribe(topic,queue){returnnewPromise((resolve,reject)=>{functionlistener(value,cb){queue.push(value.payload)cb()}constclose=()=>{this.emitter.removeListener(topic,listener)}this.emitter.on(topic,listener,(err)=>{if(err){returnreject(err)}resolve()})if(!queue.close)queue.close=[]queue.close.push(close)})}publish(event,callback){this.emitter.emit(event,callback)}}// One context - and queue for each subscriptionclassSubscriptionContext{constructor({ pubsub, fastify }){this.fastify=fastifythis.pubsub=pubsubthis.queue=newReadable({objectMode: true,read: ()=>{}})}subscribe(topics){if(typeoftopics==='string'){returnthis.pubsub.subscribe(topics,this.queue).then(()=>this.queue)}returnPromise.all(topics.map((topic)=>this.pubsub.subscribe(topic,this.queue))).then(()=>this.queue)}publish(event){returnnewPromise((resolve,reject)=>{this.pubsub.publish(event,(err)=>{if(err){returnreject(err)}resolve()})}).catch(err=>{this.fastify.log.error(err)})}close(){// In rare cases when `subscribe()` not called (e.g. some network error)// `close` will be `undefined`.if(Array.isArray(this.queue.close)){this.queue.close.map((close)=>close())}this.queue.push(null)}}functionwithFilter(subscribeFn,filterFn){returnasyncfunction*(root,args,context,info){constsubscription=awaitsubscribeFn(root,args,context,info)forawait(constpayloadofsubscription){try{if(awaitfilterFn(payload,args,context,info)){yieldpayload}}catch(err){context.app.log.error(err)continue}}}}module.exports={
PubSub,
SubscriptionContext,
withFilter
}
The text was updated successfully, but these errors were encountered:
When using SubscriptionContext with array of topic, the listeners are not removed on close function.
Only the listener on the last topic is removed on close.
These issue is a leak of socket, specially if you are using redis.
The solution is an array of close functions:
The text was updated successfully, but these errors were encountered: