Build Scramjet with Us!We're hiring

Stream protocol

Table of Contents

Introduction

Stream protocol is used to communicate and push data through Cloud Server Instance (STH). Streams are piped to proper communication channel, which are grouped (multiplexed) as interleaved data packets to travel from Host to Runner. All the communication takes place thanks to CommunicationHandler class.

Cloud Server Host on initialization of a new Sequence and a Sequence Instance based on passed parameters, like: package stream, application config (package.json) and sequence arguments, creates downstream and upstream. Both arrays of streams are hooked (hookUpstreamStreams, hookDownstreamStreams, hookLogStream) to proper communication channel.

Next it pipes monitor stream from host and from runner so that both can talk to each other. Communication starts from a handshake. Runner sends ping to say 'Hello there', a host sends back pong saying 'Hi, I'm ready to talk'.

At the end of its initiation CSH creates Net Server and Api Server.

Links to the description in model modules:

Communication channels

List of channels:

  • [0] STDIN,
  • [1] STDOUT,
  • [2] STDERR,
  • [3] CONTROL,
  • [4] MONITORING,
  • [5] PACKAGE,
  • [6] TO_SEQ,
  • [7] FROM_SEQ,
  • [8] LOG

Type of streams

STREAM DESCRIPTION
stdin process.stdin
stdout process.stdout
stderr process.stderr
control Used for sending events like: kill, stop, etc. and info about sequence configuration.
monitor Contains all information about a sequence like description, status, feedback from all performed operations, etc.
pkg Contains sequence stream with a config file which is sent to the runner.
input An optional input stream transporting data for processing to the Sequence.
output An optional output stream transporting data processed by the Sequence, piped to runner - if none passed, this.stdout will be used.
log Log channel is for all kinds of log messages not only for development purposes but also for messages defined in sequence.

Read more about processes of Node.
Streams can be managed by user via cURL. Example:

curl -H "Content-Type: application/type_xx" localhost:8000/api/v1/xxx/

CMD options that good to know:

  • -H (--header): <header/@file> pass custom header(s) to server
  • -d (--data): <data> HTTP POST data
  • -X POST (--request): <POST> specify POST when there is no data provided -d
  • -X GET (--request): <GET> specify GET, if no data provided -d GET is default
  • -v (--verbose): make the operation more talkative
  • jq - JSON formatting to read data by human.
sudo apt-get install jq

Check out more about .jq
To send a package use below command (stream pkg).

curl -H "Content-Type: application/octet-stream" --data-binary "@home/user/package.tar.gz" http://localhost:8000/api/v1/sequence -v

or

SEQ_ID=$( \ curl -H 'content-type: application/octet-stream' \ --data-binary '@packages/reference-apps/hello-alice-out.tar.gz' \ "http://localhost:8000/api/v1/sequence" | jq ".id" -r \ )

As a response the sequence_id will be received. Copy it and use to start the sequence via command:

curl -X POST -H "Content-Type: application/json" http://localhost:8000/api/v1/sequence/:id/start -v

or in the other way with params (no need to copy):

