Systems
Systems are used for defining key configuration details shared across elements which are based on a common data store or external tool.
Systems are used for connecting 3rd party components to Rierino platform, by configuring their access details, such as:
Database systems: Including shared connection and authorization details for state and query managers
Event streaming systems: Including shared connection and authorization details for streams
Authentication systems: Including connection details for authentication and authorization systems
API based systems: Including connection and authorization details for REST API integrations
Each of these systems can be configured as elements just once and added to as many runners as required for allowing such runners communicate with an external component.
It is recommended to configure and use value lookups (e.g. from k8s ConfigMap, etcd or similar) for infrastructure related settings (e.g. server IP address) and secret lookups (e.g. from k8s Secret, HashiCorp Vault or similar) for credential settings (e.g. API authentication token).
Value lookups can be referenced using ${{VALUE_PATH}} notation whereas secret lookups can be referenced using #{{SECRET_PATH}} notation when deployments are configured to use value and secret loaders.
Some systems also support auto reconnect functionality with "ephemeral" lookups. Such systems replace $!{{VALUE_PATH}} and #!{{SECRET_PATH}} notations with their respective values on each reconnect attempt. This feature allows changing a target system IP address or connection credentials without having to restart microservices connecting to such systems.
List of applicable settings when configuring a system element depends on the type of system (e.g. a database system requiring JDBC connection details, a rest system requiring connection URL), such as:
REST
Includes settings required for connecting with an http server for standard REST API calls.
url
Base URL for REST endpoint
http://localhost:8080/api
-
contentType
Media content type for REST communications ("query" for url parameters, "none" for no content)
application/xml
application/json
trust
Whether endpoint should be trusted without SSL validation
true
false
auth.method
Method to use for authentication to endpoint (input, basic, bearer, scribe, jwt)
basic
-
Header to include generated token in
custom-auth
Authorization
Prefix to include for generated token
none (for no prefix)
Bearer (Basic for basic auth)
header.[header]
Additional header to send in system communications
special-header=value
-
clientPrefix
Client library specific prefix to use for setting client properties
jersey.config.client
-
client.[property]
Client library specific property to include
async.threadPoolSize=10
-
alternateUrls
Comma separated list of alternate URLs for fallback
http://back1.example.com,http://back2.example.com
-
retries
Maximum number of retries allowed in case of server failure
5
0
backoffMs
Milliseconds to wait between retries
500
1000
failTTLMs
Milliseconds to wait until reconsidering a base or alternate URL
10000
5000
responseCookies
Json path to return cookie details received in response (in {path, value, domain, expiry, [name]} format)
cookies
-
responseHeaders
Json path to return header details received in response (in {[name]:[]} format)
headers
-
When using XML contents, json body elements starting with to attributes. E.g.
{"soapenv:Envelope": {"-xmls:soapenv": "http://schemas.xmlsoap.org/soap/envelope/"} }
generates
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
Cookies can be sent using "header.cookie" header parameter with "[name]=[value];[name]=[value]" format as the header value, similar to regular headers.
Based on auth.method, additional system parameters are used:
Input Authentication Method
Authentication with a request body element:
auth.key
Key to include in each request body for authentication
cmVhbGx5Pw==
-
auth.element
Json path to include authentication key in body
user.apiKey
-
Basic Authentication Method
Username / password based basic authentication:
auth.username
Username to include in each request
admin
-
auth.password
Password to include in each request
pass
-
Bearer Authentication Method
Authentication with a bearer token:
auth.token
Token to include in each request
cmVhbGx5Pw==
-
auth.prefix
Bearer prefix to use in each request
Special_Token
Bearer
Scribe Authentication Method
Scribe based OAuth token authentication:
auth.prefix
Bearer prefix to use for access token in each request
Special_Token
Bearer
auth.provider.authentication
Client authentication model (if required, none, basic or input)
input
basic
auth.provider.key
Key to use for client authentication
somekey
-
auth.provider.secret
Secret to use for client authentication
somesecret
-
auth.provider.url
Url of OAuth provider
http://example.com/oauth
-
auth.provider.refreshUrl
Url for getting new tokens when useRefresh is true
http://example.com/oauth/refresh
[auth.provider.url]
JMESPath pattern for converting response from provider url to standard OAuth
{"access_token": data.accessToken}
-
auth.token.grant
Grant type (password, clientCredentials, refreshToken, custom)
password
-
auth.token.scope
Authentication scope to apply
customer.read
-
auth.token.useRefresh
Whether the refresh_token received from OAuth endpoint should be used for
true
false
auth.token.service
Class name for OAuth provider service if custom class is needed
com.github.scribejava.core.builder.api.DefaultApi20
-
Depending on the grant type parameter, additional settings are applicable as follows:
Password Grant Type
auth.username
Username for scribe grant
admin
-
auth.password
Password for scribe grant
pass
-
Refresh Token Grant Type
auth.refreshToken
Refresh token for scripe
-
-
Custom Grant Type
auth.provider.system
Name of to use for token requests
custom_auth
-
provider.custom.url
Url endpoint on custom system to call for access tokens
/GetToken
-
provider.custom.method
Http method to use for access token calls
GET
-
provider.custom.content
Content type for access token calls
query
-
provider.custom.input
Json string to use as call body/parameters for access tokens
{ username: "user", password: ${{rierinoKV.secret}} }
-
provider.custom.refreshUrl
Url endpoint on custom system to call for refreshing tokens
/RefreshToken
-
provider.custom.refreshMethod
Http method to use for token refresh calls
GET
-
provider.custom.refreshContent
Content type for token refresh calls
query
-
provider.custom.refreshInput
Json string to use as call body/parameters for token refresh
-
-
provider.custom.refreshPath
Json path to add refresh token received from access token call to call body/parameters
refresh_token
-
JWT Authentication Method
Authentication generating JWT token wiith a private key:
auth.prefix
Bearer prefix to use in each request
Special_Token
Bearer
auth.ephemeral
Whether each request should generate a new access token
true
false
auth.issuer
Issuer of the token
rierino
-
auth.subject
Subject of the token
12345
-
auth.audience
Target audience for the token
-
auth.payload
Payload of the token
foo
-
auth.id
Id of the token
12345
-
auth.ttl
TTL of token in seconds
36000
3600
auth.claim.*
Claims of the token
user=123
-
auth.header.*
Header of the token
typ=JWT
-
auth.privateKey
Private key for signing token
AAAAAAAA
-
auth.algorithm
Algorithm for signing token
RS256
RS256
In addition to parameters, it is possible to send token inputs (e.g. issuer, ttl, claim, header) using input authentication payload.
MongoDB
Includes settings required for connecting to a MongoDB database.
uri
Connection string in URI format for connecting to MongoDB system (can be multiple comma separated)
mongodb://localhost:27017
-
database
Name of the database to connect
master
-
This system requires the following dependency added to deployment contents:
File System
Includes settings required for connecting to a file system. Additional HDFS settings can be applied using site.xml files.
uri
Filesystem root address
hdfs://localhost:8020/master
-
fsspec.protocol
Fsspec protocol when using file system with a Py4J handler
sftp
-
fsspec.options
Json representation of fsspec options when using with a Py4J handler
{host:"", port:22, username:"", password:""}
-
hdfs.[parameter]
Filesystem parameters when using with an FSEventHandler
fs.s3a.impl=com.rierino.util.fs.CustomS3FileSystem
-
Custom file systems listed in "Gateway Services" can be also used with FSEventHandler.
When writing to sequence files with FSEventHandler, this system also uses the following settings:
path.writer
Full class name of the path writer to use for generating file paths
com.rierino.handler.util.helper.hdfs.DatePathWriter
com.rierino.handler.util.helper.hdfs.DatePathWriter
path.maxRows
to include in each sequence file (-1 for unlimited)
10000
-1
path.bufferSize
Buffer size for sequence file writer
1000
-1
path.blockSize
Block size for sequence file writer
100
-1
path.compression
Compression to apply on sequence file writer
BLOCK
NONE
path.asBytes
Whether to write contents as bytes or Text.class
false
true
path.format
Sequence path format to use for DatePathWriter (e.g. one folder per hour)
yyyy/MM/dd/hh
yyyy/MM/dd
CDC
Includes settings required for connecting to a database or a similar system for change data capture. CDC managers produce CDCRecord entries and publish them on a given stream, which can be consumed by a CDCRoleHandler to convert them into pulse and journal records.
Spring event runners provide support for CDC managers, where each CDC stream linked to a CDC manager can define an offset state (using offset.state parameter of the stream), which is updated based on the specified commit duration (using commitMs parameter of the runner) for managing resume tokens on restart.
Samza event runners on the other hand, provide more native support for CDC managers, treating them as consumers with input streams with a specific way of configuring access to them.
systems.$alias.consumer.manager
manager
Fully qualified class name for the CDC manager
com.rierino.state.cdc.MongoCDCManager
-
systems.$alias.consumer.dlq.suffix
dlq.suffix
Suffix to add to CDC stream names for dead letter queues
_fail
-
systems.$alias.consumer.dlq.enrich
dlq.enrich
Whether dead letter queue entries should include CDC content
true
false
systems.$alias.consumer.offset.type
offset.type
Type of resume token / offset value (long, comparable or unordered)
long
unordered
systems.$alias.consumer.pollMs
Milliseconds to wait before polling new records
5000
1000
systems.$alias.consumer.asPulse
Whether CDC should produce records as pulse instead of CDC records
false
true
systems.$alias.consumer.manager.parameter.ignoreTerminate
ignoreTerminate
Whether the system should stop listening if a TERMINATE operation is received
true
false
systems.$alias.consumer.manager.parameter.onResumeFail
onResumeFail
Type of action when CDC manager can not resume from last checkpoint (SKIP, MUTE or FATAL)
FATAL
SKIP
systems.$alias.consumer.manager.parameter.onRecordFail
onRecordFail
Type of action when CDC manager can not process current change record (SKIP, DLQ, MUTE or FATAL)
FATAL
SKIP
systems.$alias.consumer.manager.parameter.ignoreResume
ignoreResume
Whether the system should ignore current resume token and start as if it is missing
true
false
systems.$alias.consumer.manager.parameter.resumeReset
resumeReset
Type of strategy to follow on missing resume token (OLDEST or NEWEST)
NEWEST
OLDEST
systems.$alias.consumer.manager.parameter.disableReconnect
disableReconnect
Whether reconnecting on failure should be disabled or not
true
false
systems.$alias.consumer.manager.parameter.retriesPerStep
retriesPerStep
Number of reconnect retries on each backoff step
3
1
systems.$alias.consumer.manager.parameter.backoffSteps
backoffSteps
Milliseconds to wait at each backoff step as comma separated values
1000,30000
10,100,200,500,1000,1000,10000
In addition to these shared settings, the following CDC managers have additional settings, which are similar to system settings (e.g. systems.$alias.consumer.manager.parameter.uri for MongoDB uri):
com.rierino.state.cdc.NoopCDCManager: Uses "ms" setting for configuring milliseconds to wait between creating a new CDC record with an incremental aggregate ID.
com.rierino.state.cdc.ActionCDCManager: Uses "action" setting for making a call to action path on each iteration and an optional "source.stream" setting for defining source for the action call. Processed event payload can contain 3 main fields:
wait: If set to true, the action is not triggered till the CDC manager is polled again
offset: Used as the resume token, which is provided in event payload on the next action call
content: Stored in content of the produced CDC record
com.rierino.state.cdc.MongoCDCManager: Uses "uri" and "database" settings.
com.rierino.state.cdc.RedisCDCManager: Uses "uri" and "master" settings.
com.rierino.state.cdc.EtcdCDCManager: Uses "url", "namespace", "user", "password" settings.
com.rierino.state.cdc.DebeziumCDCManager: Uses all settings applicable to Debezium connectors.
This manager requires the following dependency added to deployment contents:
com.rierino.state.cdc.HDFSCDCManager: Uses "uri" and all settings applicable to HDFS file systems for file system change data capture.
com.rierino.state.cdc.odata4.OdataCDCManager: Uses "url" and "path" settings and delta logic of odata v4 endpoints for change data capture.
com.rierino.state.cdc.MailCDCManager: "mail.*" settings and UID logic of email servers to fetch new emails as change data capture.
Runners using CDC managers should be deployed with single replicas since managers consume all records coming from a CDC stream without applying any partitioning. To apply partitioning on these records, the runners should output records to Kafka topics and run business logic on runners consuming these topics.
Keycloak
Includes settings required for connecting to a Keycloak server for authentication handlers.
config
Json string for Keycloak adapter configuration
{"realm":"test", ...}
-
authServerUrl
Url endpoint for Keycloak server (if not provided as config already)
https://localhost/auth/
-
realm
Authentication realm to use (if not provided as config already)
admin-user
-
resource
Authentication client resource to use (if not provided as config already)
rierino-auth
-
credential.[key]
Keycloak server access credentials as KV pair (if not provided as config already)
provider=secret, username=admin, secret=pass
-
roles
Default roles to assign to each new user
user
-
idToken
Whether to return id_token when resolving tokens
true
false
This system requires the following dependency added to deployment contents:
Elasticsearch
Includes settings required for connecting to an Elasticsearch server.
url
HTTP endpoint to access ES through REST API (can be multiple comma separated)
http://localhost:9200
-
pathPrefix
Path prefix to include for ES rest client builder
test
-
username
Username for accessing ES with basic authentication
*****
-
password
Password for accessing ES with basic authentication
*****
-
token
Service token to access ES with token authentication
*****
-
key
API key to access ES for key authentication
*****
-
secret
API secret to access ES for key authentication
*****
-
This system requires the following dependency added to deployment contents:
Redis
Includes settings required for connecting to a Redis store.
uri
Connection string in URI format for connecting to Redis system (can be multiple comma separated)
redis://localhost:6379
-
master
Master name if using sentinel pool
master
-
This system requires the following dependency added to deployment contents:
Couchbase
Includes settings required for connecting to a Couchbase database.
uri
Connection string in URI format for connecting to Couchbase system
couchbase://localhost
-
username
Username for access
user
-
password
Password for access
pass
-
bucket
Bucket to access
bucket_1
-
bucket.wait
Bucket wait duration
30
10
This system requires the following dependency added to deployment contents:
JDBC
Includes settings required for connecting to a database using JDBC.
uri
JDBC data source uri
jdbc:sqlite:test.db
-
connectionProperties
JDBC connection properties
DriverClassName=org.sqlite.JDBC,Username=user
-
dataSourceClassName
org.postgresql.xa.PGXADataSource
-
For Jooq based JDBC state managers, the following system settings are also applicable:
dbms
Jooq DBMS name for SQLDialect
POSTGRES
DEFAULT
mergeInto
Whether the Jooq database class supports merge into or not
false
true
This system requires the following dependency added to deployment contents:
Kafka
Includes settings required for connecting to a Kafka cluster.
binary
Whether the system uses binary or text data format
true
false
key.class
Fully qualified classname for Kafka system record keys
java.lang.String
-
msg.class
Fully qualified classname for Kafka system record contents
java.lang.String
-
msg.class.inner
Fully qualified inner classname for Kafka system record contents, if msg.class is a generic
java.lang.String
-
rierino.system.$alias.consumer.*
Kafka consumer configurations (except for Samza)
bootstrap.servers=localhost:9092
-
rierino.system.$alias.producer.*
Kafka producer configurations (except for Samza)
batch.size=1
-
systems.$alias.*
Samza specific connection configurations
samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-
parameter.consumer.[property]
Kafka consumer properties
auto.offset.reset=earliest
-
parameter.producer.[property]
Kafka producer properties
acks=0
-
parameter.output.backupSystem
Name of backup system to use if a stream of this system fails
kafka_backup
-
parameter.output.backupStream
Name of backup stream to use if a stream of this system fails
journal_backup
-
etcd
Includes settings required for connecting to an etcd server.
url
Comma separated list of endpoints for etcd server(s)
http://localhost:2379, http://localhost:2378
-
user
Username for etcd connection
****
-
password
Password for etcd connection
****
-
This system requires the following dependency added to deployment contents:
Camel
Includes settings required for connecting to an Apache Camel system.
camelRoute
Uri for the Camel system endpoint
mock:out
-
This system requires the following dependency added to deployment contents:
Email
Includes settings required for connecting to an email server system.
mail.*
Jakarta mail settings to apply
mail.store.protocol=imap
-
mail.rierino.*
Rierino OAuth2Auth authentication settings (when mechanism is XOAUTH2)
-
-
This system requires the following dependency added to deployment contents:
noop
A predefined "noop" system exists in Rierino, which is considered to be a dummy system. Any stream mapping to this system name ignores all send message requests, acting as a blackhole.
Last updated