• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala EncoderFactory类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.avro.io.EncoderFactory的典型用法代码示例。如果您正苦于以下问题:Scala EncoderFactory类的具体用法?Scala EncoderFactory怎么用?Scala EncoderFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了EncoderFactory类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Tip

//设置package包名称以及导入依赖的类
package com.alvin.niagara.model

import java.io.ByteArrayOutputStream
import java.util

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.JavaConversions._
import scala.io.Source


case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)


object TipSerde {

  val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
  val schema = new Schema.Parser().parse(avroSchema)

  val reader = new GenericDatumReader[GenericRecord](schema)
  val writer = new GenericDatumWriter[GenericRecord](schema)

  def serialize(tip: Tip): Array[Byte] = {

    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("business_id", tip.business_id)
    avroRecord.put("date", tip.date)
    avroRecord.put("likes", tip.likes)
    avroRecord.put("text", tip.text)
    avroRecord.put("type", tip.`type`)
    avroRecord.put("user_id", tip.user_id)

    writer.write(avroRecord, encoder)
    encoder.flush
    out.close
    out.toByteArray

  }

  def deserialize(bytes: Array[Byte]): Tip = {

    val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
    val record = reader.read(null, decoder)

    Tip(
      record.get("business_id").toString,
      record.get("date").toString,
      record.get("likes").asInstanceOf[Long],
      record.get("text").toString,
      record.get("type").toString,
      record.get("user_id").toString
    )
  }
} 
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala


示例2: AvroNodeSerializer

//设置package包名称以及导入依赖的类
package eventgen.launcher.core.avro

import java.io.ByteArrayOutputStream

import eventgen.launcher.core.NodeSerializer
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory


class AvroNodeSerializer extends NodeSerializer[Schema, AvroNode[_], ByteArrayOutputStream] {
  override def serialize(metadata: Schema, node: AvroNode[_]): ByteArrayOutputStream = {
    val record = node.asInstanceOf[AvroRecord]
    val writer = new GenericDatumWriter[GenericRecord]
    writer.setSchema(metadata)
    val outputStream = new ByteArrayOutputStream
    val encoder = EncoderFactory.get().jsonEncoder(metadata, outputStream, true)
    writer.write(record.value, encoder)
    encoder.flush()
    outputStream
  }
} 
开发者ID:gpulse,项目名称:eventgenerator,代码行数:23,代码来源:AvroNodeSerializer.scala


示例3: Avro

//设置package包名称以及导入依赖的类
package com.lukecycon.avro

import java.io.ByteArrayOutputStream
import org.apache.avro.io.EncoderFactory
import org.apache.avro.file.BZip2Codec
import java.nio.ByteBuffer
import org.apache.avro.io.DecoderFactory

object Avro {
  def schemaFor[T: AvroFormat] = implicitly[AvroFormat[T]].schema

  def write[T: AvroFormat](thing: T, compress: Boolean = false): Array[Byte] = {
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    implicitly[AvroFormat[T]].writeValue(thing, encoder)

    encoder.flush

    if (compress) {
      new BZip2Codec().compress(ByteBuffer.wrap(out.toByteArray)).array
    } else {
      out.toByteArray
    }
  }

  def writeHex[T: AvroFormat](thing: T): String =
    byteArrayToHexString(write(thing))

  def read[T: AvroFormat](bytes: Array[Byte],
                          compressed: Boolean = false): Either[String, T] = {
    val byts = if (compressed) {
      new BZip2Codec().decompress(ByteBuffer.wrap(bytes)).array
    } else {
      bytes
    }

    val decoder = DecoderFactory.get.binaryDecoder(byts, null)

    implicitly[AvroFormat[T]].decodeValue(Nil, decoder)
  }

  def readHex[T: AvroFormat](hex: String): Either[String, T] =
    read(
        hex
          .replace(" ", "")
          .grouped(2)
          .map(Integer.parseInt(_, 16).toByte)
          .toArray)

  private def byteArrayToHexString(bb: Array[Byte]): String =
    bb.map("%02X" format _).mkString.grouped(2).mkString(" ")
} 
开发者ID:themattchan,项目名称:Skaro,代码行数:54,代码来源:Avro.scala


示例4: AvroFlumeEventEncoder

