Tuesday, October 9, 2012

Gnuradio scheduler : Part-2 How a thread of each block works


The TPB scheduler (the default one) generates a thread for each block whose entry starts from the constructor of class gr_tpb_thread_body

Lets see the constructor in the file gr_tpb_thread_body.cc
You can locate it in

gnuradio-3.3.0/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc 

 gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
  : d_exec(block)
{
  // std::cerr << "gr_tpb_thread_body: " << block << std::endl;

  gr_block_detail *d = block->detail().get();
  gr_block_executor::state s;
  pmt_t msg;


main loop of the thread starts from here

while(1)

First the thread processes all signals

boost::this_thread::interruption_point();

    // handle any queued up messages
    while ((msg = d->d_tpb.delete_head_nowait()))
      block->handle_msg(msg);

    d->d_tpb.clear_changed();

    s = d_exec.run_one_iteration();

This run_one_iteration() is the main function call of the block and it finishes the main functionality of the block in

s = d_exec.run_one_iteration();

s is the return result of the run_one_iteration() 

The next thing is to just act on the result of different outcomes according to switch case :

    switch(s){
    case gr_block_executor::READY:        // Tell neighbors we made progress.
      d->d_tpb.notify_neighbors(d);
      break;

    case gr_block_executor::READY_NO_OUTPUT:    // Notify upstream only
      d->d_tpb.notify_upstream(d);
      break;

    case gr_block_executor::DONE:        // Game over.
      d->d_tpb.notify_neighbors(d);
      return;

    case gr_block_executor::BLKD_IN:        // Wait for input.
      {
    gruel::scoped_lock guard(d->d_tpb.mutex);
    while (!d->d_tpb.input_changed){
     
      // wait for input or message
      while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
        d->d_tpb.input_cond.wait(guard);

      // handle all pending messages

      while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
        guard.unlock();            // release lock while processing msg
        block->handle_msg(msg);
        guard.lock();
      }
    }
      }
      break;

     
    case gr_block_executor::BLKD_OUT:        // Wait for output buffer space.
      {
    gruel::scoped_lock guard(d->d_tpb.mutex);
    while (!d->d_tpb.output_changed){
     
      // wait for output room or message
      while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
        d->d_tpb.output_cond.wait(guard);

      // handle all pending messages
      while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
        guard.unlock();            // release lock while processing msg
        block->handle_msg(msg);
        guard.lock();
      }
    }
      }
      break;

    default:
      assert(0);
    }



So we can see that run_one_iteration() is the key in the whole thread and it includes the major functionality of the block. Lets see its code
gr_block_executor.cc

you can locate it in

gnuradio-3.3.0/gnuradio-core/src/lib/runtime/gr_block_executor.cc

The code is kind of two long but overall it does following :

1. Whether there exist sufficient data for output. NO -> return BLKD_OUT

 if (noutput_items == 0){        // we're output blocked
      LOG(*d_log << "  BLKD_OUT\n");
      return BLKD_OUT;


2. Whether there are sufficient data available.  No -> return BLKD_IN

    int i;
    for (i = 0; i < d->ninputs (); i++)
      if (d_ninput_items_required[i] > d_ninput_items[i])    // not enough
    break;

    if (i < d->ninputs ()){            // not enough input on input[i]
      // if we can, try reducing the size of our output request
      if (noutput_items > m->output_multiple ()){
    noutput_items /= 2;
    noutput_items = round_up (noutput_items, m->output_multiple ());
    goto try_again;
      }

      // We're blocked on input
      LOG(*d_log << "  BLKD_IN\n");
      if (d_input_done[i])     // If the upstream block is done, we're done
    goto were_done;

      // Is it possible to ever fulfill this request?
      if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
    // Nope, never going to happen...
    std::cerr << "\nsched: name()
          << " (" << m->unique_id() << ")>"
          << " is requesting more input data\n"
          << "  than we can provide.\n"
          << "  ninput_items_required = "
          << d_ninput_items_required[i] << "\n"
          << "  max_possible_items_available = "
          << d->input(i)->max_possible_items_available() << "\n"
          << "  If this is a filter, consider reducing the number of taps.\n";
    goto were_done;
      }

      return BLKD_IN;
    }


3. If there are sufficient input data and sufficient output space, the ocde runs the actual work of the block i.e. general_work()

    // We've got enough data on each input to produce noutput_items.
    // Finish setting up the call to work.

    for (int i = 0; i < d->ninputs (); i++)
      d_input_items[i] = d->input(i)->read_pointer();

  setup_call_to_work:

    d->d_produce_or = 0;
    for (int i = 0; i < d->noutputs (); i++)
      d_output_items[i] = d->output(i)->write_pointer();

    // Do the actual work of the block
    int n = m->general_work (noutput_items, d_ninput_items,
                 d_input_items, d_output_items);
    LOG(*d_log << "  general_work: noutput_items = " << noutput_items
    << " result = " << n << std::endl);

    if (n == gr_block::WORK_DONE)
      goto were_done;

    if (n != gr_block::WORK_CALLED_PRODUCE)
      d->produce_each (n);    // advance write pointers
   
    if (d->d_produce_or > 0)    // block produced something
      return READY;

    // We didn't produce any output even though we called general_work.
    // We have (most likely) consumed some input.

    // If this is a source, it's broken.
    if (d->source_p()){
      std::cerr << "gr_block_executor: source " << m
        << " produced no output.  We're marking it DONE.\n";
      // FIXME maybe we ought to raise an exception...
      goto were_done;
    }



So briefly summarize how each thread in gnuradio core works :

1. A thread for each block has a while(1) loop

2. The loop processes signals and run the key function run_one_iteration()

3. run_one_iteration() checks if there are sufficient data at the input and sufficient space for the output of the block

4. If yes, the general_work() is called to run the main functionality of the block.

5. If no, return BLKD_OUT, BLKD_IN or others

2 comments:

  1. Hi
    We are 2 final year undergraduate students. We have our BTech Project on cooperative communication and for that we have to set up the experiment of USRP kits. But the transmission and reception is not working, probably because the image on the SD Card is corrupted. Any insights that you could provide in this direction would be very helpful. We are kind of stuck and wanted to discuss these problems with you after stumbling upon your blog.

    Thanks in advance
    Sanchit Jain
    jn.sanchit@gmail.com

    ReplyDelete
  2. If you are using USRP2 you can always reload the fresh image.

    ReplyDelete