A Detailed Look at the Data Platform

For a more gentle introduction to the data platform, please read the Pipeline Overview article.

This article goes into more depth about the architecture and flow of data in the platform.

The Entire Platform

The full detail of the platform can get quite complex, but at a high level the structure is fairly simple.

graph LR
  Producers[Data Producers] --> Ingestion
  Ingestion --> Storage[Long-term Storage]
  Ingestion --> Stream[Stream Processing]
  Stream --> Storage
  Batch[Batch Processing] --> Storage
  Storage --> Batch
  Self[Self Serve] -.- Stream
  Self -.- Batch
  Stream -.-> Visualization
  Batch -.-> Visualization
  Stream --> Export
  Batch --> Export

Each of these high-level parts of the platform are described in more detail below.

Data Producers

By far most data handled by the Data Platform is produced by Firefox. There are other producers, though, and the eventual aim is to generalize data production using a client SDK or set of standard tools.

Most data is submitted via HTTP POST, but data is also produced in the form of service logs and statsd messages.

If you would like to locally test a new data producer, the gzipServer project provides a simplified server that makes it easy to inspect submitted messages.

Ingestion

graph LR
  subgraph HTTP
    tee
    lb[Load Balancer]
    mozingest
  end
  subgraph Kafka
    kafka_unvalidated[Kafka unvalidated]
    kafka_validated[Kafka validated]
    zookeeper[ZooKeeper] -.- kafka_unvalidated
    zookeeper -.- kafka_validated
  end
  subgraph Storage
    s3_heka[S3 Heka Protobuf Storage]
    s3_parquet[S3 Parquet Storage]
  end
  subgraph Data Producers
    Firefox --> lb
    more_producers[Other Producers] --> lb
  end

  lb --> tee
  tee --> mozingest
  mozingest --> kafka_unvalidated
  mozingest --> Landfill
  kafka_unvalidated --> dwl[Data Store Loader]
  kafka_validated --> cep[Hindsight CEP]
  kafka_validated --> sparkstreaming[Spark Streaming]
  Schemas -.->|validation| dwl
  dwl --> kafka_validated
  dwl --> s3_heka
  dwl --> s3_parquet
  sparkstreaming --> s3_parquet

Data arrives as an HTTP POST of an optionally gzipped payload of JSON. See the common Edge Server specification for details.

Submissions hit a load balancer which handles the SSL connection, then forwards to a "tee" server, which may direct some or all submissions to alternate backends. In the past, the tee was used to manage the cutover between different versions of the backend infrastructure. It is implemented as an OpenResty plugin.

From there, the mozingest HTTP Server receives submissions from the tee and batches and stores data durably on Amazon S3 as a fail-safe (we call this "Landfill"). Data is then passed along via Kafka for validation and further processing. If there is a problem with decoding, validation, or any of the code described in the rest of this section, data can be re-processed from this fail-safe store. The mozingest server is implemented as an nginx module.

Validation, at a minimum, ensures that a payload is valid JSON (possibly compressed). Many document types also have a JSONSchema specification, and are further validated against that.

Invalid messages are redirected to a separate "errors" stream for debugging and inspection.

Valid messages proceed for further decoding and processing. This involves things like doing GeoIP lookup and discarding the IP address, and attaching some HTTP header info as annotated metadata.

Validated and annotated messages become available for stream processing.

They are also batched and stored durably for later batch processing and ad-hoc querying.

See also the "generic ingestion" proposal which aims to make ingestion, validation, storage, and querying available as self-serve for platform users.

Data flow for valid submissions
sequenceDiagram
    participant Fx as Firefox
    participant lb as Load Balancer
    participant mi as mozingest
    participant lf as Landfill
    participant k as Kafka
    participant dwl as Data Store Loader
    participant dl as Data Lake

    Fx->>lb: HTTPS POST
    lb->>mi: forward
    mi-->>lf: failsafe store
    mi->>k: enqueue
    k->>dwl: validate, decode
    dwl->>k: enqueue validated
    dwl->>dl: store durably

Other ingestion methods

