4

I'm facing issues with the insertion of timestamps in to a PostgreSQL database using the Nifi PutSQL processor.

More specifically when trying to insert a date in the format '2018-01-31T19:01:09+00:00' in to a timestamptz column I get the following error message:

2018-04-01 19:29:40,091 ERROR [Timer-Driven Process Thread-5] o.apache.nifi.processors.standard.PutSQL PutSQL[id=7997503a-0162-1000-ee81-a0361cad5e0c] Failed to update database for StandardFlowFileRecord[uuid=d02e8b39-e564-4c37-a08a-dab8931e9890,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522615075930-15, container=default, section=15], offset=11492, length=163],offset=0,name=32836401373126,size=163] due to java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp; routing to failure: java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
java.sql.SQLDataException: The value of the sql.args.5.value is '2018-01-31T20:19:35+00:00', which cannot be converted to a timestamp
    at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:711)
    at org.apache.nifi.processors.standard.PutSQL.lambda$null$5(PutSQL.java:313)
    at org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
    at org.apache.nifi.processors.standard.PutSQL.lambda$new$6(PutSQL.java:311)
    at org.apache.nifi.processors.standard.PutSQL.lambda$new$9(PutSQL.java:354)
    at org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:91)
    at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:101)
    at org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger$20(PutSQL.java:574)
    at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
    at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
    at org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:574)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable date: "2018-01-31T20:19:35+00:00"
    at java.text.DateFormat.parse(DateFormat.java:366)
    at org.apache.nifi.processors.standard.PutSQL.setParameter(PutSQL.java:911)
    at org.apache.nifi.processors.standard.PutSQL.setParameters(PutSQL.java:707)
    ... 21 common frames omitted

I have tested the insertion of '2018-01-31T19:01:09+00:00' in to the timestamptz column from the command line and it works perfectly. I have tried various alternative formats such as:

  • '2018-01-31 19:01:09+00:00'
  • '2018-01-31 19:01:09+00'
  • '2018-01-31T19:01:09+00'
  • '2018-01-31 19:01:09'

They all fail with the same error in Nifi, even though they are all inserted just fine when performing the INSERT from the command line.

Please find a screenshot of my flow attached. Let me know if you need any more details.

Part of NiFi flow

To be honest I would prefer to avoid the java conversion all together, as leaving the datetime as a string and inserting it directly in to the Postgres DB works just fine. I have tried forcing this by using an UpdateAttribute processor but that lead to additional errors.

I have come across various questions regarding this topic, but I still don't get what is going on. Most notably:

