• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala ActorSubscriber类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.actor.ActorSubscriber的典型用法代码示例。如果您正苦于以下问题:Scala ActorSubscriber类的具体用法?Scala ActorSubscriber怎么用?Scala ActorSubscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ActorSubscriber类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: NakadiActorSubscriber

//设置package包名称以及导入依赖的类
package org.zalando.react.nakadi

import akka.actor.{ActorLogging, ActorRef, Props}
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, RequestStrategy}
import org.zalando.react.nakadi.NakadiMessages._


object NakadiActorSubscriber {

  def props(consumerAndProps: ReactiveNakadiProducer, requestStrategyProvider: () => RequestStrategy) = {
    Props(new NakadiActorSubscriber(consumerAndProps, requestStrategyProvider))
  }
}

class NakadiActorSubscriber(producerAndProps: ReactiveNakadiProducer, requestStrategyProvider: () => RequestStrategy)
  extends ActorSubscriber
  with ActorLogging {

  override protected val requestStrategy = requestStrategyProvider()
  private val client: ActorRef = producerAndProps.nakadiClient

  override def receive: Receive = {
    case ActorSubscriberMessage.OnNext(element)   => processElement(element.asInstanceOf[StringProducerMessage])
    case ActorSubscriberMessage.OnError(ex)       => handleError(ex)
    case ActorSubscriberMessage.OnComplete        => stop()
  }

  private def processElement(message: StringProducerMessage) = client ! message

  private def handleError(ex: Throwable) = {
    log.error(ex, "Stopping Nakadi subscriber due to fatal error.")
    stop()
  }

  def stop() = {
    context.stop(self)
  }
} 
开发者ID:zalando-nakadi,项目名称:reactive-nakadi,代码行数:39,代码来源:NakadiActorSubscriber.scala


示例2: KafkaWriter

//设置package包名称以及导入依赖的类
package wiii

import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}


object KafkaWriter {
    def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}

class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
    override def preStart(): Unit = initWriter()
    override def receive: Receive = {
        case _ =>
    }

    def initWriter(): Unit = {
        val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
            bootstrapServers = "localhost:9092",
            topic = topicName,
            valueSerializer = TweetSerializer
        ))
        val subscriber = context.actorOf(subscriberProps)
        val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

        val factory = new TwitterStreamFactory()
        val twitterStream = factory.getInstance()
        twitterStream.addListener(new StatusForwarder(actorRef))
        twitterStream.filter(new FilterQuery("espn"))

        Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
        .runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
    }
}

class StatusForwarder(publisher: ActorRef) extends StatusListener {
    def onStatus(status: Status): Unit = publisher ! status

    //\\ nop all the others for now  //\\
    def onStallWarning(warning: StallWarning): Unit = {}
    def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
    def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
    def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
    def onException(ex: Exception): Unit = {}
} 
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala


示例3: QueueSubscriber

//设置package包名称以及导入依赖的类
package reactive.queue.router

import akka.actor.{ActorLogging, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError, OnNext}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import io.scalac.amqp.{Connection, Message}

object QueueSubscriber {
  def props(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri): Props =
    Props(classOf[QueueSubscriber],queueConnection,  queueName, queueSubscriberUri)
}

class QueueSubscriber(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri) extends
  ActorSubscriber with ActorLogging {
  implicit val system = context.system
  implicit val ec = context.dispatcher
  implicit val materlizer = ActorMaterializer()

  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  def receive = {
    case OnNext(message: Message) => route(ByteString(message.body.toArray).decodeString("UTF-8"))
    case OnComplete => log.info("*** on complete")
    case OnError(error) => log.error(s"*** on error: $error")
    case HttpResponse(status, _, _, _) => log.info(s"*** route response: ${status.intValue}")
  }

  def route(message: String): Unit = {
    log.info(s"*** on next: $message")
    try {
      val httpResponse = for {
        request <- Marshal(message).to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method = HttpMethods.POST, uri = queueSubscriberUri, entity = request))
        entity <- Unmarshal(response).to[HttpResponse]
      } yield entity
      httpResponse.pipeTo(self)
    } catch {
      case t: Throwable =>
        log.error(s"*** on next: forward to uri $queueSubscriberUri failed on: $message with error: ${t.getMessage}")
        queueConnection.publish(queueName, message)
        log.info(s"*** on next: republished to queue $queueName")
    }
  }
} 
开发者ID:objektwerks,项目名称:reactive.queue.router,代码行数:52,代码来源:QueueSubscriber.scala


