Pipelines

If data_check finds a file named data_check_pipeline.yml in a folder, it will treat this folder as a pipeline check. Instead of running CSV checks it will execute the steps in the YAML file.

Example

Example project with a pipeline:

data_check.yml
checks/
    some_test.sql                # this test will run in parallel to the pipeline test
    some_test.csv
    sample_pipeline/
        data_check_pipeline.yml  # configuration for the pipeline
        data/
            my_schema.some_table.csv       # data for a table
        data2/
            some_data.csv        # other data
        some_checks/             # folder with CSV checks
            check1.sql
            check1.csl
            ...
        run_this.sql             # a SQL file that will be executed
        cleanup.sql
    other_pipeline/              # you can have multiple pipelines that will run in parallel
        data_check_pipeline.yml
        ...

The file sample_pipeline/data_check_pipeline.yml can look like this:

steps:
    # this will truncate the table my_schema.some_table and load it with the data from data/my_schema.some_table.csv
    - load: data
    # this will execute the SQL statement in run_this.sql
    - sql: run_this.sql
    # this will append the data from data2/some_data.csv to my_schema.other_table
    - load:
        file: data2/some_data.csv
        table: my_schema.other_table
        mode: append
    # this will run a python script and pass the connection name
    - cmd: "python3 /path/to/my_pipeline.py --connection {{CONNECTION}}"
    # this will run the CSV checks in the some_checks folder
    - check: some_checks
    - always_run:
        - sql: run_this_always.sql

Pipeline checks and simple CSV checks can coexist in a project.

Pipeline configuration

data_check_pipeline.yml is a YAML file with steps as its main element that contains a list of steps.

steps

steps is a list of steps that are executed in the pipeline sequentially. If any of the steps fail the pipeline will fail and the following steps will not be executed.

Most steps have a list of files as a parameter. If you only need a single file/path, you can usually use a short form: step_name: path.

check

check will run CSV checks in a given folder or from a single file. It is like running data_check with this parameter. All checks will be performed in parallel.

Short form:

- check: some_checks

Long form:

- check:
    files:
    - some_checks
    - some/other/checks.sql

You can also omit files:

- check:
    - some_checks
    - some/other/checks.sql

run is an alias for check:

- run: some_checks

You can omit mode. Then the default mode truncate will be used. load_mode can be used instead of mode, but it is deprecated.

load

load is like calling data_check load .... This will load one or more tables from CSV files and infer the table name from the file name. You can also override the table name when using a single CSV file.

Like with data_check load the path before the filename has no impact on the inferred table name, only the file name itself.

Short form:

- load: some_path/schema.table_name.csv

Long form:

- load:
    files:
      - some_path
      - some/other/path/schema.other_table.csv
    mode: append

You can omit mode. Then the default mode truncate will be used. load_mode can be used instead of mode, but it is deprecated.

You can also omit files:

- load:
    - some_path
    - some/other/path/schema.other_table.csv

Or use file as an alias for _files:

- load:
    file: some_file.csv

Overriding the table name:

- load:
    table: some_table
    file: some_file.csv

You can only use a single CSV file when overriding the table name. Using multiple files will fail.

append

append is an alias for

- load:
    mode: append

Short form:

- append: some_path/schema.table_name.csv

Long form:

- append:
    files:
      - some_path
      - some/other/path/schema.other_table.csv

sql

sql is like calling data_check sql .... sql has two modes: query mode and files mode. In the short form, sql will check if the given parameter is a file and run the file in files mode. If the parameter is not a file, sql will execute in query mode.

Query mode and files mode can be explicitly specified by using the long form of sql.

query mode

The query mode is like calling data_check sql ... without --files. This will execute a SQL statement given as the parameter. If the SQL is a query, the result will be printed as CSV. If the parameter is a file path, the

Short form:

- sql: select 1 as a, 'b' as t

Long form:

- sql:
    query: select 1 as a, 'b' as t

With output to write a CSV file:

- sql:
    query: select 1 as a, 'b' as t
    output: result.csv

output is relative to the pipeline path, unless an absolute path is specified, for example '{{PROJECT_PATH}}/result.csv'. output can only be used with a query, not with files.

write_check can be used to generate a CSV check from a SQL statement:

- sql:
    query: select 1 as a, 'b' as t
    write_check: some_check.sql

See here for more information.

files mode

The query mode is like calling data_check sql --files .... This will run a SQL file or all SQL files in a folder against the configured database. All SQL files are executed in parallel. If you need to execute a file after another file, you need to call sql twice.

