Documentation Index
Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixdeploy.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Lineage Ingestion — Advanced Topics
This page covers entity resolution, cross-database lineage, stored procedure and view lineage, parallel processing, publishing, dialect mapping, and configuration. For the pipeline overview, SQL parsing, and graph analysis, see Lineage Ingestion.
Entity Resolution
Before publishing, table names from SQL must be resolved to OpenMetadata entities:
search_table_entities(service_name, database, schema, table)
1. Search Elasticsearch (fast) for table FQN
2. If not found, try API search (reliable)
3. If schema not found, retry without schema qualifier
4. Cache results (1,000-entry LRU cache)
This resolution step is critical — lineage can only be created between entities that exist in the catalog.
Cross-Database Lineage
When processCrossDatabaseLineage is enabled:
# Query references tables from multiple services:
INSERT INTO warehouse.analytics.summary
SELECT * FROM production.sales.orders
# Resolution searches across multiple services:
search_table_entities(
service_names=["warehouse_service", "production_service"],
...
)
Configured via crossDatabaseServiceNames in the workflow config.
Stored Procedure Lineage
lineage_processors.py handles lineage from stored procedures:
- Fetch stored procedure definitions
- Extract individual SQL statements from the procedure body
- Filter for lineage-relevant statements (
is_lineage_query())
- Build a directed graph of the procedure’s data flow
- For each lineage edge, link to the stored procedure entity as the pipeline reference
The resulting lineage includes pipeline=EntityReference(type="storedProcedure") to show which procedure creates the data flow.
View Lineage
Views are processed separately:
view_lineage_processor(view_entity)
1. Get view definition SQL
2. Parse with LineageParser
3. Extract source tables from the SELECT
4. Create lineage: source tables → view
5. Override existing view lineage (views can be redefined)
External Table Lineage
ExternalTableLineageMixin (source/database/external_table_lineage_mixin.py) links external tables (S3, GCS paths) to database tables:
- Search for container entities by storage path
- Map columns between container and table
- Source:
LineageSource.ExternalTableLineage
Parallel Processing
LineageSource processes queries in parallel for performance:
Query logs → Chunk into batches of 200
│
┌──────────────┼──────────────┐
▼ ▼ ▼
Thread 1 Thread 2 Thread 3
(chunk 1) (chunk 2) (chunk 3)
│ │ │
└──────────────┼──────────────┘
▼
Result queue → yield AddLineageRequest
Thread count is configurable (default: CPU count). Each thread gets its own parser instances.
AddLineageRequest Structure
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=source_id, type="table"),
toEntity=EntityReference(id=target_id, type="table"),
lineageDetails=LineageDetails(
sqlQuery="INSERT INTO target SELECT ...", # Masked
source=LineageSource.QueryLineage,
columnsLineage=[
ColumnLineage(
fromColumns=["db.schema.source.col1"],
toColumn="db.schema.target.col1"
),
],
pipeline=EntityReference(...) # Optional: stored procedure
)
)
)
Publishing
The sink merges lineage with existing edges:
add_lineage(request, check_patch=False)
1. Check if edge already exists
2. If exists: merge column lineage (union of old + new columns)
3. If new: create edge
4. Cache the result
For views, write_override_lineage() replaces existing lineage entirely (since view definitions can change).
Dialect Mapping
lineage/models.py maps each database connection type to a SQL dialect for correct parsing:
| Connection Type | Dialect |
|---|
| Postgres, Redshift | POSTGRES |
| MySQL, MariaDB | MYSQL |
| Snowflake | SNOWFLAKE |
| BigQuery | BIGQUERY |
| SQL Server, Azure SQL | TSQL |
| Hive, Spark, Databricks | SPARKSQL |
| Oracle | ORACLE |
| Trino, Presto | TRINO |
| + 20 more | … |
Configuration
source:
type: postgres-lineage # Database-specific lineage source
serviceName: my_postgres
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1 # Days to look back
resultLimit: 100000 # Max queries to fetch
parsingTimeoutLimit: 30 # Seconds per query
processCrossDatabaseLineage: false
crossDatabaseServiceNames: []
threads: 4 # Parallel processing threads
queryLogFilePath: "" # Optional: CSV file path
Key Design Patterns
| Pattern | Where | Why |
|---|
| Directed graph | NetworkX in sql_lineage.py | Handle intermediate tables and complex data flows |
| Cascade parsing | SqlGlot → SqlFluff → SqlParse | Maximize success across SQL dialects |
| Entity resolution | ES + API fallback with LRU cache | Fast lookup with reliability guarantee |
| Chunked parallelism | Thread pool with 200-query chunks | Bounded memory + CPU utilization |
| Query masking | QueryMasker before storage | Prevent sensitive data leakage |
Key Files Quick Reference
| What you want to do | Start here |
|---|
| Understand the lineage workflow | workflow/lineage.py |
| See how query logs are fetched and filtered | source/database/lineage_source.py |
| See database-specific filters | source/database/{dialect}/lineage.py |
| Read the SQL parser | lineage/parser.py → LineageParser |
| Understand graph-based analysis | lineage/sql_lineage.py → get_lineage_by_query() |
| See query masking | lineage/masker.py |
| See dialect mappings | lineage/models.py |
| See stored procedure processing | source/database/lineage_processors.py |
| See view lineage | source/database/lineage_processors.py → view_lineage_processor() |
| See external table lineage | source/database/external_table_lineage_mixin.py |
| See how lineage is published | ometa/mixins/lineage_mixin.py → add_lineage() |
| See lineage sink logic | sink/metadata_rest.py → write_lineage() |
All paths above are relative to ingestion/src/metadata/ingestion/. For example, lineage/parser.py means ingestion/src/metadata/ingestion/lineage/parser.py.