I'm using Apache Flink (v1.11) with Scala and added an own DeserializationSchema for Kafka connector. Therefore i would like to use my own packages and versions of jackson (v2.12.0).
But i got the following error:
Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)
This is my EventSchema:
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.datatype.joda.JodaModule
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
@SerialVersionUID(6154188370181669758L)
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
private val mapper = new CsvMapper
mapper.registerModule(new JodaModule)
val csvSchema = mapper
.schemaFor(classOf[CovidEvent])
.withLineSeparator(",")
.withoutHeader()
val reader = mapper.readerWithSchemaFor(classOf[CovidEvent])
def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()
@throws[IOException]
def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)
def isEndOfStream(nextElement: CovidEvent) = false
def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
}
This is my PoJo for schema:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.time.DateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CovidEvent {
private long objectId;
private int bundeslandId;
private String bundesland;
private String landkreis;
private String altersgruppe;
private String geschlecht;
private int anzahlFall;
private int anzahlTodesfall;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
private DateTime meldedatum;
private int landkreisId;
private String datenstand;
private int neuerFall;
private int neuerTodesfall;
private String refDatum;
private int neuGenesen;
private int anzahlGenesen;
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
private boolean istErkrankungsbeginn;
private String altersGruppe2;
public long getEventtime() {
return meldedatum.getMillis();
}
}
After some research I found out that the error is probably caused by different Jackson versions in the classpath.
I thought it would be possible to use own version of Jackson, because Flink shaded the own versions.
What am I doing wrong?
UPDATE: If i import the jackson classes from shaded flink package it is working
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper
But so i am dependent on the flink shaded jackson version.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…