Nodes are the processing unit of a Sift. A node connects its input to a number of outputs. Note that a node can have just one input, but many outputs. Nodes communicate with each other through entries in an internal key/value store.
Nodes can optionally be named but this is cosmetic and mainly for logging and debugging.

Sift inputs vs node inputs

It worth keeping in mind that both Sifts and nodes have inputs, but they are different. Sift inputs define the data coming in from external sources, while node inputs define either data from the outside, or from other nodes.

The nodes property in the Sift configuration file, sift.json, is an array of objects, that define the computational components of our graph, for example:

"nodes": [{
  "#": "node1",
  "implementation": {
    "javascript": "server/node1_impl.js"
  },
  "input": {
    "bucket": "store1",
    "select": "*"
  },
  "outputs": {
    "store1": {}
  }
}]

Let's have a look at each of its properties now.

1. "#" property

The # is used as a comment property for nodes. In this case we use it to add a cosmetic name to the node.

2. "implementation" property

The implementation property defines where the code for your node resides, and in conjunction with the DAG definition in the sift.json file forms the processing logic for a Sift. We currently support node implementations in a variety of languages listed in the following subsections. We also provide an empty implementation that just copies data from an input to all output nodes so you can 'fan out' your data.

Conceptually, on each iteration an implementation consumes a list of bucket data defined in the input property that has been filtered and grouped. In turn it emits a stream of 0 to n results before closing the input stream.

2.1 Javascript node implementation

You should specify the path to the node implementation in the javascript attribute:

"implementation": {
  "javascript": "server/node1.js",
  "sandbox": "quay.io/redsift/sandbox-javascript:v6.2.2"
}

If a package.json file is found in the same level as the node implementation then an npm install will be executed to install all dependencies.

The implementation of the node should export a function which takes exactly 1 parameter:

module.exports = function (got) {
  // got ={
  //   in: ... // contains the key/value pairs that match the given query
  //   with: ... // key/value pairs selected based on the with selection
  //   lookup: ... // an array with the lookup result for a specific key
  //   query: ... // an array containing the key hierarchy from the select
  // }
  
  // Implementation goes here
  
  // Return can be an object, an array of objects, 
  // a Promise or an array of Promises
  return { name: 'result', key: 'key', value: 'value' };
}

2.2 Python node implementation

You should specify the path to the node implementation in the python attribute. If a requirements.txt file is found in the same level as the node implementation then a pip install will be executed to install all dependencies.

"implementation": {
  "python": "server/node1.py",
  "sandbox": "quay.io/redsift/sandbox-python:v2.7.10"
}

The implementation of the node should have a function that takes exactly 1 parameter:

def compute(req):
    # Implementation goes here
    
    # Return can be a dictionary or an array of dictionaries
    out = dict(name='result', key='key', value='value')
    return out

2.3 Julia node implementation

You should specify the path to the node implementation in the julia attribute.

"implementation": {
  "julia": "server/node1.jl",
  "sandbox": "quay.io/redsift/sandbox-julia:v0.4.2"
}

The implementation of the node should have a function that takes exactly 1 parameter. You should include a userimg.jl so that a system image can be created, otherwise the execution will be slow.

module Node1

function compute(data::Dict)
    # Your implementation goes here
    Dict("name" => "default", "key" => "some/thing", "value" => Dict("something" => "BOO0"))
end

end

compute(data::Dict) = Node1.compute(data)

2.4 Java node implementation

All Java nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;) in the java attribute.

"implementation": {
  "java": "server/Node1.java;server.Node1",
  "sandbox": "quay.io/redsift/sandbox-java:v1.8.0"
}

If a pom.xml file (only the standard directory layout is supported) is found then a mvn clean and mvn install will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.

If the class name is not specified then a default one is assumed (for a java node at server/Node1.java, the class name will be deduced to be server.Node1). However, you should provide the class name for completeness.

The implementation should have a static function named compute which takes in exactly 1 parameter.

package server;

// Mandatory imports
import io.redsift.ComputeRequest;
import io.redsift.ComputeResponse;

public class Node1 {
  public static ComputeResponse compute(ComputeRequest req) throws Exception {
    // Implementation goes here
    
    // Response can be a ComputeResponse, 
    // an array of ComputeResponse or List<ComputeResponse>
    ComputeResponse res = new ComputeResponse("bucket-maven", "key-maven", "value-maven", 0);
    return res;
  }
}

If you are using Maven you need to add a dependency on the io.redsift.compute package which is made available through the Local Maven repository.

<dependency>
    <groupId>io.redsift</groupId>
    <artifactId>compute</artifactId>
    <version>1.0</version>
</dependency>

Maven Package (with dependencies)

If you are using Maven, please ensure that you add an assembly step to create a JAR with all dependencies packaged up. Otherwise you will notice that the node won't find the necessary dependencies when executed. We do not persist the Maven cache in the sandbox.

