Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

google cloud composer - Airflow error importing DAG using plugin - Relationships can only be set between Operators

I have written an airflow plugin that simply contains one custom operator (to support CMEK in BigQuery). I can create a simple DAG with a single task that uses this operator and that executes fine.

However if I try and create a dependency in the DAG from a DummyOperator task to my custom operator task the DAG fails to load in the UI and throws the following error and I can't understand why this error is being thrown?

Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator

I have tested this so far on composer-1.4.2-airflow-1.9.0, composer-1.4.2-airflow-1.10.0 and composer-1.4.1-airflow-1.10.0.

Running airflow test for each of the tasks completes without error.

Using it in isolation in a DAG works fine (as below) so I don't believe there is anything inherently wrong with the plugin

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)

Whereas if I introduce a DummyOperator and dependency then the error occurs

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator

default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}

dag = DAG(
    'js_bq_custom_plugin_v2',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

etl_start = DummyOperator(task_id='etl_start', dag=dag)

extract = BQCMEKOperator(
    task_id     = 'extract',
    sql         = 'select * from foo.bar LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test5',
    key         = 'xxx',
    dag         = dag
)

etl_start.set_downstream(extract)

The operator itself is straightforward and I can reproduce the issue with the simplest custom operator such as the one below

import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class TestOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                *args,
                **kwargs):
        super(TestOperator, self).__init__(*args, **kwargs)


    def execute(self, context):
        logging.info("Executed by TestOperator")

With the following plugin definition in init.py

from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator

class TestPlugin(AirflowPlugin):
    name = "test_plugin"
    operators = [TestOperator]
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []

Also having looked at the airflow code in models.py that generates this error it uses isinstance(t, BaseOperator) and this returns true for my task using my custom operator when I just run it in python so I have no idea what is going on?

for t in task_list:
    if not isinstance(t, BaseOperator):
        raise AirflowException(
            "Relationships can only be set between "
            "Operators; received {}".format(t.__class__.__name__))
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...