diff --git a/README.md b/README.md index 7ec739b..2c004ec 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,32 @@ insert array of objects: ``` *** +insert with format JSONEachRow: +```javascript + +// rows - array of objects as in previous example + + await clickhouse.insert( + `INSERT INTO test_array + (*) FORMAT JSONEachRow`, + rows + ).toPromise(); + +// or with stream + + const stream = await clickhouse.insert( + `insert into test_array + (*) FORMAT JSONEachRow` + ).stream(); + + for (const row of rows){ + stream.writeRow(row); + } + await stream.exec(); + +``` +*** + Parameterized Values: ```javascript const rows = await clickhouse.query( diff --git a/index.js b/index.js index a86a79c..37f371e 100644 --- a/index.js +++ b/index.js @@ -30,7 +30,7 @@ const SEPARATORS = { }; const ALIASES = { - TabSeparated: "TSV" + TabSeparated: "TSV", }; var ESCAPE_STRING = { @@ -66,12 +66,14 @@ const PORT = 8123; const DATABASE = 'default'; const FORMAT_NAMES = { + JSONEachRow: 'JSONEachRow', JSON: 'json', TSV: 'tsv', CSV: 'csv' } const FORMATS = { + [FORMAT_NAMES.JSONEachRow]: 'JSONEachRow', [FORMAT_NAMES.JSON]: 'JSON', [FORMAT_NAMES.TSV]: 'TabSeparatedWithNames', [FORMAT_NAMES.CSV]: 'CSVWithNames', @@ -102,6 +104,13 @@ function parseTSV(body, options = { header: true }) { return data; } +function parseJSONEachRow(body) { + const data = body.split('\n').map((str)=>str?JSON.parse(str):{}); + data.splice(data.length - 1, 1); + return data; +} + + function parseCSVStream(s = new Set()) { let isFirst = true; let ref = { @@ -253,10 +262,12 @@ function isObject(obj) { class Rs extends Transform { - constructor(reqParams) { + constructor(reqParams, opts) { super(); const me = this; + + me.opts = opts; me.ws = request.post(reqParams); @@ -290,7 +301,11 @@ class Rs extends Transform { } else if (Array.isArray(data)) { row = ClickHouse.mapRowAsArray(data); } else if (isObject(data)) { - throw new Error('Error: Inserted data must be an array, not an object.'); + if (this.opts.format === FORMAT_NAMES.JSONEachRow) { + row = ClickHouse.mapRowAsJSONObject(data); + } else { + throw new Error('Error: Inserted data must be an array, not an object.'); + } } let isOk = this.write( @@ -425,7 +440,9 @@ class QueryCursor { if (m) { fieldList = m[2].split(',').map(s => s.trim()); } else { - throw new Error('insert query wasnt parsed field list after TABLE_NAME'); + if (me.format === FORMAT_NAMES.TSV) { + throw new Error('insert query wasnt parsed field list after TABLE_NAME'); + } } } @@ -434,7 +451,13 @@ class QueryCursor { return row; } if (isFirstElObject) { - return ClickHouse.mapRowAsObject(fieldList, row); + if (me.format === FORMAT_NAMES.TSV) { + return ClickHouse.mapRowAsTabSeparatedObject(fieldList, row); + } else if (me.format === FORMAT_NAMES.JSONEachRow) { + return ClickHouse.mapRowAsJSONObject(row); + } else { + throw new Error('Object should be serialized as TabSeparated or JSONEachRow format'); + } } else { return ClickHouse.mapRowAsArray(row); } @@ -552,8 +575,11 @@ class QueryCursor { if (data && Array.isArray(data) && data.every(d => typeof d === 'string')) { params['body'] = me._getBodyForInsert(); } - } else { - query += ' FORMAT TabSeparated'; + } else { + if (!R_FORMAT_PARSER.test(query)) { + query += ' FORMAT TabSeparated'; + me.format = FORMAT_NAMES.TSV; + } if (data) { params['body'] = me._getBodyForInsert(); @@ -646,6 +672,9 @@ class QueryCursor { getBodyParser() { if (this.format === FORMAT_NAMES.JSON) { return JSON.parse; + } + if (this.format === FORMAT_NAMES.JSONEachRow) { + return parseJSONEachRow; } if (this.format === FORMAT_NAMES.TSV) { @@ -699,7 +728,7 @@ class QueryCursor { const reqParams = me._getReqParams(); if (me.isInsert) { - const rs = new Rs(reqParams); + const rs = new Rs(reqParams, me.opts); rs.query = me.query; me._request = rs; @@ -986,14 +1015,16 @@ class ClickHouse { .join('\t'); } - static mapRowAsObject(fieldList, row) { + static mapRowAsTabSeparatedObject(fieldList, row) { return fieldList .map(f => { return encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated'); }) .join('\t'); } - + static mapRowAsJSONObject(row) { + return JSON.stringify(row); + } static getFullFormatName(format = '') { if ( ! FORMATS[format]) { throw new Error(`Clickhouse.getFullFormatName: unknown format "${format}`); diff --git a/test/test.js b/test/test.js index 4970ab8..a29958c 100644 --- a/test/test.js +++ b/test/test.js @@ -509,6 +509,8 @@ describe('queries', () => { it('insert field as array', async () => { clickhouse.sessionId = Date.now(); + await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise(); + const r = await clickhouse.query(` CREATE TABLE IF NOT EXISTS test_array ( date Date, @@ -519,6 +521,7 @@ describe('queries', () => { id1 UUID ) ENGINE=MergeTree(date, date, 8192) `).toPromise(); + expect(r).to.be.ok(); const rows = [ @@ -552,6 +555,101 @@ describe('queries', () => { expect(r3).to.eql(rows); }); + it('insert * as JSONEachRow with array', async () => { + clickhouse.sessionId = Date.now(); + + await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise(); + const r = await clickhouse.query(` + CREATE TABLE IF NOT EXISTS test_array ( + date Date, + str String, + arr Array(String), + arr2 Array(Date), + arr3 Array(UInt8), + id1 UUID + ) ENGINE=MergeTree(date, date, 8192) + `).toPromise(); + expect(r).to.be.ok(); + + const rows = [ + { + date: '2018-01-01', + str: 'Вам, проживающим за оргией оргию,', + arr: [], + arr2: ['1985-01-02', '1985-01-03'], + arr3: [1,2,3,4,5], + id1: '102a05cb-8aaf-4f11-a442-20c3558e4384' + }, + + { + date: '2018-02-01', + str: 'It\'s apostrophe test.', + arr: ['5670000000', 'asdas dasf. It\'s apostrophe test.'], + arr2: ['1985-02-02'], + arr3: [], + id1: 'c2103985-9a1e-4f4a-b288-b292b5209de1' + } + ]; + + const r2 = await clickhouse.insert( + `insert into test_array + (*) FORMAT JSONEachRow`, + rows + ).toPromise(); + expect(r2).to.be.ok(); + const r3 = await clickhouse.query('SELECT * FROM test_array ORDER BY date').toPromise(); + expect(r3).to.eql(rows); + }); + + it('insert * as JSONEachRow with stream', async () => { + clickhouse.sessionId = Date.now(); + + await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise(); + const r = await clickhouse.query(` + CREATE TABLE IF NOT EXISTS test_array ( + date Date, + str String, + arr Array(String), + arr2 Array(Date), + arr3 Array(UInt8), + id1 UUID + ) ENGINE=MergeTree(date, date, 8192) + `).toPromise(); + expect(r).to.be.ok(); + + const rows = [ + { + date: '2018-01-01', + str: 'Вам, проживающим за оргией оргию,', + arr: [], + arr2: ['1985-01-02', '1985-01-03'], + arr3: [1,2,3,4,5], + id1: '102a05cb-8aaf-4f11-a442-20c3558e4384' + }, + + { + date: '2018-02-01', + str: 'It\'s apostrophe test.', + arr: ['5670000000', 'asdas dasf. It\'s apostrophe test.'], + arr2: ['1985-02-02'], + arr3: [], + id1: 'c2103985-9a1e-4f4a-b288-b292b5209de1' + } + ]; + + const stream = await clickhouse.insert( + `insert into test_array + (*) FORMAT JSONEachRow` + ).stream(); + stream.writeRow(rows[0]); + stream.writeRow(rows[1]); + const r2 = await stream.exec(); + expect(r2).to.be.ok(); + + const r3 = await clickhouse.query('SELECT * FROM test_array ORDER BY date').toPromise(); + expect(r3).to.eql(rows); + }); + it('insert field as raw string', async () => { clickhouse.sessionId = Date.now();