示例4: HrSenderActor

//设置package包名称以及导入依赖的类
package it.wknd.reactive.frontend

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging

import scala.concurrent.ExecutionContext

class HrSenderActor(implicit val actorSystem: ActorSystem,
                    val ec: ExecutionContext,
                    val materializer: ActorMaterializer) extends ActorSubscriber with Logging {
  private var inFlight = 0
  private val http = Http()

  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally: Int = inFlight
  }

  def receive: Receive = {
    case OnNext(event: String) =>
      inFlight += 1

      http.singleRequest(HttpRequest(
        method = HttpMethods.POST,
        uri = Uri("http://localhost:2525/hr"),
        entity =  HttpEntity(
          ContentTypes.`application/json`,
          event
        ))) onComplete {
        tryy =>
          log.info(tryy.toString)
          inFlight -= 1
      }
  }
} 
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:40,代码来源:HrSenderActor.scala


示例5: StepSenderActor

//设置package包名称以及导入依赖的类
package it.wknd.reactive.frontend

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging

import scala.concurrent.ExecutionContext

class StepSenderActor(implicit val actorSystem: ActorSystem,
                      val ec: ExecutionContext,
                      val materializer: ActorMaterializer) extends ActorSubscriber with Logging {
  private var inFlight = 0
  private val http = Http()

  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally: Int = inFlight
  }

  def receive: Receive = {
    case OnNext(event: String) =>
      inFlight += 1

      http.singleRequest(HttpRequest(
        method = HttpMethods.POST,
        uri = Uri("http://localhost:2525/step"),
        entity = HttpEntity(
          ContentTypes.`application/json`,
          event
        ))) onComplete {
        tryy =>
          log.info(tryy.toString)
          inFlight -= 1
      }

  }
} 
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:41,代码来源:StepSenderActor.scala


示例6: NotifierActor

//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend

import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging
import it.wknd.reactive.backend.model.HealthNotification

class NotifierActor extends ActorSubscriber with Logging {
  private var worstHeartRate: Option[Int] = None
  private var inFlight = 0

  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally: Int = inFlight
  }

  def receive: Receive = {
    case OnNext(event: HealthNotification) =>
      inFlight += 1
      processEvent(event)
      log.info(event.toString)
      inFlight -= 1
  }

  def processEvent(decision: HealthNotification): Unit =
    worstHeartRate = Some((decision.heartRate :: worstHeartRate.toList).max)
} 
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:27,代码来源:NotifierActor.scala


示例7: TopicActor

//设置package包名称以及导入依赖的类
package actors

import akka.actor.{ActorRef, Terminated}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import models.response.View
import play.api.libs.json.{JsValue, Json}

object TopicActor {
  case class Subscribe(actorRef: ActorRef)
}

class TopicActor extends ActorSubscriber with View.JsonWriter {
  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  var subscribers: Set[ActorRef] = Set()

  override def receive: Receive = {
    case Terminated(actor) =>
      subscribers = subscribers - actor
    case TopicActor.Subscribe(sub) =>
      context.watch(sub)
      subscribers = subscribers + sub
    case view: View =>
      val json = view match {
        case value: View.Event =>
          Json.obj(("type", "Event"), ("value", Json.toJson(value)))
        case value: View.Message =>
          Json.obj(("type", "Message"), ("value", Json.toJson(value)))
        case value: View.Gravity =>
          Json.obj(("type", "Gravity"), ("value", Json.toJson(value)))
      }
      subscribers.foreach(a => a ! json)
  }
} 
开发者ID:Augment8,项目名称:slot,代码行数:35,代码来源:TopicActor.scala


