<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">"use strict";(self.webpackChunkdocs_website=self.webpackChunkdocs_website||[]).push([[86539],{3905:(e,t,a)=&gt;{a.d(t,{Zo:()=&gt;p,kt:()=&gt;m});var n=a(67294);function l(e,t,a){return t in e?Object.defineProperty(e,t,{value:a,enumerable:!0,configurable:!0,writable:!0}):e[t]=a,e}function i(e,t){var a=Object.keys(e);if(Object.getOwnPropertySymbols){var n=Object.getOwnPropertySymbols(e);t&amp;&amp;(n=n.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),a.push.apply(a,n)}return a}function r(e){for(var t=1;t&lt;arguments.length;t++){var a=null!=arguments[t]?arguments[t]:{};t%2?i(Object(a),!0).forEach((function(t){l(e,t,a[t])})):Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(a)):i(Object(a)).forEach((function(t){Object.defineProperty(e,t,Object.getOwnPropertyDescriptor(a,t))}))}return e}function o(e,t){if(null==e)return{};var a,n,l=function(e,t){if(null==e)return{};var a,n,l={},i=Object.keys(e);for(n=0;n&lt;i.length;n++)a=i[n],t.indexOf(a)&gt;=0||(l[a]=e[a]);return l}(e,t);if(Object.getOwnPropertySymbols){var i=Object.getOwnPropertySymbols(e);for(n=0;n&lt;i.length;n++)a=i[n],t.indexOf(a)&gt;=0||Object.prototype.propertyIsEnumerable.call(e,a)&amp;&amp;(l[a]=e[a])}return l}var u=n.createContext({}),s=function(e){var t=n.useContext(u),a=t;return e&amp;&amp;(a="function"==typeof e?e(t):r(r({},t),e)),a},p=function(e){var t=s(e.components);return n.createElement(u.Provider,{value:t},e.children)},d="mdxType",g={inlineCode:"code",wrapper:function(e){var t=e.children;return n.createElement(n.Fragment,{},t)}},c=n.forwardRef((function(e,t){var a=e.components,l=e.mdxType,i=e.originalType,u=e.parentName,p=o(e,["components","mdxType","originalType","parentName"]),d=s(a),c=l,m=d["".concat(u,".").concat(c)]||d[c]||g[c]||i;return a?n.createElement(m,r(r({ref:t},p),{},{components:a})):n.createElement(m,r({ref:t},p))}));function m(e,t){var a=arguments,l=t&amp;&amp;t.mdxType;if("string"==typeof e||l){var i=a.length,r=new Array(i);r[0]=c;var o={};for(var u in t)hasOwnProperty.call(t,u)&amp;&amp;(o[u]=t[u]);o.originalType=e,o[d]="string"==typeof e?e:l,r[1]=o;for(var s=2;s&lt;i;s++)r[s]=a[s];return n.createElement.apply(null,r)}return n.createElement.apply(null,a)}c.displayName="MDXCreateElement"},79024:(e,t,a)=&gt;{a.r(t),a.d(t,{assets:()=&gt;u,contentTitle:()=&gt;r,default:()=&gt;g,frontMatter:()=&gt;i,metadata:()=&gt;o,toc:()=&gt;s});var n=a(87462),l=(a(67294),a(3905));const i={title:"Airflow Integration",slug:"/lineage/airflow",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/docs/lineage/airflow.md"},r="Airflow Integration",o={unversionedId:"docs/lineage/airflow",id:"docs/lineage/airflow",title:"Airflow Integration",description:"If you're looking to schedule DataHub ingestion using Airflow, see the guide on scheduling ingestion with Airflow.",source:"@site/genDocs/docs/lineage/airflow.md",sourceDirName:"docs/lineage",slug:"/lineage/airflow",permalink:"/docs/next/lineage/airflow",draft:!1,editUrl:"https://github.com/datahub-project/datahub/blob/master/docs/lineage/airflow.md",tags:[],version:"current",frontMatter:{title:"Airflow Integration",slug:"/lineage/airflow",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/docs/lineage/airflow.md"},sidebar:"overviewSidebar",previous:{title:"Configuration",permalink:"/docs/next/quick-ingestion-guides/looker/configuration"},next:{title:"Spark",permalink:"/docs/next/metadata-integration/java/spark-lineage"}},u={},s=[{value:"DataHub Plugin v2",id:"datahub-plugin-v2",level:2},{value:"Installation",id:"installation",level:3},{value:"Configuration",id:"configuration",level:3},{value:"Automatic lineage extraction",id:"automatic-lineage-extraction",level:3},{value:"DataHub Plugin v1",id:"datahub-plugin-v1",level:2},{value:"Installation",id:"installation-1",level:3},{value:"Configuration",id:"configuration-1",level:3},{value:"Disable lazy plugin loading",id:"disable-lazy-plugin-loading",level:4},{value:"Setup a DataHub connection",id:"setup-a-datahub-connection",level:4},{value:"Configure the plugin",id:"configure-the-plugin",level:4},{value:"Validate that the plugin is working",id:"validate-that-the-plugin-is-working",level:4},{value:"Manual Lineage Annotation",id:"manual-lineage-annotation",level:2},{value:"Using &lt;code&gt;inlets&lt;/code&gt; and &lt;code&gt;outlets&lt;/code&gt;",id:"using-inlets-and-outlets",level:3},{value:"Custom Operators",id:"custom-operators",level:3},{value:"Emit Lineage Directly",id:"emit-lineage-directly",level:2},{value:"Debugging",id:"debugging",level:2},{value:"Missing lineage",id:"missing-lineage",level:3},{value:"Incorrect URLs",id:"incorrect-urls",level:3},{value:"Compatibility",id:"compatibility",level:2},{value:"Additional references",id:"additional-references",level:2}],p={toc:s},d="wrapper";function g(e){let{components:t,...a}=e;return(0,l.kt)(d,(0,n.Z)({},p,a,{components:t,mdxType:"MDXLayout"}),(0,l.kt)("h1",{id:"airflow-integration"},"Airflow Integration"),(0,l.kt)("admonition",{type:"note"},(0,l.kt)("p",{parentName:"admonition"},"If you're looking to schedule DataHub ingestion using Airflow, see the guide on ",(0,l.kt)("a",{parentName:"p",href:"/docs/next/metadata-ingestion/schedule_docs/airflow"},"scheduling ingestion with Airflow"),".")),(0,l.kt)("p",null,"The DataHub Airflow plugin supports:"),(0,l.kt)("ul",null,(0,l.kt)("li",{parentName:"ul"},"Automatic column-level lineage extraction from various operators e.g. ",(0,l.kt)("inlineCode",{parentName:"li"},"SqlOperator"),"s (including ",(0,l.kt)("inlineCode",{parentName:"li"},"MySqlOperator"),", ",(0,l.kt)("inlineCode",{parentName:"li"},"PostgresOperator"),", ",(0,l.kt)("inlineCode",{parentName:"li"},"SnowflakeOperator"),", and more), ",(0,l.kt)("inlineCode",{parentName:"li"},"S3FileTransformOperator"),", and a few others."),(0,l.kt)("li",{parentName:"ul"},"Airflow DAG and tasks, including properties, ownership, and tags."),(0,l.kt)("li",{parentName:"ul"},"Task run information, including task successes and failures."),(0,l.kt)("li",{parentName:"ul"},"Manual lineage annotations using ",(0,l.kt)("inlineCode",{parentName:"li"},"inlets")," and ",(0,l.kt)("inlineCode",{parentName:"li"},"outlets")," on Airflow operators.")),(0,l.kt)("p",null,"There's two actively supported implementations of the plugin, with different Airflow version support."),(0,l.kt)("table",null,(0,l.kt)("thead",{parentName:"table"},(0,l.kt)("tr",{parentName:"thead"},(0,l.kt)("th",{parentName:"tr",align:null},"Approach"),(0,l.kt)("th",{parentName:"tr",align:null},"Airflow Version"),(0,l.kt)("th",{parentName:"tr",align:null},"Notes"))),(0,l.kt)("tbody",{parentName:"table"},(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"Plugin v2"),(0,l.kt)("td",{parentName:"tr",align:null},"2.3+"),(0,l.kt)("td",{parentName:"tr",align:null},"Recommended. Requires Python 3.8+")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"Plugin v1"),(0,l.kt)("td",{parentName:"tr",align:null},"2.1+"),(0,l.kt)("td",{parentName:"tr",align:null},"No automatic lineage extraction; may not extract lineage if the task fails.")))),(0,l.kt)("p",null,"If you're using Airflow older than 2.1, it's possible to use the v1 plugin with older versions of ",(0,l.kt)("inlineCode",{parentName:"p"},"acryl-datahub-airflow-plugin"),". See the ",(0,l.kt)("a",{parentName:"p",href:"#compatibility"},"compatibility section")," for more details."),(0,l.kt)("h2",{id:"datahub-plugin-v2"},"DataHub Plugin v2"),(0,l.kt)("h3",{id:"installation"},"Installation"),(0,l.kt)("p",null,"The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, use the v1 plugin instead."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-shell"},"pip install 'acryl-datahub-airflow-plugin[plugin-v2]'\n")),(0,l.kt)("h3",{id:"configuration"},"Configuration"),(0,l.kt)("p",null,"Set up a DataHub connection in Airflow."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-shell"},"airflow connections add  --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '&lt;optional datahub auth token&gt;'\n")),(0,l.kt)("p",null,"No additional configuration is required to use the plugin. However, there are some optional configuration parameters that can be set in the ",(0,l.kt)("inlineCode",{parentName:"p"},"airflow.cfg")," file."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-ini",metastring:'title="airflow.cfg"',title:'"airflow.cfg"'},"[datahub]\n# Optional - additional config here.\nenabled = True  # default\n")),(0,l.kt)("table",null,(0,l.kt)("thead",{parentName:"table"},(0,l.kt)("tr",{parentName:"thead"},(0,l.kt)("th",{parentName:"tr",align:null},"Name"),(0,l.kt)("th",{parentName:"tr",align:null},"Default value"),(0,l.kt)("th",{parentName:"tr",align:null},"Description"))),(0,l.kt)("tbody",{parentName:"table"},(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"enabled"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If the plugin should be enabled.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"conn_id"),(0,l.kt)("td",{parentName:"tr",align:null},"datahub_rest_default"),(0,l.kt)("td",{parentName:"tr",align:null},"The name of the datahub rest connection.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"cluster"),(0,l.kt)("td",{parentName:"tr",align:null},"prod"),(0,l.kt)("td",{parentName:"tr",align:null},"name of the airflow cluster")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_ownership_info"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"Extract DAG ownership.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_tags_info"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"Extract DAG tags.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_executions"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},'Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab.')),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"enable_extractors"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"Enable automatic lineage extraction.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"disable_openlineage_plugin"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"Disable the OpenLineage plugin to avoid duplicative processing.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"log_level"),(0,l.kt)("td",{parentName:"tr",align:null},(0,l.kt)("em",{parentName:"td"},"no change")),(0,l.kt)("td",{parentName:"tr",align:null},"[debug]"," Set the log level for the plugin.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"debug_emitter"),(0,l.kt)("td",{parentName:"tr",align:null},"false"),(0,l.kt)("td",{parentName:"tr",align:null},"[debug]"," If true, the plugin will log the emitted events.")))),(0,l.kt)("h3",{id:"automatic-lineage-extraction"},"Automatic lineage extraction"),(0,l.kt)("p",null,"To automatically extract lineage information, the v2 plugin builds on top of Airflow's built-in ",(0,l.kt)("a",{parentName:"p",href:"https://openlineage.io/docs/integrations/airflow/default-extractors"},"OpenLineage extractors"),"."),(0,l.kt)("p",null,"The SQL-related extractors have been updated to use DataHub's SQL parser, which is more robust than the built-in one and uses DataHub's metadata information to generate column-level lineage. We discussed the DataHub SQL parser, including why schema-aware parsing works better and how it performs on benchmarks, during the ",(0,l.kt)("a",{parentName:"p",href:"https://youtu.be/1QVcUmRQK5E?si=U27zygR7Gi_KdkzE&amp;t=2309"},"June 2023 community town hall"),"."),(0,l.kt)("h2",{id:"datahub-plugin-v1"},"DataHub Plugin v1"),(0,l.kt)("h3",{id:"installation-1"},"Installation"),(0,l.kt)("p",null,"The v1 plugin requires Airflow 2.1+ and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the ",(0,l.kt)("a",{parentName:"p",href:"#compatibility"},"compatibility section")," for more details."),(0,l.kt)("p",null,"If you're using Airflow 2.3+, we recommend using the v2 plugin instead. If you need to use the v1 plugin with Airflow 2.3+, you must also set the environment variable ",(0,l.kt)("inlineCode",{parentName:"p"},"DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true"),"."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-shell"},"pip install 'acryl-datahub-airflow-plugin[plugin-v1]'\n\n# The DataHub rest connection type is included by default.\n# To use the DataHub Kafka connection type, install the plugin with the kafka extras.\npip install 'acryl-datahub-airflow-plugin[plugin-v1,datahub-kafka]'\n")),(0,l.kt)("h3",{id:"configuration-1"},"Configuration"),(0,l.kt)("h4",{id:"disable-lazy-plugin-loading"},"Disable lazy plugin loading"),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-ini",metastring:'title="airflow.cfg"',title:'"airflow.cfg"'},"[core]\nlazy_load_plugins = False\n")),(0,l.kt)("p",null,"On MWAA you should add this config to your ",(0,l.kt)("a",{parentName:"p",href:"https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html#configuring-2.0-airflow-override"},"Apache Airflow configuration options"),"."),(0,l.kt)("h4",{id:"setup-a-datahub-connection"},"Setup a DataHub connection"),(0,l.kt)("p",null,"You must configure an Airflow connection for Datahub. We support both a Datahub REST and a Kafka-based connections, but you only need one."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-shell"},"# For REST-based:\nairflow connections add  --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '&lt;optional datahub auth token&gt;'\n# For Kafka-based (standard Kafka sink config can be passed via extras):\nairflow connections add  --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'\n")),(0,l.kt)("h4",{id:"configure-the-plugin"},"Configure the plugin"),(0,l.kt)("p",null,"If your config doesn't align with the default values, you can configure the plugin in your ",(0,l.kt)("inlineCode",{parentName:"p"},"airflow.cfg")," file."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-ini",metastring:'title="airflow.cfg"',title:'"airflow.cfg"'},"[datahub]\nenabled = true\nconn_id = datahub_rest_default  # or datahub_kafka_default\n# etc.\n")),(0,l.kt)("table",null,(0,l.kt)("thead",{parentName:"table"},(0,l.kt)("tr",{parentName:"thead"},(0,l.kt)("th",{parentName:"tr",align:null},"Name"),(0,l.kt)("th",{parentName:"tr",align:null},"Default value"),(0,l.kt)("th",{parentName:"tr",align:null},"Description"))),(0,l.kt)("tbody",{parentName:"table"},(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"enabled"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If the plugin should be enabled.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"conn_id"),(0,l.kt)("td",{parentName:"tr",align:null},"datahub_rest_default"),(0,l.kt)("td",{parentName:"tr",align:null},"The name of the datahub connection you set in step 1.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"cluster"),(0,l.kt)("td",{parentName:"tr",align:null},"prod"),(0,l.kt)("td",{parentName:"tr",align:null},"name of the airflow cluster")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_ownership_info"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If true, the owners field of the DAG will be capture as a DataHub corpuser.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_tags_info"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If true, the tags field of the DAG will be captured as DataHub tags.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"capture_executions"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If true, we'll capture task runs in DataHub in addition to DAG definitions.")),(0,l.kt)("tr",{parentName:"tbody"},(0,l.kt)("td",{parentName:"tr",align:null},"graceful_exceptions"),(0,l.kt)("td",{parentName:"tr",align:null},"true"),(0,l.kt)("td",{parentName:"tr",align:null},"If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.")))),(0,l.kt)("h4",{id:"validate-that-the-plugin-is-working"},"Validate that the plugin is working"),(0,l.kt)("ol",null,(0,l.kt)("li",{parentName:"ol"},"Go and check in Airflow at Admin -&gt; Plugins menu if you can see the DataHub plugin"),(0,l.kt)("li",{parentName:"ol"},"Run an Airflow DAG. In the task logs, you should see Datahub related log messages like:")),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre"},"Emitting DataHub ...\n")),(0,l.kt)("h2",{id:"manual-lineage-annotation"},"Manual Lineage Annotation"),(0,l.kt)("h3",{id:"using-inlets-and-outlets"},"Using ",(0,l.kt)("inlineCode",{parentName:"h3"},"inlets")," and ",(0,l.kt)("inlineCode",{parentName:"h3"},"outlets")),(0,l.kt)("p",null,"You can manually annotate lineage by setting ",(0,l.kt)("inlineCode",{parentName:"p"},"inlets")," and ",(0,l.kt)("inlineCode",{parentName:"p"},"outlets")," on your Airflow operators. This is useful if you're using an operator that doesn't support automatic lineage extraction, or if you want to override the automatic lineage extraction."),(0,l.kt)("p",null,"We have a few code samples that demonstrate how to use ",(0,l.kt)("inlineCode",{parentName:"p"},"inlets")," and ",(0,l.kt)("inlineCode",{parentName:"p"},"outlets"),":"),(0,l.kt)("ul",null,(0,l.kt)("li",{parentName:"ul"},(0,l.kt)("a",{parentName:"li",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_demo.py"},(0,l.kt)("inlineCode",{parentName:"a"},"lineage_backend_demo.py"))),(0,l.kt)("li",{parentName:"ul"},(0,l.kt)("a",{parentName:"li",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py"},(0,l.kt)("inlineCode",{parentName:"a"},"lineage_backend_taskflow_demo.py"))," - uses the ",(0,l.kt)("a",{parentName:"li",href:"https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html"},"TaskFlow API"))),(0,l.kt)("p",null,"For more information, take a look at the ",(0,l.kt)("a",{parentName:"p",href:"https://airflow.apache.org/docs/apache-airflow/stable/lineage.html"},"Airflow lineage docs"),"."),(0,l.kt)("h3",{id:"custom-operators"},"Custom Operators"),(0,l.kt)("p",null,"If you have created a ",(0,l.kt)("a",{parentName:"p",href:"https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html"},"custom Airflow operator")," that inherits from the BaseOperator class,\nwhen overriding the ",(0,l.kt)("inlineCode",{parentName:"p"},"execute")," function, set inlets and outlets via ",(0,l.kt)("inlineCode",{parentName:"p"},"context['ti'].task.inlets")," and ",(0,l.kt)("inlineCode",{parentName:"p"},"context['ti'].task.outlets"),".\nThe DataHub Airflow plugin will then pick up those inlets and outlets after the task runs."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-python"},"class DbtOperator(BaseOperator):\n    ...\n\n    def execute(self, context):\n        # do something\n        inlets, outlets = self._get_lineage()\n        # inlets/outlets are lists of either datahub_airflow_plugin.entities.Dataset or datahub_airflow_plugin.entities.Urn\n        context['ti'].task.inlets = self.inlets\n        context['ti'].task.outlets = self.outlets\n\n    def _get_lineage(self):\n        # Do some processing to get inlets/outlets\n\n        return inlets, outlets\n")),(0,l.kt)("p",null,"If you override the ",(0,l.kt)("inlineCode",{parentName:"p"},"pre_execute")," and ",(0,l.kt)("inlineCode",{parentName:"p"},"post_execute")," function, ensure they include the ",(0,l.kt)("inlineCode",{parentName:"p"},"@prepare_lineage")," and ",(0,l.kt)("inlineCode",{parentName:"p"},"@apply_lineage")," decorators respectively. Reference the ",(0,l.kt)("a",{parentName:"p",href:"https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage"},"Airflow docs")," for more details."),(0,l.kt)("h2",{id:"emit-lineage-directly"},"Emit Lineage Directly"),(0,l.kt)("p",null,"If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the ",(0,l.kt)("inlineCode",{parentName:"p"},"DatahubEmitterOperator"),"."),(0,l.kt)("p",null,"Reference ",(0,l.kt)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_emission_dag.py"},(0,l.kt)("inlineCode",{parentName:"a"},"lineage_emission_dag.py"))," for a full example."),(0,l.kt)("p",null,"In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See the plugin configuration for examples."),(0,l.kt)("h2",{id:"debugging"},"Debugging"),(0,l.kt)("h3",{id:"missing-lineage"},"Missing lineage"),(0,l.kt)("p",null,"If you're not seeing lineage in DataHub, check the following:"),(0,l.kt)("ul",null,(0,l.kt)("li",{parentName:"ul"},"Validate that the plugin is loaded in Airflow. Go to Admin -&gt; Plugins and check that the DataHub plugin is listed."),(0,l.kt)("li",{parentName:"ul"},"If using the v2 plugin's automatic lineage, ensure that the ",(0,l.kt)("inlineCode",{parentName:"li"},"enable_extractors")," config is set to true and that automatic lineage is supported for your operator."),(0,l.kt)("li",{parentName:"ul"},"If using manual lineage annotation, ensure that you're using the ",(0,l.kt)("inlineCode",{parentName:"li"},"datahub_airflow_plugin.entities.Dataset")," or ",(0,l.kt)("inlineCode",{parentName:"li"},"datahub_airflow_plugin.entities.Urn")," classes for your inlets and outlets.")),(0,l.kt)("h3",{id:"incorrect-urls"},"Incorrect URLs"),(0,l.kt)("p",null,"If your URLs aren't being generated correctly (usually they'll start with ",(0,l.kt)("inlineCode",{parentName:"p"},"http://localhost:8080")," instead of the correct hostname), you may need to set the webserver ",(0,l.kt)("inlineCode",{parentName:"p"},"base_url")," config."),(0,l.kt)("pre",null,(0,l.kt)("code",{parentName:"pre",className:"language-ini",metastring:'title="airflow.cfg"',title:'"airflow.cfg"'},"[webserver]\nbase_url = http://airflow.mycorp.example.com\n")),(0,l.kt)("h2",{id:"compatibility"},"Compatibility"),(0,l.kt)("p",null,"We no longer officially support Airflow &lt;2.1. However, you can use older versions of ",(0,l.kt)("inlineCode",{parentName:"p"},"acryl-datahub-airflow-plugin")," with older versions of Airflow.\nBoth of these options support Python 3.7+."),(0,l.kt)("ul",null,(0,l.kt)("li",{parentName:"ul"},"Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin &lt;= 0.9.1.0."),(0,l.kt)("li",{parentName:"ul"},"Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin &lt;= 0.11.0.1.")),(0,l.kt)("p",null,"DataHub also previously supported an Airflow ",(0,l.kt)("a",{parentName:"p",href:"https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend"},"lineage backend")," implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release.\nNote that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA.\nThe ",(0,l.kt)("a",{parentName:"p",href:"https://docs-website-1wmaehubl-acryldata.vercel.app/docs/lineage/airflow/#using-datahubs-airflow-lineage-backend-deprecated"},"documentation for the lineage backend")," has already been archived."),(0,l.kt)("h2",{id:"additional-references"},"Additional references"),(0,l.kt)("p",null,"Related Datahub videos:"),(0,l.kt)("ul",null,(0,l.kt)("li",{parentName:"ul"},(0,l.kt)("a",{parentName:"li",href:"https://www.youtube.com/watch?v=3wiaqhb8UR0"},"Airflow Lineage")),(0,l.kt)("li",{parentName:"ul"},(0,l.kt)("a",{parentName:"li",href:"https://www.youtube.com/watch?v=YpUOqDU5ZYg"},"Airflow Run History in DataHub"))))}g.isMDXComponent=!0}}]);</pre></body></html>