Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,32 @@ insert array of objects:
```
***

insert with format JSONEachRow:
```javascript

// rows - array of objects as in previous example

await clickhouse.insert(
`INSERT INTO test_array
(*) FORMAT JSONEachRow`,
rows
).toPromise();

// or with stream

const stream = await clickhouse.insert(
`insert into test_array
(*) FORMAT JSONEachRow`
).stream();

for (const row of rows){
stream.writeRow(row);
}
await stream.exec();

```
***

Parameterized Values:
```javascript
const rows = await clickhouse.query(
Expand Down
51 changes: 41 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const SEPARATORS = {
};

const ALIASES = {
TabSeparated: "TSV"
TabSeparated: "TSV",
};

var ESCAPE_STRING = {
Expand Down Expand Up @@ -66,12 +66,14 @@ const PORT = 8123;
const DATABASE = 'default';

const FORMAT_NAMES = {
JSONEachRow: 'JSONEachRow',
JSON: 'json',
TSV: 'tsv',
CSV: 'csv'
}

const FORMATS = {
[FORMAT_NAMES.JSONEachRow]: 'JSONEachRow',
[FORMAT_NAMES.JSON]: 'JSON',
[FORMAT_NAMES.TSV]: 'TabSeparatedWithNames',
[FORMAT_NAMES.CSV]: 'CSVWithNames',
Expand Down Expand Up @@ -102,6 +104,13 @@ function parseTSV(body, options = { header: true }) {
return data;
}

function parseJSONEachRow(body) {
const data = body.split('\n').map((str)=>str?JSON.parse(str):{});
data.splice(data.length - 1, 1);
return data;
}


function parseCSVStream(s = new Set()) {
let isFirst = true;
let ref = {
Expand Down Expand Up @@ -253,10 +262,12 @@ function isObject(obj) {


class Rs extends Transform {
constructor(reqParams) {
constructor(reqParams, opts) {
super();

const me = this;

me.opts = opts;

me.ws = request.post(reqParams);

Expand Down Expand Up @@ -290,7 +301,11 @@ class Rs extends Transform {
} else if (Array.isArray(data)) {
row = ClickHouse.mapRowAsArray(data);
} else if (isObject(data)) {
throw new Error('Error: Inserted data must be an array, not an object.');
if (this.opts.format === FORMAT_NAMES.JSONEachRow) {
row = ClickHouse.mapRowAsJSONObject(data);
} else {
throw new Error('Error: Inserted data must be an array, not an object.');
}
}

let isOk = this.write(
Expand Down Expand Up @@ -425,7 +440,9 @@ class QueryCursor {
if (m) {
fieldList = m[2].split(',').map(s => s.trim());
} else {
throw new Error('insert query wasnt parsed field list after TABLE_NAME');
if (me.format === FORMAT_NAMES.TSV) {
throw new Error('insert query wasnt parsed field list after TABLE_NAME');
}
}
}

Expand All @@ -434,7 +451,13 @@ class QueryCursor {
return row;
}
if (isFirstElObject) {
return ClickHouse.mapRowAsObject(fieldList, row);
if (me.format === FORMAT_NAMES.TSV) {
return ClickHouse.mapRowAsTabSeparatedObject(fieldList, row);
} else if (me.format === FORMAT_NAMES.JSONEachRow) {
return ClickHouse.mapRowAsJSONObject(row);
} else {
throw new Error('Object should be serialized as TabSeparated or JSONEachRow format');
}
} else {
return ClickHouse.mapRowAsArray(row);
}
Expand Down Expand Up @@ -552,8 +575,11 @@ class QueryCursor {
if (data && Array.isArray(data) && data.every(d => typeof d === 'string')) {
params['body'] = me._getBodyForInsert();
}
} else {
query += ' FORMAT TabSeparated';
} else {
if (!R_FORMAT_PARSER.test(query)) {
query += ' FORMAT TabSeparated';
me.format = FORMAT_NAMES.TSV;
}

if (data) {
params['body'] = me._getBodyForInsert();
Expand Down Expand Up @@ -646,6 +672,9 @@ class QueryCursor {
getBodyParser() {
if (this.format === FORMAT_NAMES.JSON) {
return JSON.parse;
}
if (this.format === FORMAT_NAMES.JSONEachRow) {
return parseJSONEachRow;
}

if (this.format === FORMAT_NAMES.TSV) {
Expand Down Expand Up @@ -699,7 +728,7 @@ class QueryCursor {
const reqParams = me._getReqParams();

if (me.isInsert) {
const rs = new Rs(reqParams);
const rs = new Rs(reqParams, me.opts);
rs.query = me.query;

me._request = rs;
Expand Down Expand Up @@ -986,14 +1015,16 @@ class ClickHouse {
.join('\t');
}

static mapRowAsObject(fieldList, row) {
static mapRowAsTabSeparatedObject(fieldList, row) {
return fieldList
.map(f => {
return encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated');
})
.join('\t');
}

static mapRowAsJSONObject(row) {
return JSON.stringify(row);
}
static getFullFormatName(format = '') {
if ( ! FORMATS[format]) {
throw new Error(`Clickhouse.getFullFormatName: unknown format "${format}`);
Expand Down
98 changes: 98 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ describe('queries', () => {
it('insert field as array', async () => {
clickhouse.sessionId = Date.now();

await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise();

const r = await clickhouse.query(`
CREATE TABLE IF NOT EXISTS test_array (
date Date,
Expand All @@ -519,6 +521,7 @@ describe('queries', () => {
id1 UUID
) ENGINE=MergeTree(date, date, 8192)
`).toPromise();

expect(r).to.be.ok();

const rows = [
Expand Down Expand Up @@ -552,6 +555,101 @@ describe('queries', () => {
expect(r3).to.eql(rows);
});

it('insert * as JSONEachRow with array', async () => {
clickhouse.sessionId = Date.now();

await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise();
const r = await clickhouse.query(`
CREATE TABLE IF NOT EXISTS test_array (
date Date,
str String,
arr Array(String),
arr2 Array(Date),
arr3 Array(UInt8),
id1 UUID
) ENGINE=MergeTree(date, date, 8192)
`).toPromise();
expect(r).to.be.ok();

const rows = [
{
date: '2018-01-01',
str: 'Вам, проживающим за оргией оргию,',
arr: [],
arr2: ['1985-01-02', '1985-01-03'],
arr3: [1,2,3,4,5],
id1: '102a05cb-8aaf-4f11-a442-20c3558e4384'
},

{
date: '2018-02-01',
str: 'It\'s apostrophe test.',
arr: ['5670000000', 'asdas dasf. It\'s apostrophe test.'],
arr2: ['1985-02-02'],
arr3: [],
id1: 'c2103985-9a1e-4f4a-b288-b292b5209de1'
}
];

const r2 = await clickhouse.insert(
`insert into test_array
(*) FORMAT JSONEachRow`,
rows
).toPromise();
expect(r2).to.be.ok();
const r3 = await clickhouse.query('SELECT * FROM test_array ORDER BY date').toPromise();
expect(r3).to.eql(rows);
});

it('insert * as JSONEachRow with stream', async () => {
clickhouse.sessionId = Date.now();

await clickhouse.query('DROP TABLE IF EXISTS test_array',).toPromise();
const r = await clickhouse.query(`
CREATE TABLE IF NOT EXISTS test_array (
date Date,
str String,
arr Array(String),
arr2 Array(Date),
arr3 Array(UInt8),
id1 UUID
) ENGINE=MergeTree(date, date, 8192)
`).toPromise();
expect(r).to.be.ok();

const rows = [
{
date: '2018-01-01',
str: 'Вам, проживающим за оргией оргию,',
arr: [],
arr2: ['1985-01-02', '1985-01-03'],
arr3: [1,2,3,4,5],
id1: '102a05cb-8aaf-4f11-a442-20c3558e4384'
},

{
date: '2018-02-01',
str: 'It\'s apostrophe test.',
arr: ['5670000000', 'asdas dasf. It\'s apostrophe test.'],
arr2: ['1985-02-02'],
arr3: [],
id1: 'c2103985-9a1e-4f4a-b288-b292b5209de1'
}
];

const stream = await clickhouse.insert(
`insert into test_array
(*) FORMAT JSONEachRow`
).stream();
stream.writeRow(rows[0]);
stream.writeRow(rows[1]);
const r2 = await stream.exec();
expect(r2).to.be.ok();

const r3 = await clickhouse.query('SELECT * FROM test_array ORDER BY date').toPromise();
expect(r3).to.eql(rows);
});

it('insert field as raw string', async () => {
clickhouse.sessionId = Date.now();

Expand Down