Note when using maven

For implementations that use maven you need to point your implementation to the java file instead of the project file. e.g. "java": "server/Node1.java;server.Node1"

2.5 Scala node implementation

All Scala nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;) in the scala attribute.

"implementation": {
  "scala": "server/Node1.scala;server.Node1",
  "sandbox": "quay.io/redsift/sandbox-scala:v2.11.8"
}

If a build.sbt file (only the standard directory layout is supported) is found then a sbt package will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.

If the class name is not specified then a default one is assumed (for a scala node at server/Node1.scala, the class name will be deduced to be server.Node1), however you should provide the class name for completeness.

The implementation should have a static (achieved in Scala by using an object) function named compute which takes in exactly 1 parameter.

// Mandatory imports
import io.redsift.ComputeRequest;
import io.redsift.ComputeResponse;

package io.redsift {
    object SbtNode1 {
        def compute(req: ComputeRequest): ComputeResponse = {
          // Implementation goes here
          val res = new ComputeResponse("bucket-sbt", "key-sbt", "value-sbt", 0);
          // Response can be a ComputeResponse, an array of ComputeResponse or List<ComputeResponse>
          return res;
        }
    }
}

If you are using sbt you need to add a dependency on the io.redsift.compute package which is made available through the Local Maven repository.

resolvers += Resolver.mavenLocal
libraryDependencies += "io.redsift" % "compute" % "1.0"

Note when using sbt

For implementations that use sbt you need to point your implementation to the scala file instead of the project file. e.g. "scala": "server/Node1.scala;server.Node1"

2.6 Clojure node implementation

All Clojure nodes are compiled and packaged into a JAR file by the sandbox. You should specify the path to the node implementation and the class name (separated by a ;) in the clojure attribute.

"implementation": {
  "clojure": "server/node1.clj;server.node1",
  "sandbox": "quay.io/redsift/sandbox-clojure:v1.8.0"
}

If a Leiningen project.clj file (only the standard directory layout is supported) is found then a lein uberjar will be executed to create the jar file. Alternatively you can also specify the path and class name to a precompiled jar or zip file.

If the class name is not specified then a default one is assumed (for a clojure node at server/node1.clj, the class name will be deduced to be server.node1), however you should provide the class name for completeness.

The implementation should have a function named compute which takes in exactly 1 parameter

(ns server.node1
    (:import io.redsift.ComputeRequest
      io.redsift.ComputeResponse))

(defn compute [^ComputeRequest req]
  		;; Implementation goes here
 			(new io.redsift.ComputeResponse "bucket-clj", "key-clj", "value-clj", 0)
      )

If you are using Leiningen you need to add a dependency on the io.redsift/compute package which is made available through the Local Maven repository.

  :dependencies [[org.clojure/clojure "1.8.0"]
                 [io.redsift/compute, "1.0"]]
  :repositories {"project" "file:repo"}

Note for Leiningen Projects

For leiningen based implementations you need to point your implementation to the clojure file instead of the project file. e.g. "clojure": "server/parser-node/src/parser_node/core.clj;parser-node.core"

Please check that namespaces with dashes use underscores in the Clojure file name.

Clojure converts dashes in namespaces to underscores and hence expect the Clojure file name (and source directory) to use underscores. For eg. if your namespace is node-2.core then the Clojure file is expected to be named node_2.clj

2.7 Swift node implementation

You should specify the path to the node implementation in the swift attribute.

"implementation": {
  "swift": "server/Node1.swift",
  "sandbox": "quay.io/redsift/sandbox-swift:v3.1.1"
}

The implementation should have a class that implements the RedsiftNode protocol shown below.

public protocol RedsiftNode{
  static func compute(req: ComputeRequest) -> Any?
}

A simple implementation should look like something like the following.

import Foundation
// Mandatory import
import Redsift

public class Node1: Redsift.RedsiftNode{
  // compute can return ComputeResponse, [ComputeResponse] or nil
  // hence the compute's return type is Any?
  public static func compute(req: ComputeRequest) -> Any?{
    setbuf(stdout, nil) // disable buffer for stdout to see prints
    print("Node1.compute is running")
    return ComputeResponse(name: "bucket-swift", key: "key-swift", value: "value-swift", epoch: 0)
  }
}

Node implementation file names in Swift

Name of .swift implementation file needs to be identical to class name and the first letter needs to be uppercase. e.g. class Node1 should be in Node1.swift

Logging in Swift

When using the print call to log stuff, Swift uses the system's stdout buffer hence you won't see any of your outputs. To change this behaviour you need to import Foundation and add the setbuf(stdout, nil) statement that you see in the example above.

2.8 Go node implementation

You should specify the path to the node implementation in the go attribute.

