diff --git a/README.md b/README.md index 8580103..61d8920 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ const clickhouse = new ClickHouse({ port: 8123, debug: false, basicAuth: null, + post: false, isUseGzip: false, format: "json", // "json" || "csv" || "tsv" config: { diff --git a/index.js b/index.js index 7140558..9864dcf 100644 --- a/index.js +++ b/index.js @@ -8,10 +8,8 @@ const stream = require('stream'), querystring = require('querystring'), JSONStream = require('JSONStream'), - through = require('through'), stream2asynciter = require('stream2asynciter'), - { URL } = require('url'), - tsv = require('tsv'); + { URL } = require('url'); /** @@ -55,86 +53,6 @@ const PORT = 8123; const DATABASE = 'default'; const USERNAME = 'default'; -function parseCSV(body, options = { header: true }) { - const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body); - data.splice(data.length - 1, 1); - return data; -} - -function parseJSON(body) { - return JSON.parse(body); -} - -function parseTSV(body, options = { header: true }) { - const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body); - data.splice(data.length - 1, 1); - return data; -} - -function parseCSVStream(s) { - let isFirst = true; - let ref = { - fields: [] - }; - return through(function (chunk) { - let str = chunk.toString(); - let parsed = parseCSV(str, {header: isFirst}); - let strarr = str.split("\n"); - let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length; - - if (!isFirst) { - chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString(); - parsed = parseCSV(str, {header: isFirst}); - s = new Set(); - } - strarr.splice(strarr.length - plen).forEach((value => s.add(value))); - chunkBuilder.call(this, isFirst, ref, str, parsed); - isFirst = false; - }) -} - -function parseJSONStream() { - return JSONStream.parse(['data', true]); -} - -function parseTSVStream(s) { - let isFirst = true; - let ref = { - fields: [] - }; - return through(function (chunk) { - let str = chunk.toString(); - let parsed = parseTSV(str, {header: isFirst}); - let strarr = str.split("\n"); - let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length; - - if (!isFirst) { - chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString(); - parsed = parseTSV(str, {header: isFirst}); - s = new Set(); - } - strarr.splice(strarr.length - plen).forEach((value => s.add(value))); - chunkBuilder.call(this, isFirst, ref, str, parsed); - isFirst = false; - }); -} - -function chunkBuilder(isFirst, ref, chunk, parsed) { - if (isFirst) { - ref.fields = Object.keys(parsed[0]); - parsed.forEach((value) => { - this.queue(value); - }); - } else { - parsed.forEach((value) => { - let result = {}; - ref.fields.forEach((field, index) => (result[field] = value[index])); - this.queue(result); - result = null; - }); - } -} - function encodeValue(quote, v, format, isArray) { format = ALIASES[format] || format; @@ -313,7 +231,9 @@ class QueryCursor { if (me.opts.debug) { console.log('exec req headers', me.reqParams.headers); } - + console.log(me.reqParams) + //me.reqParams.query + //delete me.reqParams.query; me._request = request.post(me.reqParams, (err, res) => { if (me.opts.debug) { console.log('exec', err, _.pick(res, [ @@ -338,39 +258,17 @@ class QueryCursor { if (me.opts.debug) { console.log('exec res headers', res.headers); } - + try { - const result = this._parseRowsByFormat(res.body); - cb(null, me.useTotals ? result : result.data || result) - } catch (e) { - cb(e); + let json = JSON.parse(res.body); + + cb(null, me.useTotals ? json : json.data); + } catch (err2) { + cb(err2); } }); } - - _parseRowsByFormat(body, isStream = false) { - let result = null; - let ws; - switch (this._getFormat()) { - case "json": - result = !isStream && parseJSON(body) || parseJSONStream(); - break; - case "tsv": - result = !isStream && parseTSV(body) || parseTSVStream(new Set()); - break; - case "csv": - result = !isStream && parseCSV(body) || parseCSVStream(new Set()); - break; - default: - result = body; - } - return result; - }; - - _getFormat() { - return this.opts.sessionFormat || this.opts.format; - } - + withTotals() { this.useTotals = true; return this; @@ -406,7 +304,7 @@ class QueryCursor { return rs; } else { - const streamParser = this._parseRowsByFormat(null, true); + const toJSON = JSONStream.parse(['data', true]); const rs = new stream.Readable({ objectMode: true }); rs._read = () => {}; @@ -415,17 +313,14 @@ class QueryCursor { const tf = new stream.Transform({ objectMode: true }); let isFirstChunck = true; tf._transform = function (chunk, encoding, cb) { + // Если для первого chuck первый символ блока данных не '{', тогда: // 1. в теле ответа не JSON // 2. сервер нашел ошибку в данных запроса - if (isFirstChunck && ( - (me._getFormat() === "json" && chunk[0] !== 123) && - (me._getFormat() === "csv" && chunk[0] !== 110) && - (me._getFormat() === "tsv" && chunk[0] !== 110) - )) { + if (isFirstChunck && chunk[0] !== 123) { this.error = new Error(chunk.toString()); - streamParser.emit("error", this.error); + toJSON.emit("error", this.error); rs.emit('close'); return cb(); @@ -445,9 +340,9 @@ class QueryCursor { let s = null; if (me.opts.isUseGzip) { const z = zlib.createGunzip(); - s = requestStream.pipe(z).pipe(tf).pipe(streamParser) + s = requestStream.pipe(z).pipe(tf).pipe(toJSON) } else { - s = requestStream.pipe(tf).pipe(streamParser) + s = requestStream.pipe(tf).pipe(toJSON) } @@ -475,14 +370,14 @@ class QueryCursor { rs.pause = () => { rs.__pause(); requestStream.pause(); - streamParser.pause(); + toJSON.pause(); }; rs.__resume = rs.resume; rs.resume = () => { rs.__resume(); requestStream.resume(); - streamParser.resume(); + toJSON.resume(); }; me._request = rs; @@ -519,13 +414,13 @@ class ClickHouse { password: '', basicAuth: null, isUseGzip: false, + post: false, config: { // session_id : Date.now(), session_timeout : 60, output_format_json_quote_64bit_integers : 0, enable_http_compression : 0 - }, - format: "json", // "json" || "csv" || "tsv" + } }, opts ); @@ -623,40 +518,8 @@ class ClickHouse { return encodeValue(false, value, 'TabSeparated'); }).join('\t'); } - - _getFormat(query) { - let format = ""; - switch (this.opts.format) { - case "json": - format = this._parseFormat(query, " format JSON"); - break; - case "tsv": - format = this._parseFormat(query, " format TabSeparatedWithNames"); - break; - case "csv": - format = this._parseFormat(query, " format CSVWithNames"); - break; - default: - format = " "; - } - return format; - }; - - _parseFormat(query, def) { - if (query.match(/format/mg) === null) { - this.opts.sessionFormat = this.opts.format; - return def; - } - if (query.match(/format JSON/mg) !== null) { - this.opts.sessionFormat = "json"; - } else if (query.match(/format TabSeparated/mg) !== null) { - this.opts.sessionFormat = "tsv"; - } else if (query.match(/format CSV/mg) !== null) { - this.opts.sessionFormat = "csv"; - } - return ""; - } - + + _mapRowAsObject(fieldList, row) { return fieldList.map(f => encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated')).join('\t'); } @@ -712,10 +575,18 @@ class ClickHouse { let sql = query.trim(); // Hack for Sequelize ORM - sql = sql.trimEnd().replace(/;$/gm, ""); - + if (sql.charAt(sql.length - 1) === ';') { + sql = sql.substr(0, sql.length - 1); + } + if (sql.match(/^(select|show|exists)/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + this._getFormat(sql) + ';') + '&' + querystring.stringify(configQS); + reqParams['url'] = me.url + '?' + querystring.stringify(configQS); + if(me.opts.post){ + reqParams['body'] = sql + ' FORMAT JSON'; + }else{ + reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql + ' FORMAT JSON'); + } + if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; } @@ -742,8 +613,13 @@ class ClickHouse { reqParams['formData'] = formData; } } else if (query.match(/^insert/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS); - + reqParams['url'] = me.url + '?' + querystring.stringify(configQS); + if(me.opts.post){ + reqParams['body'] = sql + ' FORMAT TabSeparated'; + }else{ + reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql + ' FORMAT TabSeparated'); + } + if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; } @@ -753,10 +629,15 @@ class ClickHouse { } if (data) { - reqParams['body'] = me._getBodyForInsert(sql, data); + reqParams['body'] = me._getBodyForInsert(query, data); } } else { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ";") + '&' + querystring.stringify(configQS); + reqParams['url'] = me.url + '?' + querystring.stringify(configQS); + if(me.opts.post){ + reqParams['body'] = sql; + }else{ + reqParams['url'] = me.url + '&query=' + encodeURIComponent(sql); + } if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; @@ -801,6 +682,5 @@ class ClickHouse { } module.exports = { - ClickHouse + ClickHouse }; -