示例8: InsertSink

//设置package包名称以及导入依赖的类
package com.github.jeroenr.tepkin

import akka.actor.{ActorLogging, ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy, RequestStrategy}
import com.github.jeroenr.bson.{BsonDocument, Bulk}
import com.github.jeroenr.tepkin.protocol.WriteConcern
import com.github.jeroenr.tepkin.protocol.command.Insert
import com.github.jeroenr.tepkin.protocol.message.Reply

class InsertSink(databaseName: String,
                 collectionName: String,
                 pool: ActorRef,
                 parallelism: Int,
                 ordered: Option[Boolean],
                 writeConcern: Option[BsonDocument])
  extends ActorSubscriber with ActorLogging {

  var requests = 0

  override protected def requestStrategy: RequestStrategy = new MaxInFlightRequestStrategy(parallelism) {
    override def inFlightInternally: Int = requests
  }

  override def receive: Receive = {
    case OnNext(bulk: Bulk) =>
      pool ! Insert(databaseName, collectionName, bulk.documents, ordered, writeConcern)
      requests += 1

    case reply: Reply =>
      requests -= 1
      if (requests == 0 && canceled) {
        context.stop(self)
      }
  }
}


object InsertSink {
  def props(databaseName: String,
            collectionName: String,
            pool: ActorRef,
            parallelism: Int = 1,
            ordered: Option[Boolean] = None,
            writeConcern: Option[WriteConcern] = None): Props = {
    Props(new InsertSink(databaseName, collectionName, pool, parallelism, ordered, writeConcern.map(_.toDoc)))
  }
} 
开发者ID:dgouyette,项目名称:tepkin,代码行数:49,代码来源:InsertSink.scala


示例9: WritingActor

//设置package包名称以及导入依赖的类
package wiii

import akka.actor.{Actor, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import wiii.Implicits._


object WritingActor {
    def props(host: String, port: Int, path: String) = Props(new WritingActor(host, port, path))
}

class WritingActor(host: String, port: Int, path: String) extends Actor with ActorSubscriber with LazyLogging {
    val filesys = {
        val conf = new Configuration()
        conf.set("fs.default.name", s"hdfs://$host:$port")
        FileSystem.get(conf)
    }

    val stream = filesys.create(path, true)

    def receive: Receive = {
        case OnNext(bs: ByteString) =>
            stream.write(bs.toArray)
            logger.trace(s"wrote ${bs.size} to $path")
    }
    protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
} 
开发者ID:jw3,项目名称:example-hdfs-docker,代码行数:33,代码来源:WritingActor.scala


示例10: App

//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl

import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future

object App {

  def main(args: Array[String]): Unit = {

    implicit val actorSystem = ActorSystem( "ReactiveKafka" )
    implicit val materializer = ActorMaterializer( )

    val kafka = new ReactiveKafka( )

    val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
      bootstrapServers = "kafka:9092",//kafka
      topic = "alert_mail",
      groupId = "sendMailKschool",
      valueDeserializer = new StringDeserializer( )
    ) )

    val senderActor = actorSystem.actorOf( Props[SenderActor] )
    val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
            ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)

    Source.fromPublisher( publisher )
      .map( retrieveContactFromMongo )
      .to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )

  }

  def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {

    val splittedRecord = record.value.split(",")
    val user = splittedRecord(0)
    val typeAlarm = splittedRecord(1)
    val latitude = splittedRecord(2)
    val longitude = splittedRecord(3)

    (db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)

  }

} 
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala


示例11: ChatService

//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service

import akka.actor.ActorSystem
import akka.stream.{ FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.stream.actor.{ ActorPublisher, ActorSubscriber }
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }

