# Tutorial 1: Event Aggregation : Streaming BTS

In this tutorial we'll learn how Blurr performs basic data aggregation. The following concepts will be introduced:

* The *Blurr Transform Spec* document (BTS)
* The basic blocks of a BTS: `Header`, `Store`, `Identity` and `Aggregates`
* How events are processed and aggregated one by one by a `Block Aggregate`
* How `Identity` and `Dimensions` are used to create new records.

Try the code from this example [launching a Jupyter Notebook](https://mybinder.org/v2/gh/productml/blurr/master?filepath=examples%2Ftutorial).

## 1. Events

Our sample application is a fairly simple game in which the player can either win or lose.

Users can play as many games as they want in one sitting, what we call a **session**. Each event will have a `session_id` to identify the session in which the game was played.

This app collects 2 types of events:

* `game_start`: sent when a user starts a new game.
* `game_end`: sent when a user finishes a game. Contains a `won` field that marks whether the user won the game (`1` for a win, `0` for a loss).

Example:

```javascript
{
  "user_id": "09C1", # unique user identifier
  "session_id": "915D", # the session the game is played on
  "event_id": "game_start", # type of the event
  "country" : "US", # demographic data
  "timestamp": "2018/03/04 09:01:03" # time of the occurrence of the event
}
```

Events are stored as `JSON` entries, split by a new line character `\n`:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:01:03" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:03:04"  }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:04:31"  }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:10:22"  }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:11:03"  }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:21:55"  }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start", "timestamp": "2018/03/04 09:22:13"  }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:25:24"  }
```

Our goal is to **collect session statistics**, such as games played in a session by a user, or the total games won.

## 2. The Transformation

For the sequence of events listed before we're interested in the **number of games played** and **number of games won** by player and session.

We will transform the original sequence of events into an series of records containing the desired information:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 1          |

In order to obtain this transformation, Blurr will process the events sequentially one by one using this **Blurr Transform Spec (BTS)** file.

```yaml
Type: Blurr:Transform:Streaming
Version: '2018-03-01'
Name: sessions

Stores:
   - Type: Blurr:Store:Memory
     Name: hello_world_store

Identity: source.user_id

Import:
  - { Module: dateutil.parser, Identifiers: [ parse ]}

Time: parse(source.timestamp)

Aggregates:

 - Type: Blurr:Aggregate:Block
   Name: session_stats
   Store: hello_world_store

   Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id

   Fields:

     - Name: games_played
       Type: integer
       When: source.event_id == 'game_start'
       Value: session_stats.games_played + 1

     - Name: games_won
       Type: integer
       When: source.event_id == 'game_end' and source.won == 1
       Value: session_stats.games_won + 1
```

Let's have a quick look at the five main blocks of this BTS: `Header`, `Store`, `Time`, `Identity` and `Aggregates`.

### 2.1. Header

```yaml
Type: Blurr:Transform:Streaming
Version: '2018-03-07'
Name : sessions
```

`Type` and `Version` identify the capabilities of the BTS.

Further in this series of tutorials we'll introduce different types of BTSs, such as `Window` BTS. We'll also learn how BTSs are combined, the reason why every BTS must have a unique `Name`.

### 2.2. Store

```yaml
Store:
   - Type: Blurr:Store:Memory
     Name: hello_world_store
```

The output of a transformation is a collection of **records** persisted in a datastore. For this example we'll be using an in-memory datastore.

### 2.3. Identity

Every BTS has an **Identity**, which is always a property of the events being processed. In our example, the Identity is the property `user_id`:

```yaml
Identity: source.user_id
```

> In a BTS we can access the properties of the event being processed using the `source` keyword, as in `source.user_id` or `source.won`

The Identity is the main dimension around which events are aggregated. At this stage, let's just think on the Identity as a mandatory field that is part of both the original events and the output.

### 2.4. Time

Used to parse timestamp expressions from events.

```yaml
Time: parser.parse(source.timestamp, 'YYYY/mm/dd HH:MM:SS')
```

Among other things, Blurr uses `Time` to internally generates `start_time` and `end_time` values for each session. We'll see in the next tutorial why this is critical to certain aggregation features.

### 2.5. Aggregates

This is where the magic happens. Aggregates define the nature of the transformation. Our example has a single Aggregate of type `Block Aggregate`. Different types of Aggregates will be introduced in the next tutorials.

We'll learn how the transformation happens in the next section by examining the flow of data event by event.

## 3. Data Flow

Events are processed one by one, and then aggregated as defined in the `Block Aggregate`:

```yaml
Aggregates:
 - Type: Blurr:Aggregate:Block
   Name: session_stats
   Store: hello_world_store

   Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id

   Fields:

     - Name: games_played
       Type: integer
       When: source.event_id == 'game_start'
       Value: session_stats.games_played + 1

     - Name: games_won
       Type: integer
       When: source.event_id == 'game_end' and source.won == '1'
       Value: session_stats.games_won + 1
