Data Transform Config (DTC)
The core of Blurr is the Data Transformation Configuration (DTC).

There are 2 types of DTCs - Streaming and Window DTC.
The Streaming DTC takes raw data and converts it into blocks.

The Window DTC generates data relative to a block. This is especially useful for building features for machine learning models.

Blurr is in Developer Preview so the spec is likely to change
The anatomy of a Streaming DTC
The raw event being processed is defined as source
in the DTC. Event parameters are accessed as source.<key>
such as source.user_id
. DTCs are processed in a python 3.6 environment and the expressions used for field values are executed as python statements.
Header
Type: Blurr:Transform:Streaming
Version: '2018-03-07'
Description: Create blocks from streaming Raw Data
Name: offer_ai_v1
Identity: source.user_id
Time: parse_time(source.event_time, 'YYYY/mm/dd HH:MM:SS')
When: source.package_version = '1.0'
Key
Description
Allowed values
Required
Type
The type of DTC - Streaming or Window
Blurr:Transform:Streaming
or Blurr:Transform:Window
Required
Version
Version number of the DTC, used by the Data Transformer Library to parse the template
Specific DTC versions only 2018-03-01
Required
Description
Text description for the DTC
Any string
Optional
Name
Unique name for the DTC
Any string
Required
Identity
The dimension in the raw data around which data is aggregated
source.<field>
Required
Time
Define what field in the source data to use as the time of the event. This is available as time
for other expressions to use. If source does not contain time then the datetime.utcnow()
expression can be used to retrieve the current utc datetime
Any python datetime
object
Optional
When
Boolean expression that defines which raw data should be processed
Any boolean
expression
Optional
Store
At the end of a transform, data is persisted in the store. The Data Transform Library (DTL) works with an abstraction of storage which isolates the actual storage tier from how it works with the DTL.
Stores:
- Type: Blurr:Store:MemoryStore
Name: hello_world_store
Key
Description
Allowed values
Required
Type
The destination data store
Blurr:Store:MemoryStore
. More Stores such as S3 and DynamoDB coming soon
Required
Name
Name of the store, used for internal referencing within the DTC
Any string
Required
Aggregates
Aggregates defines groups of data that are either in a one-to-one relationship with the Identity or a in a many-to-one relationship.
There are 3 types of Aggregates in a Streaming DTC.
IdentityAggregate
Blurr:Aggregate:IdentityAggregate
. Fields in the IdentityAggregates are in a one-to-one relationship with the identity. There is a single record that stores these fields and change to these fields overwrite the previous value. There are no historical records kept for state changes.
Aggregates:
- Type: Blurr:Aggregate:IdentityAggregate
Name: user
Store: offer_ai_dynamo
When: source.event_id in ['app_launched', 'user_updated']
Fields:
- Name: user_id
Type: string
Value: source.customer_identifier
- Name: country
Value: source.country
Key
Description
Allowed values
Required
Type
Type of Aggregate
Blurr:Aggregate:IdentityAggregate
, Blurr:Aggregate:BlockAggregate
, Blurr:Aggregate:VariableAggregate
Required
Name
Name of the Aggregate
Any string
, unique within the DTC
Required
Store
Name of the Store in which to create the Aggregate
Stores defined in the DTC
Required
When
Boolean expression that defines which raw events to process
Any boolean
expression
Optional
A Aggregate
contains fields
for the information being stored. IdentityAggregate fields are especially useful for data that is relatively static over time - like a user's country.
Each field in an IdentityAggregate has 3 properties.
Key
Description
Allowed values
Required
Name
Name of the field
Any string
Required
Type
Type of data being stored
integer
, boolean
, string
, datetime
, float
, map
, list
, set
Optional. If Type is not set, the DTL uses string
as the default type
Value
Value of the field
Any python expression, and must match the Type
Required
All fields in the Aggregate are encapsulated in a Aggregate object. The object is available in the DTL, which is the python environment processing the DTC. Field values can be accessed using AggregateName.FieldName
BlockAggregate
Blurr:Aggregate:BlockAggregate
. Fields in the BlockAggregates are in a one-to-many relationship with the identity. These fields are aggregated together in blocks based on the split condition specified.
- Type: Blurr:Aggregate:BlockAggregate
Name: session
When: source.app_version > '3.7'
Split: source.session_id != session.id
Fields:
- Name: presents_wrapped
Type: integer
Value: session.presents_wrapped + 1
Filter: source.event_name == 'wrap_present'
Key
Description
Allowed values
Required
Type
Type of Aggregate
Blurr:Aggregate:IdentityAggregate
, Blurr:Aggregate:BlockAggregate
, Blurr:Aggregate:VariableAggregate
Required
Name
Name of the Aggregate
Any, unique within the DTC
Required
When
Boolean expression that defines which raw events to process
Any boolean
expression
Optional
Split
Boolean expression that defines when a new block should be created
Any boolean
expression
Required
All fields in the Aggregate are encapsulated in a Aggregate object. The object is available in the DTL, which is a python environment processing the DTC. Field values can be accessed using AggregateName.FieldName
Each field in an BlockAggregate has 4 properties.
Key
Description
Allowed values
Required
Name
Name of the field
Any string
Required
Type
Type of data being stored
integer
, boolean
, string
, datetime
, float
, map
, list
, set
Optional. If Type is not set, the DTL uses string
as the default type
Value
Value of the field
Any python expression, and must match the Type
Required
When
Boolean expression that defines which raw events to process
Any boolean
expression
Optional
Variable
Blurr:Aggregate:VariableAggregate
. Variable Aggregates are temporary variables that can be used in other data blocks. Variables aim to reduce code duplication and improve readability. They are useful for cleansing / modifying / typecasting source elements and representing complex filter conditions that evaluate to a binary value.
- Type: Blurr:Aggregate:VariableAggregate
Name: vars
Fields:
- Name: item_price_micro
Type: integer
Value: int(float(source.item_price) * 1000000)
This can be accessed as vars.item_price_micro
anywhere in the Streaming DTC
Window DTC
Windowing concepts in Blurr are similar to the concepts in Apache Spark/Flink.
Once the Streaming DTC has run, the raw data is split into blocks. The Window DTC generates data relative to a block.

