Skip to main content

Documentation Index

Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixdeploy.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Lineage Ingestion — Advanced Topics

This page covers entity resolution, cross-database lineage, stored procedure and view lineage, parallel processing, publishing, dialect mapping, and configuration. For the pipeline overview, SQL parsing, and graph analysis, see Lineage Ingestion.

Entity Resolution

Before publishing, table names from SQL must be resolved to OpenMetadata entities:
search_table_entities(service_name, database, schema, table)
  1. Search Elasticsearch (fast) for table FQN
  2. If not found, try API search (reliable)
  3. If schema not found, retry without schema qualifier
  4. Cache results (1,000-entry LRU cache)
This resolution step is critical — lineage can only be created between entities that exist in the catalog.

Cross-Database Lineage

When processCrossDatabaseLineage is enabled:
# Query references tables from multiple services:
INSERT INTO warehouse.analytics.summary
SELECT * FROM production.sales.orders

# Resolution searches across multiple services:
search_table_entities(
    service_names=["warehouse_service", "production_service"],
    ...
)
Configured via crossDatabaseServiceNames in the workflow config.

Stored Procedure Lineage

lineage_processors.py handles lineage from stored procedures:
  1. Fetch stored procedure definitions
  2. Extract individual SQL statements from the procedure body
  3. Filter for lineage-relevant statements (is_lineage_query())
  4. Build a directed graph of the procedure’s data flow
  5. For each lineage edge, link to the stored procedure entity as the pipeline reference
The resulting lineage includes pipeline=EntityReference(type="storedProcedure") to show which procedure creates the data flow.

View Lineage

Views are processed separately:
view_lineage_processor(view_entity)
  1. Get view definition SQL
  2. Parse with LineageParser
  3. Extract source tables from the SELECT
  4. Create lineage: source tables → view
  5. Override existing view lineage (views can be redefined)

External Table Lineage

ExternalTableLineageMixin (source/database/external_table_lineage_mixin.py) links external tables (S3, GCS paths) to database tables:
  1. Search for container entities by storage path
  2. Map columns between container and table
  3. Source: LineageSource.ExternalTableLineage

Parallel Processing

LineageSource processes queries in parallel for performance:
Query logs → Chunk into batches of 200

     ┌──────────────┼──────────────┐
     ▼              ▼              ▼
  Thread 1       Thread 2       Thread 3
  (chunk 1)      (chunk 2)      (chunk 3)
     │              │              │
     └──────────────┼──────────────┘

            Result queue → yield AddLineageRequest
Thread count is configurable (default: CPU count). Each thread gets its own parser instances.

AddLineageRequest Structure

AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=source_id, type="table"),
        toEntity=EntityReference(id=target_id, type="table"),
        lineageDetails=LineageDetails(
            sqlQuery="INSERT INTO target SELECT ...",  # Masked
            source=LineageSource.QueryLineage,
            columnsLineage=[
                ColumnLineage(
                    fromColumns=["db.schema.source.col1"],
                    toColumn="db.schema.target.col1"
                ),
            ],
            pipeline=EntityReference(...)  # Optional: stored procedure
        )
    )
)

Publishing

The sink merges lineage with existing edges:
add_lineage(request, check_patch=False)
  1. Check if edge already exists
  2. If exists: merge column lineage (union of old + new columns)
  3. If new: create edge
  4. Cache the result
For views, write_override_lineage() replaces existing lineage entirely (since view definitions can change).

Dialect Mapping

lineage/models.py maps each database connection type to a SQL dialect for correct parsing:
Connection TypeDialect
Postgres, RedshiftPOSTGRES
MySQL, MariaDBMYSQL
SnowflakeSNOWFLAKE
BigQueryBIGQUERY
SQL Server, Azure SQLTSQL
Hive, Spark, DatabricksSPARKSQL
OracleORACLE
Trino, PrestoTRINO
+ 20 more

Configuration

source:
  type: postgres-lineage          # Database-specific lineage source
  serviceName: my_postgres
  sourceConfig:
    config:
      type: DatabaseLineage
      queryLogDuration: 1           # Days to look back
      resultLimit: 100000           # Max queries to fetch
      parsingTimeoutLimit: 30       # Seconds per query
      processCrossDatabaseLineage: false
      crossDatabaseServiceNames: []
      threads: 4                    # Parallel processing threads
      queryLogFilePath: ""          # Optional: CSV file path

Key Design Patterns

PatternWhereWhy
Directed graphNetworkX in sql_lineage.pyHandle intermediate tables and complex data flows
Cascade parsingSqlGlot → SqlFluff → SqlParseMaximize success across SQL dialects
Entity resolutionES + API fallback with LRU cacheFast lookup with reliability guarantee
Chunked parallelismThread pool with 200-query chunksBounded memory + CPU utilization
Query maskingQueryMasker before storagePrevent sensitive data leakage

Key Files Quick Reference

What you want to doStart here
Understand the lineage workflowworkflow/lineage.py
See how query logs are fetched and filteredsource/database/lineage_source.py
See database-specific filterssource/database/{dialect}/lineage.py
Read the SQL parserlineage/parser.pyLineageParser
Understand graph-based analysislineage/sql_lineage.pyget_lineage_by_query()
See query maskinglineage/masker.py
See dialect mappingslineage/models.py
See stored procedure processingsource/database/lineage_processors.py
See view lineagesource/database/lineage_processors.pyview_lineage_processor()
See external table lineagesource/database/external_table_lineage_mixin.py
See how lineage is publishedometa/mixins/lineage_mixin.pyadd_lineage()
See lineage sink logicsink/metadata_rest.pywrite_lineage()
All paths above are relative to ingestion/src/metadata/ingestion/. For example, lineage/parser.py means ingestion/src/metadata/ingestion/lineage/parser.py.