From 37cd3c62175dffb8e6596bd9585d68f463ef9bc1 Mon Sep 17 00:00:00 2001 From: alsotang Date: Thu, 31 Dec 2015 22:31:12 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/easy_sock.js | 474 +++++++++++++++++++++-------------------------- package.json | 3 - 2 files changed, 210 insertions(+), 267 deletions(-) diff --git a/lib/easy_sock.js b/lib/easy_sock.js index 6a953f2..223018e 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -8,340 +8,286 @@ var net = require('net'); -//需要通过创建实例来使用 -var Easysock = exports = module.exports = function(conf){ - //并发请求时的会话标识 - this.seq = 0; +var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ 时,从 0 开始计数。 +var TIMEOUT = 60 * 1000; // 向 socket 写数据的超时时间 +var IDLE_TIMEOUT = 60 * 1000; // socket 空闲时,关闭 socket 的超时时间。 - //保存请求的回调函数 - this.context = {}; +//需要通过创建实例来使用 +var EasySock = exports = module.exports = function(config){ - //全局唯一的一个socket - this.socket = null; + // use `config` + this.ip = config.ip; + this.port = Number(config.port); - this.between_connect = false; - this.between_close = false; + this.keepAlive = config.keepAlive !== void 0 ? !!config.keepAlive : false; + this.timeout = config.timeout !== void 0 ? Number(config.timeout) : TIMEOUT; + this.idleTimeout = config.idleTimeout !== void 0 ? Number(config.idleTimeout) : IDLE_TIMEOUT; - this.calling_close = false; + if (!this.ip || !this.port){ + throw new Error("needs config info: ip, port"); + } + // END - this.currentSession = 0; - this.tmpGetTaskList = []; + this.restore() - this.config = { - ip:"", - port : 0, - /* - * 是否保持连接状态,如果为false,则每次socket空闲下来后就会关闭连接 - */ - keepAlive : false, - timeout : 0 - }; - - if (conf){ - this.setConfig(conf); - } - + // 这三个函数需要在外部重写,具体请看 readme this.isReceiveComplete = null; this.encode = null; this.decode = null; -}; -/** - * 设置配置信息 - * @param {[Object]} obj [description] - */ -Easysock.prototype.setConfig = function(conf){ - this.config = this.config || {}; - - if(typeof(conf) == 'object'){ - for(var key in conf){ - this.config[key] = conf[key]; - } - } }; -/** - * 当前是否已连接(或正在连接) - * @type {Bool} - */ -Easysock.prototype.isAlive = false; +// 目前的会话总数 +Object.defineProperty(EasySock.prototype, 'sessionCount', { + get: function () { + return Object.keys(this.context).length; + } +}) + +// 重置所有内部状态 +EasySock.prototype.restore = function () { + //并发请求时的会话标识,会自增。 + this.seq = 0; + + //保存请求的回调函数 + this.context = {}; + + //全局唯一的一个socket + this.socket = null; + + //当前是否已连接 + this.isAlive = false; + + // .close 方法的锁 + this.isClosing = false; + + + // 当连接并未建立时,缓存请求的队列 + this.taskQueue = []; +} /** * 对外的获取数据的接口方法 - * @param {[Array]} data [任意类型,会直接传给encode函数] - * @param {[Function]} callback [回调函数(err, data)] - * @return {[void]} [void] + * @param {Array} data [任意类型,会直接传给encode函数] + * @param {Function} callback [回调函数(err, data)] + * @return {EasySock} [返回自身] */ -Easysock.prototype.write = function(data, callback){ +EasySock.prototype.write = function(data, callback){ var self = this; - //当在这两个状态的时候,先保留请求,等连接成功后再执行 - if (this.between_connect || this.between_close){ + var args = arguments - this.tmpGetTaskList.push(function(err){ + // 当连接未建立时,将请求缓存 + if (!self.isAlive){ + self.taskQueue.push(function(err){ if (err){ - callback(err); + return callback(err); } - else{ - self.write(data, callback); - } + + self.write.apply(self, args); }); - return; + + self.initSocket(); + + return self; } - if (!this.config || !this.config.ip || !this.config.port){ - callback("needs config info:ip,port"); + + //并发情况下靠这个序列标识哪个返回是哪个请求 + var seq = (++self.seq) % MAX_SEQ; + + //编码 + var buf = self.encode(data, self.seq); + if (!Buffer.isBuffer(buf)){ + return callback(new Error("encode error, return value is not Buffer")); } - else{ - - if (this.socket){ - //并发情况下靠这个序列标识哪个返回是哪个请求 - var seq = this.seq = (this.seq+1) % 10000; - - //编码 - var buf = this.encode(data, this.seq); - if (!Buffer.isBuffer(buf)){ - callback("encode error"); - return; - } - - var timer = null; - if(this.config.timeout){ - timer = setTimeout(function(){ - //返回超时 - self.context[seq] = null; - self.currentSession--; - - tryCloseSocket(self); - callback("request timeout(" + self.config.timeout + "ms)"); - }, this.config.timeout); - } - - //保存当前上下文,都是为了并发 - this.context[this.seq] = { - seq : this.seq, - cb : function(err, result){ - if (timer){ - clearTimeout(timer); - } - callback(err, result); - } - }; - this.currentSession++; - - //真正的写socket - this.socket.write(buf); - } - else{ - //第一次请求,初始化 - this.tmpGetTaskList.push(function(){ - self.write(data, callback); - }); - initSocket(self); - } + + // 返回超时的逻辑 + var timeoutTimer; + if(self.timeout !== 0){ + timeoutTimer = setTimeout(function(){ + self.deleteTask(seq) + + callback(new Error("request timeout(" + self.timeout + "ms)")); + }, self.timeout); } + // END 返回超时的逻辑 + + // 保存当前上下文,都是为了并发 + self.context[seq] = { + seq : seq, + cb : function(err, result){ + clearTimeout(timeoutTimer); + + callback(err, result); + } + }; + // END 保存当前上下文,都是为了并发 + + //真正的写socket + self.socket.write(buf); + + return self; }; + +// 删除某个已完成或出错的任务 +EasySock.prototype.deleteTask = function (seq) { + var self = this; + + delete self.context[seq]; +} + /** * 关闭连接 */ -Easysock.prototype.close = function(){ - if (this.socket && this.currentSession == 0 && this.tmpGetTaskList.length == 0 && (!this.between_close)){ - this.between_close = true; - this.isAlive = false; +EasySock.prototype.close = function(msg, callback){ + callback = callback || function () {} + + var self = this; + + if (this.isClosing) { + return; + } + + this.isClosing = true; + + if (this.isAlive) { this.socket.end(); + this.socket.on('close', onClose) + } else { + onClose() + } + + function onClose() { + self.isClosing = false; + this.notifyAll(new Error(msg)) + this.restore() + callback() } - else{ - //等所有请求处理完再关闭 - this.calling_close = true; + +} + +// 给外部所有在等待的函数一个交代 +EasySock.prototype.notifyAll = function (err) { + var self = this; + + console.error('notifyAll', err); + + // 通知所有队列中未发出的请求 + self.taskQueue.forEach(function (callback) { + callback(err) + }) + + // 通知所有已发出,正在等待结果的请求 + for (var seq in self.context){ + var ctx = self.context[seq]; + ctx.cb(err); } } /** * 初始化socket方法 */ -function initSocket(cur){ - var totalData = new Buffer(''); - - var socket = cur.socket = new net.Socket({ +EasySock.prototype.initSocket = function (){ + var self = this; + + var socket = self.socket = new net.Socket({ writable : true, readable : true }); + + if (!self.keepAlive) { + socket.setTimeout(self.idleTimeout); + } + + socket.setKeepAlive(self.keepAlive); - //socket.setTimeout(cur.config.timeout); - socket.setKeepAlive(cur.config.keepAlive); - - var errorCall = function(msg){ - //actually, I don't know which request is error and which cb function I shall call. So call them all. - console.log(msg); - //Timeout while connection - var cb; - while(cb = cur.tmpGetTaskList.shift()){ - cb(msg); - } - - for (var key in cur.context){ - var ctx = cur.context[key]; - if (ctx && typeof(ctx.cb) == "function"){ - ctx.cb(msg); - cur.context[key] = null; - cur.currentSession--; - } - } - - socket.destroy(); - }; - - var connect_timeout = cur.config.timeout ? cur.config.timeout * 3 : 3000; + var connectTimeout = self.timeout * 3; // 在常规的发送数据超时时间上加个倍数 - var connect_timer = setTimeout(function(){ - errorCall("easy_sock:TCP connect timeout(" + connect_timeout + ")"); - }, connect_timeout); + var connectTimer = setTimeout(function () { + self.close("easy_sock:TCP connect timeout(" + connectTimeout + "ms)"); + }, connectTimeout); socket.on('connect',function(){ //连接成功,把等待的数据发送掉 console.log("easy_sock connected"); - clearTimeout(connect_timer); - cur.between_connect = false; - - //外部有可能会在发起连接但还没完成的时候发起请求,所以,把积累的请求都发了 - var get; - while(get = cur.tmpGetTaskList.shift()){ - get(); - } + clearTimeout(connectTimer); + self.isAlive = true; - }).on('data', function(data) { + // 把积累的请求都发了 + var task; + while(task = self.taskQueue.shift()){ + task(); + } + }) + + var totalData = new Buffer(''); + socket.on('data', function(data) { if (!data || !Buffer.isBuffer(data) || data.length <= 0 ){ - //error - console.log("buffer error:" + data); - errorCall("receive error, illegal data"); - socket.end(); + socket.emit('error', new Error('receive error, illegal data')) + return; } - else{ - - totalData = Buffer.concat([totalData, data]); - var packageSize = cur.isReceiveComplete(totalData); - if(packageSize){ - //网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 - var totalSize = totalData.length; - if (packageSize == totalSize){ - //只有一个包,这是大多数情况 - handleData(cur, totalData); - } - else{ - //存在多个包,这里要做一些buffer复制的操作,会消耗一定性能 - - while(true){ - var buf = totalData.slice(0, packageSize); - handleData(cur, buf); - totalData = totalData.slice(packageSize, totalData.length); - packageSize = cur.isReceiveComplete(totalData); - - if (packageSize >= totalData.length){ - //last one - handleData(cur, totalData); - break; - } - else if (packageSize == 0){ - //包还没接收完 - return; - } - } - } - - //清空buffer,给下一次请求使用 - totalData = new Buffer(''); - } - else{ - //没接收完的话继续接收 - //console.log("keep looking"); + + totalData = Buffer.concat([totalData, data]); + + // 网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 + while(true){ + var packageSize = self.isReceiveComplete(totalData); + if (packageSize === 0) { + return; } - - } - - }).on('error',function(e){ - errorCall('socket error:' + e); - socket.destroy(); - cur.socket = null; - /* - }).on('end',function(){ - //console.log("on end"); - - }).on('timeout',function(){ - //console.dir(socket); - /* do nothing because it use setTimeout instead - if (cur.currentSession > 0){ - errorCall('socket timeout'); - } - else{ - //timeout while no request is sent, just egnore it - console.log("egnore timeout"); - } - */ - - }).on('close',function(){ - cur.between_close = false; - cur.socket = null; - cur.isAlive = false; - cur.currentSession = 0; - console.log("easy_sock closed"); - if (cur.tmpGetTaskList.length){ - //刚关闭socket,又来新请求 - cur.tmpGetTaskList.shift()(); + var buf = totalData.slice(0, packageSize); + self.handleData(buf); + totalData = totalData.slice(packageSize); } + }) + + + socket.on('error', function(e){ + self.close('socket error:' + e); + }) + + + socket.on('close', function () { + console.log("easy_sock closed"); }); + + if (!self.keepAlive) { + // 不活跃一段时间后,自动关闭 socket + socket.on('timeout', function () { + self.close('socket is inactivity for ' + self.idleTimeout + 'ms') + }) + } //连接也有可能会超时阻塞 socket.connect({ - port : cur.config.port, - host : cur.config.ip + port : self.config.port, + host : self.config.ip }); - - cur.between_connect = true; - cur.isAlive = true; } /** * 处理返回数据,回调 */ -function handleData(cur, buf){ - - var obj = cur.decode(buf); +EasySock.prototype.handleData = function (self, buf){ + var self = this; - if (typeof(obj) != "object"){ - //error - console.log("easy_sock:handle error:" + obj); - cur.socket.destroy(); + var obj = self.decode(buf); + + if (!obj) { + console.error("decode buffer error:", obj); return; } - var ctx = cur.context[obj.seq]; + var ctx = self.context[obj.seq]; if (!ctx){ //找不到上下文,可能是因为超时,callback已执行,直接放弃当前数据 - //console.log("Can't find context. This should never happened!" + obj.seq); - //socket.destroy(); + console.error("Can't find context. " + obj.seq); return; } - cur.context[obj.seq] = null; - cur.currentSession--; - - tryCloseSocket(cur); - - //遵循nodejs最佳实践,第一个参数是err,第二个才是返回结果 - ctx.cb(null, obj.result); -} + self.deleteTask(obj.seq) -/** - * 尝试关闭socket - */ -function tryCloseSocket(cur){ - - if ((cur.calling_close || !cur.config.keepAlive) && cur.currentSession == 0 && cur.tmpGetTaskList.length == 0){ - cur.between_close = true; - cur.calling_close = false; - cur.isAlive = false; - //调用end()之后sock会自动close,消息回调会先触发end,再触发close - cur.socket.end(); - } -} + ctx.cb(null, obj.result); +} \ No newline at end of file diff --git a/package.json b/package.json index ef37581..935bab7 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,4 @@ }, "license": "ISC", "readme": "README.md", - "_id": "easy_sock@0.2.0", - "_shasum": "77ea2bfa6c10379f0792e3eb8116bab8d316be62", - "_from": "easy_sock@" } From c76deb98f47e95d1dc9c3b86c8618bf44e4389ac Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 02:16:53 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E7=AE=A1=E7=90=86=EF=BC=8C=E7=BB=9F=E4=B8=80=E7=AE=A1=E7=90=86?= =?UTF-8?q?=20timers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/easy_sock.js | 456 +++++++++++++++++++++++++---------------------- 1 file changed, 246 insertions(+), 210 deletions(-) diff --git a/lib/easy_sock.js b/lib/easy_sock.js index 223018e..adb92e8 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -12,58 +12,71 @@ var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ var TIMEOUT = 60 * 1000; // 向 socket 写数据的超时时间 var IDLE_TIMEOUT = 60 * 1000; // socket 空闲时,关闭 socket 的超时时间。 +// 类似一个 enum。只能从一种状态,变成下一种,或者变成 NEW。 +var STATES = { + 'NEW': 1, // 全新的实例,还未初始化 socket + 'INITING': 2, // 正在初始化 socket + 'ALIVE': 3, // socket 已经建立 + 'CLOSING': 4, // 正在关闭 socket +}; + //需要通过创建实例来使用 -var EasySock = exports = module.exports = function(config){ +var EasySock = function (config) { + + // use `config` + this.ip = config.ip; + this.port = Number(config.port); - // use `config` - this.ip = config.ip; - this.port = Number(config.port); + this.keepAlive = config.keepAlive !== void 0 ? !!config.keepAlive : false; + this.timeout = config.timeout !== void 0 ? Number(config.timeout) : TIMEOUT; + this.idleTimeout = config.idleTimeout !== void 0 ? Number(config.idleTimeout) : IDLE_TIMEOUT; - this.keepAlive = config.keepAlive !== void 0 ? !!config.keepAlive : false; - this.timeout = config.timeout !== void 0 ? Number(config.timeout) : TIMEOUT; - this.idleTimeout = config.idleTimeout !== void 0 ? Number(config.idleTimeout) : IDLE_TIMEOUT; + this.allowIdleClose = !this.keepAlive && this.idleTimeout !== 0 // 在空闲时关闭 socket + this.allowWriteTimeout = this.timeout !== 0 - if (!this.ip || !this.port){ - throw new Error("needs config info: ip, port"); - } - // END + if (!this.ip || !this.port) { + throw new Error("needs config info: ip, port"); + } + // END - this.restore() + this.restore() - // 这三个函数需要在外部重写,具体请看 readme - this.isReceiveComplete = null; - this.encode = null; - this.decode = null; + // 这三个函数需要在外部重写,具体请看 readme + this.isReceiveComplete = null; + this.encode = null; + this.decode = null; }; // 目前的会话总数 Object.defineProperty(EasySock.prototype, 'sessionCount', { - get: function () { - return Object.keys(this.context).length; - } + get: function () { + return Object.keys(this.context).length; + } }) // 重置所有内部状态 EasySock.prototype.restore = function () { - //并发请求时的会话标识,会自增。 - this.seq = 0; + //并发请求时的会话标识,会自增。 + this.seq = 0; - //保存请求的回调函数 - this.context = {}; + //保存请求的回调函数 + this.context = {}; - //全局唯一的一个socket - this.socket = null; + // 实例唯一的socket + this.socket = null; - //当前是否已连接 - this.isAlive = false; + // 自身所处的状态 + this.state = STATES.NEW - // .close 方法的锁 - this.isClosing = false; + // 当连接并未建立时,缓存请求的队列 + this.taskQueue = []; - - // 当连接并未建立时,缓存请求的队列 - this.taskQueue = []; + // 记录所有 timer 的对象,关闭 socket 时需要集体释放 + this.timers = { + connect: null, // 初始化 socket 超时 + writes: {}, // 写入超时 + }; } /** @@ -72,222 +85,245 @@ EasySock.prototype.restore = function () { * @param {Function} callback [回调函数(err, data)] * @return {EasySock} [返回自身] */ -EasySock.prototype.write = function(data, callback){ - var self = this; - var args = arguments +EasySock.prototype.write = function (data, callback) { + var self = this; + var args = arguments + + // 当连接未建立时,将请求缓存 + if (self.state !== STATES.ALIVE) { + self.taskQueue.push(function (err) { + if (err) { + return callback(err); + } - // 当连接未建立时,将请求缓存 - if (!self.isAlive){ - self.taskQueue.push(function(err){ - if (err){ - return callback(err); - } + self.write.apply(self, args); + }); - self.write.apply(self, args); - }); + self.initSocket(); - self.initSocket(); + return self; + } - return self; - } + //并发情况下靠这个序列标识哪个返回是哪个请求 + self.seq++ + var seq = self.seq % MAX_SEQ + 1; - //并发情况下靠这个序列标识哪个返回是哪个请求 - var seq = (++self.seq) % MAX_SEQ; - //编码 - var buf = self.encode(data, self.seq); - if (!Buffer.isBuffer(buf)){ - return callback(new Error("encode error, return value is not Buffer")); - } + //编码 + var buf = self.encode(data, self.seq); + if (!Buffer.isBuffer(buf)) { + return callback(new Error("encode result is not Buffer")); + } - // 返回超时的逻辑 - var timeoutTimer; - if(self.timeout !== 0){ - timeoutTimer = setTimeout(function(){ - self.deleteTask(seq) + // 返回超时的逻辑 + if (self.allowWriteTimeout) { + self.timers.writes[seq] = setTimeout(function () { + var ctx = self.context[seq] - callback(new Error("request timeout(" + self.timeout + "ms)")); - }, self.timeout); - } - // END 返回超时的逻辑 + ctx.cb(new Error("request timeout(" + self.timeout + "ms)")) + self.deleteTask(seq) + }, self.timeout); + } + // END 返回超时的逻辑 - // 保存当前上下文,都是为了并发 - self.context[seq] = { - seq : seq, - cb : function(err, result){ - clearTimeout(timeoutTimer); + // 保存当前上下文,都是为了并发 + self.context[seq] = { + seq: seq, + cb: function (err, result) { + clearTimeout(self.timers.writes[seq]); - callback(err, result); - } - }; - // END 保存当前上下文,都是为了并发 + callback(err, result); + } + }; + // END 保存当前上下文,都是为了并发 - //真正的写socket - self.socket.write(buf); + //真正的写socket + self.socket.write(buf); - return self; + return self; }; // 删除某个已完成或出错的任务 EasySock.prototype.deleteTask = function (seq) { - var self = this; + var self = this; - delete self.context[seq]; + delete self.context[seq]; } /** * 关闭连接 */ -EasySock.prototype.close = function(msg, callback){ - callback = callback || function () {} - - var self = this; - - if (this.isClosing) { - return; - } - - this.isClosing = true; - - if (this.isAlive) { - this.socket.end(); - this.socket.on('close', onClose) - } else { - onClose() - } - - function onClose() { - self.isClosing = false; - this.notifyAll(new Error(msg)) - this.restore() - callback() - } - +EasySock.prototype.close = function (msg, callback) { + callback = callback || function () { + } + + var self = this; + var state = self.state; + + + if (state === STATES.NEW) { + // 全新的实例不需要 close + return; + } + if (state === STATES.CLOSING) { + // 防止关闭两次 + return; + } + self.state = STATES.CLOSING; + + if (state === STATES.INITING) { + self.socket.destroy() + } else if (state === STATES.ALIVE) { + self.socket.end(); + } + + self.socket.on('close', onClose) + + function onClose() { + self.notifyAll(new Error(msg)) + self.clearTimers() + self.restore() + callback() + } } // 给外部所有在等待的函数一个交代 EasySock.prototype.notifyAll = function (err) { - var self = this; + var self = this; - console.error('notifyAll', err); + console.error('notifyAll', err); - // 通知所有队列中未发出的请求 - self.taskQueue.forEach(function (callback) { - callback(err) - }) + // 通知所有队列中未发出的请求 + self.taskQueue.forEach(function (callback) { + callback(err) + }) - // 通知所有已发出,正在等待结果的请求 - for (var seq in self.context){ - var ctx = self.context[seq]; - ctx.cb(err); - } + // 通知所有已发出,正在等待结果的请求 + for (var seq in self.context) { + var ctx = self.context[seq]; + ctx.cb(err); + } +} + +// 清理所有 timers +EasySock.prototype.clearTimers = function () { + var self = this; + var timers = self.timers + + clearTimeout(timers.connect) + timers.writes.forEach(function (timer) { + clearTimeout(timer) + }) } /** * 初始化socket方法 */ -EasySock.prototype.initSocket = function (){ - var self = this; - - var socket = self.socket = new net.Socket({ - writable : true, - readable : true - }); - - if (!self.keepAlive) { - socket.setTimeout(self.idleTimeout); - } - - socket.setKeepAlive(self.keepAlive); - - var connectTimeout = self.timeout * 3; // 在常规的发送数据超时时间上加个倍数 - - var connectTimer = setTimeout(function () { - self.close("easy_sock:TCP connect timeout(" + connectTimeout + "ms)"); - }, connectTimeout); - - - socket.on('connect',function(){ - //连接成功,把等待的数据发送掉 - console.log("easy_sock connected"); - clearTimeout(connectTimer); - self.isAlive = true; - - // 把积累的请求都发了 - var task; - while(task = self.taskQueue.shift()){ - task(); - } - }) - - var totalData = new Buffer(''); - socket.on('data', function(data) { - if (!data || !Buffer.isBuffer(data) || data.length <= 0 ){ - socket.emit('error', new Error('receive error, illegal data')) - return; - } - - totalData = Buffer.concat([totalData, data]); - - // 网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 - while(true){ - var packageSize = self.isReceiveComplete(totalData); - if (packageSize === 0) { - return; - } - var buf = totalData.slice(0, packageSize); - self.handleData(buf); - totalData = totalData.slice(packageSize); - } - }) - - - socket.on('error', function(e){ - self.close('socket error:' + e); - }) - - - socket.on('close', function () { - console.log("easy_sock closed"); - }); - - if (!self.keepAlive) { - // 不活跃一段时间后,自动关闭 socket - socket.on('timeout', function () { - self.close('socket is inactivity for ' + self.idleTimeout + 'ms') - }) - } - - //连接也有可能会超时阻塞 - socket.connect({ - port : self.config.port, - host : self.config.ip - }); +EasySock.prototype.initSocket = function () { + var self = this; + + if (self.state === STATES.INITING) { + return; + } + self.state = STATES.INITING + + var socket = self.socket = new net.Socket({ + writable: true, + readable: true + }); + + if (self.allowIdleClose) { + socket.setTimeout(self.idleTimeout); + } + socket.setKeepAlive(self.keepAlive); + + var connectTimeout = self.timeout * 3; // 在常规的发送数据超时时间上加个倍数 + + self.timers.connect = setTimeout(function () { + self.close("easy_sock:TCP connect timeout(" + connectTimeout + "ms)"); + }, connectTimeout); + + + socket.on('connect', function () { + console.log("easy_sock connected"); + clearTimeout(self.timers.connect); + + self.state = STATES.ALIVE + + // 把积累的请求都发了 + self.taskQueue.forEach(function (task) { + task() + }) + self.taskQueue = [] + // END + }) + + var totalData = new Buffer(''); + socket.on('data', function (data) { + totalData = Buffer.concat([totalData, data]); + + // 网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 + while (true) { + var packageSize = self.isReceiveComplete(totalData); + if (packageSize === 0) { + return; + } + var buf = totalData.slice(0, packageSize); + self.handleData(buf); + totalData = totalData.slice(packageSize); + } + }) + + + socket.on('error', function (e) { + self.close('socket error:' + e); + }) + + socket.on('close', function () { + console.log("easy_sock closed"); + }); + + if (self.allowIdleClose) { + // 不活跃一段时间后,自动关闭 socket + socket.on('timeout', function () { + self.close('socket is inactivity for ' + self.idleTimeout + 'ms') + }) + } + + //连接也有可能会超时阻塞 + socket.connect({ + port: self.config.port, + host: self.config.ip + }); } /** * 处理返回数据,回调 */ -EasySock.prototype.handleData = function (self, buf){ - var self = this; - - var obj = self.decode(buf); - - if (!obj) { - console.error("decode buffer error:", obj); - return; - } - - var ctx = self.context[obj.seq]; - if (!ctx){ - //找不到上下文,可能是因为超时,callback已执行,直接放弃当前数据 - console.error("Can't find context. " + obj.seq); - return; - } - - self.deleteTask(obj.seq) - - ctx.cb(null, obj.result); -} \ No newline at end of file +EasySock.prototype.handleData = function (self, buf) { + var self = this; + + var obj = self.decode(buf); + + if (!obj) { + console.error("decode buffer error:", obj); + return; + } + + var ctx = self.context[obj.seq]; + if (!ctx) { + //找不到上下文,可能因为服务器抽风,callback已执行,直接放弃当前数据 + console.error("Can't find context. " + obj.seq); + return; + } + + ctx.cb(null, obj.result); + self.deleteTask(obj.seq) +} + + +exports = module.exports = EasySock + +exports.STATES = STATES \ No newline at end of file From 3e107349975c80ec2209afc7aa8c5ce38d11f6b8 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 02:24:04 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 16 ++++++++++++++++ lib/easy_sock.js | 21 ++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 31b617b..9b6aaab 100644 --- a/README.md +++ b/README.md @@ -151,3 +151,19 @@ client.write({ ``` 目前easy_sock已经接入protobuf、jce、以及cmem等协议。如有任何意见欢迎交流 vicyao#tencent.com + +## 事件 + +`write_timeout` 当 write 数据超时 + +`connect_timeout` 当初始化 socket 超时 + +`connect` 当 socket 连接成功时 + +`data` 当接收到数据时 + +`error` 当错误发生时 + +`close` 当 socket 关闭时 + +`idle` 当 socket 一直闲置时 \ No newline at end of file diff --git a/lib/easy_sock.js b/lib/easy_sock.js index adb92e8..f77a76e 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -7,6 +7,7 @@ 'use strict'; var net = require('net'); +var EventEmitter = require('events').EventEmitter; var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ 时,从 0 开始计数。 var TIMEOUT = 60 * 1000; // 向 socket 写数据的超时时间 @@ -46,8 +47,12 @@ var EasySock = function (config) { this.encode = null; this.decode = null; + EventEmitter.call(this); + }; +util.inherits(EasySock, EventEmitter); + // 目前的会话总数 Object.defineProperty(EasySock.prototype, 'sessionCount', { get: function () { @@ -119,6 +124,8 @@ EasySock.prototype.write = function (data, callback) { // 返回超时的逻辑 if (self.allowWriteTimeout) { self.timers.writes[seq] = setTimeout(function () { + self.emit('write_timeout', seq) + var ctx = self.context[seq] ctx.cb(new Error("request timeout(" + self.timeout + "ms)")) @@ -242,11 +249,15 @@ EasySock.prototype.initSocket = function () { var connectTimeout = self.timeout * 3; // 在常规的发送数据超时时间上加个倍数 self.timers.connect = setTimeout(function () { + self.emit('connect_timeout') + self.close("easy_sock:TCP connect timeout(" + connectTimeout + "ms)"); }, connectTimeout); socket.on('connect', function () { + self.emit('connect') + console.log("easy_sock connected"); clearTimeout(self.timers.connect); @@ -262,6 +273,8 @@ EasySock.prototype.initSocket = function () { var totalData = new Buffer(''); socket.on('data', function (data) { + self.emit('data', data) + totalData = Buffer.concat([totalData, data]); // 网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 @@ -278,16 +291,22 @@ EasySock.prototype.initSocket = function () { socket.on('error', function (e) { + self.emit('error', e) + self.close('socket error:' + e); }) - + socket.on('close', function () { + self.emit('close') + console.log("easy_sock closed"); }); if (self.allowIdleClose) { // 不活跃一段时间后,自动关闭 socket socket.on('timeout', function () { + self.emit('idle') + self.close('socket is inactivity for ' + self.idleTimeout + 'ms') }) } From 4f0f0445cc569beaa328e29e82157cc603f72f3d Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 02:30:59 +0800 Subject: [PATCH 04/10] add gitignore and makefile --- .gitignore | 27 +++++++++++++++++++++++++++ Makefile | 5 +++++ package.json | 14 ++++++++------ 3 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 .gitignore create mode 100644 Makefile diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..42b2c72 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..129b2f1 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +all: test + +test: + +.PHONY: all test \ No newline at end of file diff --git a/package.json b/package.json index 935bab7..e531114 100644 --- a/package.json +++ b/package.json @@ -4,14 +4,16 @@ "description": "A easy and foolish way to develop a socket module, which is easy to use as well.", "main": "index.js", "scripts": { - "test": "node test.js" + "test": "make test" }, "keywords": [ "socket" ], - "author": { - "name": "vicyao" - }, - "license": "ISC", - "readme": "README.md", + "repository": "https://github.com/ysbcc/easy_sock.git", + "author": "vicyao ", + "license": "MIT", + "devDependencies": { + "mocha": "^2.3.4", + "should": "^8.0.2" + } } From 561f44932a2d5fb52c13cbf2bee267c83445f838 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 03:51:58 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 5 ++ lib/easy_sock.js | 15 ++-- package.json | 1 + test/lib/easy_sock.test.js | 161 ++++++++++++++++++++++++++++++++++++ test.js => test/test_run.js | 2 +- 5 files changed, 175 insertions(+), 9 deletions(-) create mode 100644 test/lib/easy_sock.test.js rename test.js => test/test_run.js (98%) diff --git a/Makefile b/Makefile index 129b2f1..56545b0 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,10 @@ +TESTS = $(shell ls -S `find test -type f -name "*.test.js" -print`) + + all: test test: + ./node_modules/.bin/mocha -r should $(TESTS) + .PHONY: all test \ No newline at end of file diff --git a/lib/easy_sock.js b/lib/easy_sock.js index f77a76e..c7ece02 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -7,6 +7,7 @@ 'use strict'; var net = require('net'); +var util = require('util') var EventEmitter = require('events').EventEmitter; var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ 时,从 0 开始计数。 @@ -23,7 +24,6 @@ var STATES = { //需要通过创建实例来使用 var EasySock = function (config) { - // use `config` this.ip = config.ip; this.port = Number(config.port); @@ -116,7 +116,7 @@ EasySock.prototype.write = function (data, callback) { //编码 - var buf = self.encode(data, self.seq); + var buf = self.encode(data, seq); if (!Buffer.isBuffer(buf)) { return callback(new Error("encode result is not Buffer")); } @@ -258,7 +258,7 @@ EasySock.prototype.initSocket = function () { socket.on('connect', function () { self.emit('connect') - console.log("easy_sock connected"); + console.info("easy_sock connected"); clearTimeout(self.timers.connect); self.state = STATES.ALIVE @@ -299,7 +299,7 @@ EasySock.prototype.initSocket = function () { socket.on('close', function () { self.emit('close') - console.log("easy_sock closed"); + console.info("easy_sock closed"); }); if (self.allowIdleClose) { @@ -313,17 +313,16 @@ EasySock.prototype.initSocket = function () { //连接也有可能会超时阻塞 socket.connect({ - port: self.config.port, - host: self.config.ip + port: self.port, + host: self.ip }); } /** * 处理返回数据,回调 */ -EasySock.prototype.handleData = function (self, buf) { +EasySock.prototype.handleData = function (buf) { var self = this; - var obj = self.decode(buf); if (!obj) { diff --git a/package.json b/package.json index e531114..ed53dd8 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "license": "MIT", "devDependencies": { "mocha": "^2.3.4", + "pedding": "^1.0.0", "should": "^8.0.2" } } diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js new file mode 100644 index 0000000..150328e --- /dev/null +++ b/test/lib/easy_sock.test.js @@ -0,0 +1,161 @@ +var net = require('net') +var EasySock = require('../../') +var pedding = require('pedding') + +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; + +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) + +describe('test/lib/easy_sock.test.js', function () { + beforeEach(function () { + socketServer = null; + serverSockets = [] + }) + afterEach(function (done) { + if (socketServer) { + socketServer.close(done) + serverSockets.forEach(function (socket) { + socket.end() + }) + } else { + done() + } + + }) + + it('should ok', function () { + true.should.ok() + }) + + it('should work with one packet', function (done) { + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', function (data) { + socket.write(data) + }) + }) + socketServer.listen(PORT) + + var socket = new EasySock({ + ip: HOST, + port: PORT, + }) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + done() + }) + }) + + it('should work with multi packets', function (done) { + done = pedding(done, 3) + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', function (data) { + socket.write(data) // 三份数据都在一次 data 事件里面,因为客户端那边缓存了 + }) + }) + socketServer.listen(PORT) + + var socket = new EasySock({ + ip: HOST, + port: PORT, + }) + + var onePacketLength; + socket.isReceiveComplete = function (packet) { + return onePacketLength + } + socket.encode = function (data, seq) { + data.seq = seq + var buf = new Buffer(jsonstringify(data)) + onePacketLength = buf.length + return buf + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + done() + }) + + socket.write({ + userid: 12 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(12) + done() + }) + + socket.write({ + userid: 13 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(13) + done() + }) + }) + + it('should error when connect timeout', function (done) { + var socket = new EasySock({ + ip: '8.8.8.8', + port: PORT, + timeout: 100 + }) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + socket.write({ + userid: 11 + }, function (err, data) { + err.message.should.eql('easy_sock:TCP connect timeout(300ms)') + + done() + }) + }) + + +}) \ No newline at end of file diff --git a/test.js b/test/test_run.js similarity index 98% rename from test.js rename to test/test_run.js index 21556f7..1b1aff4 100644 --- a/test.js +++ b/test/test_run.js @@ -1,6 +1,6 @@ //这是用protobuf服务器的一个简单实例,其中的encode和decode没有具体实现,只是展示easy_sock用法 -var EasySock = require("./index.js"); +var EasySock = require("../"); function createSocket(){ var easysock = new EasySock(); From 01c52ba4f60cbfd48fd97882c6de9866cc8b87d9 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 04:50:19 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E5=B9=B6=E5=8A=A0=E5=85=A5=E6=9B=B4?= =?UTF-8?q?=E5=A4=9A=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 +- lib/easy_sock.js | 18 +++- test/lib/easy_sock.test.js | 197 ++++++++++++++++++++++++++----------- 3 files changed, 163 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 9b6aaab..c0d2b47 100644 --- a/README.md +++ b/README.md @@ -166,4 +166,12 @@ client.write({ `close` 当 socket 关闭时 -`idle` 当 socket 一直闲置时 \ No newline at end of file +`idle` 当 socket 一直闲置时 + +## development + +1. git clone this project + +2. `$ npm install` + +3. make test \ No newline at end of file diff --git a/lib/easy_sock.js b/lib/easy_sock.js index c7ece02..bf4a1be 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -49,6 +49,8 @@ var EasySock = function (config) { EventEmitter.call(this); + this.initSocket() // 初始化之后,就开始建立 socket 连接 + }; util.inherits(EasySock, EventEmitter); @@ -163,8 +165,13 @@ EasySock.prototype.deleteTask = function (seq) { * 关闭连接 */ EasySock.prototype.close = function (msg, callback) { - callback = callback || function () { - } + if (typeof msg == 'function') { + callback = msg; + msg = null; + } + + msg = msg || 'unknown socket close reason' + callback = callback || function () {} var self = this; var state = self.state; @@ -220,9 +227,10 @@ EasySock.prototype.clearTimers = function () { var timers = self.timers clearTimeout(timers.connect) - timers.writes.forEach(function (timer) { - clearTimeout(timer) - }) + + for (var seq in timers.writes) { + clearTimeout(timers.writes[seq]) + } } /** diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js index 150328e..e7e96b3 100644 --- a/test/lib/easy_sock.test.js +++ b/test/lib/easy_sock.test.js @@ -10,6 +10,49 @@ var PORT = 3000; var jsonstringify = JSON.stringify.bind(JSON) var jsonparse = JSON.parse.bind(JSON) +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + return socket +} + describe('test/lib/easy_sock.test.js', function () { beforeEach(function () { socketServer = null; @@ -17,10 +60,10 @@ describe('test/lib/easy_sock.test.js', function () { }) afterEach(function (done) { if (socketServer) { - socketServer.close(done) serverSockets.forEach(function (socket) { socket.end() }) + socketServer.close(done) } else { done() } @@ -32,31 +75,10 @@ describe('test/lib/easy_sock.test.js', function () { }) it('should work with one packet', function (done) { - socketServer = net.createServer(function (socket) { - serverSockets.push(socket) - socket.on('data', function (data) { - socket.write(data) - }) - }) - socketServer.listen(PORT) - var socket = new EasySock({ - ip: HOST, - port: PORT, - }) + createServer() - socket.isReceiveComplete = function (packet) { - return packet.length - } - socket.encode = function (data, seq) { - data.seq = seq - return new Buffer(jsonstringify(data)) - } - socket.decode = function (data) { - data = jsonparse(String(data)) - data.result = data.userid - return data - } + var socket = createEasySocket() socket.write({ userid: 11 @@ -71,18 +93,9 @@ describe('test/lib/easy_sock.test.js', function () { it('should work with multi packets', function (done) { done = pedding(done, 3) - socketServer = net.createServer(function (socket) { - serverSockets.push(socket) - socket.on('data', function (data) { - socket.write(data) // 三份数据都在一次 data 事件里面,因为客户端那边缓存了 - }) - }) - socketServer.listen(PORT) + createServer() - var socket = new EasySock({ - ip: HOST, - port: PORT, - }) + var socket = createEasySocket() var onePacketLength; socket.isReceiveComplete = function (packet) { @@ -94,11 +107,6 @@ describe('test/lib/easy_sock.test.js', function () { onePacketLength = buf.length return buf } - socket.decode = function (data) { - data = jsonparse(String(data)) - data.result = data.userid - return data - } socket.write({ userid: 11 @@ -129,33 +137,110 @@ describe('test/lib/easy_sock.test.js', function () { }) it('should error when connect timeout', function (done) { - var socket = new EasySock({ - ip: '8.8.8.8', - port: PORT, + var socket = createEasySocket({ + ip: '1.1.1.1', timeout: 100 }) - socket.isReceiveComplete = function (packet) { - return packet.length - } - socket.encode = function (data, seq) { - data.seq = seq - return new Buffer(jsonstringify(data)) - } - socket.decode = function (data) { - data = jsonparse(String(data)) - data.result = data.userid - return data - } - socket.write({ userid: 11 }, function (err, data) { err.message.should.eql('easy_sock:TCP connect timeout(300ms)') + socket.timers.connect._called.should.true() + setImmediate(function () { + socket.taskQueue.should.length(0) + done() + }) + }) + socket.taskQueue.should.length(1) + }) + + it('should error when write timeout', function (done) { + + createServer(function (socket) { + return function (data) { + // do nothing + } + }) + + var socket = createEasySocket({ + timeout: 100 + }) + + socket.write({ + userid: 11 + }, function (err, data) { + err.message.should.eql('request timeout(100ms)') + done() + }) + }) + it('should not call connect timeout timer when unknown error occurs', function (done) { + var socket = createEasySocket({ + ip: '1.1.1.1' + }) + + socket.timers.connect._idleTimeout.should.above(-1) + + socket.restore = function () {} + socket.close() + + setImmediate(function () { + socket.timers.connect._idleTimeout.should.eql(-1) done() }) }) + it('should not call write timeout timer when unknown error occurs', function (done) { + createServer(function (socket) { + return function (data) { + } + }) + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + }) + + setTimeout(function () { + socket.timers.writes[2]._idleTimeout.should.above(-1) + + setTimeout(function () { + socket.restore = function () {} + socket.close() + + setTimeout(function () { + socket.timers.writes[2]._idleTimeout.should.eql(-1) + done() + }, 10) + }, 100) + }, 100) + + }) + + it('should close socket when idle, and recreate when write', function (done) { + done = pedding(done, 2); + + createServer() + + var socket = createEasySocket({ + idleTimeout: 200 + }) + + socket.once('close', function () { + done() + }) + + setTimeout(function () { + socket.write({ + userid: 11, + }, function (err, data) { + data.should.eql(11) + done() + }) + }, 300) + }) }) \ No newline at end of file From c8c557689baee634719aadef98b1900d54e964c2 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 05:08:32 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E6=9B=B4=E5=A4=9A=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/easy_sock.js | 4 +- test/lib/easy_sock.test.js | 88 +++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/lib/easy_sock.js b/lib/easy_sock.js index bf4a1be..65c1c53 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -113,8 +113,8 @@ EasySock.prototype.write = function (data, callback) { //并发情况下靠这个序列标识哪个返回是哪个请求 - self.seq++ var seq = self.seq % MAX_SEQ + 1; + self.seq++ //编码 @@ -333,7 +333,7 @@ EasySock.prototype.handleData = function (buf) { var self = this; var obj = self.decode(buf); - if (!obj) { + if (!obj || !obj.seq || !obj.result) { console.error("decode buffer error:", obj); return; } diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js index e7e96b3..ce648d9 100644 --- a/test/lib/easy_sock.test.js +++ b/test/lib/easy_sock.test.js @@ -53,6 +53,10 @@ function createEasySocket(_options) { return socket } +function shouldNotBeHere() { + throw new Error('should not be here') +} + describe('test/lib/easy_sock.test.js', function () { beforeEach(function () { socketServer = null; @@ -90,6 +94,38 @@ describe('test/lib/easy_sock.test.js', function () { }) }) + it('should work with one packet and not call one task twice', function (done) { + + createServer(function (socket) { + return function (data) { + data = jsonparse(data) + data.seq = 2; + data = new Buffer(jsonstringify(data)) + socket.write(data) + } + }) + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + + socket.write({ + userid: 12 + }, function (err, data) { + shouldNotBeHere() + }) + }) + + setTimeout(function () { + done() + }, 200) + }) + it('should work with multi packets', function (done) { done = pedding(done, 3) @@ -206,14 +242,14 @@ describe('test/lib/easy_sock.test.js', function () { }) setTimeout(function () { - socket.timers.writes[2]._idleTimeout.should.above(-1) + socket.timers.writes[1]._idleTimeout.should.above(-1) setTimeout(function () { socket.restore = function () {} socket.close() setTimeout(function () { - socket.timers.writes[2]._idleTimeout.should.eql(-1) + socket.timers.writes[1]._idleTimeout.should.eql(-1) done() }, 10) }, 100) @@ -243,4 +279,52 @@ describe('test/lib/easy_sock.test.js', function () { }) }, 300) }) + + it('should error when no ip or port', function () { + try{ + new EasySock({}) + } catch (e) { + e.message.should.eql('needs config info: ip, port') + } + }) + + it('should log error when decode not return seq or result', function (done) { + createServer() + + var socket = createEasySocket() + + socket.decode = function () { + return ({hehe: '呵呵'}) + } + + socket.write({ + userid: 11, + }, function (err, data) { + throw new Error('should not be here') + }) + + setTimeout(function () { + done() + }, 500) + }) + + it('should log error when decode return wrong req', function (done) { + createServer() + + var socket = createEasySocket() + + socket.decode = function () { + return ({req: -1}) + } + + socket.write({ + userid: 11, + }, function (err, data) { + throw new Error('should not be here') + }) + + setTimeout(function () { + done() + }, 500) + }) }) \ No newline at end of file From 0776dce581ffe2e74665323efd5261bb10087650 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 14:38:42 +0800 Subject: [PATCH 08/10] use custom error type, and more error test --- lib/easy_sock.js | 23 +++++++++++++++++------ lib/errors.js | 5 +++++ package.json | 3 +++ test/lib/easy_sock.test.js | 31 +++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 lib/errors.js diff --git a/lib/easy_sock.js b/lib/easy_sock.js index 65c1c53..c1331e1 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -9,6 +9,8 @@ var net = require('net'); var util = require('util') var EventEmitter = require('events').EventEmitter; +var errors = require('./errors') +var EasySockError = errors.EasySockError; var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ 时,从 0 开始计数。 var TIMEOUT = 60 * 1000; // 向 socket 写数据的超时时间 @@ -24,6 +26,8 @@ var STATES = { //需要通过创建实例来使用 var EasySock = function (config) { + var self = this; + // use `config` this.ip = config.ip; this.port = Number(config.port); @@ -36,7 +40,7 @@ var EasySock = function (config) { this.allowWriteTimeout = this.timeout !== 0 if (!this.ip || !this.port) { - throw new Error("needs config info: ip, port"); + throw new EasySockError("needs config info: ip, port"); } // END @@ -48,6 +52,12 @@ var EasySock = function (config) { this.decode = null; EventEmitter.call(this); + self.on('error', function (e) { + if (self.listeners('error') > 1) { + return; + } + console.error('default EasySock error handler:', e) + }) this.initSocket() // 初始化之后,就开始建立 socket 连接 @@ -120,7 +130,7 @@ EasySock.prototype.write = function (data, callback) { //编码 var buf = self.encode(data, seq); if (!Buffer.isBuffer(buf)) { - return callback(new Error("encode result is not Buffer")); + return callback(new EasySockError("encode result is not Buffer")); } // 返回超时的逻辑 @@ -130,7 +140,7 @@ EasySock.prototype.write = function (data, callback) { var ctx = self.context[seq] - ctx.cb(new Error("request timeout(" + self.timeout + "ms)")) + ctx.cb(new EasySockError("request timeout(" + self.timeout + "ms)")) self.deleteTask(seq) }, self.timeout); } @@ -196,7 +206,7 @@ EasySock.prototype.close = function (msg, callback) { self.socket.on('close', onClose) function onClose() { - self.notifyAll(new Error(msg)) + self.notifyAll(new EasySockError(msg)) self.clearTimers() self.restore() callback() @@ -207,6 +217,7 @@ EasySock.prototype.close = function (msg, callback) { EasySock.prototype.notifyAll = function (err) { var self = this; + self.emit('error', err) console.error('notifyAll', err); // 通知所有队列中未发出的请求 @@ -334,14 +345,14 @@ EasySock.prototype.handleData = function (buf) { var obj = self.decode(buf); if (!obj || !obj.seq || !obj.result) { - console.error("decode buffer error:", obj); + self.emit('error', new EasySockError("decode buffer error: " + util.inspect(obj))) return; } var ctx = self.context[obj.seq]; if (!ctx) { //找不到上下文,可能因为服务器抽风,callback已执行,直接放弃当前数据 - console.error("Can't find context. " + obj.seq); + self.emit('error', new EasySockError("Can't find context. " + obj.seq)); return; } diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 0000000..2eacc11 --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,5 @@ +var customError = require('custom-error'); + +var EasySockError = customError('EasySockError'); + +exports.EasySockError = EasySockError; \ No newline at end of file diff --git a/package.json b/package.json index ed53dd8..353c8f0 100644 --- a/package.json +++ b/package.json @@ -16,5 +16,8 @@ "mocha": "^2.3.4", "pedding": "^1.0.0", "should": "^8.0.2" + }, + "dependencies": { + "custom-error": "^0.2.1" } } diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js index ce648d9..b8a1847 100644 --- a/test/lib/easy_sock.test.js +++ b/test/lib/easy_sock.test.js @@ -288,6 +288,33 @@ describe('test/lib/easy_sock.test.js', function () { } }) + it('should not close socket when idle and keepAlive', function (done) { + createServer() + + var socket = createEasySocket({ + //idleTimeout: 200, + keepAlive: true, + }) + + var closeCount = 0; + socket.on('close', function () { + closeCount++; + closeCount.should.not.eql(2) + }) + + setTimeout(function () { + done() + }, 300) + }) + + it('should error when no ip or port', function () { + try{ + new EasySock({}) + } catch (e) { + e.message.should.eql('needs config info: ip, port') + } + }) + it('should log error when decode not return seq or result', function (done) { createServer() @@ -323,6 +350,10 @@ describe('test/lib/easy_sock.test.js', function () { throw new Error('should not be here') }) + socket.on('error', function (e) { + e.message.should.eql('decode buffer error: { req: -1 }') + }) + setTimeout(function () { done() }, 500) From 2519ebfa43e3f066cbeb03fc0aab02b455a50acf Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 20:12:15 +0800 Subject: [PATCH 09/10] =?UTF-8?q?=E5=86=85=E5=AD=98=E6=B3=84=E9=9C=B2?= =?UTF-8?q?=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 1 + test/memory_leak.js | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 test/memory_leak.js diff --git a/package.json b/package.json index 353c8f0..8f0b10f 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "author": "vicyao ", "license": "MIT", "devDependencies": { + "memwatch-next": "^0.2.10", "mocha": "^2.3.4", "pedding": "^1.0.0", "should": "^8.0.2" diff --git a/test/memory_leak.js b/test/memory_leak.js new file mode 100644 index 0000000..2988a08 --- /dev/null +++ b/test/memory_leak.js @@ -0,0 +1,40 @@ +var memwatch = require('memwatch-next') +var EasySock = require('..') + +console.info = console.error = function () {} + + + +var arr = []; + +memwatch.gc(); +var hd = new memwatch.HeapDiff(); + +for (var i = 0; i < 10000; i++) { + var socket = new EasySock({ + ip: '127.0.0.1', + port: '3000' + }) + + socket.write({ + haha: 11 + }, function (err, data) { + + }) + + arr.push(socket) +} + +arr.forEach(function (sock) { + sock.close() +}) + +arr = null + + +setTimeout(function () { + memwatch.gc(); + var hde = hd.end(); + + console.log(JSON.stringify(hde, null, 2)); +}, 2000) From 51ecd3a2054b0e0dca8db19f6ffef44af8cbf828 Mon Sep 17 00:00:00 2001 From: alsotang Date: Fri, 1 Jan 2016 20:45:12 +0800 Subject: [PATCH 10/10] add benchmark --- Makefile | 5 +- benchmark/multi_packet.js | 98 ++++++++++++++++++++++++++++++++++++++ benchmark/one_packet.js | 81 +++++++++++++++++++++++++++++++ package.json | 1 + test/lib/easy_sock.test.js | 15 +----- test/memory_leak.js | 2 +- 6 files changed, 187 insertions(+), 15 deletions(-) create mode 100644 benchmark/multi_packet.js create mode 100644 benchmark/one_packet.js diff --git a/Makefile b/Makefile index 56545b0..edc5c2f 100644 --- a/Makefile +++ b/Makefile @@ -6,5 +6,8 @@ all: test test: ./node_modules/.bin/mocha -r should $(TESTS) +benchmark bench: + ./node_modules/.bin/matcha benchmark/*.js -.PHONY: all test \ No newline at end of file + +.PHONY: all test benchmark bench \ No newline at end of file diff --git a/benchmark/multi_packet.js b/benchmark/multi_packet.js new file mode 100644 index 0000000..bb6c2b3 --- /dev/null +++ b/benchmark/multi_packet.js @@ -0,0 +1,98 @@ +var net = require('net') +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; +var EasySock = require('..') +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) +var pedding = require('pedding') + +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + return socket +} + + +suite('multi packet', function () { + var socket; + before(function () { + createServer() + + socket = createEasySocket() + + socket.isReceiveComplete = function (packet) { + return packet.indexOf('}') + 1 + } + }) + + after(function (done) { + if (socketServer) { + serverSockets.forEach(function (socket) { + socket.end() + }) + socketServer.close(done) + } else { + done() + } + }) + + bench('multi packet', function (next) { + next = pedding(next, 3) + + socket.write({ + userid: 11 + }, function (err, data) { + next() + }) + socket.write({ + userid: 12 + }, function (err, data) { + next() + }) + socket.write({ + userid: 13 + }, function (err, data) { + next() + }) + }) + +}) \ No newline at end of file diff --git a/benchmark/one_packet.js b/benchmark/one_packet.js new file mode 100644 index 0000000..8b7116d --- /dev/null +++ b/benchmark/one_packet.js @@ -0,0 +1,81 @@ +var net = require('net') +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; +var EasySock = require('..') +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) + +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(data) + data.result = data.userid + return data + } + + return socket +} + + +suite('one packet', function () { + var socket; + before(function () { + createServer() + + socket = createEasySocket() + }) + + after(function (done) { + if (socketServer) { + serverSockets.forEach(function (socket) { + socket.end() + }) + socketServer.close(done) + } else { + done() + } + }) + + bench('one packet', function (next) { + socket.write({ + userid: 11 + }, function (err, data) { + next() + }) + }) + +}) \ No newline at end of file diff --git a/package.json b/package.json index 8f0b10f..8ee3941 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "author": "vicyao ", "license": "MIT", "devDependencies": { + "matcha": "^0.6.1", "memwatch-next": "^0.2.10", "mocha": "^2.3.4", "pedding": "^1.0.0", diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js index b8a1847..9dbf8e5 100644 --- a/test/lib/easy_sock.test.js +++ b/test/lib/easy_sock.test.js @@ -38,14 +38,14 @@ function createEasySocket(_options) { var socket = new EasySock(options) socket.isReceiveComplete = function (packet) { - return packet.length + return packet.indexOf('}') + 1 } socket.encode = function (data, seq) { data.seq = seq return new Buffer(jsonstringify(data)) } socket.decode = function (data) { - data = jsonparse(String(data)) + data = jsonparse(data) data.result = data.userid return data } @@ -133,17 +133,6 @@ describe('test/lib/easy_sock.test.js', function () { var socket = createEasySocket() - var onePacketLength; - socket.isReceiveComplete = function (packet) { - return onePacketLength - } - socket.encode = function (data, seq) { - data.seq = seq - var buf = new Buffer(jsonstringify(data)) - onePacketLength = buf.length - return buf - } - socket.write({ userid: 11 }, function (err, data) { diff --git a/test/memory_leak.js b/test/memory_leak.js index 2988a08..c8fa1b9 100644 --- a/test/memory_leak.js +++ b/test/memory_leak.js @@ -19,7 +19,7 @@ for (var i = 0; i < 10000; i++) { socket.write({ haha: 11 }, function (err, data) { - + }) arr.push(socket)