> For the complete documentation index, see [llms.txt](https://productml.gitbook.io/blurr/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://productml.gitbook.io/blurr/docs/streaming-bts-tutorial.md).

# Tutorial 1: Event Aggregation : Streaming BTS

In this tutorial we'll learn how Blurr performs basic data aggregation. The following concepts will be introduced:

* The *Blurr Transform Spec* document (BTS)
* The basic blocks of a BTS: `Header`, `Store`, `Identity` and `Aggregates`
* How events are processed and aggregated one by one by a `Block Aggregate`
* How `Identity` and `Dimensions` are used to create new records.

Try the code from this example [launching a Jupyter Notebook](https://mybinder.org/v2/gh/productml/blurr/master?filepath=examples%2Ftutorial).

## 1. Events

Our sample application is a fairly simple game in which the player can either win or lose.

Users can play as many games as they want in one sitting, what we call a **session**. Each event will have a `session_id` to identify the session in which the game was played.

This app collects 2 types of events:

* `game_start`: sent when a user starts a new game.
* `game_end`: sent when a user finishes a game. Contains a `won` field that marks whether the user won the game (`1` for a win, `0` for a loss).

Example:

```javascript
{
  "user_id": "09C1", # unique user identifier
  "session_id": "915D", # the session the game is played on
  "event_id": "game_start", # type of the event
  "country" : "US", # demographic data
  "timestamp": "2018/03/04 09:01:03" # time of the occurrence of the event
}
```

Events are stored as `JSON` entries, split by a new line character `\n`:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:01:03" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:03:04"  }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:04:31"  }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:10:22"  }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start", "timestamp": "2018/03/04 09:11:03"  }
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:21:55"  }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start", "timestamp": "2018/03/04 09:22:13"  }
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_end", "won": 1, "timestamp": "2018/03/04 09:25:24"  }
```

Our goal is to **collect session statistics**, such as games played in a session by a user, or the total games won.

## 2. The Transformation

For the sequence of events listed before we're interested in the **number of games played** and **number of games won** by player and session.

We will transform the original sequence of events into an series of records containing the desired information:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 1          |

In order to obtain this transformation, Blurr will process the events sequentially one by one using this **Blurr Transform Spec (BTS)** file.

```yaml
Type: Blurr:Transform:Streaming
Version: '2018-03-01'
Name: sessions

Stores:
   - Type: Blurr:Store:Memory
     Name: hello_world_store

Identity: source.user_id

Import:
  - { Module: dateutil.parser, Identifiers: [ parse ]}

Time: parse(source.timestamp)

Aggregates:

 - Type: Blurr:Aggregate:Block
   Name: session_stats
   Store: hello_world_store

   Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id

   Fields:

     - Name: games_played
       Type: integer
       When: source.event_id == 'game_start'
       Value: session_stats.games_played + 1

     - Name: games_won
       Type: integer
       When: source.event_id == 'game_end' and source.won == 1
       Value: session_stats.games_won + 1
```

Let's have a quick look at the five main blocks of this BTS: `Header`, `Store`, `Time`, `Identity` and `Aggregates`.

### 2.1. Header

```yaml
Type: Blurr:Transform:Streaming
Version: '2018-03-07'
Name : sessions
```

`Type` and `Version` identify the capabilities of the BTS.

Further in this series of tutorials we'll introduce different types of BTSs, such as `Window` BTS. We'll also learn how BTSs are combined, the reason why every BTS must have a unique `Name`.

### 2.2. Store

```yaml
Store:
   - Type: Blurr:Store:Memory
     Name: hello_world_store
```

The output of a transformation is a collection of **records** persisted in a datastore. For this example we'll be using an in-memory datastore.

### 2.3. Identity

Every BTS has an **Identity**, which is always a property of the events being processed. In our example, the Identity is the property `user_id`:

```yaml
Identity: source.user_id
```

> In a BTS we can access the properties of the event being processed using the `source` keyword, as in `source.user_id` or `source.won`

The Identity is the main dimension around which events are aggregated. At this stage, let's just think on the Identity as a mandatory field that is part of both the original events and the output.

### 2.4. Time

Used to parse timestamp expressions from events.

```yaml
Time: parser.parse(source.timestamp, 'YYYY/mm/dd HH:MM:SS')
```

Among other things, Blurr uses `Time` to internally generates `start_time` and `end_time` values for each session. We'll see in the next tutorial why this is critical to certain aggregation features.

### 2.5. Aggregates

This is where the magic happens. Aggregates define the nature of the transformation. Our example has a single Aggregate of type `Block Aggregate`. Different types of Aggregates will be introduced in the next tutorials.

We'll learn how the transformation happens in the next section by examining the flow of data event by event.

## 3. Data Flow

Events are processed one by one, and then aggregated as defined in the `Block Aggregate`:

```yaml
Aggregates:
 - Type: Blurr:Aggregate:Block
   Name: session_stats
   Store: hello_world_store

   Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id

   Fields:

     - Name: games_played
       Type: integer
       When: source.event_id == 'game_start'
       Value: session_stats.games_played + 1

     - Name: games_won
       Type: integer
       When: source.event_id == 'game_end' and source.won == '1'
       Value: session_stats.games_won + 1
