2

I'm trying to use Cloudera's Impala JDBC 2.6.17.1020 connector driver with Spark to be able to access tables in Kudu and in Hive simultaneously.

When the query is simple, it works fine and I get the expected output.

val simpleQuery = """ SELECT *
                       FROM HIVE_TABLE
                       WHERE ABC = '123'
                   """
val simpleQueryDF : DataFrame = spark.read.jdbc(this.impalaJDBCString,"("+simpleQuery+") as temp",this.impalaConnectionProps)
simpleQueryDF.show()

But when the query contains nested queries and joins, I get the header repeated as rows, just like below

+----------+--------+------------------------+--------------+ ...
|as_of_date|dq_index|impacted_data_lake_table|pk_column_name| ...
+----------+--------+------------------------+--------------+ ...
|as_of_date|dq_index|    impacted_data_lak...|pk_column_name| ...
|as_of_date|dq_index|    impacted_data_lak...|pk_column_name| ...
|as_of_date|dq_index|    impacted_data_lak...|pk_column_name| ...
+----------+--------+------------------------+--------------+ ...

I don't know why this behavior occurs, can someone advise me what to do?

The complex query is as below

SELECT    '2020-10-22' AS AS_OF_DATE,
          DATA_QUERY.DQ_INDEX,
          DATA_QUERY.IMPACTED_DATA_LAKE_TABLE,
          DATA_QUERY.PK_COLUMN_NAME,
          DATA_QUERY.PK_OF_BAD_RECORD,
          DATA_QUERY.DQ_ASSESSMENT_DIMENSION,
          DATA_QUERY.SOURCE_SYSTEM,
          'ABC' AS BUSINESS_SUBJECT_AREA,
          DATA_QUERY.SUB_SUBJECT_AREA_CD,
          NVL(SBJ_AREA_DESC_TABLE.SUB_SUBJECT_AREA, 'Not Recognized') AS SUB_SUBJECT_AREA
FROM (
        SELECT  'DQ_INDEX_1' AS DQ_INDEX,
                'dq_data_governance.table_1 fin_year
                 dq_data_governance.table_2
                 dq_data_governance.table_3
                 dq_data_governance.table_4 ' AS IMPACTED_DATA_LAKE_TABLE,
                NAMES AS PK_COLUMN_NAME,
                CAST(VALS AS STRING) AS PK_OF_BAD_RECORD,
                'KLM' AS DQ_ASSESSMENT_DIMENSION,
                'NOQ' AS SOURCE_SYSTEM,
                'TTV' AS SUB_SUBJECT_AREA_CD
        FROM (
                SELECT 'table_4.SEGMENT4|table_4.SEGMENT8|table_1.FIN_YEAR' AS NAMES,
                        concat(GCC.SEGMENT4,'|',GCC.SEGMENT8,'|',CAST(FIN_YEAR as STRING)) AS VALS
                FROM dq_data_governance.table_1 FIN_YEAR,
                     dq_data_governance.table_2 gjh,
                     dq_data_governance.table_3 gjl,
                     dq_data_governance.table_4 GCC
                WHERE GJH.JE_HEADER_ID = GJL.JE_HEADER_ID
                  AND GJL.CODE_COMBINATION_ID = GCC.CODE_COMBINATION_ID
                  AND ACTUAL_FLAG = '5'
                  AND GCC.SEGMENT9 = '9'
                  AND EFFECTIVE_DATE BETWEEN FIN_YEAR.from_date AND FIN_YEAR.to_date 
                GROUP BY GCC.SEGMENT4,GCC.SEGMENT8,FIN_YEAR
                HAVING sum((nvl((GJL.ACCOUNTED_DR), 0) - nvl((GJL.ACCOUNTED_CR), 0))) <-0.1 
        ) AS T_DQ_INDEX_1
        
        UNION ALL 
        
        SELECT  'DQ_INDEX_2' AS DQ_INDEX,
                'dq_data_governance.table_5' AS IMPACTED_DATA_LAKE_TABLE,
                NAMES AS PK_COLUMN_NAME,
                CAST(VALS AS STRING) AS PK_OF_BAD_RECORD,
                'BKL' AS DQ_ASSESSMENT_DIMENSION,
                'CCG' AS SOURCE_SYSTEM,
                'LMA' AS SUB_SUBJECT_AREA_CD
        FROM (
                SELECT 'table_5.contractid|table_5.name' AS NAMES,
                        concat(CAST(c.contractid as STRING),'|',c.name) AS VALS
                FROM dq_data_governance.table_5 c
                INNER JOIN dq_data_governance.table_6 ps ON ps.processinstanceid=c.processinstanceid
                WHERE c.amountinriyal<1 and ps.approvalstatusid=3
        ) AS T_DQ_INDEX_2
        
        UNION ALL
        
        # ...
        # <MORE QUERIES CAN BE ADDED HERE>
        # ...
        
) AS DATA_QUERY
LEFT JOIN dq_data_governance.dq_sub_subject_area as SBJ_AREA_DESC_TABLE ON DATA_QUERY.SUB_SUBJECT_AREA_CD = SBJ_AREA_DESC_TABLE.SUB_SUBJECT_AREA_CD

Related: #1 , #2

Matrix
  • 1,810
  • 1
  • 19
  • 20

1 Answers1

0

Try for Amazon version of Impala driver from here.

While using this driver please recheck for Double data type equality between table and dataframe.

I hope it will work for you.

  • I tried it now, and it did the same behavior. BTW, I'm using a Cloudera environment not AWS EMR. – Matrix Oct 22 '20 at 07:00
  • This Simba libs compatible for non-emr envs as well. I don't know which protocol you are using but you can try to connect to Impala with "jdbc:hive2" protocol instead of "jdbc:impala" living the rest of the URL part the same. – Ramil Kaliyev Oct 22 '20 at 08:02
  • I'm connecting using Impala JDBC jdbc:impala://Host:Port[/Schema];Property1=Value;Property2=Value;... Replacing impala with hive2 fails to connect – Matrix Oct 22 '20 at 08:22