Skip to main content

Data Stream

:DataStream : import("stream").PassThrough

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Use as:

const { DataStream } = require("scramjet");

await DataStream.from(aStream) // create a DataStream
.map(findInFiles) // read some data asynchronously
.map(sendToAPI) // send the data somewhere
.run(); // wait until end

Kind: static class Extends: import("stream").PassThrough Test: test/methods/data-stream-constructor.js

new DataStream([opts])

Create the DataStream.

ParamTypeDefaultDescription
[opts]DataStreamOptionsStream options passed to superclass

dataStream.map(func, [ClassType]) ↺

Transforms stream objects into new ones, just like Array.prototype.map does.

Map takes an argument which is the Function function operating on every element of the stream. If the function returns a Promise or is an AsyncFunction then the stream will await for the outcome of the operation before pushing the data forwards.

A simple example that turns stream of urls into stream of responses

stream.map(async (url) => fetch(url));

Multiple subsequent map operations (as well as filter, do, each and other simple ops) will be merged together into a single operation to improve performance. Such behaviour can be suppressed by chaining .tap() after .map().

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-map.js

ParamTypeDefaultDescription
funcMapCallbackThe function that creates the new object
[ClassType]functionthis.constructorThe class to be mapped to.

dataStream.filter(func) ↺

Filters object based on the function outcome, just like Array.prototype.filter.

Filter takes a Function argument which should be a Function or an AsyncFunction that will be called on each stream item. If the outcome of the operation is falsy (0, '', false, null or undefined) the item will be filtered from subsequent operations and will not be pushed to the output of the stream. Otherwise the item will not be affected.

A simple example that filters out non-2xx responses from a stream

stream.filter(({ statusCode }) => !(statusCode >= 200 && statusCode < 300));

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-filter.js

ParamTypeDescription
funcFilterCallbackThe function that filters the object

dataStream.reduce(func, into) ⇄

Reduces the stream into a given accumulator

Works similarly to Array.prototype.reduce, so whatever you return in the former operation will be the first operand to the latter. The result is a promise that's resolved with the return value of the last transform executed.

A simple example that sums values from a stream

stream.reduce((accumulator, { value }) => accumulator + value);

This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

Kind: instance method of DataStream Test: test/methods/data-stream-reduce.js

ParamTypeDescription
funcReduceCallbackThe into object will be passed as the first argument, the data object from the stream as the second.
intoobjectAny object passed initially to the transform function

dataStream.do(func) ↺

Perform an asynchronous operation without changing or resuming the stream.

In essence the stream will use the call to keep the backpressure, but the resolving value has no impact on the streamed data (except for possible mutation of the chunk itself)

Kind: instance method of DataStream Chainable

ParamTypeDescription
funcDoCallbackthe async function

dataStream.all(functions) ↺

Processes a number of functions in parallel, returns a stream of arrays of results.

This method is to allow running multiple asynchronous operations and receive all the results at one, just like Promise.all behaves.

Keep in mind that if one of your methods rejects, this behaves just like Promise.all you won't be able to receive partial results.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-all.js

ParamTypeDescription
functionsArray.<function()>list of async functions to run

dataStream.race(functions) ↺

Processes a number of functions in parallel, returns the first resolved.

This method is to allow running multiple asynchronous operations awaiting just the result of the quickest to execute, just like Promise.race behaves.

Keep in mind that if one of your methods it will only raise an error if that was the first method to reject.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-race.js

ParamTypeDescription
functionsArray.<function()>list of async functions to run

dataStream.unorder(func)

Allows processing items without keeping order

This method useful if you are not concerned about the order in which the chunks are being pushed out of the operation. The maxParallel option is still used for keeping a number of simultaneous number of parallel operations that are currently happening.

Kind: instance method of DataStream

ParamTypeDescription
funcMapCallbackthe async function that will be unordered

dataStream.into(func, into) ↺

Allows own implementation of stream chaining.

The async Function is called on every chunk and should implement writes in it's own way. The resolution will be awaited for flow control. The passed into argument is passed as the first argument to every call.

It returns the DataStream passed as the second argument.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-into.js

ParamTypeDescription
funcIntoCallbackthe method that processes incoming chunks
intoDataStreamthe DataStream derived class

