Soru Kaynaklara dinamik olarak elemanlar nasıl eklenir?


Bağlanmamış bir kaynak oluşturmak ve onunla çalışmak için örnek kodum var:

nesne Ana {

 def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }

}

Uygulayan bir sınıf oluşturmak istiyorum:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}

Ve birden fazla iş parçacığıyla kullanmam gerekiyor, örneğin:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will be infinite loop
        mySources.addToSource(i.toString)
    }
  }
} 

Ve beklenen tam kod:

object Main {
  def main(args : Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) {
      (new MyThread(sources)).start()
    }

    val source = sources.getSource()

    source.runForeach((item:String) => { println(item) })
    .onComplete{ _ => system.shutdown() }
  }
}

Nasıl uygulanır MySources?


21
2018-03-16 09:05


Menşei




Cevaplar:


Sonlu olmayan bir kaynağa sahip olmanın bir yolu, kaynak olarak özel bir tür aktör kullanmaktır. ActorPublisher kişisel özellik. Bu tür aktörlerden birini oluşturursanız ve daha sonra ActorPublisher.applyReaktif Akımlarla sonuçlanırsınız Publisher örnek ve bununla birlikte apply itibaren Source üretmek için Source ondan. Bundan sonra, sadece ActorPublisher sınıf, alt akış elemanlarını göndermek için Reaktif Akış protokolünü düzgün bir şekilde kullanır ve gitmekte fayda vardır. Çok önemsiz bir örnek şu şekildedir:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App{

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorFlowMaterializer()

  val actorRef = system.actorOf(Props[ActorBasedSource])
  val pub = ActorPublisher[Int](actorRef)

  Source(pub).
    map(_ * 2).
    runWith(Sink.foreach(println))

  for(i <- 1 until 20){
    actorRef ! i.toString
    Thread.sleep(1000)
  }

}

class ActorBasedSource extends Actor with ActorPublisher[Int]{
  import ActorPublisherMessage._
  var items:List[Int] = List.empty

  def receive = {
    case s:String =>
      if (totalDemand == 0) 
        items = items :+ s.toInt
      else
        onNext(s.toInt)    

    case Request(demand) =>  
      if (demand > items.size){
        items foreach (onNext)
        items = List.empty
      }
      else{
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }


}

18
2018-03-16 12:47



Sanırım Request'ler else, çizgi items foreach (onNext) olmalı send foreach (onNext) - mdm
@mdm, evet haklısın. Ben düzenleyeceğim Teşekkürler. - cmbaxter
Şu anda daha güvenli (sınırlı) bir çözüm var. Source.actorPublisher ki bir ActorRef Hangi özel ne ile çok benzer bir şey yapan bir oyuncu tarafından desteklenmektedir ActorBasedSource yapar. Örneğin. aktörünüzün uygun bir yaşam döngüsü yoktur ve doğru olması zor olan çoklu materyalizasyonlarla çalışmaz. - jrudolph
@jrudolph, sen haklısın. Bu cevaptan bu yana biraz değişti. Kısa bir süre yeni yolu gösteren bir düzenleme ekleyeceğim. Teşekkürler. - cmbaxter
Teşekkürler, çok yararlı. ActorBasedSource çok genel görünüyor, varsayılan satın alma Akka-Streams tarafından sağlanan değil mi şaşırtıcı mı? - Loic


Akka Streams 2 ile bir sourceQueue kullanabilirsiniz: Daha sonra bir yöntem çağrısı yoluyla öğeler alabilecek bir Kaynak nasıl oluşturulur?


10
2018-06-19 06:33