Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

J-Sek implementation (Bun workers) #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions calculate_average_J-Sek.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

time bun run src/main/bun/J-Sek/index.ts measurements.txt
177 changes: 177 additions & 0 deletions src/main/bun/J-Sek/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore

# Logs

logs
_.log
npm-debug.log_
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*

# Caches

.cache

# Diagnostic reports (https://nodejs.org/api/report.html)

report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json

# Runtime data

pids
_.pid
_.seed
*.pid.lock

# Directory for instrumented libs generated by jscoverage/JSCover

lib-cov

# Coverage directory used by tools like istanbul

coverage
*.lcov

# nyc test coverage

.nyc_output

# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)

.grunt

# Bower dependency directory (https://bower.io/)

bower_components

# node-waf configuration

.lock-wscript

# Compiled binary addons (https://nodejs.org/api/addons.html)

build/Release

# Dependency directories

node_modules/
jspm_packages/

# Snowpack dependency directory (https://snowpack.dev/)

web_modules/

# TypeScript cache

*.tsbuildinfo

# Optional npm cache directory

.npm

# Optional eslint cache

.eslintcache

# Optional stylelint cache

.stylelintcache

# Microbundle cache

.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/

# Optional REPL history

.node_repl_history

# Output of 'npm pack'

*.tgz

# Yarn Integrity file

.yarn-integrity

# dotenv environment variable files

.env
.env.development.local
.env.test.local
.env.production.local
.env.local

# parcel-bundler cache (https://parceljs.org/)

.parcel-cache

# Next.js build output

.next
out

# Nuxt.js build / generate output

.nuxt
dist

# Gatsby files

# Comment in the public line in if your project uses Gatsby and not Next.js

# https://nextjs.org/blog/next-9-1#public-directory-support

# public

# vuepress build output

.vuepress/dist

# vuepress v2.x temp and cache directory

.temp

# Docusaurus cache and generated files

.docusaurus

# Serverless directories

.serverless/

# FuseBox cache

.fusebox/

# DynamoDB Local files

.dynamodb/

# TernJS port file

.tern-port

# Stores VSCode versions used for testing VSCode extensions

.vscode-test

# yarn v2

.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*

# IntelliJ based IDEs
.idea

# Finder (MacOS) folder config
.DS_Store

bun.lockb
15 changes: 15 additions & 0 deletions src/main/bun/J-Sek/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# 1brc

To install dependencies:

```bash
bun install
```

To run:

```bash
bun run index.ts
```

This project was created using `bun init` in bun v1.0.21. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime.
90 changes: 90 additions & 0 deletions src/main/bun/J-Sek/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import * as os from "node:os";
import { createReadStream } from "node:fs";

const fileName = Bun.argv[2];

const fileSize = Bun.file(fileName).size;

const workerCount = os.cpus().length - 1;

let lastOffset = 0;
const tasks = [];
while (lastOffset < fileSize) {
const start = lastOffset;
let end = Math.min(fileSize, start + 100 + fileSize / workerCount);
if (end < fileSize) {
end = await moveToClosestLineBreak(end);
}
tasks.push(
new Promise<Map<string, CityValues>>((resolve, reject) => {
const worker = new Worker(new URL("worker.ts", import.meta.url).href);
worker.postMessage({ fileName, start, end });
worker.onmessage = (e) => resolve(e.data);
worker.onerror = (e) => { console.error(e); reject(); };
})
);
lastOffset = end;
}

const groups = await Promise.all(tasks);
const cities = aggregate(groups);
printResults();

type CityValues = { min: number; max: number; avg: number; count: number };

function aggregate(groups: Map<string, CityValues>[]) {
const [firstGroup, ...rest] = groups;
const allCities = firstGroup;

rest.forEach(g => {
[...g.entries()].forEach(([city, gv]) => {
if (!allCities.has(city)) {
allCities.set(city, gv);
} else {
const values = allCities.get(city)!;
if (values.min > gv.min) values.min = gv.min;
if (values.max < gv.max) values.max = gv.max;
const newCount = values.count + gv.count;
if ((values.count + gv.count) < (Number.MAX_VALUE / 100)) {
// hack in condition above to avoid int overflow
values.avg = (values.avg * values.count + gv.avg * gv.count) / newCount;
values.count = newCount;
}
}
})
})

return allCities;
}

function printResults() {
const result =
"{" +
[...cities.keys()]
.toSorted()
.map((city) => {
const values = cities.get(city)!;
const out = [
values.min.toFixed(1),
values.avg.toFixed(1),
values.max.toFixed(1),
].join("/");
return [city, out].join("=");
})
.join(", ") +
"}";

console.log(result);
}

function moveToClosestLineBreak(start: number): Promise<number> {
return new Promise((resolve) => {
const stream = createReadStream(fileName, { start, end: start + 128 });
stream.on('readable', () => {
const chunk = stream.read(128);
const offset = chunk.indexOf("\n".charCodeAt(0));
resolve(start + offset);
stream.close();
});
});
}
11 changes: 11 additions & 0 deletions src/main/bun/J-Sek/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "1brc",
"module": "index.ts",
"type": "module",
"devDependencies": {
"@types/bun": "latest"
},
"peerDependencies": {
"typescript": "^5.0.0"
}
}
22 changes: 22 additions & 0 deletions src/main/bun/J-Sek/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"compilerOptions": {
"lib": ["ESNext"],
"target": "ESNext",
"module": "ESNext",
"moduleDetection": "force",
"jsx": "react-jsx",
"allowJs": true,

/* Bundler mode */
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"noEmit": true,

/* Linting */
"skipLibCheck": true,
"strict": true,
"noFallthroughCasesInSwitch": true,
"forceConsistentCasingInFileNames": true
}
}
72 changes: 72 additions & 0 deletions src/main/bun/J-Sek/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as fs from 'node:fs';

declare var self: Worker;

const cities = new Map<string, CityValues>();
type CityValues = { min: number; max: number; avg: number; count: number };

self.onmessage = async (event: MessageEvent) => {
const { fileName, start, end } = event.data;
await readStream(
fs.createReadStream(fileName, { start, end }),
acceptLine,
() => {
postMessage(cities);
process.exit();
}
);
};

function acceptLine(line: string) {
if (line.length < 3) return; // skip empty lines

const [city, value] = line.split(";");
const v = Number(value);

if (!cities.has(city)) {
cities.set(city, { min: v, max: v, avg: v, count: 1 });
} else {
const values = cities.get(city)!;

if (values.min > v) values.min = v;
if (values.max < v) values.max = v;

// hack: average is not expected to change after certain number of measurements
if (values.count < Number.MAX_VALUE / 100) {
values.avg = (values.avg * values.count + v) / (values.count + 1);
}
values.count += 1;
}
}

async function readStream(
stream: fs.ReadStream,
accept: (line: string) => void,
done: () => void
) {
const lineBreak = "\n".charCodeAt(0);
const textDecoder = new TextDecoder();

// max line length is 128
let lineBuffer = new Uint8Array(128);
let lineBufferLen = 0;

for await (const chunk of stream) {
for (let i = 0; i < chunk.length; i++) {
if (chunk[i] === lineBreak) {
const line = textDecoder.decode(lineBuffer.slice(0, lineBufferLen));
lineBufferLen = 0;
accept(line);
} else {
lineBuffer[lineBufferLen] = chunk[i];
lineBufferLen++;
}
}
}

if (lineBufferLen > 0) {
accept(textDecoder.decode(lineBuffer.slice(0, lineBufferLen)));
}

done();
}