Skip to main content

Framework Reference

Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. First create a stream from a source:

Use DataStream.from(someThing) to create a new stream from an Array, Generator, AsyncGenerator, Iterator or Readable stream. See the DataStream.from documentation for more information, here's a sample.

/* global StringStream, fs */
StringStream.from(fs.createReadStream("./log.txt")) // get from any readable stream
.lines() // split the stream by line
.use("./your-file"); // use some transforms from another file

Use DataStream.pipeline(readable, transforms) to create a pipeline of transform streams and/or stream modules. Any number of consecutive arguments will get piped one into another.

/* global StringStream, fs, gzip */
StringStream.pipeline(
// process a number of streams
fs.createReadStream("./log.txt.gz"),
gzip.unzip() // all errors here will get forwarded
)
.lines() // split the stream by line
.use("./your-file"); // use some transforms from another file

Some methods like from, use, flatMap allow using ES6 generators and ES7 async generators:

const fetch = require("node-fetch");
const { StringStream } = require("scramjet");

StringStream.from(
async function* () {
// construct a stream from an async generator
yield "houses\n"; // yield - push a stream chunk
// yield - push a whole stream
yield* (await fetch("https://example.org/categories")).body;
},
{ maxParallel: 4 } // set your options
)
.lines() // split the stream by line
.flatMap(async function* (category) {
const req = await fetch(`https://example.org/posts/${category}/`);
yield* await req.json(); // yield - push a whole array
})
.catch((err) => `! Error occured ${err.uri}`)
.toStringStream()
.append("\n")
.pipe(process.stdout); // pipe to any output

Most transformations are done by passing a transform function. You can write your function in three ways:

  1. Synchronous

Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.

DataStream.from(items).map((item) => ({
id: item.id,
length: item.value.length,
}));
  1. Asynchronous using ES2015 async await

Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream

StringStream.from(urls)
.map(async (url) => fetch(url).then((res) => res.json()))
.JSONParse();
  1. Asynchronous using Promises

Example: A simple stream that fetches an url mentioned in the incoming object

datastream.map(
(item) =>
new Promise((resolve, reject) => {
request(item.url, (err, res, data) => {
if (err) reject(err);
// will emit an "error" event on the stream
else resolve(data);
});
})
);

The actual logic of this transform function is as if you passed your function to the then method of a Promise resolved with the data from the input stream.

  1. Streams with multi-threading

To distribute your code among the processor cores, just use the method distribute:

datastream.distribute(
16, // number of threads
(stream) => {
// multi-threaded code goes here.
// it MUST return a valid stream back to the main thread.
}
);

Writing modules

Scramjet allows writing simple modules that are resolved in the same way as node's require. A module is a simple javascript file that exposes a function taking a stream and any number of following arguments as default export.

Here's an example:

module.exports = (stream, arg1) => {
const mapper = (chunk) => mapper(chunk, arg1);
return stream.map(mapper);
};

Then it can be used with 'DataStream.use function like this:

myStream.use("./path/to/my-module", "arg1");

If these modules are published you can also simply use myStream.use('published-module').

For more universal modules you can use helper methods createTransformModule and createReadModule that scramjet exports. See more in about this in this blog post Scramjet Modules .

Typescript support

Scramjet aims to be fully documented and expose TypeScript declarations. First version to include definitions in .d.ts folder is 4.15.0. More TypeScript support will be added with next versions, so feel free to report issues in GitHub.

Detailed docs

Here's the list of the exposed classes and methods, please review the specific documentation for details:

Note that:

  • Most of the methods take a Function argument that operates on the stream items.
  • The Function, unless it's stated otherwise, will receive an argument with the next chunk.
  • If you want to perform your operations asynchronously, return a Promise, otherwise just return the right value.

Quick reference of some methods

:DataStream

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Use as:

const { DataStream } = require("scramjet");

await DataStream.from(aStream) // create a DataStream
.map(findInFiles) // read some data asynchronously
.map(sendToAPI) // send the data somewhere
.run(); // wait until end

For detailed :DataStream documentation

Most popular methods:

StringStream

A stream of string objects for further transformation on top of DataStream.

Example:

StringStream.from(async () =>
(await fetch("https://example.com/data/article.txt")).text()
)
.lines()
.append("\r\n")
.pipe(fs.createWriteStream("./path/to/file.txt"));

For detailed :StringStream documentation

Most popular methods:

:BufferStream

A facilitation stream created for easy splitting or parsing buffers.

Useful for working on built-in Node.js streams from files, parsing binary formats etc.

A simple use case would be:

fs.createReadStream("pixels.rgba")
.pipe(new BufferStream()) // pipe a buffer stream into scramjet
.breakup(4) // split into 4 byte fragments
.parse((buffer) => [
buffer.readInt8(0), // the output is a stream of R,G,B and Alpha
buffer.readInt8(1), // values from 0-255 in an array.
buffer.readInt8(2),
buffer.readInt8(3),
]);

For Detailed :BufferStream documentation

Most popular methods:

:MultiStream

An object consisting of multiple streams than can be refined or muxed.

The idea behind a MultiStream is being able to mux and demux streams when needed.

Usage:

new MultiStream([...streams]).mux();

new MultiStream(function* () {
yield* streams;
})
.map((stream) => stream.filter(myFilter))
.mux();

For detailed :MultiStream documentation

Most popular methods:

:NumberStream

Simple scramjet stream that by default contains numbers or other containing with valueOf method. The streams provides simple methods like sum, average. It derives from DataStream so it's still fully supporting all map, reduce etc.

For detailed :NumberStream documenation

Most popular methods:

:WindowStream

A stream for moving window calculation with some simple methods.

In essence it's a stream of Array's containing a list of items - a window. It's best used when created by the `DataStream..window`` method.

for detailed :WindowStream documentation

Most popular methods:

:StreamWorker

StreamWorker class - intended for internal use

This class provides control over the subprocesses, including:

  • spawning
  • communicating
  • delivering streams

For detailed :StreamWorker documentation

Most popular methods:

Was it helpful?

Didn't find information needed?

Join our Scramjet Community on Discord, where you can get help from our engineers directly.