class ChatService(implicit system: ActorSystem, mat: Materializer) {
  val roomActor = system.actorOf(RoomActor.props("default"), "defaultroom")
  val roomPub =
    Source
      .fromPublisher[String](ActorPublisher(roomActor))
      .map(msg => TextMessage(Source.single(msg)))

  def flow(name: String): Flow[Message, Message, Any] = {
    val userActor = system.actorOf(UserActor.props(name, roomActor))
    roomActor.tell(InternalProtocol.Join(name), userActor)
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val userOut = b.add(Source.fromPublisher(ActorPublisher(userActor)))
      val userIn = b.add(Sink.fromSubscriber(ActorSubscriber(userActor)))

      val fromMessage = b.add(msgToStringFlow)
      val toMessage = b.add(Flow[String].map(msg => TextMessage(msg)))

      fromMessage ~> userIn
      userOut ~> toMessage

      FlowShape(fromMessage.in, toMessage.out)
    })
  }

  def msgToStringFlow: Flow[Message, String, Any] = Flow[Message].mapConcat {
    case TextMessage.Strict(msg) => msg :: Nil
    case tm: TextMessage =>
      tm.textStream.runWith(Sink.ignore)
      Nil
    case bm: BinaryMessage =>
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

} 
开发者ID:Technius,项目名称:chatty,代码行数:46,代码来源:ChatService.scala


示例12: UserActor

//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service

import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, WatermarkRequestStrategy }
import akka.stream.actor.{ ActorSubscriberMessage => Sub }
import akka.stream.actor.{ ActorPublisherMessage => Pub }

import InternalProtocol._

class UserActor private(name: String, roomActor: ActorRef) extends Actor
    with ActorSubscriber
    with ActorPublisher[String] {

  val requestStrategy = new WatermarkRequestStrategy(100)

  val msgQueue = collection.mutable.Queue[String]()
  def receive = {
    case Sub.OnNext(msg: String) =>
      roomActor ! InboundMessage(name, msg)
    case Pub.Request(n) => flushQueue(n)
    case OutboundMessage(msg) =>
      msgQueue += msg
      if (isActive && totalDemand > 0) {
        flushQueue(totalDemand)
      }
    case Sub.OnComplete =>
      roomActor ! Leave(name)
  }

  def flushQueue(num: Long): Unit = {
    var cur = 0
    while (cur < num && msgQueue.size > 0) {
      val msg = msgQueue.dequeue()
      onNext(msg)
      cur = cur + 1
    }
  }
}

object UserActor {
  def props(name: String, roomActor: ActorRef): Props =
    Props(new UserActor(name, roomActor))
} 
开发者ID:Technius,项目名称:chatty,代码行数:44,代码来源:UserActor.scala


示例13: SummarizerPersistent

//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams

import akka.persistence.PersistentActor
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}

import scala.collection.mutable.{ Map => MMap }

import scalaz._
import Scalaz._

class SummarizerPersistent extends PersistentActor with ActorSubscriber with Logging {
  private val balance = MMap.empty[String, Balance]

  override def persistenceId = "transaction-netter"

  private var inFlight = 0

  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally = inFlight
  }

  def receiveCommand = {
    case OnNext(data: Transaction) =>
      inFlight += 1
      persistAsync(data) { _ =>
        updateBalance(data)
        inFlight -= 1
      }

    case LogSummaryBalance => logger.info("Balance so far: " + balance)
  }

  def receiveRecover = {
    case d: Transaction => updateBalance(d)
  }

  def updateBalance(data: Transaction) = balance.get(data.accountNo).fold { 
    balance += ((data.accountNo, Balance(data.amount, data.debitCredit)))
  } { b =>
    balance += ((data.accountNo, b |+| Balance(data.amount, data.debitCredit)))
  }
} 
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:45,代码来源:SummarizerPersistent.scala


示例14: TransactionProcessor

//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Tcp, Source, Sink, Framing}
import akka.util.ByteString

import scala.concurrent.duration._


