How to interleave streams (with backpressure)

How to interleave streams (with backpressure)

By : user2953517
Date : November 21 2020, 01:01 AM
may help you . The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.
After that it was quite trivial to formalise the desired output using denotational semantics: code is on GitHub
code :
function rec(f) {
  var bus = new Bacon.Bus();
  var result = f(bus);
  return result;
function stateMachine(inputs, initState, f) {
  var mapped = inputs.map(function (input, i) {
    return input.map(function (x) {
      return [i, x];
  return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
    if (p.hasValue()) {
      p = p.value();
      return f(state, p[0], p[1]);
    } else {
      return [state, p];
function fairScheduler(streams, fn) {
  var streamsCount = streams.length;
  return rec(function (res) {
    return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
      // console.log("FAIR: " + JSON.stringify(state), i, x);

      // END event
      if (i == streamsCount && x.end) {
        var additionalCost = new Date().getTime() - x.started;

        // add cost to input stream cost center
        var updatedState = _.extend({}, state, {
          costs: updateArray(
            x.idx, function (cost) { return cost + additionalCost; }),

        if (state.queues.every(function (q) { return q.length === 0; })) {
          // if queues are empty, set running: false and don't emit any events
          return [_.extend({}, updatedState, { running: false }), []];
        } else {
          // otherwise pick a stream with
          // - non-empty queue
          // - minimal cost
          var minQueueIdx = _.chain(state.queues)
            .map(function (q, i) {
              return [q, i];
            .filter(function (p) {
              return p[0].length !== 0;
            .sortBy(function (p) {
              return state.costs[p[1]];

          // emit an event from that stream
          return [
            _.extend({}, updatedState, {
              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
              running: true,
            [new Bacon.Next({
              value: state.queues[minQueueIdx][0],
              idx: minQueueIdx,
      } else if (i < streamsCount) {
        // event from input stream
        if (state.running) {
          // if worker is running, just enquee the event
          return [
            _.extend({}, state, {
              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
        } else {
          // if worker isn't running, start it right away
          return [
            _.extend({}, state, {
              running: true,
            [new Bacon.Next({ value: x, idx: i})],
      } else {
        return [state, []];

    .flatMapConcat(function (x) {
      // map passed thru events,
      // and append special "end" event
      return fn(x).concat(Bacon.once({
        end: true,
        idx: x.idx,
        started: new Date().getTime(),
  .filter(function (x) {
    // filter out END events
    return !x.end;
  .map(".value"); // and return only value field

Share : facebook icon twitter icon
backpressure is not properly handled in akka-streams

backpressure is not properly handled in akka-streams

By : Leon de Bruin
Date : March 29 2020, 07:55 AM
it should still fix some issue I wrote a simple stream using akka-streams api assuming it will handle my source but unfortunately it doesn't. I am sure I am doing something wrong in my source. I simply created an iterator which generate very large number of elements assuming it won't matter because akka-streams api will take care of backpressure. What am I doing wrong, this is my iterator. , The problem is primarily in the line
code :
data += TimeSeriesValue(sessionId, keyName, time, fValue)
def createTimeSeries(startTime: Time, snapShotCount : Int, sessionId : UUID, keyName : String) = 
  Iterator.range(1, snapShotCount)
          .map(_ * 2)
          .map(startTime plusSeconds _)
          .map(t => TimeSeriesValue(sessionId, keyName, t, ThreadLocalRandom.current().nextLong()))

def sessionGenerator(countersPerSession : Int, sessionID : UUID) = 
  Iterator.range(1, countersPerSession)
          .map(j => s"Encoder-${sessionId.toString}-Controller.CaptureFrameCount.$j")
          .flatMap { keyName => 
    createTimeSeries(DateTime.now(), snapShotCount, sessionID, keyName)

object UUIDIterator extends Iterator[UUID] {
  def hasNext : Boolean = true
  def next() : UUID = UUID.randomUUID()

def iterateOverIDs(range : Int) = 
              .flatMap(sessionID => sessionGenerator(countersPerSession, sessionID))
Akka-streams backpressure on broadcast with async processing

Akka-streams backpressure on broadcast with async processing

By : Earl Allin
Date : March 29 2020, 07:55 AM
This might help you The reason that explains this behavior are internal buffers that are introduced by akka when async boundaries are set.
Buffers for asynchronous operators
code :
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})

  val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
  val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }

  val bcast = builder.add(Broadcast[Int](2))

  val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)

  val f2 = Flow[Int].map(_ + 10)
  val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }

  batchedIn ~> bcast ~> f2 ~> out
  bcast ~> f4 ~> out2
Akka Streams - Backpressure for Source.unfoldAsync

Akka Streams - Backpressure for Source.unfoldAsync

By : CpTeja
Date : March 29 2020, 07:55 AM
I wish this help you I think I figured it out. As I already mentioned in the edit of my question, I found this comment to an issue in Akka HTTP, where the author says:
code :
def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
  reqOption match {
    case Some(request) =>
        .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
        .map {
          case tuple@(response, multipart) =>
            if (response.status.isFailure()) Some((None, tuple))
            else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
    case None => Future.successful(None)
Implementing backpressure all the way in Node.js streams

Implementing backpressure all the way in Node.js streams

By : kishan Raj
Date : March 29 2020, 07:55 AM
this one helps. Node.js streams are very powerful and provides a lot of control on buffering in buffering and flow of the data through them.
Now to answer your questions:
Akka.net - Streams with parallelism, backpressure and ActorRef

Akka.net - Streams with parallelism, backpressure and ActorRef

By : 93593sardev
Date : March 29 2020, 07:55 AM
wish helps you About to press Post, when tried to combine previous attempts and viola!
Previous attempts with ForEachParallel failed when I tried to create the actor within, but couldn't do so in an async function. If I use an single actor previous declared, then the Tell would work, but I couldn't get the parallelism I desired.
code :
var props = new RoundRobinPool(5).Props(Props.Create<MyActor>());
var actor = Context.ActorOf(props);

flow = Source.Queue<Element>(2000,OverflowStrategy.Backpressure)            
.Select(x => {
 return new Wrapper() { Element = x, Request = ++cnt };
.To(Sink.ForEachParallel<Wrapper>(5, (s) => { actor.Tell(s); }))
Related Posts Related Posts :
  • angular 6 Please add a @NgModule annotation
  • Trying to cast element ID to class selector in jQuery
  • Swipe JS - Display 3 Slides at a time
  • Jasmine test to call function when value is null
  • Redux reducer gets string instead of object
  • Keep chrome javascript running even if page reload
  • The event.target.value is not insert the userinput into a variable
  • Exporting HTML tables to Excel (.xls) in a separate sheet
  • Jquery tooltip on dialog close button
  • access variable inside anonymous function from outside
  • How come func is called from a function?
  • How to change add and remove active class in JavaScript
  • how to toggle a specific div in javascript
  • How to get all the values from multiple keys of an array of object?
  • jQuery bind() unbind() and on() and off()
  • regular expression to find DD Month YYYY from a string
  • How to upload files from web client (HTML/jQuery form) to your Dropbox folder
  • Backbone history and require.js issue
  • Confusion over Classical and Prototypal Inheritance Javascript
  • How to take screen shot of current webpage using Javascript/JQuery?
  • Store meridiem in a variable
  • class declared inside closure vs standard class without closure
  • javascript confirm box always return true
  • dynamically changing the size of font size based on text length using css and html
  • Node.js - Getting the host IP address while on Amazon EC2
  • Confusing CSS, can anyone explain please
  • How to install v8-profiler on Windows 8 (64 bit) using npm?
  • Resolve promise based on another promise
  • google maps adding overlay layer above UI and markers
  • Style specific letter?
  • RangeError: Maximum call stack size exceeded with array.slice
  • node rest client get with local variable
  • If condition not executes while mouse move fastly
  • Count the number of displayed elements in a HTML list
  • Locate JavaScript source code in Emacs
  • Clean, Modular Code vs MV* Frameworks
  • infinite scroll without loading image
  • Backbone: reverse collection order with comparator
  • What do printers ignore?
  • jQuery UI .tabs() Contentless tab?
  • Execute Javascript alert() after page (visually) loaded
  • JavaScript - duplicating array doesn't work
  • Excessive clickable area below image
  • JavaScript Regex: Replace |b| with <b>
  • Unexpected value change in 2D array in JavaScript
  • Function doesnt see parameter as a array of objects
  • jQuery fetch keypress event on chrome and IE
  • How to enable jquery validate localization?
  • Cassandra map collection returned by node.js Helenus looks odd
  • angular ng-repeat with multiple filter options
  • Selecting Children without ID or Class names
  • How to uncheck a group of checkboxes when another checkbox is checked
  • Is hiding content by Javascript or jQuery Worth trying
  • Load XUL resource using javascript
  • XML to HTML text area from server file system
  • set focus() on textbox on form onload
  • es lint '' is assigned a value but never used' eventhough i have used it
  • Can "name" attribute be used for custom VueJS components
  • Get innerHTML of content when it is clicked
  • HTTPS causes CSS animations to not load? Very confused
  • shadow
    Privacy Policy - Terms - Contact Us © ourworld-yourmove.org