Nodes
Nodes are the processing unit of a Sift. A node connects its input to a number of outputs. Note that a node can have just one input, but many outputs. Nodes communicate with each other through entries in an internal key/value store.
Nodes can optionally be named but this is cosmetic and mainly for logging and debugging.
Sift inputs vs node inputs
It worth keeping in mind that both Sifts and nodes have inputs, but they are different. Sift inputs define the data coming in from external sources, while node inputs define either data from the outside, or from other nodes.
The nodes
property in the Sift configuration file, sift.json, is an array of objects, that define the computational components of our graph, for example:
"nodes": [{
"#": "node1",
"implementation": {
"javascript": "server/node1_impl.js"
},
"input": {
"bucket": "store1",
"select": "*"
},
"outputs": {
"store1": {}
}
}]
Let's have a look at each of its properties now.
1. "#" property
The #
is used as a comment property for nodes. In this case we use it to add a cosmetic name to the node.
2. "implementation" property
The implementation property defines where the code for your node resides, and in conjunction with the DAG definition in the sift.json
file forms the processing logic for a Sift. We currently support node implementations in a variety of languages listed in the following subsections. We also provide an empty implementation that just copies data from an input to all output nodes so you can 'fan out' your data.
Conceptually, on each iteration an implementation
consumes a list of bucket
data defined in the input
property that has been filtered and grouped. In turn it emits a stream of 0 to n results before closing the input stream.
2.1 Javascript node implementation
You should specify the path to the node implementation in the javascript attribute:
"implementation": {
"javascript": "server/node1.js",
"sandbox": "quay.io/redsift/sandbox-javascript:v6.2.2"
}
If a package.json
file is found in the same level as the node implementation then an npm install
will be executed to install all dependencies.
The implementation of the node should export a function which takes exactly 1 parameter:
module.exports = function (got) {
// got ={
// in: ... // contains the key/value pairs that match the given query
// with: ... // key/value pairs selected based on the with selection
// get: ... // an array with the get result for a specific key
// query: ... // an array containing the key hierarchy from the select
// }
// Implementation goes here
// Return can be an object, an array of objects,
// a Promise or an array of Promises
return { name: 'result', key: 'key', value: 'value' };
}
2.2 Python node implementation
You should specify the path to the node implementation in the python
attribute. If a requirements.txt
file is found in the same level as the node implementation then a pip install
will be executed to install all dependencies.
"implementation": {
"python": "server/node1.py",
"sandbox": "quay.io/redsift/sandbox-python:v2.7.10"
}
The implementation of the node should have a function that takes exactly 1 parameter:
def compute(req):
# Implementation goes here
# Return can be a dictionary or an array of dictionaries
out = dict(name='result', key='key', value='value')
return out
2.3 Julia node implementation
You should specify the path to the node implementation in the julia attribute.
"implementation": {
"julia": "server/node1.jl",
"sandbox": "quay.io/redsift/sandbox-julia:v0.4.2"
}
The implementation of the node should have a function that takes exactly 1 parameter. You should include a userimg.jl
so that a system image can be created, otherwise the execution will be slow.
module Node1
function compute(data::Dict)
# Your implementation goes here
Dict("name" => "default", "key" => "some/thing", "value" => Dict("something" => "BOO0"))
end
end
compute(data::Dict) = Node1.compute(data)
2.4 Java node implementation
All Java nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;
) in the java
attribute.
"implementation": {
"java": "server/Node1.java;server.Node1",
"sandbox": "quay.io/redsift/sandbox-java:v1.8.0"
}
If a pom.xml file (only the standard directory layout is supported) is found then a mvn clean
and mvn install
will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.
If the class name is not specified then a default one is assumed (for a java node at server/Node1.java
, the class name will be deduced to be server.Node1
). However, you should provide the class name for completeness.
The implementation should have a static function named compute which takes in exactly 1 parameter.
package server;
// Mandatory imports
import io.redsift.ComputeRequest;
import io.redsift.ComputeResponse;
public class Node1 {
public static ComputeResponse compute(ComputeRequest req) throws Exception {
// Implementation goes here
// Response can be a ComputeResponse,
// an array of ComputeResponse or List<ComputeResponse>
ComputeResponse res = new ComputeResponse("bucket-maven", "key-maven", "value-maven", 0);
return res;
}
}
If you are using Maven you need to add a dependency on the io.redsift.compute package which is made available through the Local Maven repository.
<dependency>
<groupId>io.redsift</groupId>
<artifactId>compute</artifactId>
<version>1.0</version>
</dependency>
Maven Package (with dependencies)
If you are using Maven, please ensure that you add an assembly step to create a JAR with all dependencies packaged up. Otherwise you will notice that the node won't find the necessary dependencies when executed. We do not persist the Maven cache in the sandbox.
Note when using maven
For implementations that use maven you need to point your
implementation
to the java file instead of the project file. e.g."java": "server/Node1.java;server.Node1"
2.5 Scala node implementation
All Scala nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;
) in the scala
attribute.
"implementation": {
"scala": "server/Node1.scala;server.Node1",
"sandbox": "quay.io/redsift/sandbox-scala:v2.11.8"
}
If a build.sbt file (only the standard directory layout is supported) is found then a sbt package
will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.
If the class name is not specified then a default one is assumed (for a scala node at server/Node1.scala
, the class name will be deduced to be server.Node1
), however you should provide the class name for completeness.
The implementation should have a static (achieved in Scala by using an object
) function named compute which takes in exactly 1 parameter.
// Mandatory imports
import io.redsift.ComputeRequest;
import io.redsift.ComputeResponse;
package io.redsift {
object SbtNode1 {
def compute(req: ComputeRequest): ComputeResponse = {
// Implementation goes here
val res = new ComputeResponse("bucket-sbt", "key-sbt", "value-sbt", 0);
// Response can be a ComputeResponse, an array of ComputeResponse or List<ComputeResponse>
return res;
}
}
}
If you are using sbt you need to add a dependency on the io.redsift.compute package which is made available through the Local Maven repository.
resolvers += Resolver.mavenLocal
libraryDependencies += "io.redsift" % "compute" % "1.0"
Note when using sbt
For implementations that use sbt you need to point your
implementation
to the scala file instead of the project file. e.g."scala": "server/Node1.scala;server.Node1"
2.6 Clojure node implementation
All Clojure nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;
) in the clojure
attribute.
"implementation": {
"clojure": "server/node1.clj;server.node1",
"sandbox": "quay.io/redsift/sandbox-clojure:v1.8.0"
}
If a Leiningen project.clj file (only the standard directory layout is supported) is found then a lein uberjar
will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.
If the class name is not specified then a default one is assumed (for a clojure node at server/node1.clj
, the class name will be deduced to be server.node1
), however you should provide the class name for completeness.
The implementation should have a function named compute
which takes in exactly 1 parameter
(ns server.node1
(:import io.redsift.ComputeRequest
io.redsift.ComputeResponse))
(defn compute [^ComputeRequest req]
;; Implementation goes here
(new io.redsift.ComputeResponse "bucket-clj", "key-clj", "value-clj", 0)
)
If you are using Leiningen you need to add a dependency on the io.redsift/compute
package which is made available through the Local Maven repository.
:dependencies [[org.clojure/clojure "1.8.0"]
[io.redsift/compute, "1.0"]]
:repositories {"project" "file:repo"}
Note for Leiningen Projects
For leiningen based implementations you need to point your
implementation
to the clojure file instead of the project file. e.g."clojure": "server/parser-node/src/parser_node/core.clj;parser-node.core"
Please check that namespaces with dashes use underscores in the Clojure file name.
Clojure converts dashes in namespaces to underscores and hence expect the Clojure file name (and source directory) to use underscores. For eg. if your namespace is node-2.core then the Clojure file is expected to be named node_2.clj
2.7 Swift node implementation
You should specify the path to the node implementation in the swift
attribute.
"implementation": {
"swift": "server/Node1.swift",
"sandbox": "quay.io/redsift/sandbox-swift:v3.1.1"
}
The implementation should have a class that implements the RedsiftNode
protocol shown below.
public protocol RedsiftNode{
static func compute(req: ComputeRequest) -> Any?
}
A simple implementation should look like something like the following.
import Foundation
// Mandatory import
import Redsift
public class Node1: Redsift.RedsiftNode{
// compute can return ComputeResponse, [ComputeResponse] or nil
// hence the compute's return type is Any?
public static func compute(req: ComputeRequest) -> Any?{
setbuf(stdout, nil) // disable buffer for stdout to see prints
print("Node1.compute is running")
return ComputeResponse(name: "bucket-swift", key: "key-swift", value: "value-swift", epoch: 0)
}
}
Node implementation file names in Swift
Name of
.swift
implementation file needs to be identical toclass
name and the first letter needs to be uppercase. e.g.class Node1
should be inNode1.swift
Logging in Swift
When using the
import Foundation
and add thesetbuf(stdout, nil)
statement that you see in the example above.
2.8 Go node implementation
You should specify the path to the node implementation in the go
attribute.
"implementation": {
"go": "server/node1/node1.go",
"sandbox": "quay.io/redsift/sandbox-go:v1.9"
}
Each implementation should have an exported Compute
function of the RedsiftFunc
type shown below.
import rpc "github.com/redsift/go-sandbox-rpc"
type RedsiftFunc func(rpc.ComputeRequest) ([]rpc.ComputeResponse, error)
A simple implementation should look like something like the following.
Implementation files for Go nodes
Each node implementation needs to be in its own package. The compiler won't let you have more than one exported
Compute
functions of theRedsiftFunc
type in the same package.
3. "input" property
The input property defines the inputs for a given node, this can be either an input or a store. A node can also be input-less, but this only makes sense if it is a cron node that gets invoked on schedule. Otherwise the node will never be invoked.
3.1 input.bucket
The bucket can refer to either a store, or a Sift input. Stores can be selected, that is filtered, while Sift inputs can't (see select below).
3.2 input.select
If your input.bucket is a store you can select data from it. The supported operations are:
- simple key selection, i.e.:
"input": {"bucket": "example", "select": "k1"}
- key in hierarchy, i.e.:
"input": {"bucket": "example", "select": "k1/s1"}
- wildcard all sub keys recursively and process each individually, i.e.:
"input": {"bucket": "example", "select": "k1/"}
- wildcard all sub keys recursively and group them to process, i.e.:
"input": {"bucket": "example", "select": "k1/*"}
. Note: selection is unsorted
3.3 input.with
In addition to selecting data from a single bucket, you may combine your selection with data from another bucket (join data). There are a few common use cases for this:
- Processing data based on user supplied preferences
- Processing events while using reference data
- Joining data from two event streams
- Caching expensive invariant operations such as external lookups
- Implementing an associative/incremental operation
Joining data relies on the with
operation that operates in conjunction with select
. In SQL terms, the select
represents the left and the with
represents the right. The with
operation itself is closest to an outer join. Note that you may not use wildcards in your with
selection though you may anchor values to replace wildcards.
// Example with operation using anchor
"dag": {
"inputs": { ... },
"nodes": [ ..., {
"implementation" : {
"javascript" : "do.js"
},
"input": {
"bucket": "left",
"select": "$month/*/",
"with": {
"bucket": "right",
"select": "$month/get"
}
},
"outputs": {
"dump": {},
"right" : {
"#": "Essentially a 100s cache",
"ttl": "100"
}
}
}],
"stores": {
"left": {
"key$schema": "string/string/string"
},
"right": {
"key$schema": "string/string"
}
},
"outputs": { ... }
}
Input buckets
Input buckets do not support
select
key space operations. The data must be written to a store if you wish to do something complex with the key space before transformation.
Limitations
with
can only join data from 1 other bucket that references a store. If you require data from multiple buckets, cascade the data across multiple nodes in the graph.
Anchors
When using
with
, you may expand out wildcards from the left select operation in your right select operation. You can use this functionality to convert the outer join into a left join.In the example at the beginning of this section, an event that is written to the store left with the key
jan/usr/123
will be joined with data from right under the keyjan/get
. The query$month
is treated as a wildcard i.e.$month/*/
is equivalent to/*/
for theselect
but allows you to anchor the right key with the$month
value of the left.It is an error to declare an anchor multiple times in the same select e.g.
"select": "*/$a/$a"
is wrong. Only one anchor per select is allowed.You cannot anchor the grouping operation effectively meaning you cannot use in your with selection.
createDependency
If the
"createDependency": true
(default is false) property was added to thewith
object, that would enhance the relation of the two inputs by triggering a computation of the graph when either left or right had new data.
Cycles
Cycles are evaluated late. You can set up a graph that never triggers because data is never written to that port by your implementation that would cause the data to loop back on itself.
A node can reference itself. A write does not trigger a recomputation. A future event would select the data.
3.4 input.get
You can also get keys from various stores using the get operation. You need to specify an array of bucket/key pairs.
"input": {
"bucket": "left",
"get": [
{
"bucket": "b1",
"key": "jan/d1",
"createDependency": false
},
{
"bucket": "b2",
"key": "2016/q1",
"createDependency": true
}]
}
createDependency & Anchors
You can use anchors and createDependency in your
get
statements in the same way as on yourwith
statements.
4. "outputs" property
A node can output data to stores or DAG exports . For store outputs you can also override the ttl
and createOnly
properties declared in the store definition.
Overriding
ttl
andcreateOnly
properties of a store will apply to the outputs of this node only.
// Example output overriding some properties of the "right" bucket
"outputs": {
"dump": {},
"right" : {
"createOnly": true,
"ttl": "100"
}
}
Updated about 6 years ago