Short form:

- sql: some_file.sql

Long form:

- sql:
    files:
      - some_file.sql
      - some_path

You can also omit files:

- sql:
    - some_file.sql
    - some_path

cmd

cmd will call any script or program. The commands will be executed sequentially. The optional print parameter can disable console output of the command.

Short form:

- cmd: echo "test"

Long form:

- cmd:
  commands:
    - echo "test"
    - script/to/start/pipeline.sh
  print: false

With print: false no output is printed from the commands.

You can also omit commands:

- cmd:
  - echo "test"
  - script/to/start/pipeline.sh

always_run

always_run is a container for other steps. These steps will always be executed, even if any other step fail. If always_run is between other steps, it will be executed in order.

Example:

steps:
  - sql: might_fail.sql
  - always_run:
    - sql: run_after_failing.sql
    - cmd: some_script.sh
  - cmd: other_script.sh
  - always_run:
    - sql: finish.sql

In this example, if might_fail.sql fails, run_after_failing.sql, some_script.sh and finish.sql will be run in this order. If might_fail.sql does not fail, other_script.sh is executed after run_after_failing.sql and some_script.sh. finish.sql will then run at the end (even when other_script.sh fails).

fake

fake is used to generate test data. See here for more details and how to create the configuration.

Short form:

- fake: fake_conf.yml

Long form:

- fake:
    configs:
        - fake_conf.yml
        - fake_conf2.yml

With custom output path:

- fake:
    configs:
        - fake_conf.yml
    output: fake_table.csv

ping

ping is like calling data_check ping --wait. This will try to connect to the database and fail the pipeline if it doesn't succeed within the timeout. Timeout duration and time between retries can be configured.

Short form:

- ping:

With timeout and retry configuration:

- ping:
    timeout: 5
    retry: 1

This will retry the connection each second and timeout after 5 seconds, which are the default values.

sql_files

sql_files is deprecated. Use sql instead.

sql_files is like calling data_check sql --files .... This will run a SQL file or all SQL files in a folder against the configured database. All SQL files are executed in parallel. If you need to execute a file after another file, you need to call sql_files twice. sql_file is an alias for sql_files.

Short form:

- sql_files: some_file.sql

Using the alias:

- sql_file: some_file.sql

Long form:

- sql_files:
    files:
      - some_file.sql
      - some_path

You can also omit files:

- sql_files:
    - some_file.sql
    - some_path

load_table

load_table is deprecated. Use load instead.

load_table is like calling data_check load --table .... This will load a CSV file into a table.

- load_table:
    file: check/date_test.csv
    table: temp.date_test
    mode: append

nested pipelines

Pipelines can be nested inside other pipelines. Passing a folder with a data_check_pipeline.yml to check will run the pipeline:

- check:
    - some_checks
    - folder_with_a_pipeline

Parameters in pipelines

You can use some predefined parameters in a pipeline definition:

  • CONNECTION: The name of the connection used for this run.
  • CONNECTION_STRING: The connection string as defined in data_check.yml used for the connection.
  • PROJECT_PATH: The path of the data_check project (the folder containing data_check.yml).
  • PIPELINE_PATH: The absolute path to the pipeline (the folder containing data_check_pipeline.yml).
  • PIPELINE_NAME: The name of the folder containing data_check_pipeline.yml.

Generating pipeline checks

Like generating expectation files you can also run data_check gen for a pipeline. In this mode the pipeline is executed, but each check step will generate the CSV files instead of running the actual checks. Adding --force will overwrite existing CSV files.

Debugging a pipeline

You can add a breakpoint between pipeline steps to stop the pipeline and run the Python Debugger (pdb).

Example:

steps:
  - sql: select 1 as a, 'b' as t {{from_dual}}
  - breakpoint:
  - sql: select 2 as a, 'c' as t {{from_dual}}

You have access to all debugger commands, e.g. 'c', 'cont' or 'continue' to continue execution until the next breakpoint. 'q' or 'quit' will quit the debugger and the pipeline will fail.

Additionally you can use these variables and function to interact with the pipeline:

  • data_check: access to the main data_check instance
  • sql: access to SQL functions
  • pipeline: access to the current pipeline
  • steps: list of all steps in the pipeline
  • current_step(): returns the current step
  • next_step(): returns the next step without executing it
  • run_next(): run the next step in the pipeline

To run a SQL statement in the debugger, use sql.run_sql("<any sql statement>"). If you want to use the result as a DataFrame, use sql.run_query("select ...") which will return a DataFrame.