Hindsight is used for ingestion of logs from applications and services, it supports parsing of log lines and appending similar metadata as the HTTP ingestion above (timestamp, source, and so on).

Statsd messages are ingested in the usual way.

Storage

graph TD
  subgraph RDBMS
    PostgreSQL
    Redshift
    MySQL
    BigQuery
  end
  subgraph NoSQL
    DynamoDB
  end
  subgraph S3
    landfill[Landfill]
    s3_heka[Heka Data Lake]
    s3_parquet[Parquet Data Lake]
    s3_analysis[Analysis Outputs]
    s3_public[Public Outputs]
  end

  Ingestion --> s3_heka
  Ingestion --> s3_parquet
  Ingestion --> landfill
  Ingestion -.-> stream[Stream Processing]
  stream --> s3_parquet
  batch[Batch Processing] --> s3_parquet
  batch --> PostgreSQL
  batch --> DynamoDB
  batch --> s3_public
  selfserve[Self Serve] --> s3_analysis
  s3_analysis --> selfserve
  Hive -->|Presto| redash[Re:dash]
  PostgreSQL --> redash
  Redshift --> redash
  MySQL --> redash
  BigQuery --> redash

  s3_parquet -.- Hive

Amazon S3 forms the backbone of the platform storage layer. The primary format used in the Data Lake is parquet, which is a strongly typed columnar storage format that can easily be read and written by Spark, as well as being compatible with SQL interfaces such as Hive and Presto. Some data is also stored in Heka-framed protobuf format. This custom format is usually reserved for data where we do not have a complete JSONSchema specification.

Using S3 for storage avoids the need for an always-on cluster, which means that data at rest is inexpensive. S3 also makes it very easy to automatically expire (delete) objects after a certain period of time, which is helpful for implementing data retention policies.

Once written to S3, the data is typically treated as immutable - data is not appended to existing files, nor is data normally updated in place. The exception here is when data is back-filled, in which case previous data may be overwritten.

There are a number of other types of storage used for more specialized applications, including relational databases (such as PostgreSQL for the Telemetry Aggregates) and NoSQL databases (DynamoDB is used for a backing store for the TAAR project). Reading data from a variety of RDBMS sources is also supported via Re:dash.

The data stored in Heka format is readable from Spark using libraries in Scala or Python.

Parquet data can be read and written natively from Spark, and many datasets are indexed in a Hive Metastore, making them available through a SQL interface on Re:dash and in notebooks via Spark SQL. Many other SQL data sources are also made available via Re:dash, see this article for more information on accessing data using SQL.

There is a separate data store for self-serve Analysis Outputs, intended to keep ad-hoc, temporary data out of the Data Lake. This is implemented as a separate S3 location, with personal output locations prefixed with each person's user id, similar to the layout of the /home directory on a Unix system. See the Working with Parquet data cookbook for more details.

Analysis outputs can also be made public using the Public Outputs bucket. This is a web-accessible S3 location for powering public dashboards. This public data is available at https://analysis-output.telemetry.mozilla.org/<job name>/data/<files>.

Stream Processing

Stream processing is done using Hindsight and Spark Streaming.

Hindsight allows you to run plugins written in Lua inside a sandbox. This gives a safe, performant way to do self-serve streaming analysis. See this article for an introduction. Hindsight plugins do the initial data validation and decoding, as well as writing out to long-term storage in both Heka-framed protobuf and parquet forms.

Spark Streaming is used to read from Kafka and perform low-latency ETL and aggregation tasks. These aggregates are currently used by Mission Control and are also available for querying via Re:dash.

Batch Processing

Batch processing is done using Spark. Production ETL code is written in both Python and Scala.

There are Python and Scala libraries for reading data from the Data Lake in Heka-framed protobuf form, though it is much easier and more performant to make use of a derived dataset whenever possible.

Datasets in parquet format can be read natively by Spark, either using Spark SQL or by reading data directly from S3.

Data produced by production jobs go into the Data Lake, while output from ad-hoc jobs go into Analysis Outputs.

