Data Stream
:DataStream : import("stream").PassThrough
DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.
Use as:
const { DataStream } = require("scramjet");
await DataStream.from(aStream) // create a DataStream
.map(findInFiles) // read some data asynchronously
.map(sendToAPI) // send the data somewhere
.run(); // wait until end
Kind: static class
Extends: import("stream").PassThrough
Test: test/methods/data-stream-constructor.js
- :DataStream
import("stream").PassThrough
- new DataStream([opts])
- dataStream.map(func, [ClassType]) ↺
- dataStream.filter(func) ↺
- dataStream.reduce(func, into)
- dataStream.do(func) ↺
- dataStream.all(functions) ↺
- dataStream.race(functions) ↺
- dataStream.unorder(func)
- dataStream.into(func, into) ↺
- dataStream.use(func) ↺
- dataStream.run()
- dataStream.tap() ↺
- dataStream.whenRead()
- dataStream.whenWrote(chunk)
- dataStream.whenEnd()
- dataStream.whenDrained()
- dataStream.whenError()
- dataStream.setOptions(options) ↺
- dataStream.copy(func) ↺
- dataStream.tee(func) ↺
- dataStream.each(func) ↺
- dataStream.while(func) ↺
- dataStream.until(func) ↺
- dataStream.catch(callback) ↺
- dataStream.raise(err)
- dataStream.bufferify(serializer) ↺
BufferStream
- dataStream.stringify([serializer]) ↺
StringStream
- dataStream.toArray([initial]) ⇄
Array.<any>
- dataStream.toGenerator()
Generator.<Promise.<any>>
- dataStream.pull(pullable) ⇄
Promise.<any>
- dataStream.shift(count, func) ↺
- dataStream.peek(count, func) ↺
- dataStream.slice([start], [length]) ↺
- dataStream.assign(func) ↺
- dataStream.empty(callback) ↺
- dataStream.unshift() ↺
- dataStream.endWith(item) ↺
- dataStream.accumulate(func, into) ⇄
Promise.<any>
dataStream.consume(func)- dataStream.reduceNow(func, into) ↺
*
- dataStream.remap(func, [ClassType]) ↺
- dataStream.flatMap(func, [ClassType]) ↺
- dataStream.flatten() ↺
DataStream
- dataStream.concat() ↺
- dataStream.join(item) ↺
- dataStream.keep([count]) ↺
- dataStream.rewind([count]) ↺
- dataStream.stack([count], [drop]) ↺
- dataStream.distribute([affinity], [clusterFunc], [options]) ↺
- dataStream.separateInto(streams, affinity) ↺
- dataStream.separate(affinity, [createOptions], [ClassType]) ↺
MultiStream
- dataStream.delegate(delegateFunc, worker, [plugins]) ↺
- dataStream.rate(cps, [options]) ↺
- dataStream.batch(count) ↺
- dataStream.timeBatch(ms, [count]) ↺
- dataStream.nagle([size], [ms]) ↺
- dataStream.window(length) ↺
WindowStream
- dataStream.toJSONArray([enclosure]) ↺
StringStream
- dataStream.toJSONObject([entryCallback], [enclosure]) ↺
StringStream
- dataStream.JSONStringify([endline]) ↺
StringStream
- dataStream.CSVStringify([options]) ↺
StringStream
- dataStream.exec(command, [options])
- dataStream.debug(func) ↺
DataStream
- dataStream.toBufferStream(serializer) ↺
BufferStream
- dataStream.toStringStream([serializer]) ↺
StringStream
- dataStream.toBufferStream(serializer) ↺
BufferStream
- dataStream.toStringStream([serializer]) ↺
StringStream
- DataStream:from(input, [options])
DataStream
- DataStream:pipeline(readable)
DataStream
- DataStream:fromArray(array, [options])
DataStream
- DataStream:fromIterator(iterator, [options])
DataStream
new DataStream([opts])
Create the DataStream.
Param | Type | Default | Description |
---|---|---|---|
[opts] | DataStreamOptions |
| Stream options passed to superclass |
dataStream.map(func, [ClassType]) ↺
Transforms stream objects into new ones, just like Array.prototype.map does.
Map takes an argument which is the Function function operating on every element of the stream. If the function returns a Promise or is an AsyncFunction then the stream will await for the outcome of the operation before pushing the data forwards.
A simple example that turns stream of urls into stream of responses
stream.map(async (url) => fetch(url));
Multiple subsequent map operations (as well as filter, do, each and other simple ops)
will be merged together into a single operation to improve performance. Such behaviour
can be suppressed by chaining .tap()
after .map()
.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-map.js
Param | Type | Default | Description |
---|---|---|---|
func | MapCallback | The function that creates the new object | |
[ClassType] | function | this.constructor | The class to be mapped to. |
dataStream.filter(func) ↺
Filters object based on the function outcome, just like Array.prototype.filter.
Filter takes a Function argument which should be a Function or an AsyncFunction that
will be called on each stream item. If the outcome of the operation is falsy
(0
, ''
,
false
, null
or undefined
) the item will be filtered from subsequent operations
and will not be pushed to the output of the stream. Otherwise the item will not be affected.
A simple example that filters out non-2xx responses from a stream
stream.filter(({ statusCode }) => !(statusCode >= 200 && statusCode < 300));
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-filter.js
Param | Type | Description |
---|---|---|
func | FilterCallback | The function that filters the object |
dataStream.reduce(func, into) ⇄
Reduces the stream into a given accumulator
Works similarly to Array.prototype.reduce, so whatever you return in the former operation will be the first operand to the latter. The result is a promise that's resolved with the return value of the last transform executed.
A simple example that sums values from a stream
stream.reduce((accumulator, { value }) => accumulator + value);
This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.
Kind: instance method of DataStream
Test: test/methods/data-stream-reduce.js
Param | Type | Description |
---|---|---|
func | ReduceCallback | The into object will be passed as the first argument, the data object from the stream as the second. |
into | object | Any object passed initially to the transform function |
dataStream.do(func) ↺
Perform an asynchronous operation without changing or resuming the stream.
In essence the stream will use the call to keep the backpressure, but the resolving value has no impact on the streamed data (except for possible mutation of the chunk itself)
Kind: instance method of DataStream
Chainable
Param | Type | Description |
---|---|---|
func | DoCallback | the async function |
dataStream.all(functions) ↺
Processes a number of functions in parallel, returns a stream of arrays of results.
This method is to allow running multiple asynchronous operations and receive all the results at one, just like Promise.all behaves.
Keep in mind that if one of your methods rejects, this behaves just like Promise.all you won't be able to receive partial results.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-all.js
Param | Type | Description |
---|---|---|
functions | Array.<function()> | list of async functions to run |
dataStream.race(functions) ↺
Processes a number of functions in parallel, returns the first resolved.
This method is to allow running multiple asynchronous operations awaiting just the result of the quickest to execute, just like Promise.race behaves.
Keep in mind that if one of your methods it will only raise an error if that was the first method to reject.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-race.js
Param | Type | Description |
---|---|---|
functions | Array.<function()> | list of async functions to run |
dataStream.unorder(func)
Allows processing items without keeping order
This method useful if you are not concerned about the order in which the
chunks are being pushed out of the operation. The maxParallel
option is
still used for keeping a number of simultaneous number of parallel operations
that are currently happening.
Kind: instance method of DataStream
Param | Type | Description |
---|---|---|
func | MapCallback | the async function that will be unordered |
dataStream.into(func, into) ↺
Allows own implementation of stream chaining.
The async Function is called on every chunk and should implement writes in it's own way. The
resolution will be awaited for flow control. The passed into
argument is passed as the first
argument to every call.
It returns the DataStream passed as the second argument.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-into.js
Param | Type | Description |
---|---|---|
func | IntoCallback | the method that processes incoming chunks |
into | DataStream | the DataStream derived class |
dataStream.use(func) ↺
Calls the passed method in place with the stream as first argument, returns result.
The main intention of this method is to run scramjet modules - transforms that allow complex transforms of streams. These modules can also be run with Scramjet-CLI directly from the command line.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-use.js
Param | Type | Description |
---|---|---|
func | AsyncGeneratorFunction | GeneratorFunction | UseCallback | string | Readable | if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain. Alternatively this can be a relative path to a scramjet-module. Lastly it can be a Transform stream. |
...parameters | Array.<any> | any additional parameters top be passed to the module |
dataStream.run() ⇄
Consumes all stream items doing nothing. Resolves when the stream is ended.
This is very convienient if you're looking to use up the stream in operations that work on each entry like map
. This uncorks the stream
and allows all preceding operations to be run at any speed.
All the data of the current stream will be discarded.
The function returns a promise that is resolved when the stream ends.
Kind: instance method of DataStream
dataStream.tap() ↺
Stops merging transform Functions at the current place in the command chain.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-tap.js
dataStream.whenRead() ⇄
Reads a chunk from the stream and resolves the promise when read.
Kind: instance method of DataStream
dataStream.whenWrote(chunk) ⇄
Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.
Kind: instance method of DataStream
Param | Type | Description |
---|---|---|
chunk | * | a chunk to write |
...more | Array.<any> | more chunks to write |
dataStream.whenEnd() ⇄
Resolves when stream ends - rejects on uncaught error
Kind: instance method of DataStream
dataStream.whenDrained() ⇄
Returns a promise that resolves when the stream is drained
Kind: instance method of DataStream
dataStream.whenError() ⇄
Returns a promise that resolves (!) when the stream is errors
Kind: instance method of DataStream
dataStream.setOptions(options) ↺
Allows resetting stream options.
It's much easier to use this in chain than constructing new stream:
stream.map(myMapper).filter(myFilter).setOptions({ maxParallel: 2 });
Kind: instance method of DataStream
Chainable
Meta.conditions: keep-order,chain
Param | Type |
---|---|
options | DataStreamOptions |
dataStream.copy(func) ↺
Returns a copy of the stream
Creates a new stream and pushes all the data from the current one to the new one. This can be called serveral times.
Kind: instance method of DataStream
Chainable
Param | Type | Description |
---|---|---|
func | TeeCallback | Writable | The duplicate stream will be passed as first argument. |
dataStream.tee(func) ↺
Duplicate the stream
Creates a duplicate stream instance and passes it to the Function.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-tee.js
Param | Type | Description |
---|---|---|
func | TeeCallback | Writable | The duplicate stream will be passed as first argument. |
dataStream.each(func) ↺
Performs an operation on every chunk, without changing the stream
This is a shorthand for stream.on("data", func)
but with flow control.
Warning: this resumes the stream!
Kind: instance method of DataStream
Chainable
Param | Type | Description |
---|---|---|
func | MapCallback | a Function called for each chunk. |
dataStream.while(func) ↺
Reads the stream while the function outcome is truthy.
Stops reading and emits end as soon as it finds the first chunk that evaluates to false. If you're processing a file until a certain point or you just need to confirm existence of some data, you can use it to end the stream before reaching end.
Keep in mind that whatever you piped to the stream will still need to be handled.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-while.js
Param | Type | Description |
---|---|---|
func | FilterCallback | The condition check |
dataStream.until(func) ↺
Reads the stream until the function outcome is truthy.
Works opposite of while.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-until.js
Param | Type | Description |
---|---|---|
func | FilterCallback | The condition check |
dataStream.catch(callback) ↺
Provides a way to catch errors in chained streams.
The handler will be called as asynchronous
- if it resolves then the error will be muted.
- if it rejects then the error will be passed to the next handler
If no handlers will resolve the error, an error
event will be emitted
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-catch.js
Param | Type | Description |
---|---|---|
callback | function | Error handler (async function) |
dataStream.raise(err) ⇄
Executes all error handlers and if none resolves, then emits an error.
The returned promise will always be resolved even if there are no successful handlers.
Kind: instance method of DataStream
Test: test/methods/data-stream-raise.js
Param | Type | Description |
---|---|---|
err | Error | The thrown error |
dataStream.bufferify(serializer) : BufferStream ↺
Creates a BufferStream.
The passed serializer must return a buffer.
Kind: instance method of DataStream
Chainable
Returns: BufferStream
- the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js
Param | Type | Description |
---|---|---|
serializer | MapCallback | A method that converts chunks to buffers |
dataStream.stringify([serializer]) : StringStream ↺
Creates a StringStream.
The passed serializer must return a string. If no serializer is passed chunks toString method will be used.
Kind: instance method of DataStream
Chainable
Returns: StringStream
- the resulting stream
Test: test/methods/data-stream-tostringstream.js
Param | Type | Description |
---|---|---|
[serializer] | MapCallback | never | A method that converts chunks to strings |
dataStream.toArray([initial]) : Array.<any> ⇄
Aggregates the stream into a single Array
In fact it's just a shorthand for reducing the stream into an Array.
Kind: instance method of DataStream
Param | Type | Default | Description |
---|---|---|---|
[initial] | Array | [] | Array to begin with (defaults to an empty array). |
dataStream.toGenerator() : Generator.<Promise.<any>>
Returns an async generator
Kind: instance method of DataStream
Returns: Generator.<Promise.<any>>
- Returns an iterator that returns a promise for each item.
dataStream.pull(pullable) : Promise.<any> ⇄
Pulls in any readable stream, resolves when the pulled stream ends.
You can also pass anything that can be passed to DataStream.from
.
Does not preserve order, does not end this stream.
Kind: instance method of DataStream
Returns: Promise.<any>
- resolved when incoming stream ends, rejects on incoming error
Test: test/methods/data-stream-pull.js
Param | Type | Description |
---|---|---|
pullable | Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readable | |
...args | Array.<any> | any additional args |
dataStream.shift(count, func) ↺
Shifts the first n items from the stream and pushes out the remaining ones.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-shift.js
Param | Type | Description |
---|---|---|
count | number | The number of items to shift. |
func | ShiftCallback | Function that receives an array of shifted items |
dataStream.peek(count, func) ↺
Allows previewing some of the streams data without removing them from the stream.
Important: Peek does not resume the flow.
Kind: instance method of DataStream
Chainable
Param | Type | Description |
---|---|---|
count | number | The number of items to view before |
func | ShiftCallback | Function called before other streams |
dataStream.slice([start], [length]) ↺
Slices out a part of the stream to the passed Function.
Returns a stream consisting of an array of items with 0
to start
omitted and length
items after start
included. Works similarly to
Array.prototype.slice.
Takes count from the moment it's called. Any previous items will not be taken into account.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-slice.js
Param | Type | Default | Description |
---|---|---|---|
[start] | number | 0 | omit this number of entries. |
[length] | number | Infinity | get this number of entries to the resulting stream |
dataStream.assign(func) ↺
Transforms stream objects by assigning the properties from the returned data along with data from original ones.
The original objects are unaltered.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-assign.js
Param | Type | Description |
---|---|---|
func | MapCallback | object | The function that returns new object properties or just the new properties |
dataStream.empty(callback) ↺
Called only before the stream ends without passing any items
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-empty.js
Param | Type | Description |
---|---|---|
callback | function | Function called when stream ends |
dataStream.unshift() ↺
Pushes any data at call time (essentially at the beginning of the stream)
This is a synchronous only function.
Kind: instance method of DataStream
Chainable
Param | Type | Description |
---|---|---|
...item | Array.<any> | list of items to unshift (you can pass more items) |
dataStream.endWith(item) ↺
Pushes any data at end of stream
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-endwith.js
Param | Type | Description |
---|---|---|
item | * | list of items to push at end |
dataStream.accumulate(func, into) : Promise.<any> ⇄
Accumulates data into the object.
Works very similarly to reduce, but result of previous operations have no influence over the accumulator in the next one.
Method works in parallel.
Kind: instance method of DataStream
Returns: Promise.<any>
- resolved with the "into" object on stream end.
Meta.noreadme:
Test: test/methods/data-stream-accumulate.js
Param | Type | Description |
---|---|---|
func | AccumulateCallback | The accumulation function |
into | * | Accumulator object |
dataStream.consume(func) ⇄
Deprecated
Consumes the stream by running each Function
Kind: instance method of DataStream
Meta.noreadme:
Param | Type | Description |
---|---|---|
func | ConsumeCallback | AsyncGeneratorFunction | GeneratorFunction | the consument |
...args | Array.<any> | additional args will be passed to generators |
dataStream.reduceNow(func, into) : * ↺
Reduces the stream into the given object, returning it immediately.
The main difference to reduce is that only the first object will be returned at once (however the method will be called with the previous entry). If the object is an instance of EventEmitter then it will propagate the error from the previous stream.
This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.
Kind: instance method of DataStream
Chainable
Returns: *
- whatever was passed as into
Meta.noreadme:
Test: test/methods/data-stream-reduceNow.js
Param | Type | Description |
---|---|---|
func | ReduceCallback | The into object will be passed as the first argument, the data object from the stream as the second. |
into | * | EventEmitter | Any object passed initially to the transform function |
dataStream.remap(func, [ClassType]) ↺
Remaps the stream into a new stream.
This means that every item may emit as many other items as we like.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-remap.js
Param | Type | Default | Description |
---|---|---|---|
func | RemapCallback | A Function that is called on every chunk | |
[ClassType] | function | this.constructor | Optional DataStream subclass to be constructed |
dataStream.flatMap(func, [ClassType]) ↺
Takes any method that returns any iterable and flattens the result.
The passed Function must return an iterable (otherwise an error will be emitted). The resulting stream will consist of all the items of the returned iterables, one iterable after another.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-flatmap.js
Param | Type | Default | Description |
---|---|---|---|
func | FlatMapCallback | A Function that is called on every chunk | |
[ClassType] | function | this.constructor | Optional DataStream subclass to be constructed |
...args | Array.<any> | additional args will be passed to generators |
dataStream.flatten() : DataStream ↺
A shorthand for streams of arrays or iterables to flatten them.
More efficient equivalent of: .flatmap(i => i);
Works on streams of async iterables too.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-flatten.js
dataStream.concat() ↺
Returns a new stream that will append the passed streams to the callee
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-concat.js
Param | Type | Description |
---|---|---|
...streams | Array.<Readable> | Streams to be injected into the current stream |
dataStream.join(item) ↺
Method will put the passed object between items. It can also be a function call or generator / iterator.
If a generator or iterator is passed, when the iteration is done no items will be interweaved. Generator receives
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-join.js
Param | Type | Description |
---|---|---|
item | * | AsyncGeneratorFunction | GeneratorFunction | JoinCallback | An object that should be interweaved between stream items |
...args | Array.<any> | additional args will be passed to generators |
dataStream.keep([count]) ↺
Keep a buffer of n-chunks for use with @see DataStream..rewind
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-keep.js
Param | Type | Default | Description |
---|---|---|---|
[count] | number | Infinity | Number of objects or -1 for all the stream |
dataStream.rewind([count]) ↺
Rewinds the buffered chunks the specified length backwards. Requires a prior call to @see DataStream..keep
Kind: instance method of DataStream
Chainable
Param | Type | Default | Description |
---|---|---|---|
[count] | number | Infinity | Number of objects or -1 for all the buffer |
dataStream.stack([count], [drop]) ↺
Returns a stream that stacks up incoming items always feeding out the newest items first. It returns the older items when read
When the stack length exceeds the given count
the given drop
function is awaited
and used for flow control.
By default the drop function ignores and quietly disposes of items not read before overflow.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-stack.js
Param | Type | Default |
---|---|---|
[count] | number | 1000 |
[drop] | function |
dataStream.distribute([affinity], [clusterFunc], [options]) ↺
Distributes processing into multiple sub-processes or threads if you like.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-distribute.js
Todo
- Currently order is not kept.
- Example test breaks travis-ci build
Param | Type | Description |
---|---|---|
[affinity] | AffinityCallback | function | number | A Function that affixes the item to specific output stream which must exist in the object for each chunk, must return a string. A number may be passed to identify how many round-robin threads to start up. Defaults to Round Robin to twice the number of CPU threads. |
[clusterFunc] | function | DataStreamOptions | stream transforms similar to @see DataStream#use method |
[options] | DataStreamOptions | Options |
dataStream.separateInto(streams, affinity) ↺
Separates stream into a hash of streams. Does not create new streams!
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Param | Type | Description |
---|---|---|
streams | object | the object hash of streams. Keys must be the outputs of the affinity function |
affinity | AffinityCallback | the Function that affixes the item to specific streams which must exist in the object for each chunk. |
dataStream.separate(affinity, [createOptions], [ClassType]) : MultiStream ↺
Separates execution to multiple streams using the hashes returned by the passed Function.
Calls the given Function for a hash, then makes sure all items with the same hash are processed within a single stream. Thanks to that streams can be distributed to multiple threads.
Kind: instance method of DataStream
Chainable
Returns: MultiStream
- separated stream
Meta.noreadme:
Test: test/methods/data-stream-separate.js
Param | Type | Default | Description |
---|---|---|---|
affinity | AffinityCallback | the affinity function | |
[createOptions] | DataStreamOptions | options to use to create the separated streams | |
[ClassType] | function | this.constructor | options to use to create the separated streams |
dataStream.delegate(delegateFunc, worker, [plugins]) ↺
Delegates work to a specified worker.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Param | Type | Default | Description |
---|---|---|---|
delegateFunc | DelegateCallback | A function to be run in the sub-thread. | |
worker | StreamWorker | ||
[plugins] | Array | [] |
dataStream.rate(cps, [options]) ↺
Limit the rate of the stream to a given number of chunks per second or given timeframe.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Param | Type | Default | Description |
---|---|---|---|
cps | number | Chunks per timeframe, the default timeframe is 1000 ms. | |
[options] | RateOptions |
| Options for the limiter controlling the timeframe and time source. Both must work on same units. |
dataStream.batch(count) ↺
Aggregates chunks in arrays given number of number of items long.
This can be used for micro-batch processing.
Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-batch.js
Param | Type | Description |
---|---|---|
count | number | How many items to aggregate |
dataStream.timeBatch(ms, [count]) ↺
Aggregates chunks to arrays not delaying output by more than the given number of ms.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-timebatch.js
Param | Type | Description |
---|---|---|
ms | number | Maximum amount of milliseconds |
[count] | number | Maximum number of items in batch (otherwise no limit) |
dataStream.nagle([size], [ms]) ↺
Performs the Nagle's algorithm on the data. In essence it waits until we receive some more data and releases them in bulk.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Todo
- needs more work, for now it's simply waiting some time, not checking the queues.
Param | Type | Default | Description |
---|---|---|---|
[size] | number | 32 | maximum number of items to wait for |
[ms] | number | 10 | milliseconds to wait for more data |
dataStream.window(length) : WindowStream ↺
Returns a WindowStream of the specified length
Kind: instance method of DataStream
Chainable
Returns: WindowStream
- a stream of array's
Meta.noreadme:
Param | Type |
---|---|
length | number |
dataStream.toJSONArray([enclosure]) : StringStream ↺
Transforms the stream to a streamed JSON array.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-tojsonarray.js
Param | Type | Default | Description |
---|---|---|---|
[enclosure] | Iterable.<any> | '[]' | Any iterable object of two items (beginning and end) |
dataStream.toJSONObject([entryCallback], [enclosure]) : StringStream ↺
Transforms the stream to a streamed JSON object.
Kind: instance method of DataStream
Chainable
Meta.noreadme:
Meta.noreadme:
Test: test/methods/data-stream-tojsonobject.js
Param | Type | Default | Description |
---|---|---|---|
[entryCallback] | MapCallback | async function returning an entry (array of [key, value]) | |
[enclosure] | Iterable.<any> | '' | Any iterable object of two items (beginning and end) |
dataStream.JSONStringify([endline]) : StringStream ↺
Returns a StringStream containing JSON per item with optional end line
Kind: instance method of DataStream
Chainable
Returns: StringStream
- output stream
Meta.noreadme:
Test: test/methods/data-stream-jsonstringify.js
Param | Type | Default | Description |
---|---|---|---|
[endline] | Boolean | string | os.EOL | whether to add endlines (boolean or string as delimiter) |
dataStream.CSVStringify([options]) : StringStream ↺
Stringifies CSV to DataString using 'papaparse' module.
Kind: instance method of DataStream
Chainable
Returns: StringStream
- stream of parsed items
Test: test/methods/data-stream-csv.js
Param | Type | Default | Description |
---|---|---|---|
[options] | object |
| options for the papaparse.unparse module. |
dataStream.exec(command, [options])
Executes a given sub-process with arguments and pipes the current stream into it while returning the output as another DataStream.
Pipes the current stream into the sub-processes stdin. The data is serialized and deserialized as JSON lines by default. You can provide your own alternative methods in the ExecOptions object.
Note: if you're piping both stderr and stdout (options.stream=3) keep in mind that chunks may get mixed up!
Kind: instance method of DataStream
Test: test/methods/data-stream-exec.js
Param | Type | Default | Description |
---|---|---|---|
command | string | command to execute | |
[options] | ExecDataOptions | any |
| options to be passed to spawn and defining serialization. |
...args | Array.<string> | additional args will be passed to function |
dataStream.debug(func) : DataStream ↺
Injects a debugger
statement when called.
Kind: instance method of DataStream
Chainable
Returns: DataStream
- self
Meta.noreadme:
Test: test/methods/data-stream-debug.js
Param | Type | Description |
---|---|---|
func | function | if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain |
dataStream.toBufferStream(serializer) : BufferStream ↺
Creates a BufferStream.
The passed serializer must return a buffer.
Kind: instance method of DataStream
Chainable
Returns: BufferStream
- the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js
Param | Type | Description |
---|---|---|
serializer | MapCallback | A method that converts chunks to buffers |
dataStream.toStringStream([serializer]) : StringStream ↺
Creates a StringStream.
The passed serializer must return a string. If no serializer is passed chunks toString method will be used.
Kind: instance method of DataStream
Chainable
Returns: StringStream
- the resulting stream
Test: test/methods/data-stream-tostringstream.js
Param | Type | Description |
---|---|---|
[serializer] | MapCallback | never | A method that converts chunks to strings |
dataStream.toBufferStream(serializer) : BufferStream ↺
Creates a BufferStream.
The passed serializer must return a buffer.
Kind: instance method of DataStream
Chainable
Returns: BufferStream
- the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js
Param | Type | Description |
---|---|---|
serializer | MapCallback | A method that converts chunks to buffers |
dataStream.toStringStream([serializer]) : StringStream ↺
Creates a StringStream.
The passed serializer must return a string. If no serializer is passed chunks toString method will be used.
Kind: instance method of DataStream
Chainable
Returns: StringStream
- the resulting stream
Test: test/methods/data-stream-tostringstream.js
Param | Type | Description |
---|---|---|
[serializer] | MapCallback | never | A method that converts chunks to strings |
DataStream:from(input, [options]) : DataStream
Returns a DataStream from pretty much anything sensibly possible.
Depending on type:
self
will return self immediatelyReadable
stream will get piped to the current stream with errors forwardedArray
will get iterated and all items will be pushed to the returned stream. The stream will also be ended in such case.GeneratorFunction
will get executed to return the iterator which will be used as source for itemsAsyncGeneratorFunction
will also work as above (including generators) in node v10.Iterable
s iterator will be used as a source for streams
You can also pass a Function
or AsyncFunction
that will be executed and it's outcome will be
passed again to from
and piped to the initially returned stream. Any additional arguments will be
passed as arguments to the function.
If a String
is passed, scramjet will attempt to resolve it as a module and use the outcome
as an argument to from
as in the Function case described above. For more information see modules
A simple example from a generator:
DataStream.from(function* () {
while (x < 100) yield { x: x++ };
}).each(console.log);
// {x: 0}
// {x: 1}
// ...
// {x: 99}
Kind: static method of DataStream
Param | Type | Default | Description |
---|---|---|---|
input | Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Promise.<any> | function | string | Readable | argument to be turned into new stream | |
[options] | DataStreamOptions | Writable |
| options for creation of a new stream or the target stream |
...args | Array.<any> | additional arguments for the stream - will be passed to the function or generator |
DataStream:pipeline(readable) : DataStream
Creates a pipeline of streams and returns a scramjet stream.
This is similar to node.js stream pipeline method, but also takes scramjet modules as possibilities in an array of transforms. It may be used to run a series of non-scramjet transform streams.
The first argument is anything streamable and will be sanitized by DataStream.from.
Each following argument will be understood as a transform and can be any of:
- AsyncFunction or Function - will be executed by DataStream.use
- A transform stream that will be piped to the preceding stream
Kind: static method of DataStream
Returns: DataStream
- a new DataStream instance of the resulting pipeline
Param | Type | Description |
---|---|---|
readable | Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readable | the initial readable argument that is streamable by scramjet.from |
...transforms | Array.<(AsyncFunction|function()|Transform)> | Transform functions (as in DataStream..use) or Transform streams (any number of these as consecutive arguments) |
DataStream:fromArray(array, [options]) : DataStream
Create a DataStream from an Array
Kind: static method of DataStream
Test: test/methods/data-stream-fromarray.js
Param | Type | Default | Description |
---|---|---|---|
array | Array.<*> | list of chunks | |
[options] | DataStreamOptions |
| the read stream options |
DataStream:fromIterator(iterator, [options]) : DataStream
Create a DataStream from an Iterator
Doesn't end the stream until it reaches end of the iterator.
Kind: static method of DataStream
Test: test/methods/data-stream-fromiterator.js
Param | Type | Default | Description |
---|---|---|---|
iterator | Iterator.<any> | the iterator object | |
[options] | DataStreamOptions |
| the read stream options |