```

In order to understand how `Block Aggregate` aggregates data we'll use the sequence of events from the initial section.

### 3.1. First Event : `game_start`

The first event is processed when the first user starts playing the game:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
```

Aggregates are calculated taking into account the historical series of events. In this case, `games_played` is increased by `1` every time a new game starts:

```yaml
- Name: games_played
  Type: integer
  When: source.event_id == 'game_start'
  Value: session_stats.games_played + 1
```

Whenever a `game_start` event is received, the existing `session_stats.games_played` record is increased by one.

> You can always access a field in the previously saved record by using the name of the Aggregate and the name of the field, such as in `session_stats.games_played` or `session_stats.games_won`.

Since this is the first historic event, the following will happen:

1. A new record is created in the store with the default values for each field (`""` for `string`, `0` for `integer`)
2. The event is processed, updating the record using the `Value` expressions for the field. The content of `Value` can be **any Python expression**.

The resulting record is added to the store:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 1             | 0          |

### 3.2. Second Event : `game_end`

The user from the 1st event wins a game:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
```

Processing this event results in the existing record having `games_won` increased by one:

```yaml
- Name: games_won
  Type: integer
  When: source.event_id == 'game_end' and source.won == 1
  Value: session_stats.games_won + 1
```

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 1             | 1          |

### 3.3. 3rd and 4th Event : user plays a new game

The same user plays and wins a new game in the same session:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
```

After processing both events,`games_played` and `games_won` are increased by one.

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |

### 3.4. 5th and 6th Event : a new user plays a game

A second user starts a new game:

```javascript
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start" }
```

Previously we defined `source.user_id` as the **Identity** of the BTS:

```yaml
Identity: source.user_id
```

Here we introduce one of the roles of the Identity: whenever an event is received and the Identity value doesn't exist in the store (like when a new user plays a game), a new record is added:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 0          |

After the `game_end` event is received, the record is updated with the win result:

```javascript
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1 }
```

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |

### 3.5. 7th Event : a user starts a new session

After some time, the user decides to play again. This is considered a new session from the game perspective:

```javascript
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start" }
```

There's an element of the Aggregate we haven't covered yet, `Dimension`:

```yaml
Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id
```

Dimensions is a key component of event aggregation. A `Block Aggregate` always contains a `Dimensions` section, defining **the record has to be upated with new events** in the store.

The dimension fields are evaluated first every event. If the `Block Aggregate` is not already evaluating these dimensions then an existing record from the store is retrieved. If no record is found in the store then a new record is created.

* `source.session_id` is the value of the property `session_id` in the event being processed (`T8KA`).
* `session_stats.session_id` is the value of `session_id` **in the last record saved for the same Identity** (i.e. the last session played by the user, `915D`)

```python
source.session_id == session_stats.session_id
"T8KA" == "915D" # False - New record is created because T8KA doesn't already exist in the store
```

As a result of the evaluation of `Dimensions` a new record is created in the store:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 0          |

### 3.6. 8th Event : `game_end`

The previous user finishes the game:

```javascript
{ "user_id": "09C1", "session_id": "T8KA", "country" : "US", "event_id": "game_end", "won": 1 }
```

Since `session_id` is the same for the last record saved from the same user (created after the previous event):

```python
source.session_id == session_stats.session_id
"T8KA" == "T8KA" # True
```

No record is created. The last record for that user is updated instead:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 1          |

## 4. Previewing the transformation using Blurr CLI

We can preview the result of the transformation using `blurr transform` command:

```bash
$ blurr transform --streaming-bts tutorial1-streaming-bts.yml tutorial1-data.log

["09C1/session_stats/915D/", {"_identity": "09C1", "_start_time": "2018-03-04T09:01:03", "_end_time": "2018-03-04T09:10:22", "games_played": 2, "games_won": 2, "session_id": "915D"}]
["09C1/session_stats/T8KA/", {"_identity": "09C1", "_start_time": "2018-03-04T09:22:13", "_end_time": "2018-03-04T09:25:24", "games_played": 1, "games_won": 1, "session_id": "T8KA"}]
["B6FA/session_stats/D043/", {"_identity": "B6FA", "_start_time": "2018-03-04T09:11:03", "_end_time": "2018-03-04T09:21:55", "games_played": 1, "games_won": 1, "session_id": "D043"}]
```

`transform` prints the result of the transform in JSON format, which is slightly different from the table representation.

Each entry consists of an array with 2 items:

* A `identity/aggregate_name/session_id/` string. The **Identity** is represented by `user_id` in the tables.
* An object with the remaining values of the record.