vcovo
  • 336
  • 1
  • 3
  • 16
  • 1
    Reading [this](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java), it appears to me that by default it uses hard-coded format patterns for Date, Time and Timestamp, respectively `yyyy-MM-dd`, `HH:mm:ss.SSS` and `yyyy-MM-dd HH:mm:ss.SSS`. None of strings you've tried actually match these patterns, try appending `.0000` to seconds. Alternatively, define a date pattern that you in your flowfile. – M. Prokhorov Apr 02 '18 at 18:30
  • Thank you for pointing that out. It seems like none of those formats support time zone. How would you go about defining your own date pattern? This seems like the better approach for this scenario. – vcovo Apr 02 '18 at 18:38
  • It's a little higher in the same file I've linked. The pattern is `"sql.args." + parameterIndex + ".format"`. – M. Prokhorov Apr 02 '18 at 18:40
  • Perhaps I'm overlooking something obvious, but what would I be adding to the ConvertJSONToSQL processor? Would this be an UpdateAttribute processor? I fail to see how a custom timestamp pattern can be passed... – vcovo Apr 02 '18 at 18:47
  • 1
    possible your database could automatically convert the string in current format to timestamptz. so try to define this parameter as string. or some string-to-date convertion function on database level . – daggett Apr 02 '18 at 18:48
  • Right that's what I was hoping for, but I can't seem to override this from within the ConvertJSONToSQL processor (By setting the attribute type from '93' to '12'). If I define the parameter as `text` in my database then it won't be converted to the timestamptz. – vcovo Apr 02 '18 at 18:53
  • @vcovo, I don't *actually* know how can you do that, because I never worked with this library. I suspect that there can be something inside `ConvertJSONToSQL` - that just makes sense to me. Or indeed - just try defining it as string, most databases do implicit conversions. – M. Prokhorov Apr 02 '18 at 18:53
  • 1
    @vcovo, I think converting database column is not the way to do it - you'll lose too much on database side, including specific validations and SQL-related query niceties. Since that's a problem at statement level, should try to fix at statement level still. – M. Prokhorov Apr 02 '18 at 18:55
  • @M.Prokhorov Agreed. I suppose I could use an ExecuteScript/ExecuteStreamCommand processor to do the JSON to SQL statement conversion with a Python script although I suspect that this might slow the flow down a little (and I was hoping to use the pre-built processors). – vcovo Apr 02 '18 at 19:00
  • 1
    @vcovo, from what [I've gathered](https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html), you should be able to set an attribute with date format within a `FlowFile`, the same one which holds your content. I'm just hoping that you understand what I just said better than I do, and that it's actually a helpful comment. – M. Prokhorov Apr 03 '18 at 11:38
  • Thank you for the pointers, @M.Prokhorov! I have resolved it for now with an ExecuteStreamCommand processor. I might update this if I face any bottlenecks (none so far). – vcovo Apr 03 '18 at 20:59

3 Answers3

3

​I resolved this by using an ExecuteStreamCommand processor which calls a python script which converts a JSON line in to it's respective SQL insert statement. The table of interest in this case is reddit_post.

Code of the python script (I'm aware that there is no need for the INSERT argument, but this is there because I plan on adding an UPDATE option later on):

import json
import argparse
import sys

# For command line arguments
parser = argparse.ArgumentParser(description='Converts JSON to respective SQL statement')
parser.add_argument('statement_type', type=str, nargs=1)
parser.add_argument('table_name', type=str, nargs=1)

# Reading the command line arguments
statement_type = parser.parse_args().statement_type[0]
table_name = parser.parse_args().table_name[0]

# Initialize SQL statement 
statement = ''

for line in sys.stdin:
  # Load JSON line
  json_line = json.loads(line)

  # Add table name and SQL syntax
  if statement_type == 'INSERT':
    statement += 'INSERT INTO {} '.format(table_name)

  # Add table parameters and SQL syntax
  statement += '({}) '.format(', '.join(json_line.keys()))

  # Add table values and SQL syntax
  # Note that strings are formatted with single quotes, other data types are converted to strings (for the join method)
  statement += "VALUES ({});".format(', '.join("'{0}'".format(value.replace("'", "''")) if type(value) == str else str(value) for value in json_line.values()))

  # Send statement to stdout
  print(statement)
​

Configuration of ExecuteStreamCommand (Note that Argument Delimeter is set to a single space): enter image description here

Flow snippet:
enter image description here

I hope this can help someone that came across a similar issue. If you have any advice on how to improve the script, flow, or anything else please don't hesitate to let me know!

vcovo
  • 336
  • 1
  • 3
  • 16
2

You may be able to use UpdateAttribute before PutSQL, along with the toDate() and format() Expression Language functions, to convert your timestamp value into something that the database will accept.

Alternatively, you may be able to skip the SplitText, ConvertJSONToSQL, and PutSQL steps by using PutDatabaseRecord, you might be able to configure a RecordReader that will accept your timestamp format and convert accordingly. If that works, it's a much better solution as it will handle the whole flow file at once (instead of individual lines)

mattyb
  • 11,693
  • 15
  • 20
  • 2
    Thank you for your insights. I have actually already tried using UpdateAttribute as a workaround but I'm pretty sure that the error is coming from the ConvertJSONToSQL processor, and thus putting an UpdateAttribute processor after doesn't make much of a difference. I will check out PutDatabaseRecord - your description sounds promising. – vcovo Apr 02 '18 at 21:43
2

Just adding the 3 digits of milliseconds solved the problem for me with the ExecuteSQL processor - "yyyy-mm-dd hh:mm:ss.sss"

Bill
  • 21
  • 1