"implementation": {
  "go": "server/node1/node1.go",
  "sandbox": "quay.io/redsift/sandbox-go:v1.9"
}

Each implementation should have an exported Compute function of the RedsiftFunc type shown below.

import rpc "github.com/redsift/go-sandbox-rpc"

type RedsiftFunc func(rpc.ComputeRequest) ([]rpc.ComputeResponse, error)

A simple implementation should look like something like the following.

Implementation files for Go nodes

Each node implementation needs to be in its own package. The compiler won't let you have more than one exported Compute functions of the RedsiftFunc type in the same package.

3. "input" property

The input property defines the inputs for a given node, this can be either an input or a store. A node can also be input-less, but this only makes sense if it is a cron node that gets invoked on schedule. Otherwise the node will never be invoked.

3.1 input.bucket

The bucket can refer to either a store, or a Sift input. Stores can be selected, that is filtered, while Sift inputs can't (see select below).

3.2 input.select

If your input.bucket is a store you can select data from it. The supported operations are:

  • simple key selection, i.e.: "input": {"bucket": "example", "select": "k1"}
  • key in hierarchy, i.e.: "input": {"bucket": "example", "select": "k1/s1"}
  • wildcard all sub keys recursively and process each individually, i.e.: "input": {"bucket": "example", "select": "k1/"}
  • wildcard all sub keys recursively and group them to process, i.e.: "input": {"bucket": "example", "select": "k1/*"}. Note: selection is unsorted

3.3 input.with

In addition to selecting data from a single bucket, you may combine your selection with data from another bucket (join data). There are a few common use cases for this:

  • Processing data based on user supplied preferences
  • Processing events while using reference data
  • Joining data from two event streams
  • Caching expensive invariant operations such as external lookups
  • Implementing an associative/incremental operation

Joining data relies on the with operation that operates in conjunction with select. In SQL terms, the select represents the left and the with represents the right. The with operation itself is closest to an outer join. Note that you may not use wildcards in your with selection though you may anchor values to replace wildcards.

// Example with operation using anchor
"dag": {
  "inputs": { ... },
  "nodes": [ ..., {
    "implementation" : {
      "javascript" : "do.js"
    },
    "input": {
      "bucket": "left",
      "select": "$month/*/",
      "with": {
        "bucket": "right",
        "select": "$month/lookup"
      }
    },
    "outputs": {
      "dump": {},
      "right" : {
        "#": "Essentially a 100s cache",
        "ttl": "100"
      }
    }
  }],
  "stores": {
    "left": {
      "key$schema": "string/string/string"
    },
    "right": {
      "key$schema": "string/string"
    }
  },
  "outputs": { ... }
}

Input buckets

Input buckets do not support select key space operations. The data must be written to a store if you wish to do something complex with the key space before transformation.

Limitations

with can only join data from 1 other bucket that references a store. If you require data from multiple buckets, cascade the data across multiple nodes in the graph.

Anchors

When using with, you may expand out wildcards from the left select operation in your right select operation. You can use this functionality to convert the outer join into a left join.

In the example at the beginning of this section, an event that is written to the store left with the key jan/usr/123 will be joined with data from right under the key jan/lookup. The query $month is treated as a wildcard i.e. $month/*/ is equivalent to /*/ for the select but allows you to anchor the right key with the $month value of the left.

It is an error to declare an anchor multiple times in the same select e.g. "select": "*/$a/$a" is wrong. Only one anchor per select is allowed.

You cannot anchor the grouping operation effectively meaning you cannot use in your with selection.

createDependency

If the "createDependency": true (default is false) property was added to the with object, that would enhance the relation of the two inputs by triggering a computation of the graph when either left or right had new data.

Cycles

Cycles are evaluated late. You can set up a graph that never triggers because data is never written to that port by your implementation that would cause the data to loop back on itself.

A node can reference itself. A write does not trigger a recomputation. A future event would select the data.

3.4 input.lookup

You can also lookup keys from various stores using the lookup operation. You need to specify an array of bucket/key pairs.

"input": {
  "bucket": "left",
  "lookup": [
    {
      "bucket": "b1",
      "key": "jan/d1",
      "createDependency": false
   },
   {
      "bucket": "b2",
      "key": "2016/q1",
      "createDependency": true
   }]
}

createDependency & Anchors

You can use anchors and createDependency in your lookup statements in the same way as on your with statements.

4. "outputs" property

A node can output data to stores or DAG exports . For store outputs you can also override the ttl and createOnly properties declared in the store definition.

Overriding ttl and createOnly properties of a store will apply to the outputs of this node only.

// Example output overriding some properties of the "right" bucket
"outputs": {
  "dump": {},
  "right" : {
    "createOnly": true,
    "ttl": "100"
  }
}

Nodes