Skip to main content

Staging

The goal of staging is to bring the data closer to the database engine so the modification of the destination (final) dataset happens faster and without errors. dlt, when asked, creates two staging areas:

  1. A staging dataset used by the merge and replace loads to deduplicate and merge data with the destination.
  2. A staging storage which is typically a s3/gcp bucket where loader files are copied before they are loaded by the destination.

Staging datasetโ€‹

dlt creates a staging dataset when write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the main dataset. Data in staging tables is truncated when load step begins and only for tables that will participate in it. Such staging dataset has the same name as the dataset passed to dlt.pipeline but with _staging suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the configured datasets.

[destination.postgres]
staging_dataset_name_layout="staging_%s"

Entry above switches the pattern to staging_ prefix and for example for dataset with name github_data dlt will create staging_github_data.

To configure static staging dataset name, you can do the following (we use destination factory)

import dlt

dest_ = dlt.destinations.postgres(staging_dataset_name_layout="_dlt_staging")

All pipelines using dest_ as destination will use staging_dataset to store staging tables. Make sure that your pipelines are not overwriting each other's tables.

Cleanup up staging dataset automaticallyโ€‹

dlt does not truncate tables in staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. If you prefer to truncate it, put the following line in config.toml:

[load]
truncate_staging_dataset=true

Staging storageโ€‹

dlt allows to chain destinations where the first one (staging) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination.

Currently, only one destination the filesystem can be used as a staging. Following destinations can copy remote files:

  1. Redshift.
  2. Bigquery.
  3. Snowflake.

How to useโ€‹

In essence, you need to set up two destinations and then pass them to dlt.pipeline. Below we'll use filesystem staging with parquet files to load into Redshift destination.

  1. Set up the s3 bucket and filesystem staging.

    Please follow our guide in filesystem destination documentation. Test the staging as standalone destination to make sure that files go where you want them. In your secrets.toml you should now have a working filesystem configuration:

    [destination.filesystem]
    bucket_url = "s3://[your_bucket_name]" # replace with your bucket name,

    [destination.filesystem.credentials]
    aws_access_key_id = "please set me up!" # copy the access key here
    aws_secret_access_key = "please set me up!" # copy the secret access key here
  2. Set up the Redshift destination.

    Please follow our guide in redshift destination documentation. In your secrets.toml you added:

    # keep it at the top of your toml file! before any section starts
    destination.redshift.credentials="redshift://loader:<password>@localhost/dlt_data?connect_timeout=15"
  3. Authorize Redshift cluster to access the staging bucket.

    By default dlt will forward the credentials configured for filesystem to the Redshift COPY command. If you are fine with this, move to the next step.

  4. Chain staging to destination and request parquet file format.

    Pass the staging argument to dlt.pipeline. It works like the destination argument:

    # Create a dlt pipeline that will load
    # chess player data to the redshift destination
    # via staging on s3
    pipeline = dlt.pipeline(
    pipeline_name='chess_pipeline',
    destination='redshift',
    staging='filesystem', # add this to activate the staging location
    dataset_name='player_data'
    )

    dlt will automatically select an appropriate loader file format for the staging files. Below we explicitly specify parquet file format (just to demonstrate how to do it):

    info = pipeline.run(chess(), loader_file_format="parquet")
  5. Run the pipeline script.

    Run the pipeline script as usual.

๐Ÿ’ก Please note that dlt does not delete loaded files from the staging storage after the load is complete.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.