dataStream.use(func) ↺

Calls the passed method in place with the stream as first argument, returns result.

The main intention of this method is to run scramjet modules - transforms that allow complex transforms of streams. These modules can also be run with Scramjet-CLI directly from the command line.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-use.js

ParamTypeDescription
funcAsyncGeneratorFunction | GeneratorFunction | UseCallback | string | Readableif passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain. Alternatively this can be a relative path to a scramjet-module. Lastly it can be a Transform stream.
...parametersArray.<any>any additional parameters top be passed to the module

dataStream.run() ⇄

Consumes all stream items doing nothing. Resolves when the stream is ended.

This is very convienient if you're looking to use up the stream in operations that work on each entry like map. This uncorks the stream and allows all preceding operations to be run at any speed.

All the data of the current stream will be discarded.

The function returns a promise that is resolved when the stream ends.

Kind: instance method of DataStream

dataStream.tap() ↺

Stops merging transform Functions at the current place in the command chain.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-tap.js

dataStream.whenRead() ⇄

Reads a chunk from the stream and resolves the promise when read.

Kind: instance method of DataStream

dataStream.whenWrote(chunk) ⇄

Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.

Kind: instance method of DataStream

ParamTypeDescription
chunk*a chunk to write
...moreArray.<any>more chunks to write

dataStream.whenEnd() ⇄

Resolves when stream ends - rejects on uncaught error

Kind: instance method of DataStream

dataStream.whenDrained() ⇄

Returns a promise that resolves when the stream is drained

Kind: instance method of DataStream

dataStream.whenError() ⇄

Returns a promise that resolves (!) when the stream is errors

Kind: instance method of DataStream

dataStream.setOptions(options) ↺

Allows resetting stream options.

It's much easier to use this in chain than constructing new stream:

stream.map(myMapper).filter(myFilter).setOptions({ maxParallel: 2 });

Kind: instance method of DataStream Chainable Meta.conditions: keep-order,chain

ParamType
optionsDataStreamOptions

dataStream.copy(func) ↺

Returns a copy of the stream

Creates a new stream and pushes all the data from the current one to the new one. This can be called serveral times.

Kind: instance method of DataStream Chainable

ParamTypeDescription
funcTeeCallback | WritableThe duplicate stream will be passed as first argument.

dataStream.tee(func) ↺

Duplicate the stream

Creates a duplicate stream instance and passes it to the Function.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-tee.js

ParamTypeDescription
funcTeeCallback | WritableThe duplicate stream will be passed as first argument.

dataStream.each(func) ↺

Performs an operation on every chunk, without changing the stream

This is a shorthand for stream.on("data", func) but with flow control. Warning: this resumes the stream!

Kind: instance method of DataStream Chainable

ParamTypeDescription
funcMapCallbacka Function called for each chunk.

dataStream.while(func) ↺

Reads the stream while the function outcome is truthy.

Stops reading and emits end as soon as it finds the first chunk that evaluates to false. If you're processing a file until a certain point or you just need to confirm existence of some data, you can use it to end the stream before reaching end.

Keep in mind that whatever you piped to the stream will still need to be handled.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-while.js

ParamTypeDescription
funcFilterCallbackThe condition check

dataStream.until(func) ↺

Reads the stream until the function outcome is truthy.

Works opposite of while.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-until.js

ParamTypeDescription
funcFilterCallbackThe condition check

dataStream.catch(callback) ↺

Provides a way to catch errors in chained streams.

The handler will be called as asynchronous

  • if it resolves then the error will be muted.
  • if it rejects then the error will be passed to the next handler

If no handlers will resolve the error, an error event will be emitted

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-catch.js

ParamTypeDescription
callbackfunctionError handler (async function)

dataStream.raise(err) ⇄

Executes all error handlers and if none resolves, then emits an error.

The returned promise will always be resolved even if there are no successful handlers.

Kind: instance method of DataStream Test: test/methods/data-stream-raise.js

ParamTypeDescription
errErrorThe thrown error

