Skip to main content

Documentation Index

Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixdeploy.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Lineage Ingestion

Lineage ingestion extracts data flow relationships from SQL queries — which tables feed into which other tables, and at what column granularity. It parses query logs, view definitions, and stored procedures to build a lineage graph published to OpenMetadata.

Pipeline Overview

Lineage uses the same query log sources as usage ingestion, but processes queries differently — it analyzes directional data flow (source → target) rather than counting references.
┌──────────────────────────────────────────────────────────┐
│  1. LineageSource                                        │
│  source/database/lineage_source.py                       │
│                                                          │
│  Fetches query logs filtered for lineage-relevant types: │
│  CREATE TABLE AS SELECT, INSERT...SELECT, MERGE,         │
│  UPDATE...FROM, view definitions, stored procedures      │
│                                                          │
│  Yields: Either[AddLineageRequest]                       │
└──────────────────────┬───────────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│  MetadataRestSink                                        │
│  sink/metadata_rest.py                                   │
│                                                          │
│  PUT /api/v1/lineage                                     │
│  (merges column lineage with existing edges)             │
└──────────────────────────────────────────────────────────┘
Unlike the 5-stage usage pipeline, lineage processing happens inside the source — the LineageSource parses queries, builds the lineage graph, resolves entities, and yields ready-to-publish AddLineageRequest objects.

Lineage Extraction Call Chain

# 1. SOURCE fetches and chunks query logs
LineageSource._iter()
    └── yield_query_lineage()
        ├── Fetch query logs from database (or file)
        ├── Chunk into batches of 200
        └── Process chunks in parallel threads

# 2. PROCESSOR parses each query (runs in thread pool)
    └── query_lineage_processor(query)           # lineage_processors.py
        └── get_lineage_by_query(query, ...)     # lineage/sql_lineage.py
            ├── LineageParser(query, dialect)     # lineage/parser.py
            │   ├── Try SqlGlot (30s timeout)
            │   ├── Fallback SqlFluff (30s timeout)
            │   └── Fallback SqlParse

            ├── Extract: source_tables, target_tables, intermediate_tables
            ├── Extract: column_lineage (column-to-column mappings)

            └── For each (source, target) pair:
                ├── search_table_entities(source)  # ES + API lookup
                ├── search_table_entities(target)
                ├── Build column lineage mappings
                └── yield AddLineageRequest

# 3. VIEW LINEAGE (separate pass)
    └── yield_view_lineage()
        └── view_lineage_processor(view)
            └── get_view_lineage(view_definition, ...)
                ├── Parse CREATE VIEW ... AS SELECT ...
                ├── Extract source tables from SELECT
                └── yield AddLineageRequest (source tables → view)

SQL Parsing

LineageParser

LineageParser (lineage/parser.py) is the core SQL analysis engine shared with usage ingestion. For lineage, it extracts directional information:
parser = LineageParser(query="INSERT INTO target SELECT * FROM source", dialect=Dialect.ANSI)

parser.source_tables         # ["source"]       — tables being read
parser.target_tables         # ["target"]       — tables being written
parser.intermediate_tables   # []               — staging/temp tables
parser.column_lineage        # [(source.col1, target.col1), ...]
parser.table_aliases         # {"s": "source"}

Cascade Parsing Strategy

Three parsers are tried in order, with 30-second timeouts and 100MB memory limits each:
  1. SqlGlot — preferred for accuracy, handles most dialects
  2. SqlFluff — fallback, good for complex SQL
  3. SqlParse — final fallback, always succeeds (less accurate)

Query Cleaning

Before parsing, queries are cleaned:
  • Remove COPY GRANTS (Snowflake)
  • Remove MERGE...WHEN MATCHED clauses (too complex for parsers)
  • Filter out CREATE TRIGGER/FUNCTION/PROCEDURE (no lineage value)
  • Normalize escape sequences

Query Masking

QueryMasker (lineage/masker.py) replaces literal values with ? before storing queries in lineage details — prevents sensitive data leakage while preserving query structure.

Lineage-Relevant Query Types

Not all queries produce lineage. Each database connector filters for specific query types:
Query PatternLineage Produced
CREATE TABLE AS SELECT ...source tables → new table
INSERT INTO ... SELECT ...source tables → target table
UPDATE ... FROM ...source tables → target table
MERGE INTO ... USING ...source table → target table
CREATE VIEW AS ...source tables → view
Stored procedure bodyvaries (parsed recursively)
Queries like plain SELECT, DROP, or DDL without data movement are filtered out.

Graph-Based Lineage Analysis

lineage/sql_lineage.py uses NetworkX directed graphs to handle complex lineage scenarios.

Direct Lineage

For simple queries (INSERT INTO target SELECT FROM source):
source ──→ target
A single AddLineageRequest is created.

Intermediate Table Lineage

For queries involving temp/staging tables:
CREATE TEMP TABLE staging AS SELECT * FROM source;
INSERT INTO target SELECT * FROM staging;
The graph captures:
source ──→ staging ──→ target
get_lineage_by_graph() traces paths through the graph:
  1. Find weakly connected components
  2. Extract root-to-leaf paths (max depth: 20)
  3. Create lineage request for each hop

Column-Level Lineage

For each (source, target) edge, the parser maps individual columns:
column_lineage = [
    ColumnLineage(
        fromColumns=["service.db.schema.source.col1"],
        toColumn="service.db.schema.target.col1"
    ),
    ColumnLineage(
        fromColumns=["service.db.schema.source.col2"],
        toColumn="service.db.schema.target.col2"
    ),
]
Handles:
  • Simple column mappings (col1 → col1)
  • Renamed columns (source.old_name → target.new_name)
  • Star selects (* → individual columns)
  • Expressions (source.a + source.b → target.sum_ab)
  • Intermediate column mappings through staging tables

For entity resolution, cross-database lineage, stored procedures, parallel processing, publishing, dialect mapping, and configuration, see Lineage Ingestion — Advanced Topics.