Job scheduling and dependency management is done using Airflow. Most jobs run once a day, processing data from "yesterday" on each run. A typical job launches a cluster, which fetches the specified ETL code as part of its bootstrap on startup, runs the ETL code, then shuts down upon completion. If something goes wrong, a job may time out or fail, and in this case it is retried automatically.

Self Serve Data Analysis

graph TD
  subgraph Storage
    lake[Data Lake]
    s3_output_public[Public Outputs]
    s3_output_private[Analysis Outputs]
  end
  subgraph ATMO
    Jupyter -->|runs on| emr_cluster[Ad hoc EMR Cluster]
    Zeppelin -->|runs on| emr_cluster
    atmo_service[ATMO Service] -->|launch| emr_cluster
    atmo_service -->|schedule| emr_job[fa:fa-clock-o Scheduled EMR Job]
    emr_cluster -->|mount| EFS
    emr_cluster -->|read + write| lake
    emr_job -->|read + write| s3_output_public
    emr_job -->|read + write| s3_output_private
  end
  subgraph STMO
    redash[Re:dash] -->|read| lake
  end
  subgraph TMO
    evo[Evolution Dashboard]
    histo[Histogram Dashboard]
    agg[Telemetry Aggregates]
    evo -.- agg
    histo -.- agg
  end
  subgraph Databricks
    db_notebook[Notebook]
    db_notebook -->|read + write| lake
  end

Most of the data analysis tooling has been developed with the goal of being "self-serve". This means that people should be able to access and analyze data on their own, without involving data engineers or operations. Thus can data access scale beyond a small set of people with specialized knowledge of the entire pipeline.

The use of these self-serve tools is described in the Getting Started article. This section focuses on how these tools integrate with the platform infrastructure.

ATMO: Spark Analysis

ATMO is a service for managing Spark clusters for data analysis. Clusters can be launched on demand, or can be scheduled to run a job on an ongoing basis. These clusters can read from the Data Lake described in the Storage section above, and can write results to either public (web-accessible) or private output locations.

Jupyter or Zeppelin notebooks are the usual interface to getting work done using a cluster, though you get full SSH access to the cluster.

Clusters launched via ATMO are automatically killed after a user-defined period of time (by default, 8 hours), though their lifetime can be extended as needed. Each cluster has a user-specific EFS volume mounted on the /home/hadoop directory, which means that data stored locally on the cluster persists from one cluster to the next. This volume is shared by all clusters launched by a given ATMO user.

STMO: SQL Analysis

STMO is a customized Re:dash installation that provides self-serve access to a a variety of different datasets. From here, you can query data in the Parquet Data Lake as well as various RDBMS data sources.

STMO interfaces with the data lake using both Presto and Amazon Athena. Each has its own data source in Re:dash. Since Athena does not support user-defined functions, datasets with HyperLogLog columns, such as client_count_daily, are only available via Presto..

Different Data Sources in STMO connect to different backends, and each backend might use a slightly different flavor of SQL. You should find a link to the documentation for the expected SQL variant next to the Data Sources list.

Queries can be run just once, or scheduled to run periodically to keep data up-to-date.

There is a command-line interface to STMO called St. Mocli, if you prefer writing SQL using your own editor and tools.

Databricks: Managed Spark Analysis

Our Databricks instance (see Databricks docs) offers another notebook interface for doing analysis in Scala, SQL, Python and R.

Databricks provides an always-on shared server which is nice for quick data investigations. ATMO clusters take some time to start up, usually on the order of tens of minutes. The shared server allows you to avoid this start-up cost. Prefer ATMO for heavy analyses since you will have dedicated resources.

TMO: Aggregate Graphs

TMO provides easy visualizations of histogram and scalar measures over time. Time can be in terms of either builds or submission dates. This is the most convenient interface to the Telemetry data, as it does not require any custom code.

Visualization

There are a number of visualization libraries and tools being used to display data.

TMO Dashboards

The landing page at telemetry.mozilla.org is a good place to look for existing graphs, notably the measurement dashboard which gives a lot of information about histogram and scalar measures collected on pre-release channels.