INSTANCE_ID=$(curl -H "Content-Type: application/json" \ --data-raw '{"appConfig": {},"args": ["/package/data.json"]}' \ http://localhost:8000/api/v1/sequence/$SEQ_ID/start | jq ".id" -r)

To check all uploaded sequences run in CMD:

curl -H "Content-Type: application/json" localhost:8000/api/v1/sequences/ -v

In a response JSON object will be displayed. Response example:

{ "va+RikUw+2u23ZtPH2fPenB1mSoxSOrl": { "id": "va+RikUw+2u23ZtPH2fPenB1mSoxSOrl", "config": { "image": "scramjetorg/runner:0.10.0", "version": "", "engines": { "node": ">=10", "scramjet": ">=0.9" }, "sequencePath": "index", "packageVolumeId": "d33cf047628a03d403318f58462ac25a537aeaa94a49ec7505324b352d7ab80a" } } }

You can also check all running instances. In a response JSON object will be displayed.

curl -H "Content-Type: application/json" localhost:8000/api/v1/instances/ -v

Send additional input:

curl -H "Content-Type: application/octet-stream" --data-binary "@home/user/test.txt" http://localhost:8000/api/v1/stream/input -v

Check the output:

curl -X GET -H "Content-Type: application/octet-stream" "http://localhost:8000/api/v1/instance/$INSTANCE_ID/stdout" \

Upstream Streams

Streams that are coming from Host to Runner.

  • stdin: Readable
  • stdout: Writable
  • stderr: Writable
  • control: ReadableStream
  • monitor: WritableStream
  • pkg: Readable
  • input: ReadableStream
  • output: WritableStream
  • log: WritableStream

Downstream Streams

Streams that are coming from Runner to Host.

  • stdin: Writable
  • stdout: Readable
  • stderr: Readable
  • control: WritableStream
  • monitor: ReadableStream
  • pkg: Readable
  • input: WritableStream
  • output: ReadableStream
  • log: ReadableStream

Control Messages

Control the sequence and send data to it via messages described below.

CODE DESCRIPTION TO
FORCE_CONFIRM_ALIVE Confirm that sequence is alive when it is not responding. Runner
KILL Send kill running sequence signal. Runner
MONITORING_RATE Used to change the sequence monitoring rate. Runner
STOP Send stop the running sequence signal. Runner
EVENT Send event name and any object, array, function to the sequence. Runner
PONG Acknowledge message from CSH to Runner. The message includes the Sequence configuration information. Runner

Send event

Event contains <eventName>, <handler> with optional <message> of any type: string, num, json obj, array, etc..

curl -H "Content-Type: application/json" -d "[5001, { 'eventName', function(message) }]" http://localhost:8000/api/v1/sequence/_event

Monitoring rate

Pass as a message <rate_number> in milliseconds.

curl -H "Content-Type: application/json" -d "[3001, { 2000 }]" http://localhost:8000/api/v1/sequence/_monitoring_rate

Start sequence

Not implemented.

curl -H "Content-Type: application/json" -d "[]" http://localhost:8000/api/v1/sequence/_start

Stop sequence

No message needed, pass proper code with empty object.

curl -H "Content-Type: application/json" -d "[4001, {}]" http://localhost:8000/api/v1/sequence/_stop

Kill sequence

No message needed, pass proper code with empty object.

curl -H "Content-Type: application/json" -d "[4002, {}]" http://localhost:8000/api/v1/sequence/_kill

Monitor Messages

CODE DESCRIPTION TO
ACKNOWLEDGE Indicating whether the command with message (e.g. stop or kill) was received. Host
DESCRIBE_SEQUENCE Includes info of stream mode, name, description and scalability of each subsequence. Runner
STATUS Includes info of host address, instance_id, sequence modifications, health checks, data time flow, etc. Host
ALIVE Information on how much longer the Sequence will be active (in milliseconds). Host
ERROR All errors that occur. Runner, Host
MONITORING Contains messages about sequence health and information about instance resource usage like: cpu, memory, memory usage, net i/o, and disk size. Host
EVENT Execute defined event in sequence with additional message as a parameter. Runner
PING Check if a runner is ready to communicate with. Runner
SNAPSHOT_RESPONSE Status about snapshot communicates if snapshot is done created without error. Host
SEQUENCE_STOPPED Status about sequence communicates if it is running or not. Host

Monitoring health check

Information about the Sequence overall health (including information about functions).

cat test.txt | curl -H "Content-Type: application/json" -d "[5001, {2000}]" http://localhost:8000/api/v1/sequence/health { "healthy": true, "sequences": [ { "throughput": 0.1, "buffer": 0, "processing": 3, "pressure": 30 } ] }
  1. Throughput: The number of bits per second that are physically delivered. Measure items per second.
  2. Buffer: Performed the reading and writing operations of the stream. Measure input and output.
  3. Processing: The collection and manipulation of items, data to produce meaningful information.
  4. Pressure: How effectively the data in buffers is being allocated and consumed. Measure stream in all socket tcp.

Status

curl -H "Content-Type: application/json" -d "{}" http://localhost:8000/api/v1/sequence/status

Event

Event contains <eventName> with optional <message>.

curl -H "Content-Type: application/json" http://localhost:8000/api/v1/sequence/events/<eventName>

Get the event only once:

curl -H "Content-Type: application/json" http://localhost:8000/api/v1/sequence/once/<eventName>

Manage Data Flow - Topics

Topics are categories/labels used to organize input/output data. Messages can be send to or read from requested topics in Scramjet Platform. To be specific, producer Sequence writes data to topic, consumer Sequence reads data from requested topic.

Examples of producer/consumer Sequences you can find in reference-apps repository.

  1. To run producer Sequence

    seq start <sequence_id> --output-topic <topic>
  2. To run consumer Sequence

    seq start <sequence_id> --input-topic <topic>
  3. Sequence can be both producer and consumer

    seq start <sequence_id> --input-topic <topic> --output-topic <topic>

Manage the Sequence

Use data input and output

Scramjet Cloud Platform (SCP) can handle various inputs. As a developer, you are free to process any of them in your Sequence applications, such as: Text, JSON, XML, SOAP, Audio, Video and more. Inputs can be either:

  1. Provided to Hub via its REST API
  2. Consumed from various local or remote sources by the app; such as: Stream, STDIN, File, API, URL
  3. Generated by the app itself
si instance input <id> [<file>] # the instance id or '-' for the last one started or selected. # the input file (stdin if not given default)

Our engine outputs can be managed in several ways:

  1. File - you can save your output to a local or a remote file
  2. STDOUT - output can be directed to system STDOUT (STDERR is supported as well)
  3. API - output can be consumed from our SCP REST API
  4. URL Request - you can write your app in a way to request URL, webhook etc
  5. Stream - output can be streamed to a particular destination

You can mix multiple actions together: you can both send data to remote system/URL and save it locally.

To pipe running instance output to stdout:

si instance output <id> # the instance id or '-' for the last one started or selected.

Use Control Messages

Control the Sequence and send data to it thanks to the messages described below.

CODE DESCRIPTION
FORCE_CONFIRM_ALIVE Confirm that sequence is alive when it is not responding.
KILL Send kill running sequence signal.
MONITORING_RATE Used to change the sequence monitoring rate.
STOP Send stop the running sequence signal.
EVENT Send event name and any object, array, function to the sequence.
PONG Acknowledge message from CSH to Runner. The message includes the Sequence configuration information.

Use monitoring

CODE DESCRIPTION
ACKNOWLEDGE Indicating whether the command with message (e.g. stop or kill) was received.
DESCRIBE_SEQUENCE Includes info of stream mode, name, description and scalability of each subsequence.
STATUS Includes info of host address, instance_id, sequence modifications, health checks, data time flow, etc.
ALIVE Information on how much longer the Sequence will be active (in milliseconds).
ERROR All errors that occur.
MONITORING Contains messages about sequence health and information about instance resource usage like: cpu, memory, memory usage, net i/o, and disk size.
EVENT Execute defined event in sequence with additional message as a parameter.
PING Check if a runner is ready to communicate with.
SNAPSHOT_RESPONSE Status about snapshot communicates if snapshot is done created without error.
SEQUENCE_STOPPED Status about sequence communicates if it is running or not.

Monitoring health check

Information about the Sequence overall health (including information about functions).

cat test.txt | curl -H "Content-Type: application/json" -d "[5001, {2000}]" http://localhost:8000/api/v1/sequence/health { "healthy": true, "sequences": [ { "throughput": 0.1, "buffer": 0, "processing": 3, "pressure": 30 } ] }

Input and output

Sequence is a program that produces, consumes or transforms data. It’s a function or an array of functions. They typically look somewhat like this:

/** The function parameters are * input stream * ...params passed to Instance on start */ module.exports = function (input, param1, param2) { return new PassThrough({ transform(chunk, encoding, callback) { this.push(`Hello ${chunk.toString(encoding)}`); callback(); }, }); };

This converts the data from lines of data to Hello lines of data. It's a data transformer, as it uses the input and has an output. Let's dive into what it means.

Producing data (output stream)

To stream data from a Sequence, you need to return values over time. Some constructs in JavaScript which enable that are NodeJS streams, Generators and Iterables. Whatever you return from your Sequence will be your output stream. You can choose whichever solution is right for you.

The output options are as follows:

Async iterators and generators

The simplest way to expose data is creating a data generator. The data generator will create new data items that will be exposed on the output stream or an output topic.

Here's how such a generator may look in Python:

async def run(context, input):​ while True:​ async for result in await get_page_from_api():​ yield result​ await asyncio.sleep(1)​

In this example you see the yield keyword is used to expose items fetched from an API. Every single result will come as a single item on the input of another sequence connected via a topic, or as a single JSON line in the API protocol.

Similar construct in Node.js would look as follows:

module.exports = async function* (_stream) {​ while (true) {​ yield* await getPageFromAPI();​ await wait(1000);​ }​ };

A cool feature of JavaScript generators is that those can yield* - this means that an iterator or an array can be passed and therefore multiple chunks can be send to the output stream. This is advisable for efficiency reasons.

Generators have one additional benefit: they will not produce more data if data isn't read. If you start a generator in your Sequence but not read from it, the program will run a couple initial iterations to fill in the buffers, but eventually it will stop at yield and wait until you read the data from another Sequence or through the API.

Stream output

Alternatively streamed output can be used, both Python and Node runners will accept a stream as a result, so the code would look a little bit like this in Node:

const { Readable } = require("stream"); module.exports = () => { let n = 0; return new Readable({ read() { this.push(`Chunk: ${n++}`); }, }); };

Similarly in Python:

import io def run(): output = io.StringIO() output.write('Hello World!') output.close() return output

Streams can be better suited when exposing data from http requests or files. They will also be a more efficient option, but in most use cases the benefits will be very vague as fetching the data will be the biggest bottleneck.

Consuming data (input stream)

Some Sequences you write will need to consume data that's created by other sequences or send via API. The data will be passed to your Sequence as the first positional argument, excluding self in Python.

module.exports = input => { /* ... */ }; // ^^^^^- this is the input! def run(self, input): # ^^^^^ - this is the input!

Remember it's the position, not the name! ;)

Below you'll find some samples of what to use.

Async iteration

Input can be parsed with async iteration:

module.exports = async function (input) { for await (const chunk of input) { // do something with chunk } };

Similarily in Python this would be:

async def run(self, input): for msg in input: print(f'Topic name={msg.topic}, Message={msg.value}')

Stream protocols

Another option in node.js is to use streams:

module.exports = async function (input) { input.on("data", () => { // do something with data }); // remember to resolve the promise when the sequence is done return new Promise(res => input.on("finish", res)); };

With stream you can control when you'd like to process more data with pause and resume like this:

function isProcessOverloaded() { // check if API limits are not exceeded. } module.exports = async function (input) { input.on("data", async () => { // do something with data if (isProcessOverloaded()) { input.pause(); // this will stop the data from being sent await new Promise(res => setTimeout(res, 200)); input.resume(); // this will resume the data flow } }); // remember to resolve the promise when the sequence is done return new Promise(res => input.on("finish", res)); };

This will result in slowing down data upload to the platform so that you can run your processing at the right speed. It's quite handy when you're dealing with API limits also.

No I/O

There's some cases when you just want a program to run and you're not interested in inputs, outputs or topics. Here's how to write such programs:

Javascript:

module.exports = async () => { return new Promise(); // when this Promise is resolved, the platform will assume that the program finished and can be stopped. };

And python:

async def run(context, input):​ return asyncio.Future() # when this Future is resolved, the platform will assume that the program finished and can be stopped.

As you see, all you need to do is to inform the platform that the program is running and when it's done.


Links to description in model modules:

Host Log

Under development.

curl -H "Content-Type: application/json" -d "{}" http://localhost:8000/api/v1/stream/logs

Was it helpful?

Didn't find information needed?

Join our Scramjet Community on Discord, where you can get help from our engineers directly.