flink序列化类型验证

发布时间 2023-10-07 14:53:35作者: Choice7

flink支持的序列化类型

  • 官方支持

    1. java tuples and scala caseclasses

    2. java pojos

    3. primitive types

    4. regular classes

    5. values

    6. hadoop writables

    7. speclal Types

  • 验证代码

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);
      env.getConfig().disableGenericTypes();

      Tuple2<String, String> tuple2 = new Tuple2<>();
      tuple2.f0 = "a";
      tuple2.f1 = "b";
      DataStreamSource<Tuple2<String, String>> tupleStream = env.fromElements(tuple2);
      tupleStream.print();

      // 1. list测试
      List<String> list = new ArrayList<>();
      list.add("hadoop");
      list.add("hive");
      list.add("spark");
      list.add("flink");

      DataStreamSource<List<String>> listStream = env.fromElements(list);
      listStream.print("listStream");
      // 2. map测试
      Map<String, String> map = new HashMap<>();
      map.put("a","java");
      map.put("b","python");
      map.put("c","scala");
      DataStreamSource<Map<String, String>> mapStream = env.fromElements(map);
      mapStream.print("mapStream");

      // 3. json数据测试
      JSONObject json = new JSONObject();
      json.put("a","java");
      json.put("b","python");
      json.put("c", 123);
      DataStreamSource<JSONObject> jsonStream = env.fromElements(json);
      jsonStream.print("jsonStream");

      env.execute();
  • 测试结果

    class java.util.ArrayList does not contain a setter for field size
    Class class java.util.ArrayList cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

    class java.util.HashMap does not contain a getter for field threshold
    class java.util.HashMap does not contain a setter for field threshold
    Class class java.util.HashMap cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

    class com.alibaba.fastjson2.JSONObject does not contain a getter for field accessOrder
    class com.alibaba.fastjson2.JSONObject does not contain a setter for field accessOrder
    Class class com.alibaba.fastjson2.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

    Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.ArrayList is treated as a generic type.
  • 分析结果

    • 禁用掉 generic type 程序会直接报错

    • flink 范用类型都是采用的 kryo序列化器进行的序列化

      3. Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.HashMap is treated as a generic type.

    • 由于泛型擦除导致被识别为 泛用类型

    • 添加相应的类型推断即可 dataStream.returns(Types.MAP(Types.STRING,Types.STRING))