Notebooks

Use of interactive notebooks has become a standard in the industry, and Mozilla makes heavy use of this approach. ATMO makes it easy to run, share, and schedule Jupyter and Zeppelin notebooks.

Databricks notebooks are also an option.

Others

Re:dash lets you query the data using SQL, but it also supports a number of useful visualizations.

Hindsight's web interface has the ability to visualize time-series data.

Mission Control gives a low-latency view into release health.

Many bespoke visualizations are built using the Metrics Graphics library as a display layer.

Monitoring and Alerting

There are multiple layers of monitoring and alerting.

At a low level, the system is monitored to ensure that it is functioning as expected. This includes things like machine-level resources (network capacity, disk space, available RAM, CPU load) which are typically monitored using DataDog.

Next, we monitor the "transport" functionality of the system. This includes monitoring incoming submission rates, payload sizes, traffic patterns, schema validation failure rates, and alerting if anomalies are detected. This type of anomaly detection and alerting is handled by Hindsight.

Once data has been safely ingested and stored, we run some automatic regression detection on all Telemetry histogram measures using Cerberus. This code looks for changes in the distribution of a measure, and emails probe owners if a significant change is observed.

Production ETL jobs are run via Airflow, which monitors batch job progress and alerts if there are failures in any job. Self-serve batch jobs run via ATMO also generate alerts upon failure.

Scheduled Re:dash queries may also be configured to generate alerts, which is used to monitor the last-mile user facing status of derived datasets. Re:dash may also be used to monitor and alert on high-level characteristics of the data, or really anything you can think of.

Data Exports

Data is exported from the pipeline to a few other tools and systems. Examples include integration with Amplitude for mobile and product analytics, publishing reports and visualizations to the Mozilla Data Collective, and shipping data to other parts of the Mozilla organization.

There are also a few data sets which are made publicly available, such as the Firefox Hardware Report.

Bringing it all together

Finally, here is a more detailed view of the entire platform. Some connections are omitted for clarity.

graph LR
 subgraph Data Producers
  Firefox
  more_producers[...]
 end
 subgraph Storage
  Landfill
  warehouse_heka[Heka Data Lake]
  warehouse_parquet[Parquet Data Lake]
  warehouse_analysis[Analysis Outputs]
  PostgreSQL
  Redshift
  MySQL
  hive[Hive] -.- warehouse_parquet
 end
 subgraph Stream Processing
  cep[Hindsight Streaming]
  dwl[Data Store Loader] --> warehouse_heka
  dwl --> warehouse_parquet
  sparkstreaming[Spark Streaming] --> warehouse_parquet
 end
 subgraph Ingestion
  Firefox --> lb[Load Balancer]
  more_producers --> lb
  lb --> tee
  tee --> mozingest
  mozingest --> kafka
  mozingest --> Landfill
  ZooKeeper -.- kafka[Kafka]
  kafka --> dwl
  kafka --> cep
  kafka --> sparkstreaming
 end
 subgraph Batch Processing
  Airflow -.->|spark|tbv[telemetry-batch-view]
  Airflow -.->|spark|python_mozetl
  warehouse_heka --> tbv
  warehouse_parquet --> tbv
  warehouse_heka --> python_mozetl
  warehouse_parquet --> python_mozetl
  tmo_agg[Telemetry Aggregates]
 end
 subgraph Visualization
  Hindsight
  Jupyter
  Zeppelin
  TMO
  redash_graphs[Re:dash]
  MissionControl
  bespoke_viz[Bespoke Viz]
 end
 subgraph Export
  tbv --> Amplitude
  sparkstreaming --> Amplitude
 end
 subgraph Self Serve
  redash[Re:dash] -.-> Presto
  Presto --> hive
  redash -.-> Athena
  Athena --> hive
  ATMO -.-> spcluster[Spark Cluster]
  warehouse_heka --> spcluster
  warehouse_parquet --> spcluster
  spcluster --> warehouse_analysis
 end
 Schemas -.->|validation| dwl