Skip to content
This repository has been archived by the owner on Sep 25, 2023. It is now read-only.

add a etcd monitor #946

Open
wants to merge 3 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions lib/monitors/etcdmonitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict'
var Etcd = require('node-etcd');
var logger = require('pomelo-logger').getLogger('pomelo', __filename);
var constants = require('../util/constants');
var path = require('path');
var utils = require('../util/utils');

var etcd = new Etcd();

var Monitor = function(app, opts) {
if(!(this instanceof Monitor)) {
return new Monitor(app, opts);
}
this.app = app;
this.etcdNodes = opts.etcdNodes || [];
this.period = opts.period || constants.TIME.DEFAULT_ETCD_PERIOD;
this.expire = (opts.expire || 3 * this.period) / 1000;
this.etcdPrefix = opts.prefix || "pomelo";
this.etcdOpts = opts.etcdOpts || {};
this.etcdPath = path.join('/', this.etcdPrefix, 'pomelo-servers');
this.servers = {};
};

module.exports = Monitor;

function getServerId(key) {
var id = key.split('/').pop();
return id;
}

Monitor.prototype.start = function(cb) {
this.client = new Etcd(this.etcdNodes);
this.watcher = this.client.watcher(this.etcdPath, null, {recursive: true});
this.watcher.on('set', this.addServer.bind(this));
this.watcher.on('expire', this.removeServer.bind(this));
this.watcher.on('reconnect', this.handleEtcdReconnect.bind(this))
this.updateInfoTimer = setInterval(this.updateServerInfo.bind(this, true), this.period);
this.syncServers();
this.updateServerInfo(false);
this.started = true;
utils.invokeCallback(cb);
}

Monitor.prototype.handleEtcdReconnect = function(err) {
logger.warn('lost connection and reconnected to etcd:', err)
}

Monitor.prototype.removeServer = function(serverInfo) {
var id = getServerId(serverInfo.node.key);
delete this.servers[id];
logger.debug('server deleted', this.servers)
}

Monitor.prototype.addServer = function(serverInfo) {
var info = serverInfo.node ? serverInfo.node : serverInfo;
var id = getServerId(info.key)
this.servers[id] = JSON.parse(info.value);
this.app.replaceServers(this.servers);
logger.debug('server added', this.servers)
}

Monitor.prototype.syncServers = function() {
var self = this;
this.client.get(this.etcdPath, function(err, res){
if(!err){
var nodes = res.node.nodes;
if (nodes) {
logger.debug('successfully sync servers')
nodes.map(function(serverInfo){
var id = getServerId(serverInfo.key);
if (!self.servers[id]) {
self.addServer(serverInfo);
}
})
} else {
logger.debug('no pomelo server got from etcd')
}
} else {
logger.error('error getting server list from etcd:', err.message)
}
})
}

Monitor.prototype.updateServerInfo = function(refresh) {
var p = path.join('/', this.etcdPath, this.app.getCurServer().id);
// first time we set the server into etcd
if(!refresh){
this.client.set(p,
JSON.stringify(this.app.getCurServer()),
{
ttl: this.expire,
}
);
} else {
//this time we want to update the ttl but don't want to trigger wathchers to avoid unnecessary traffic
this.client.set(p,
JSON.stringify(this.app.getCurServer()),
{
prevExist: true,
ttl: this.expire
}
)
}
}
1 change: 1 addition & 0 deletions lib/pomelo.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Pomelo.monitors = {};
Pomelo.monitors.__defineGetter__('zookeepermonitor', load.bind(null, './monitors/zookeepermonitor'));
Pomelo.monitors.__defineGetter__('redismonitor', load.bind(null, './monitors/redismonitor'));
Pomelo.monitors.__defineGetter__('redismonitorlight', load.bind(null, './monitors/redismonitorlight'));
Pomelo.monitors.__defineGetter__('etcdmonitor', load.bind(null, './monitors/etcdmonitor'));

/**
* Create an pomelo application.
Expand Down
3 changes: 2 additions & 1 deletion lib/util/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ module.exports = {
DEFAULT_REDIS_REG_UPDATE_INFO: 600 * 1000,
DEFAULT_REDIS_EXPIRE: 15 * 1000,
DEFAULT_REDIS_PING_TIMEOUT: 15 * 1000,
DEFAULT_REDIS_PING: 60 * 1000
DEFAULT_REDIS_PING: 60 * 1000,
DEFAULT_ETCD_PERIOD: 15 * 1000
},

RETRY: {
Expand Down
29 changes: 15 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,27 @@
"server"
],
"dependencies": {
"socket.io": "1.3.7",
"async": "0.2.5",
"seq-queue": "0.0.5",
"crc": "0.2.0",
"cliff": "0.1.8",
"commander": "2.0.0",
"crc": "0.2.0",
"ioredis": "1.10.0",
"mkdirp": "0.3.3",
"mqtt": "0.3.9",
"node-bignumber": "1.2.1",
"node-etcd": "^5.0.3",
"node-uuid": "1.4.3",
"node-zookeeper-client": "0.2.2",
"pomelo-loader": "0.0.7",
"pomelo-rpc": "1.0.3",
"pomelo-protocol": "0.1.6",
"pomelo-logger": "0.1.7",
"pomelo-scheduler": "0.3.9",
"ws": "0.8.0",
"pomelo-protobuf": "0.4.0",
"node-bignumber": "1.2.1",
"commander": "2.0.0",
"mqtt": "0.3.9",
"node-zookeeper-client": "0.2.2",
"node-uuid": "1.4.3",
"ioredis": "1.10.0",
"redis": "0.12.1"
"pomelo-protocol": "0.1.6",
"pomelo-rpc": "1.0.3",
"pomelo-scheduler": "0.3.9",
"redis": "0.12.1",
"seq-queue": "0.0.5",
"socket.io": "1.3.7",
"ws": "0.8.0"
},
"bin": {
"pomelo": "./bin/pomelo"
Expand Down