dataStream.bufferify(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream Chainable Returns: BufferStream - the resulting stream Meta.noreadme: Test: test/methods/data-stream-tobufferstream.js

ParamTypeDescription
serializerMapCallbackA method that converts chunks to buffers

dataStream.stringify([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream Chainable Returns: StringStream - the resulting stream Test: test/methods/data-stream-tostringstream.js

ParamTypeDescription
[serializer]MapCallback | neverA method that converts chunks to strings

dataStream.toArray([initial]) : Array.<any> ⇄

Aggregates the stream into a single Array

In fact it's just a shorthand for reducing the stream into an Array.

Kind: instance method of DataStream

ParamTypeDefaultDescription
[initial]Array[]Array to begin with (defaults to an empty array).

dataStream.toGenerator() : Generator.<Promise.&lt;any&gt;>

Returns an async generator

Kind: instance method of DataStream Returns: Generator.<Promise.<any>> - Returns an iterator that returns a promise for each item.

dataStream.pull(pullable) : Promise.<any> ⇄

Pulls in any readable stream, resolves when the pulled stream ends.

You can also pass anything that can be passed to DataStream.from.

Does not preserve order, does not end this stream.

Kind: instance method of DataStream Returns: Promise.<any> - resolved when incoming stream ends, rejects on incoming error Test: test/methods/data-stream-pull.js

ParamTypeDescription
pullableArray | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readable
...argsArray.<any>any additional args

dataStream.shift(count, func) ↺

Shifts the first n items from the stream and pushes out the remaining ones.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-shift.js

ParamTypeDescription
countnumberThe number of items to shift.
funcShiftCallbackFunction that receives an array of shifted items

dataStream.peek(count, func) ↺

Allows previewing some of the streams data without removing them from the stream.

Important: Peek does not resume the flow.

Kind: instance method of DataStream Chainable

ParamTypeDescription
countnumberThe number of items to view before
funcShiftCallbackFunction called before other streams

dataStream.slice([start], [length]) ↺

Slices out a part of the stream to the passed Function.

Returns a stream consisting of an array of items with 0 to start omitted and length items after start included. Works similarly to Array.prototype.slice.

Takes count from the moment it's called. Any previous items will not be taken into account.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-slice.js

ParamTypeDefaultDescription
[start]number0omit this number of entries.
[length]numberInfinityget this number of entries to the resulting stream

dataStream.assign(func) ↺

Transforms stream objects by assigning the properties from the returned data along with data from original ones.

The original objects are unaltered.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-assign.js

ParamTypeDescription
funcMapCallback | objectThe function that returns new object properties or just the new properties

dataStream.empty(callback) ↺

Called only before the stream ends without passing any items

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-empty.js

ParamTypeDescription
callbackfunctionFunction called when stream ends

dataStream.unshift() ↺

Pushes any data at call time (essentially at the beginning of the stream)

This is a synchronous only function.

Kind: instance method of DataStream Chainable

ParamTypeDescription
...itemArray.<any>list of items to unshift (you can pass more items)

dataStream.endWith(item) ↺

Pushes any data at end of stream

Kind: instance method of DataStream Chainable Meta.noreadme: Test: test/methods/data-stream-endwith.js

ParamTypeDescription
item*list of items to push at end

dataStream.accumulate(func, into) : Promise.<any> ⇄

Accumulates data into the object.

Works very similarly to reduce, but result of previous operations have no influence over the accumulator in the next one.

Method works in parallel.

Kind: instance method of DataStream Returns: Promise.<any> - resolved with the "into" object on stream end. Meta.noreadme: Test: test/methods/data-stream-accumulate.js

ParamTypeDescription
funcAccumulateCallbackThe accumulation function
into*Accumulator object

dataStream.consume(func) ⇄

Deprecated

Consumes the stream by running each Function

Kind: instance method of DataStream Meta.noreadme:

ParamTypeDescription
funcConsumeCallback | AsyncGeneratorFunction | GeneratorFunctionthe consument
...argsArray.<any>additional args will be passed to generators

dataStream.reduceNow(func, into) : * ↺

Reduces the stream into the given object, returning it immediately.

The main difference to reduce is that only the first object will be returned at once (however the method will be called with the previous entry). If the object is an instance of EventEmitter then it will propagate the error from the previous stream.

This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

Kind: instance method of DataStream Chainable Returns: * - whatever was passed as into Meta.noreadme: Test: test/methods/data-stream-reduceNow.js

ParamTypeDescription
funcReduceCallbackThe into object will be passed as the first argument, the data object from the stream as the second.
into* | EventEmitterAny object passed initially to the transform function

dataStream.remap(func, [ClassType]) ↺

Remaps the stream into a new stream.

This means that every item may emit as many other items as we like.

Kind: instance method of DataStream Chainable Meta.noreadme: Test: test/methods/data-stream-remap.js

ParamTypeDefaultDescription
funcRemapCallbackA Function that is called on every chunk
[ClassType]functionthis.constructorOptional DataStream subclass to be constructed

dataStream.flatMap(func, [ClassType]) ↺

Takes any method that returns any iterable and flattens the result.

The passed Function must return an iterable (otherwise an error will be emitted). The resulting stream will consist of all the items of the returned iterables, one iterable after another.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-flatmap.js

ParamTypeDefaultDescription
funcFlatMapCallbackA Function that is called on every chunk
[ClassType]functionthis.constructorOptional DataStream subclass to be constructed
...argsArray.<any>additional args will be passed to generators

dataStream.flatten() : DataStream ↺

A shorthand for streams of arrays or iterables to flatten them.

More efficient equivalent of: .flatmap(i => i); Works on streams of async iterables too.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-flatten.js

dataStream.concat() ↺

Returns a new stream that will append the passed streams to the callee

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-concat.js

ParamTypeDescription
...streamsArray.<Readable>Streams to be injected into the current stream

dataStream.join(item) ↺

Method will put the passed object between items. It can also be a function call or generator / iterator.

If a generator or iterator is passed, when the iteration is done no items will be interweaved. Generator receives

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-join.js

ParamTypeDescription
item* | AsyncGeneratorFunction | GeneratorFunction | JoinCallbackAn object that should be interweaved between stream items
...argsArray.<any>additional args will be passed to generators

dataStream.keep([count]) ↺

Keep a buffer of n-chunks for use with @see DataStream..rewind

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-keep.js

ParamTypeDefaultDescription
[count]numberInfinityNumber of objects or -1 for all the stream

dataStream.rewind([count]) ↺

Rewinds the buffered chunks the specified length backwards. Requires a prior call to @see DataStream..keep

Kind: instance method of DataStream Chainable

ParamTypeDefaultDescription
[count]numberInfinityNumber of objects or -1 for all the buffer

dataStream.stack([count], [drop]) ↺

Returns a stream that stacks up incoming items always feeding out the newest items first. It returns the older items when read

When the stack length exceeds the given count the given drop function is awaited and used for flow control.

By default the drop function ignores and quietly disposes of items not read before overflow.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-stack.js

ParamTypeDefault
[count]number1000
[drop]function

dataStream.distribute([affinity], [clusterFunc], [options]) ↺

Distributes processing into multiple sub-processes or threads if you like.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-distribute.js Todo

  • Currently order is not kept.
  • Example test breaks travis-ci build
ParamTypeDescription
[affinity]AffinityCallback | function | numberA Function that affixes the item to specific output stream which must exist in the object for each chunk, must return a string. A number may be passed to identify how many round-robin threads to start up. Defaults to Round Robin to twice the number of CPU threads.
[clusterFunc]function | DataStreamOptionsstream transforms similar to @see DataStream#use method
[options]DataStreamOptionsOptions

dataStream.separateInto(streams, affinity) ↺

Separates stream into a hash of streams. Does not create new streams!

Kind: instance method of DataStream Chainable Meta.noreadme:

ParamTypeDescription
streamsobjectthe object hash of streams. Keys must be the outputs of the affinity function
affinityAffinityCallbackthe Function that affixes the item to specific streams which must exist in the object for each chunk.

dataStream.separate(affinity, [createOptions], [ClassType]) : MultiStream ↺

Separates execution to multiple streams using the hashes returned by the passed Function.

Calls the given Function for a hash, then makes sure all items with the same hash are processed within a single stream. Thanks to that streams can be distributed to multiple threads.

Kind: instance method of DataStream Chainable Returns: MultiStream - separated stream Meta.noreadme: Test: test/methods/data-stream-separate.js

ParamTypeDefaultDescription
affinityAffinityCallbackthe affinity function
[createOptions]DataStreamOptionsoptions to use to create the separated streams
[ClassType]functionthis.constructoroptions to use to create the separated streams

dataStream.delegate(delegateFunc, worker, [plugins]) ↺

Delegates work to a specified worker.

Kind: instance method of DataStream Chainable Meta.noreadme:

ParamTypeDefaultDescription
delegateFuncDelegateCallbackA function to be run in the sub-thread.
workerStreamWorker
[plugins]Array[]

dataStream.rate(cps, [options]) ↺

Limit the rate of the stream to a given number of chunks per second or given timeframe.

Kind: instance method of DataStream Chainable Meta.noreadme:

ParamTypeDefaultDescription
cpsnumberChunks per timeframe, the default timeframe is 1000 ms.
[options]RateOptionsOptions for the limiter controlling the timeframe and time source. Both must work on same units.

dataStream.batch(count) ↺

Aggregates chunks in arrays given number of number of items long.

This can be used for micro-batch processing.

Kind: instance method of DataStream Chainable Test: test/methods/data-stream-batch.js

ParamTypeDescription
countnumberHow many items to aggregate

dataStream.timeBatch(ms, [count]) ↺

Aggregates chunks to arrays not delaying output by more than the given number of ms.

Kind: instance method of DataStream Chainable Meta.noreadme: Test: test/methods/data-stream-timebatch.js

ParamTypeDescription
msnumberMaximum amount of milliseconds
[count]numberMaximum number of items in batch (otherwise no limit)

dataStream.nagle([size], [ms]) ↺

Performs the Nagle's algorithm on the data. In essence it waits until we receive some more data and releases them in bulk.

Kind: instance method of DataStream Chainable Meta.noreadme: Todo

  • needs more work, for now it's simply waiting some time, not checking the queues.
ParamTypeDefaultDescription
[size]number32maximum number of items to wait for
[ms]number10milliseconds to wait for more data

dataStream.window(length) : WindowStream ↺

Returns a WindowStream of the specified length

Kind: instance method of DataStream Chainable Returns: WindowStream - a stream of array's Meta.noreadme:

ParamType
lengthnumber

dataStream.toJSONArray([enclosure]) : StringStream ↺

Transforms the stream to a streamed JSON array.

Kind: instance method of DataStream Chainable Meta.noreadme: Test: test/methods/data-stream-tojsonarray.js

ParamTypeDefaultDescription
[enclosure]Iterable.<any>'[]'Any iterable object of two items (beginning and end)

dataStream.toJSONObject([entryCallback], [enclosure]) : StringStream ↺

Transforms the stream to a streamed JSON object.

Kind: instance method of DataStream Chainable Meta.noreadme: Meta.noreadme: Test: test/methods/data-stream-tojsonobject.js

ParamTypeDefaultDescription
[entryCallback]MapCallbackasync function returning an entry (array of [key, value])
[enclosure]Iterable.<any>''Any iterable object of two items (beginning and end)

dataStream.JSONStringify([endline]) : StringStream ↺

Returns a StringStream containing JSON per item with optional end line

Kind: instance method of DataStream Chainable Returns: StringStream - output stream Meta.noreadme: Test: test/methods/data-stream-jsonstringify.js

ParamTypeDefaultDescription
[endline]Boolean | stringos.EOLwhether to add endlines (boolean or string as delimiter)

dataStream.CSVStringify([options]) : StringStream ↺

Stringifies CSV to DataString using 'papaparse' module.

Kind: instance method of DataStream Chainable Returns: StringStream - stream of parsed items Test: test/methods/data-stream-csv.js

ParamTypeDefaultDescription
[options]objectoptions for the papaparse.unparse module.

dataStream.exec(command, [options])

Executes a given sub-process with arguments and pipes the current stream into it while returning the output as another DataStream.

Pipes the current stream into the sub-processes stdin. The data is serialized and deserialized as JSON lines by default. You can provide your own alternative methods in the ExecOptions object.

Note: if you're piping both stderr and stdout (options.stream=3) keep in mind that chunks may get mixed up!

Kind: instance method of DataStream Test: test/methods/data-stream-exec.js

ParamTypeDefaultDescription
commandstringcommand to execute
[options]ExecDataOptions | anyoptions to be passed to spawn and defining serialization.
...argsArray.<string>additional args will be passed to function

dataStream.debug(func) : DataStream ↺

Injects a debugger statement when called.

Kind: instance method of DataStream Chainable Returns: DataStream - self Meta.noreadme: Test: test/methods/data-stream-debug.js

ParamTypeDescription
funcfunctionif passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain

dataStream.toBufferStream(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream Chainable Returns: BufferStream - the resulting stream Meta.noreadme: Test: test/methods/data-stream-tobufferstream.js

ParamTypeDescription
serializerMapCallbackA method that converts chunks to buffers

dataStream.toStringStream([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream Chainable Returns: StringStream - the resulting stream Test: test/methods/data-stream-tostringstream.js

ParamTypeDescription
[serializer]MapCallback | neverA method that converts chunks to strings

dataStream.toBufferStream(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream Chainable Returns: BufferStream - the resulting stream Meta.noreadme: Test: test/methods/data-stream-tobufferstream.js

ParamTypeDescription
serializerMapCallbackA method that converts chunks to buffers

dataStream.toStringStream([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream Chainable Returns: StringStream - the resulting stream Test: test/methods/data-stream-tostringstream.js

ParamTypeDescription
[serializer]MapCallback | neverA method that converts chunks to strings

DataStream:from(input, [options]) : DataStream

Returns a DataStream from pretty much anything sensibly possible.

Depending on type:

  • self will return self immediately
  • Readable stream will get piped to the current stream with errors forwarded
  • Array will get iterated and all items will be pushed to the returned stream. The stream will also be ended in such case.
  • GeneratorFunction will get executed to return the iterator which will be used as source for items
  • AsyncGeneratorFunction will also work as above (including generators) in node v10.
  • Iterables iterator will be used as a source for streams

You can also pass a Function or AsyncFunction that will be executed and it's outcome will be passed again to from and piped to the initially returned stream. Any additional arguments will be passed as arguments to the function.

If a String is passed, scramjet will attempt to resolve it as a module and use the outcome as an argument to from as in the Function case described above. For more information see modules

A simple example from a generator:

DataStream.from(function* () {
while (x < 100) yield { x: x++ };
}).each(console.log);
// {x: 0}
// {x: 1}
// ...
// {x: 99}

Kind: static method of DataStream

ParamTypeDefaultDescription
inputArray | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Promise.<any> | function | string | Readableargument to be turned into new stream
[options]DataStreamOptions | Writableoptions for creation of a new stream or the target stream
...argsArray.<any>additional arguments for the stream - will be passed to the function or generator

DataStream:pipeline(readable) : DataStream

Creates a pipeline of streams and returns a scramjet stream.

This is similar to node.js stream pipeline method, but also takes scramjet modules as possibilities in an array of transforms. It may be used to run a series of non-scramjet transform streams.

The first argument is anything streamable and will be sanitized by DataStream.from.

Each following argument will be understood as a transform and can be any of:

  • AsyncFunction or Function - will be executed by DataStream.use
  • A transform stream that will be piped to the preceding stream

Kind: static method of DataStream Returns: DataStream - a new DataStream instance of the resulting pipeline

ParamTypeDescription
readableArray | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readablethe initial readable argument that is streamable by scramjet.from
...transformsArray.<(AsyncFunction|function()|Transform)>Transform functions (as in DataStream..use) or Transform streams (any number of these as consecutive arguments)

DataStream:fromArray(array, [options]) : DataStream

Create a DataStream from an Array

Kind: static method of DataStream Test: test/methods/data-stream-fromarray.js

ParamTypeDefaultDescription
arrayArray.<*>list of chunks
[options]DataStreamOptionsthe read stream options

DataStream:fromIterator(iterator, [options]) : DataStream

Create a DataStream from an Iterator

Doesn't end the stream until it reaches end of the iterator.

Kind: static method of DataStream Test: test/methods/data-stream-fromiterator.js

ParamTypeDefaultDescription
iteratorIterator.<any>the iterator object
[options]DataStreamOptionsthe read stream options

Was it helpful?

Didn't find information needed?

Join our Scramjet Community on Discord, where you can get help from our engineers directly.