```

In order to understand how `Block Aggregate` aggregates data we'll use the sequence of events from the initial section.

### 3.1. First Event : `game_start`

The first event is processed when the first user starts playing the game:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
```

Aggregates are calculated taking into account the historical series of events. In this case, `games_played` is increased by `1` every time a new game starts:

```yaml
- Name: games_played
  Type: integer
  When: source.event_id == 'game_start'
  Value: session_stats.games_played + 1
```

Whenever a `game_start` event is received, the existing `session_stats.games_played` record is increased by one.

> You can always access a field in the previously saved record by using the name of the Aggregate and the name of the field, such as in `session_stats.games_played` or `session_stats.games_won`.

Since this is the first historic event, the following will happen:

1. A new record is created in the store with the default values for each field (`""` for `string`, `0` for `integer`)
2. The event is processed, updating the record using the `Value` expressions for the field. The content of `Value` can be **any Python expression**.

The resulting record is added to the store:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 1             | 0          |

### 3.2. Second Event : `game_end`

The user from the 1st event wins a game:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
```

Processing this event results in the existing record having `games_won` increased by one:

```yaml
- Name: games_won
  Type: integer
  When: source.event_id == 'game_end' and source.won == 1
  Value: session_stats.games_won + 1
```

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 1             | 1          |

### 3.3. 3rd and 4th Event : user plays a new game

The same user plays and wins a new game in the same session:

```javascript
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_start" }
{ "user_id": "09C1", "session_id": "915D", "country" : "US", "event_id": "game_end", "won": 1 }
```

After processing both events,`games_played` and `games_won` are increased by one.

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |

### 3.4. 5th and 6th Event : a new user plays a game

A second user starts a new game:

```javascript
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_start" }
```

Previously we defined `source.user_id` as the **Identity** of the BTS:

```yaml
Identity: source.user_id
```

Here we introduce one of the roles of the Identity: whenever an event is received and the Identity value doesn't exist in the store (like when a new user plays a game), a new record is added:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 0          |

After the `game_end` event is received, the record is updated with the win result:

```javascript
{ "user_id": "B6FA", "session_id": "D043", "country" : "US", "event_id": "game_end", "won": 1 }
```

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |

### 3.5. 7th Event : a user starts a new session

After some time, the user decides to play again. This is considered a new session from the game perspective:

```javascript
{ "user_id": "09C1", "session_id": "T8KA", "country" : "UK", "event_id": "game_start" }
```

There's an element of the Aggregate we haven't covered yet, `Dimension`:

```yaml
Dimensions:

     - Name: session_id
       Type: string
       Value: source.session_id
```

Dimensions is a key component of event aggregation. A `Block Aggregate` always contains a `Dimensions` section, defining **the record has to be upated with new events** in the store.

The dimension fields are evaluated first every event. If the `Block Aggregate` is not already evaluating these dimensions then an existing record from the store is retrieved. If no record is found in the store then a new record is created.

* `source.session_id` is the value of the property `session_id` in the event being processed (`T8KA`).
* `session_stats.session_id` is the value of `session_id` **in the last record saved for the same Identity** (i.e. the last session played by the user, `915D`)

```python
source.session_id == session_stats.session_id
"T8KA" == "915D" # False - New record is created because T8KA doesn't already exist in the store
```

As a result of the evaluation of `Dimensions` a new record is created in the store:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 0          |

### 3.6. 8th Event : `game_end`

The previous user finishes the game:

```javascript
{ "user_id": "09C1", "session_id": "T8KA", "country" : "US", "event_id": "game_end", "won": 1 }
```

Since `session_id` is the same for the last record saved from the same user (created after the previous event):

```python
source.session_id == session_stats.session_id
"T8KA" == "T8KA" # True
```

No record is created. The last record for that user is updated instead:

| session\_id | user\_id | games\_played | games\_won |
| ----------- | -------- | ------------- | ---------- |
| 915D        | 09C1     | 2             | 2          |
| D043        | B6FA     | 1             | 1          |
| T8KA        | 09C1     | 1             | 1          |

## 4. Previewing the transformation using Blurr CLI

We can preview the result of the transformation using `blurr transform` command:

```bash
$ blurr transform --streaming-bts tutorial1-streaming-bts.yml tutorial1-data.log

["09C1/session_stats/915D/", {"_identity": "09C1", "_start_time": "2018-03-04T09:01:03", "_end_time": "2018-03-04T09:10:22", "games_played": 2, "games_won": 2, "session_id": "915D"}]
["09C1/session_stats/T8KA/", {"_identity": "09C1", "_start_time": "2018-03-04T09:22:13", "_end_time": "2018-03-04T09:25:24", "games_played": 1, "games_won": 1, "session_id": "T8KA"}]
["B6FA/session_stats/D043/", {"_identity": "B6FA", "_start_time": "2018-03-04T09:11:03", "_end_time": "2018-03-04T09:21:55", "games_played": 1, "games_won": 1, "session_id": "D043"}]
```

`transform` prints the result of the transform in JSON format, which is slightly different from the table representation.

Each entry consists of an array with 2 items:

* A `identity/aggregate_name/session_id/` string. The **Identity** is represented by `user_id` in the tables.
* An object with the remaining values of the record.


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://productml.gitbook.io/blurr/docs/streaming-bts-tutorial.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
