Issue
I want to import BigQueryTableExistenceAsyncSensor from airflow.providers.google.cloud.sensors.bigquery
here is my code:
from airflow import DAG
from util.dags_hourly import create_dag_write_append #this is class that I created, no issues with other DAG
from airflow.providers.google.cloud.sensors.bigquery import
BigQueryTableExistenceAsyncSensor
def __init__(self, dataset=None, table_name=None):
self.dataset = dataset
self.table_name = table_name
def check_table_exists(self):
return BigQueryTableExistenceAsyncSensor(
task_id="check_table_exists_async",
project_id='x-staging',
dataset_id=self.dataset,
table_id=self.table
)
with create_dag_write_append('test') as dag:
a = BigQueryTableExistenceAsyncSensor(
dataset_id='data_lake_staging',
table_id='test_table'
)
task1 = a.check_table_exists()
task1
However it returns DAG import error on Airflow:
Broken DAG: [/home/airflow/gcs/dags/manatal/test_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/airflow/gcs/dags/test/test_dag.py", line 4, in <module>
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceAsyncSensor
ImportError: cannot import name 'BigQueryTableExistenceAsyncSensor' from 'airflow.providers.google.cloud.sensors.bigquery' (/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/bigquery.py)BigQueryTableExistenceAsyncSensor
I read the documentation from here, but I don't understand why the library is not read properly.
my final objective is to check whether the table exist on my dataset.
Solution
I figure this out with a workaround. Turns out I can use
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound
class ExistsOperator(GCSToBigQuery, PostgresToGCS):
def __init__(self, filenames, table_name, task_id=None, task_id_pgcs=None, task_id_pgcs_init=None, task_id_gb=None):
self.task_id = task_id or f"check_exists_{filenames}_from_gcs"
self.task_id_pgcs = task_id_pgcs
self.task_id_pgcs_init = task_id_pgcs_init
self.task_id_gb = task_id_gb
self.bucket = constants.GCS_BUCKET
self.source_objects = filenames
self.table_name = table_name
def is_exists_storage(self):
client = storage.Client()
bucket = client.bucket(self.bucket)
filenames = bucket.get_blob(self.source_objects)
try:
filenames.exists(client)
return self.task_id_gb
except:
return 'no_upsert_task'
def is_exists_table(self):
client = bigquery.Client()
try:
table_name = client.get_table(self.table_name)
if table_name:
return self.task_id_pgcs
except NotFound as error:
return self.task_id_pgcs_init
I create additional function to check existence of file in cloud storage bucket. hope this helps anyone in needs!
Answered By - Mohammad Iqbal
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.