//设置package包名称以及导入依赖的类
package contrib.kafka.serializer

import kafka.serializer.Encoder
import kafka.utils.VerifiableProperties

import org.apache.avro.io.BinaryEncoder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter

import org.apache.flume.source.avro.AvroFlumeEvent

import java.io.ByteArrayOutputStream
import java.io.InputStream

class AvroFlumeEventEncoder(props: VerifiableProperties = null)
  extends Encoder[AvroFlumeEvent] {

  private val writer: SpecificDatumWriter[AvroFlumeEvent] =
    new SpecificDatumWriter[AvroFlumeEvent](classOf[AvroFlumeEvent])
  private var encoder: BinaryEncoder = null.asInstanceOf[BinaryEncoder]
  private var tempOutStream = new ByteArrayOutputStream()

  override def toBytes(event: AvroFlumeEvent): Array[Byte] = {
    tempOutStream.reset()
    encoder = EncoderFactory.get.directBinaryEncoder(tempOutStream, encoder)
    writer.write(event, encoder)
    tempOutStream.toByteArray
  }
} 
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:30,代码来源:AvroFlumeEventEncoder.scala


示例5: Util

//设置package包名称以及导入依赖的类
import java.io.{ByteArrayOutputStream, FileInputStream}
import java.util.Properties

import org.apache.avro.io.EncoderFactory
import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter}
import spray.json.JsonFormat

import scala.util.{Failure, Success, Try}

import spray.json.{JsonFormat, _}
import DefaultJsonProtocol._


object Util {

  def parse[T](data: String)(implicit format : JsonFormat[T]) = {
    Try {
      val parsed = data.parseJson
      parsed.convertTo[T]
    }
  }

  def parseCollection[T](data: String)(implicit format : JsonFormat[T]) = {
    Try {
      val parsed = data.parseJson
      parsed.convertTo[Seq[T]]
    }
  }

  def serialize[T](obj: T) = {
    val schema = ReflectData.get().getSchema(obj.getClass)
    val writer = new ReflectDatumWriter[T](schema)
    val out = new ByteArrayOutputStream

    Try {
      writer.write(obj, EncoderFactory.get.directBinaryEncoder(out, null))
      out.toByteArray
    }
  }

  def getFileLines(fileName : String) = scala.io.Source.fromFile(fileName).mkString.split("\n")

  def loadPropertiesFile(fileName : String) = {
    val prop = new Properties()

    Try{
      prop.load(new FileInputStream(fileName))
    } match {
      case Failure(e) => e.printStackTrace; System.exit(1)
      case Success(msg) => ()
    }

    prop

  }

} 
开发者ID:msumner91,项目名称:FlinkCEP,代码行数:58,代码来源:Util.scala


示例6: mkSerializer

//设置package包名称以及导入依赖的类
package com.cj.serialization

import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.io.{BinaryDecoder, BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord}

package object avro {
  type RecordSerializer[-T] = T => Array[Byte]
  type RecordDeserializer[+T] = Array[Byte] => T

  // make a RecordSerializer for class T, passing through intermediate avro-generated class U
  def mkSerializer[T, U <: SpecificRecord](f: (T => U)): RecordSerializer[T] = {
    val avroSerializer = mkAvroSerializer[U]()
    record => avroSerializer(f(record))
  }

  // make a RecordDeserializer for class T, passing through intermediate avro-generated class U
  def mkDeserializer[T, U >: Null <: SpecificRecord](f: U => T, schema: Schema): RecordDeserializer[Option[T]] = {
    val avroDeserializer = mkAvroDeserializer(schema)
    bytes => {
      val avroRec: U = avroDeserializer(bytes)
      if (avroRec == null) None
      else Some(f(avroRec))
    }
  }

  // make a RecordSerializer for avro-generated class T
  def mkAvroSerializer[T <: SpecificRecord](): RecordSerializer[T] = {
    val output = new ByteArrayOutputStream()
    val writer: DatumWriter[T] = new SpecificDatumWriter[T]()
    var encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(output, null)
    record => {
      output.reset()
      encoder = EncoderFactory.get().binaryEncoder(output, encoder)
      writer.setSchema(record.getSchema)
      writer.write(record, encoder)
      encoder.flush()
      output.close()
      output.toByteArray
    }
  }

  // make a RecordDeserializer for avro-generated class T
  def mkAvroDeserializer[T >: Null <: SpecificRecord](schema: Schema): RecordDeserializer[T] = {
    val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema)
    var decoder: BinaryDecoder = DecoderFactory.get().binaryDecoder(Array[Byte](), null)
    bytes => {
      decoder = DecoderFactory.get().binaryDecoder(bytes, decoder)
      reader.read(null, decoder)
    }
  }
} 
开发者ID:cjdev,项目名称:avro-serialization,代码行数:55,代码来源:package.scala