class TransactionProcessor(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {

  def run(): Unit = {
    implicit val mat = ActorMaterializer()

    val summarizer = system.actorOf(Props[Summarizer])

    logger.info(s"Receiver: binding to $host:$port")

    Tcp().bind(host, port).runForeach { conn =>
      val receiveSink = 
        conn.flow
            .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 4000, allowTruncation = true))
            .map(_.utf8String)
            .map(_.split(","))
            .mapConcat(Transaction(_).toList)
            .to(Sink.fromSubscriber(ActorSubscriber[Transaction](summarizer)))

      receiveSink.runWith(Source.maybe)
    }

    import system.dispatcher
    system.scheduler.schedule(0.seconds, 5.seconds, summarizer, LogSummaryBalance)
  }
}

object TransactionProcessor extends App {
  implicit val system = ActorSystem("processor")
  new TransactionProcessor("127.0.0.1", 9982).run()
} 
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:43,代码来源:TransactionProcessor.scala


示例15: Summarizer

//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams

import akka.actor.Actor
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}

import scala.collection.mutable.{ Map => MMap }

import scalaz._
import Scalaz._

class Summarizer extends Actor with ActorSubscriber with Logging {
  private val balance = MMap.empty[String, Balance]

  private var inFlight = 0

  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally = inFlight
  }

  def receive = {
    case OnNext(data: Transaction) =>
      inFlight += 1
      updateBalance(data)
      inFlight -= 1

    case LogSummaryBalance => logger.info("Balance so far: " + balance)
  }

  def updateBalance(data: Transaction) = balance.get(data.accountNo).fold { 
    balance += ((data.accountNo, Balance(data.amount, data.debitCredit)))
  } { b =>
    balance += ((data.accountNo, b |+| Balance(data.amount, data.debitCredit)))
  }
} 
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:37,代码来源:Summarizer.scala


示例16: MessageReceiver

//设置package包名称以及导入依赖的类
package server

import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{MaxInFlightRequestStrategy, ActorSubscriber}
import akka.util.ByteString

object MessageReceiver {
  def props(router: ActorRef): Props = Props(new MessageReceiver(router))
}

class MessageReceiver(router: ActorRef) extends ActorSubscriber {
  import akka.stream.actor.ActorPublisherMessage._
  import MessageReceiver._

  val MaxBufferSize = 100
  var uid = ""

  override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxBufferSize) {
    override def inFlightInternally: Int = 0 //not safe :)
  }

  def receive = {
    case OnNext(MyUid(inUid)) =>
      uid = inUid
    case OnNext(x:Message) =>
      router ! x
    case OnNext(x:Disconnect) =>
      router ! CtrlDisconnect(uid)
    case OnNext(x:ListUsers) =>
      router ! CtrlListUsers(uid)
  }
} 
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:34,代码来源:MessageReceiver.scala


示例17: MessageReceiver

//设置package包名称以及导入依赖的类
package gatling.protocol.protoclient

import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import akka.util.ByteString
import gatling.protocol.protoclient.MessageReceiver.{ReceivedMsg, ReceivedUserList}

object MessageReceiver {
	def props(router: ActorRef): Props = Props(new MessageReceiver(router))

	case class ReceivedMsg(payLoad: String)

	case class ReceivedUserList(payLoad: String)

}

class MessageReceiver(router: ActorRef) extends ActorSubscriber {
	val MaxBufferSize = 100

	override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxBufferSize) {
		override def inFlightInternally: Int = 0 //not safe :)
	}

	def receive = {
		case OnNext(x: ByteString) => {
			val y = x.utf8String
			if(y.contains(">")) {
				router ! ReceivedMsg(y)
			}
			else if(y.contains(";")) {
				router ! ReceivedUserList(y)
			}
		}
	}
} 
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:37,代码来源:MessageReceiver.scala



注:本文中的akka.stream.actor.ActorSubscriber类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Collector类代码示例发布时间:2022-05-23
下一篇:
Scala LoggingMDCFilter类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap