diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..42b2c72 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..edc5c2f --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +TESTS = $(shell ls -S `find test -type f -name "*.test.js" -print`) + + +all: test + +test: + ./node_modules/.bin/mocha -r should $(TESTS) + +benchmark bench: + ./node_modules/.bin/matcha benchmark/*.js + + +.PHONY: all test benchmark bench \ No newline at end of file diff --git a/README.md b/README.md index 31b617b..c0d2b47 100644 --- a/README.md +++ b/README.md @@ -151,3 +151,27 @@ client.write({ ``` 目前easy_sock已经接入protobuf、jce、以及cmem等协议。如有任何意见欢迎交流 vicyao#tencent.com + +## 事件 + +`write_timeout` 当 write 数据超时 + +`connect_timeout` 当初始化 socket 超时 + +`connect` 当 socket 连接成功时 + +`data` 当接收到数据时 + +`error` 当错误发生时 + +`close` 当 socket 关闭时 + +`idle` 当 socket 一直闲置时 + +## development + +1. git clone this project + +2. `$ npm install` + +3. make test \ No newline at end of file diff --git a/benchmark/multi_packet.js b/benchmark/multi_packet.js new file mode 100644 index 0000000..bb6c2b3 --- /dev/null +++ b/benchmark/multi_packet.js @@ -0,0 +1,98 @@ +var net = require('net') +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; +var EasySock = require('..') +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) +var pedding = require('pedding') + +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(String(data)) + data.result = data.userid + return data + } + + return socket +} + + +suite('multi packet', function () { + var socket; + before(function () { + createServer() + + socket = createEasySocket() + + socket.isReceiveComplete = function (packet) { + return packet.indexOf('}') + 1 + } + }) + + after(function (done) { + if (socketServer) { + serverSockets.forEach(function (socket) { + socket.end() + }) + socketServer.close(done) + } else { + done() + } + }) + + bench('multi packet', function (next) { + next = pedding(next, 3) + + socket.write({ + userid: 11 + }, function (err, data) { + next() + }) + socket.write({ + userid: 12 + }, function (err, data) { + next() + }) + socket.write({ + userid: 13 + }, function (err, data) { + next() + }) + }) + +}) \ No newline at end of file diff --git a/benchmark/one_packet.js b/benchmark/one_packet.js new file mode 100644 index 0000000..8b7116d --- /dev/null +++ b/benchmark/one_packet.js @@ -0,0 +1,81 @@ +var net = require('net') +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; +var EasySock = require('..') +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) + +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.length + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(data) + data.result = data.userid + return data + } + + return socket +} + + +suite('one packet', function () { + var socket; + before(function () { + createServer() + + socket = createEasySocket() + }) + + after(function (done) { + if (socketServer) { + serverSockets.forEach(function (socket) { + socket.end() + }) + socketServer.close(done) + } else { + done() + } + }) + + bench('one packet', function (next) { + socket.write({ + userid: 11 + }, function (err, data) { + next() + }) + }) + +}) \ No newline at end of file diff --git a/lib/easy_sock.js b/lib/easy_sock.js index 6a953f2..c1331e1 100644 --- a/lib/easy_sock.js +++ b/lib/easy_sock.js @@ -7,341 +7,360 @@ 'use strict'; var net = require('net'); +var util = require('util') +var EventEmitter = require('events').EventEmitter; +var errors = require('./errors') +var EasySockError = errors.EasySockError; -//需要通过创建实例来使用 -var Easysock = exports = module.exports = function(conf){ - //并发请求时的会话标识 - this.seq = 0; - - //保存请求的回调函数 - this.context = {}; - - //全局唯一的一个socket - this.socket = null; - - this.between_connect = false; - this.between_close = false; - - this.calling_close = false; - - this.currentSession = 0; - this.tmpGetTaskList = []; - - this.config = { - ip:"", - port : 0, - /* - * 是否保持连接状态,如果为false,则每次socket空闲下来后就会关闭连接 - */ - keepAlive : false, - timeout : 0 - }; - - if (conf){ - this.setConfig(conf); - } - - this.isReceiveComplete = null; - this.encode = null; - this.decode = null; +var MAX_SEQ = 10000; // 用于标志每一个并发请求,当超过 MAX_SEQ 时,从 0 开始计数。 +var TIMEOUT = 60 * 1000; // 向 socket 写数据的超时时间 +var IDLE_TIMEOUT = 60 * 1000; // socket 空闲时,关闭 socket 的超时时间。 + +// 类似一个 enum。只能从一种状态,变成下一种,或者变成 NEW。 +var STATES = { + 'NEW': 1, // 全新的实例,还未初始化 socket + 'INITING': 2, // 正在初始化 socket + 'ALIVE': 3, // socket 已经建立 + 'CLOSING': 4, // 正在关闭 socket }; -/** - * 设置配置信息 - * @param {[Object]} obj [description] - */ -Easysock.prototype.setConfig = function(conf){ - this.config = this.config || {}; - - if(typeof(conf) == 'object'){ - for(var key in conf){ - this.config[key] = conf[key]; - } - } +//需要通过创建实例来使用 +var EasySock = function (config) { + var self = this; + + // use `config` + this.ip = config.ip; + this.port = Number(config.port); + + this.keepAlive = config.keepAlive !== void 0 ? !!config.keepAlive : false; + this.timeout = config.timeout !== void 0 ? Number(config.timeout) : TIMEOUT; + this.idleTimeout = config.idleTimeout !== void 0 ? Number(config.idleTimeout) : IDLE_TIMEOUT; + + this.allowIdleClose = !this.keepAlive && this.idleTimeout !== 0 // 在空闲时关闭 socket + this.allowWriteTimeout = this.timeout !== 0 + + if (!this.ip || !this.port) { + throw new EasySockError("needs config info: ip, port"); + } + // END + + this.restore() + + // 这三个函数需要在外部重写,具体请看 readme + this.isReceiveComplete = null; + this.encode = null; + this.decode = null; + + EventEmitter.call(this); + self.on('error', function (e) { + if (self.listeners('error') > 1) { + return; + } + console.error('default EasySock error handler:', e) + }) + + this.initSocket() // 初始化之后,就开始建立 socket 连接 + }; -/** - * 当前是否已连接(或正在连接) - * @type {Bool} - */ -Easysock.prototype.isAlive = false; +util.inherits(EasySock, EventEmitter); + +// 目前的会话总数 +Object.defineProperty(EasySock.prototype, 'sessionCount', { + get: function () { + return Object.keys(this.context).length; + } +}) + +// 重置所有内部状态 +EasySock.prototype.restore = function () { + //并发请求时的会话标识,会自增。 + this.seq = 0; + + //保存请求的回调函数 + this.context = {}; + + // 实例唯一的socket + this.socket = null; + + // 自身所处的状态 + this.state = STATES.NEW + + // 当连接并未建立时,缓存请求的队列 + this.taskQueue = []; + + // 记录所有 timer 的对象,关闭 socket 时需要集体释放 + this.timers = { + connect: null, // 初始化 socket 超时 + writes: {}, // 写入超时 + }; +} /** * 对外的获取数据的接口方法 - * @param {[Array]} data [任意类型,会直接传给encode函数] - * @param {[Function]} callback [回调函数(err, data)] - * @return {[void]} [void] + * @param {Array} data [任意类型,会直接传给encode函数] + * @param {Function} callback [回调函数(err, data)] + * @return {EasySock} [返回自身] */ -Easysock.prototype.write = function(data, callback){ - var self = this; - //当在这两个状态的时候,先保留请求,等连接成功后再执行 - if (this.between_connect || this.between_close){ - - this.tmpGetTaskList.push(function(err){ - if (err){ - callback(err); - } - else{ - self.write(data, callback); - } - }); - return; - } - - if (!this.config || !this.config.ip || !this.config.port){ - callback("needs config info:ip,port"); - } - else{ - - if (this.socket){ - //并发情况下靠这个序列标识哪个返回是哪个请求 - var seq = this.seq = (this.seq+1) % 10000; - - //编码 - var buf = this.encode(data, this.seq); - if (!Buffer.isBuffer(buf)){ - callback("encode error"); - return; - } - - var timer = null; - if(this.config.timeout){ - timer = setTimeout(function(){ - //返回超时 - self.context[seq] = null; - self.currentSession--; - - tryCloseSocket(self); - callback("request timeout(" + self.config.timeout + "ms)"); - }, this.config.timeout); - } - - //保存当前上下文,都是为了并发 - this.context[this.seq] = { - seq : this.seq, - cb : function(err, result){ - if (timer){ - clearTimeout(timer); - } - callback(err, result); - } - }; - this.currentSession++; - - //真正的写socket - this.socket.write(buf); - } - else{ - //第一次请求,初始化 - this.tmpGetTaskList.push(function(){ - self.write(data, callback); - }); - initSocket(self); - } - } +EasySock.prototype.write = function (data, callback) { + var self = this; + var args = arguments + + // 当连接未建立时,将请求缓存 + if (self.state !== STATES.ALIVE) { + self.taskQueue.push(function (err) { + if (err) { + return callback(err); + } + + self.write.apply(self, args); + }); + + self.initSocket(); + + return self; + } + + + //并发情况下靠这个序列标识哪个返回是哪个请求 + var seq = self.seq % MAX_SEQ + 1; + self.seq++ + + + //编码 + var buf = self.encode(data, seq); + if (!Buffer.isBuffer(buf)) { + return callback(new EasySockError("encode result is not Buffer")); + } + + // 返回超时的逻辑 + if (self.allowWriteTimeout) { + self.timers.writes[seq] = setTimeout(function () { + self.emit('write_timeout', seq) + + var ctx = self.context[seq] + + ctx.cb(new EasySockError("request timeout(" + self.timeout + "ms)")) + self.deleteTask(seq) + }, self.timeout); + } + // END 返回超时的逻辑 + + // 保存当前上下文,都是为了并发 + self.context[seq] = { + seq: seq, + cb: function (err, result) { + clearTimeout(self.timers.writes[seq]); + + callback(err, result); + } + }; + // END 保存当前上下文,都是为了并发 + + //真正的写socket + self.socket.write(buf); + + return self; }; + +// 删除某个已完成或出错的任务 +EasySock.prototype.deleteTask = function (seq) { + var self = this; + + delete self.context[seq]; +} + /** * 关闭连接 */ -Easysock.prototype.close = function(){ - if (this.socket && this.currentSession == 0 && this.tmpGetTaskList.length == 0 && (!this.between_close)){ - this.between_close = true; - this.isAlive = false; - this.socket.end(); - } - else{ - //等所有请求处理完再关闭 - this.calling_close = true; - } +EasySock.prototype.close = function (msg, callback) { + if (typeof msg == 'function') { + callback = msg; + msg = null; + } + + msg = msg || 'unknown socket close reason' + callback = callback || function () {} + + var self = this; + var state = self.state; + + + if (state === STATES.NEW) { + // 全新的实例不需要 close + return; + } + if (state === STATES.CLOSING) { + // 防止关闭两次 + return; + } + self.state = STATES.CLOSING; + + if (state === STATES.INITING) { + self.socket.destroy() + } else if (state === STATES.ALIVE) { + self.socket.end(); + } + + self.socket.on('close', onClose) + + function onClose() { + self.notifyAll(new EasySockError(msg)) + self.clearTimers() + self.restore() + callback() + } +} + +// 给外部所有在等待的函数一个交代 +EasySock.prototype.notifyAll = function (err) { + var self = this; + + self.emit('error', err) + console.error('notifyAll', err); + + // 通知所有队列中未发出的请求 + self.taskQueue.forEach(function (callback) { + callback(err) + }) + + // 通知所有已发出,正在等待结果的请求 + for (var seq in self.context) { + var ctx = self.context[seq]; + ctx.cb(err); + } +} + +// 清理所有 timers +EasySock.prototype.clearTimers = function () { + var self = this; + var timers = self.timers + + clearTimeout(timers.connect) + + for (var seq in timers.writes) { + clearTimeout(timers.writes[seq]) + } } /** * 初始化socket方法 */ -function initSocket(cur){ - var totalData = new Buffer(''); - - var socket = cur.socket = new net.Socket({ - writable : true, - readable : true - }); - - //socket.setTimeout(cur.config.timeout); - socket.setKeepAlive(cur.config.keepAlive); - - var errorCall = function(msg){ - //actually, I don't know which request is error and which cb function I shall call. So call them all. - console.log(msg); - //Timeout while connection - var cb; - while(cb = cur.tmpGetTaskList.shift()){ - cb(msg); - } - - for (var key in cur.context){ - var ctx = cur.context[key]; - if (ctx && typeof(ctx.cb) == "function"){ - ctx.cb(msg); - cur.context[key] = null; - cur.currentSession--; - } - } - - socket.destroy(); - }; - - var connect_timeout = cur.config.timeout ? cur.config.timeout * 3 : 3000; - - var connect_timer = setTimeout(function(){ - errorCall("easy_sock:TCP connect timeout(" + connect_timeout + ")"); - }, connect_timeout); - - - socket.on('connect',function(){ - //连接成功,把等待的数据发送掉 - console.log("easy_sock connected"); - clearTimeout(connect_timer); - cur.between_connect = false; - - //外部有可能会在发起连接但还没完成的时候发起请求,所以,把积累的请求都发了 - var get; - while(get = cur.tmpGetTaskList.shift()){ - get(); - } - - }).on('data', function(data) { - if (!data || !Buffer.isBuffer(data) || data.length <= 0 ){ - //error - console.log("buffer error:" + data); - errorCall("receive error, illegal data"); - socket.end(); - } - else{ - - totalData = Buffer.concat([totalData, data]); - var packageSize = cur.isReceiveComplete(totalData); - if(packageSize){ - //网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 - var totalSize = totalData.length; - if (packageSize == totalSize){ - //只有一个包,这是大多数情况 - handleData(cur, totalData); - } - else{ - //存在多个包,这里要做一些buffer复制的操作,会消耗一定性能 - - while(true){ - var buf = totalData.slice(0, packageSize); - handleData(cur, buf); - totalData = totalData.slice(packageSize, totalData.length); - packageSize = cur.isReceiveComplete(totalData); - - if (packageSize >= totalData.length){ - //last one - handleData(cur, totalData); - break; - } - else if (packageSize == 0){ - //包还没接收完 - return; - } - } - } - - //清空buffer,给下一次请求使用 - totalData = new Buffer(''); - } - else{ - //没接收完的话继续接收 - //console.log("keep looking"); - } - - } - - }).on('error',function(e){ - errorCall('socket error:' + e); - socket.destroy(); - cur.socket = null; - /* - }).on('end',function(){ - //console.log("on end"); - - }).on('timeout',function(){ - //console.dir(socket); - /* do nothing because it use setTimeout instead - if (cur.currentSession > 0){ - errorCall('socket timeout'); - } - else{ - //timeout while no request is sent, just egnore it - console.log("egnore timeout"); - } - */ - - }).on('close',function(){ - cur.between_close = false; - cur.socket = null; - cur.isAlive = false; - cur.currentSession = 0; - console.log("easy_sock closed"); - if (cur.tmpGetTaskList.length){ - //刚关闭socket,又来新请求 - cur.tmpGetTaskList.shift()(); - } - }); - - //连接也有可能会超时阻塞 - socket.connect({ - port : cur.config.port, - host : cur.config.ip - }); - - cur.between_connect = true; - cur.isAlive = true; +EasySock.prototype.initSocket = function () { + var self = this; + + if (self.state === STATES.INITING) { + return; + } + self.state = STATES.INITING + + var socket = self.socket = new net.Socket({ + writable: true, + readable: true + }); + + if (self.allowIdleClose) { + socket.setTimeout(self.idleTimeout); + } + socket.setKeepAlive(self.keepAlive); + + var connectTimeout = self.timeout * 3; // 在常规的发送数据超时时间上加个倍数 + + self.timers.connect = setTimeout(function () { + self.emit('connect_timeout') + + self.close("easy_sock:TCP connect timeout(" + connectTimeout + "ms)"); + }, connectTimeout); + + + socket.on('connect', function () { + self.emit('connect') + + console.info("easy_sock connected"); + clearTimeout(self.timers.connect); + + self.state = STATES.ALIVE + + // 把积累的请求都发了 + self.taskQueue.forEach(function (task) { + task() + }) + self.taskQueue = [] + // END + }) + + var totalData = new Buffer(''); + socket.on('data', function (data) { + self.emit('data', data) + + totalData = Buffer.concat([totalData, data]); + + // 网络有可能一次返回n个结果包,需要做判断,是不是很bt。。 + while (true) { + var packageSize = self.isReceiveComplete(totalData); + if (packageSize === 0) { + return; + } + var buf = totalData.slice(0, packageSize); + self.handleData(buf); + totalData = totalData.slice(packageSize); + } + }) + + + socket.on('error', function (e) { + self.emit('error', e) + + self.close('socket error:' + e); + }) + + socket.on('close', function () { + self.emit('close') + + console.info("easy_sock closed"); + }); + + if (self.allowIdleClose) { + // 不活跃一段时间后,自动关闭 socket + socket.on('timeout', function () { + self.emit('idle') + + self.close('socket is inactivity for ' + self.idleTimeout + 'ms') + }) + } + + //连接也有可能会超时阻塞 + socket.connect({ + port: self.port, + host: self.ip + }); } /** * 处理返回数据,回调 */ -function handleData(cur, buf){ - - var obj = cur.decode(buf); - - if (typeof(obj) != "object"){ - //error - console.log("easy_sock:handle error:" + obj); - cur.socket.destroy(); - return; - } - - var ctx = cur.context[obj.seq]; - if (!ctx){ - //找不到上下文,可能是因为超时,callback已执行,直接放弃当前数据 - //console.log("Can't find context. This should never happened!" + obj.seq); - //socket.destroy(); - return; - } - - cur.context[obj.seq] = null; - cur.currentSession--; - - tryCloseSocket(cur); - - //遵循nodejs最佳实践,第一个参数是err,第二个才是返回结果 - ctx.cb(null, obj.result); -} +EasySock.prototype.handleData = function (buf) { + var self = this; + var obj = self.decode(buf); -/** - * 尝试关闭socket - */ -function tryCloseSocket(cur){ - - if ((cur.calling_close || !cur.config.keepAlive) && cur.currentSession == 0 && cur.tmpGetTaskList.length == 0){ - cur.between_close = true; - cur.calling_close = false; - cur.isAlive = false; - //调用end()之后sock会自动close,消息回调会先触发end,再触发close - cur.socket.end(); - } + if (!obj || !obj.seq || !obj.result) { + self.emit('error', new EasySockError("decode buffer error: " + util.inspect(obj))) + return; + } + + var ctx = self.context[obj.seq]; + if (!ctx) { + //找不到上下文,可能因为服务器抽风,callback已执行,直接放弃当前数据 + self.emit('error', new EasySockError("Can't find context. " + obj.seq)); + return; + } + + ctx.cb(null, obj.result); + self.deleteTask(obj.seq) } + + +exports = module.exports = EasySock + +exports.STATES = STATES \ No newline at end of file diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 0000000..2eacc11 --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,5 @@ +var customError = require('custom-error'); + +var EasySockError = customError('EasySockError'); + +exports.EasySockError = EasySockError; \ No newline at end of file diff --git a/package.json b/package.json index ef37581..8ee3941 100644 --- a/package.json +++ b/package.json @@ -4,17 +4,22 @@ "description": "A easy and foolish way to develop a socket module, which is easy to use as well.", "main": "index.js", "scripts": { - "test": "node test.js" + "test": "make test" }, "keywords": [ "socket" ], - "author": { - "name": "vicyao" + "repository": "https://github.com/ysbcc/easy_sock.git", + "author": "vicyao ", + "license": "MIT", + "devDependencies": { + "matcha": "^0.6.1", + "memwatch-next": "^0.2.10", + "mocha": "^2.3.4", + "pedding": "^1.0.0", + "should": "^8.0.2" }, - "license": "ISC", - "readme": "README.md", - "_id": "easy_sock@0.2.0", - "_shasum": "77ea2bfa6c10379f0792e3eb8116bab8d316be62", - "_from": "easy_sock@" + "dependencies": { + "custom-error": "^0.2.1" + } } diff --git a/test/lib/easy_sock.test.js b/test/lib/easy_sock.test.js new file mode 100644 index 0000000..9dbf8e5 --- /dev/null +++ b/test/lib/easy_sock.test.js @@ -0,0 +1,350 @@ +var net = require('net') +var EasySock = require('../../') +var pedding = require('pedding') + +var socketServer; +var serverSockets = [] +var HOST = '127.0.0.1' +var PORT = 3000; + +var jsonstringify = JSON.stringify.bind(JSON) +var jsonparse = JSON.parse.bind(JSON) + +function createServer(onData) { + onData = onData || function (socket) { + return function (data) { + socket.write(data) + } + } + + socketServer = net.createServer(function (socket) { + serverSockets.push(socket) + socket.on('data', onData(socket)) + }) + socketServer.listen(PORT) + + return socketServer; +} + +function createEasySocket(_options) { + var options = { + ip: HOST, + port: PORT, + }; + + for (var k in _options) { + options[k] = _options[k] + } + var socket = new EasySock(options) + + socket.isReceiveComplete = function (packet) { + return packet.indexOf('}') + 1 + } + socket.encode = function (data, seq) { + data.seq = seq + return new Buffer(jsonstringify(data)) + } + socket.decode = function (data) { + data = jsonparse(data) + data.result = data.userid + return data + } + + return socket +} + +function shouldNotBeHere() { + throw new Error('should not be here') +} + +describe('test/lib/easy_sock.test.js', function () { + beforeEach(function () { + socketServer = null; + serverSockets = [] + }) + afterEach(function (done) { + if (socketServer) { + serverSockets.forEach(function (socket) { + socket.end() + }) + socketServer.close(done) + } else { + done() + } + + }) + + it('should ok', function () { + true.should.ok() + }) + + it('should work with one packet', function (done) { + + createServer() + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + done() + }) + }) + + it('should work with one packet and not call one task twice', function (done) { + + createServer(function (socket) { + return function (data) { + data = jsonparse(data) + data.seq = 2; + data = new Buffer(jsonstringify(data)) + socket.write(data) + } + }) + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + + socket.write({ + userid: 12 + }, function (err, data) { + shouldNotBeHere() + }) + }) + + setTimeout(function () { + done() + }, 200) + }) + + it('should work with multi packets', function (done) { + done = pedding(done, 3) + + createServer() + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(11) + done() + }) + + socket.write({ + userid: 12 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(12) + done() + }) + + socket.write({ + userid: 13 + }, function (err, data) { + (!!err).should.false(); + + data.should.eql(13) + done() + }) + }) + + it('should error when connect timeout', function (done) { + var socket = createEasySocket({ + ip: '1.1.1.1', + timeout: 100 + }) + + socket.write({ + userid: 11 + }, function (err, data) { + err.message.should.eql('easy_sock:TCP connect timeout(300ms)') + socket.timers.connect._called.should.true() + setImmediate(function () { + socket.taskQueue.should.length(0) + done() + }) + }) + socket.taskQueue.should.length(1) + }) + + it('should error when write timeout', function (done) { + + createServer(function (socket) { + return function (data) { + // do nothing + } + }) + + var socket = createEasySocket({ + timeout: 100 + }) + + socket.write({ + userid: 11 + }, function (err, data) { + err.message.should.eql('request timeout(100ms)') + done() + }) + }) + + it('should not call connect timeout timer when unknown error occurs', function (done) { + var socket = createEasySocket({ + ip: '1.1.1.1' + }) + + socket.timers.connect._idleTimeout.should.above(-1) + + socket.restore = function () {} + socket.close() + + setImmediate(function () { + socket.timers.connect._idleTimeout.should.eql(-1) + done() + }) + }) + + it('should not call write timeout timer when unknown error occurs', function (done) { + createServer(function (socket) { + return function (data) { + + } + }) + + var socket = createEasySocket() + + socket.write({ + userid: 11 + }, function (err, data) { + }) + + setTimeout(function () { + socket.timers.writes[1]._idleTimeout.should.above(-1) + + setTimeout(function () { + socket.restore = function () {} + socket.close() + + setTimeout(function () { + socket.timers.writes[1]._idleTimeout.should.eql(-1) + done() + }, 10) + }, 100) + }, 100) + + }) + + it('should close socket when idle, and recreate when write', function (done) { + done = pedding(done, 2); + + createServer() + + var socket = createEasySocket({ + idleTimeout: 200 + }) + + socket.once('close', function () { + done() + }) + + setTimeout(function () { + socket.write({ + userid: 11, + }, function (err, data) { + data.should.eql(11) + done() + }) + }, 300) + }) + + it('should error when no ip or port', function () { + try{ + new EasySock({}) + } catch (e) { + e.message.should.eql('needs config info: ip, port') + } + }) + + it('should not close socket when idle and keepAlive', function (done) { + createServer() + + var socket = createEasySocket({ + //idleTimeout: 200, + keepAlive: true, + }) + + var closeCount = 0; + socket.on('close', function () { + closeCount++; + closeCount.should.not.eql(2) + }) + + setTimeout(function () { + done() + }, 300) + }) + + it('should error when no ip or port', function () { + try{ + new EasySock({}) + } catch (e) { + e.message.should.eql('needs config info: ip, port') + } + }) + + it('should log error when decode not return seq or result', function (done) { + createServer() + + var socket = createEasySocket() + + socket.decode = function () { + return ({hehe: '呵呵'}) + } + + socket.write({ + userid: 11, + }, function (err, data) { + throw new Error('should not be here') + }) + + setTimeout(function () { + done() + }, 500) + }) + + it('should log error when decode return wrong req', function (done) { + createServer() + + var socket = createEasySocket() + + socket.decode = function () { + return ({req: -1}) + } + + socket.write({ + userid: 11, + }, function (err, data) { + throw new Error('should not be here') + }) + + socket.on('error', function (e) { + e.message.should.eql('decode buffer error: { req: -1 }') + }) + + setTimeout(function () { + done() + }, 500) + }) +}) \ No newline at end of file diff --git a/test/memory_leak.js b/test/memory_leak.js new file mode 100644 index 0000000..c8fa1b9 --- /dev/null +++ b/test/memory_leak.js @@ -0,0 +1,40 @@ +var memwatch = require('memwatch-next') +var EasySock = require('..') + +console.info = console.error = function () {} + + + +var arr = []; + +memwatch.gc(); +var hd = new memwatch.HeapDiff(); + +for (var i = 0; i < 10000; i++) { + var socket = new EasySock({ + ip: '127.0.0.1', + port: '3000' + }) + + socket.write({ + haha: 11 + }, function (err, data) { + + }) + + arr.push(socket) +} + +arr.forEach(function (sock) { + sock.close() +}) + +arr = null + + +setTimeout(function () { + memwatch.gc(); + var hde = hd.end(); + + console.log(JSON.stringify(hde, null, 2)); +}, 2000) diff --git a/test.js b/test/test_run.js similarity index 98% rename from test.js rename to test/test_run.js index 21556f7..1b1aff4 100644 --- a/test.js +++ b/test/test_run.js @@ -1,6 +1,6 @@ //这是用protobuf服务器的一个简单实例,其中的encode和decode没有具体实现,只是展示easy_sock用法 -var EasySock = require("./index.js"); +var EasySock = require("../"); function createSocket(){ var easysock = new EasySock();