mercredi 15 juin 2016

Why am I getting java.lang.IllegalStateException on Google Dataflow?

I've upgraded to the new Google's dataflow version 1.6 and when I test in local machine I get a java.lang.IllegalStateException at the end of my pipeline. I hadn't this problem with version 1.5.1.

This doesn't occur in live environment just in local. Is it a bug of the new version? Is it necessary do changes in my code to avoid those errors?

I attached part of my pipeline to try find the problem.

private static void getTableRowAndWrite(final PCollection<KV<Integer, Iterable<byte[]>>> groupedTransactions, final String tableName) {
    // Get the tableRow element from the PCollection
    groupedTransactions
            .apply(ParDo
                    .of(((tableName.equals("avail")) ? new GetTableRowAvail() : new GetTableRowReservation())) //Get a TableRow
                    .named("Get " + tableName + " TableRows"))
            .apply(BigQueryIO
                    .Write
                    .named("Write to BigQuery " + tableName) //Write to BigQuery
                    .withSchema(createTableSchema())
                    .to((SerializableFunction<BoundedWindow, String>) window -> {
                        String date = window.toString();
                        String date2 = date.substring(1, 5) + date.substring(6, 8) + date.substring(9, 11);
                        return "travelinsights-1056:hotel." + tableName + "_full_" + (TEST ? "test_" : "") + date2;
                    })
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
}

The error is:

Exception in thread "main" java.lang.IllegalStateException: Cleanup time 294293-06-23T12:00:54.774Z is beyond end-of-time
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
at com.google.cloud.dataflow.sdk.util.ReduceFnRunner.onTimer(ReduceFnRunner.java:642)
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advance(BatchTimerInternals.java:134)
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advanceInputWatermark(BatchTimerInternals.java:110)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:91)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)

Aucun commentaire:

Enregistrer un commentaire