示例7: AvroEncoder

//设置package包名称以及导入依赖的类
import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.io.{BinaryEncoder, DatumWriter, EncoderFactory}
import org.apache.avro.reflect.ReflectDatumWriter
import org.apache.avro.specific.SpecificRecordBase

class AvroEncoder[A <: SpecificRecordBase](schema: Schema) {

  val encoderFactory: EncoderFactory = EncoderFactory.get()
  val avroWriter: DatumWriter[A] = new ReflectDatumWriter[A](schema)

  def encode(bidPackage: A, reuseEncoder: Option[BinaryEncoder] = Option.empty): Array[Byte] = {
    val stream = new ByteArrayOutputStream
    val binaryEncoder = encoderFactory.binaryEncoder(stream, reuseEncoder.orNull)
    avroWriter.write(bidPackage, binaryEncoder)
    binaryEncoder.flush()
    stream.toByteArray
  }
} 
开发者ID:defndaines,项目名称:ScalaBits,代码行数:21,代码来源:AvroEncoder.scala


示例8: GenericRecordEventJsonConverter

//设置package包名称以及导入依赖的类
package com.pragmasoft.eventaggregator

import java.io.ByteArrayOutputStream

import com.pragmasoft.eventaggregator.model.KafkaAvroEvent
import com.sksamuel.elastic4s.source.Indexable
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.joda.time.format.ISODateTimeFormat

object GenericRecordEventJsonConverter {

  case class EventHeaderDescriptor(eventIdPath: Option[String], eventTsPath: Option[String]) {
    import com.pragmasoft.eventaggregator.GenericRecordFieldExtractionSupport._

    def extractEventId[T <: GenericRecord](event: T): Option[String] = eventIdPath.flatMap(event.getField[Any]).map(_.toString)

    def extractEventTs[T <: GenericRecord](event: T): Option[Long] = eventTsPath.flatMap(event.getField[Long])
  }

  private def asJsonString(event: GenericRecord): String = {
    val out = new ByteArrayOutputStream()
    val jsonEncoder = EncoderFactory.get().jsonEncoder(event.getSchema, out)
    val writer = new GenericDatumWriter[GenericRecord](event.getSchema)

    writer.write(event, jsonEncoder)
    jsonEncoder.flush()
    out.close()

    out.toString
  }

  implicit def kafkaAvroEventIndexable(implicit headerDescriptor: EventHeaderDescriptor): Indexable[KafkaAvroEvent[GenericRecord]] = new Indexable[KafkaAvroEvent[GenericRecord]] {
    val timestampFormat = ISODateTimeFormat.dateTime().withZoneUTC

    override def json(event: KafkaAvroEvent[GenericRecord]): String = {
      val timestampJsonAttributeMaybe =
        headerDescriptor.extractEventTs(event.data)
          .map(ts => s""" "@timestamp" : "${timestampFormat.print(ts)}",""")

      s"""{
          |  ${timestampJsonAttributeMaybe.getOrElse("")}
          |  "location" : { "topic" : "${event.location.topic}", "partition" : ${event.location.partition}, "offset" : ${event.location.offset} },
          |  "schemaName" : "${event.schemaName}",
          |  "data" : ${asJsonString(event.data)}
          |} """.stripMargin

    }
  }

} 
开发者ID:galarragas,项目名称:event-aggregator,代码行数:52,代码来源:GenericRecordEventJsonConverter.scala



注:本文中的org.apache.avro.io.EncoderFactory类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala CharSequenceReader类代码示例发布时间:2022-05-23
下一篇:
Scala AuthenticatorService类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap