Posts

Schema for Table API

  Schema schema = Schema. newBuilder () .column( "id" , "INT" ) // .column("end_date", "date") .column( "name" , "VARCHAR" ) // .column("start_date", "date") // .column("page_number","INT") // .column("page_type","VARCHAR" ) // Add more columns as needed .build() ;

Datastream timezone type serialization

 Datastream timezone type serialization env.getConfig().registerTypeWithKryoSerializer(TimestampTz.class, JavaSerializer.class); env.getConfig().registerTypeWithKryoSerializer(TimeTz.class, JavaSerializer.class);

Join

  package Dataset ; import com.vertica.flink.dataStream.VerticaDataStreamInputFormat ; import org.apache.flink.api.common.functions.JoinFunction ; import org.apache.flink.api.java.functions.KeySelector ; import org.apache.flink.api.java.tuple.Tuple2 ; import org.apache.flink.streaming.api.datastream.DataStream ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment ; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows ; import org.apache.flink.streaming.api.windowing.time.Time ; import org.apache.flink.types.Row ; public class Join { public static void main (String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment () ; VerticaDataStreamInputFormat custDim = VerticaDataStreamInputFormat. buildVerticaSource () .setSourceDBUrl( "jdbc:vertica://10.20.73.64:5433/vertica_db?user=dbadmin&password=vdb" ) .setSo

Schema Creation in Flink

  Schema schema = Schema.newBuilder() .column("online_page_key", "INT") .column("end_date", "date") .column("page_description", "VARCHAR") .column("start_date", "date") .column("page_number","INT") .column("page_type","VARCHAR" ) .build();

Filter Function

  package org.example ; import com.vertica.flink.dataStream.CustomDataStreamInputFormat ; import com.vertica.flink.dataStream.CustomSinkFunction ; import org.apache.flink.api.common.functions.FilterFunction ; import org.apache.flink.api.common.typeinfo.BasicTypeInfo ; import org.apache.flink.api.java.typeutils.RowTypeInfo ; import org.apache.flink.connector.jdbc.JdbcConnectionOptions ; import org.apache.flink.connector.jdbc.JdbcInputFormat ; import org.apache.flink.connector.jdbc.JdbcSink ; import org.apache.flink.streaming.api.datastream.DataStream ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment ; import org.apache.flink.streaming.api.functions.sink.SinkFunction ; import org.apache.flink.types.Row ; public class DataTypeTesting { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment () ; CustomDataStreamInputFormat dataStreamInputFormat = CustomD

1) Setup of Flink

 Download Flink tar Extract Flink Tar Go to bin folder and run ./start-cluster.sh open localhost:8081 in browser  If you want to access Flink through IP Address then change the localhost with IP Address in flink-conf.yaml I have basically replaced all localhost with IP Address using: :s/localhost/IPAddress/g Now you can access the flink in browser using the IP Address also.

error resolution

Image
 java.lang.NoClassDefFoundError: org/apache/flink/connector/jdbc/JdbcStatementBuilder Select menu item "Run" "Edit Configurations..." then in the "Build and run" section select "Modify options" => Java => Add dependencies with "Provided" scope to classpath in your local configuration. In this way you don't have to remove the <scope>provided</scope>.