Say we're training a model to predict what sales offer to show to a user in a session. We want the features of this model to be purchases_prev_week
, games_played_last_session
and win_ratio_last_session
. We'd like to predict next_7_days_revenue
.
When training a model, the data needs to be organized around the decision point. We want to know the value of the features at the point an offer is shown. So if the offer is shown during session 4, the features need to be relative to session 4. And the optimization function (total revenue over next 7 days) needs to be relative to session 4 as well.

The decision point is the Anchor. A window defines segments of data relative to the anchor. In this case, we need the features and optimization function relative to when an offer is shown.
The anatomy of a Window DTC
Header
Type: Blurr:Transform:Window
Version: '2018-03-01'
Description: Generate features around an Anchor
Name: Window Example
SourceDTC: offer_ai_v1
Key
Description
Allowed values
Required
Type
The type of DTC - Streaming or Window
Blurr:Transform:Streaming
or Blurr:Transform:Window
Required
Version
Version number of the DTC, used by the Data Transformer Library to parse the template
Specific DTC versions only 2018-03-01
Required
Description
Text description for the DTC
Any string
Optional
Name
Unique name for the Window DTC
Any string
Required
SourceDTC
The name of the streaming DTC upon which this window DTC will be executed
Valid streaming DTC
Required
Anchor
An Anchor represents a decision made at a point in time. For example: when a model to pick the offer price for a user is being used, then this decision being made is an Anchor.
This is a vital concept to align the way data processing is done for training data generation and for prediction. The training data is generated as if the only information that was available was up to the Anchor point. Anything after the Anchor point can only be used as a label (which is what the model will predict).
Anchor:
Condition: offer_ai_v1.game_stats.offer_type != ''
Max: 1
Condition is a python expression which returns a boolean
value to determine if an anchor condition exists in the block being processed. Field values in a block are accessed as StreamingDTCName.AggregateName.FieldName
.
Max defines the maximum number of output rows to be generated every time the Window DTC is run.
When the number of blocks that satisfy the anchor condition is more than the max value specified, then, a ‘max’ number of satisfying sessions are picked randomly. Max can be used to prevent oversampling of more active users.
Store
Stores:
- Type: Blurr:Store:MemoryStore
Name: hello_world_store
Key
Description
Allowed values
Required
Type
The destination data store
Blurr:Store:MemoryStore
. More Stores such as S3 and DynamoDB coming soon
Required
Name
Name of the store, used for internal referencing within the DTC
Any string
Required
Aggregates
All Aggregate operations that are performed in a window DTC can only use the following available data:
Anchor block - Block that satisfies the anchor condition. The fields from the anchor block can be accessed as
anchor.FieldName
.IdentityAggregate - Identity aggregates available from the source Streaming DTC. The fields from an IdentityAggregate can be accessed as
StreamingDTCName.IdentityAggregateName.Field Name
.A window of blocks around the anchor block - A list of blocks from a BlockAggregate before or after the anchor block based on the window defined. A field from the list of blocks is referenced as
WindowName.FieldName
.
WindowAggregate
Aggregates:
Aggregates:
- Type: Blurr:Aggregate:WindowAggregate
Name: last_session
# Defines a processing window for the rollup. Supported window types are Day, Hour and Count
WindowType: count
# Negative values are backward from the Anchor
WindowValue: -1
Source: offer_ai_v1.game_stats
Fields:
# The output will contain a column for last_session.games_played
- Name: games_played
Type: integer
# source.games_played is a list containing the games_played information from the
# sessions that fall in the window defined above. In this case, because of a count
# window of 1 the source.games_played list only contains a single data point.
Value: source.games_played[0]
Key
Description
Allowed values
Required
Type
Type of Aggregate
Blurr:Aggregate:WindowAggregate
, Blurr:Aggregate:VariableAggregate
Required
Name
Name of the Aggregate
Any string
, unique within the DTC
Required
WindowType
The type of window to use around the anchor block
day
, hour
, count
Optional. A WindowAggregate can be defined without a Window
WindowValue
The number of days, hours or blocks to window around the anchor block
Integer
Optional. A WindowAggregate can be defined without a Window
Source
The BlockAggregate (defined in the Streaming DTC) on which the window operations should be performed
Valid BlockAggregate
Required
All functions defined on windows work on a list of values. For e.g. if a session contains a games_played
field and a last_week
window is defined on it, then last_week.games_played
represents the list of values from last week's sessions.
Important: Window operations using WindowAggregate
do not include the Anchor block itself.
Each field in a WindowAggregate has 3 properties.
Key
Description
Allowed values
Required
Name
Name of the field
Any string
Required
Type
Type of data being stored
integer
, boolean
, string
, datetime
, float
, map
, list
, set
Optional. If Type is not set, the DTL uses string
as the default type
Value
Value of the field
Any python expression, and must match the Type
Required
Variable
The Variable Aggregate works exactly the same as in the Streaming DTC.
Order of Aggregates
Aggregates can be defined in any order. However, if field values are referenced within the DTC, they must be defined in the order in which they are referenced. For example, if a BlockAggregate
uses a Variable
, then the Variable
Aggregate should be defined before the BlockAggregate
so that the Variable
is processed first and available to the BlockAggregate
when the BlockAggregate
is being processed.
Processing field values
All field values must be valid python expressions. These expressions are executed by the Data Transform Library (DTL), which uses a Python 3.6 interpreter.
Errors
Field values are not recorded when:
Evaluation results in an error
A None value is returned / no values are returned
Value returned is of a type different from the type of the field and casting the value to the required type fails.
Types
If a type is not specified for a field, it is assumed that it is a string
. The following types are supported.
Type
Description
Expected format
Default
integer
integers!
2
0
boolean
boolean!
true
false
string
default type
'this is a string'
''
datetime
datetime
any datetime object in python
-
float
floating point numbers with decimals
1.2
0.0
map
Any type to type maps
‘{“a”: 1, “b”: 2}’
empty map
list
lists can contain any type of data
‘[1,2,3]’
empty list
set
contains only unique elements that are integer
, long
, float
and strings
-
empty set
Reserved keywords
Reserved Field Names
identity
time
start_time
end_time
name
schema
type
window
split
filter
Reserved Function Names
add_to_map
add_to_set
counter
Last updated