Thursday, January 28, 2010

New blog posts on ddj.com

I am writing today to mention that two new blog posts are available on www.ddj.com/go-parallel relating to using a QuickThread two stage parallel pipeline to improve the performance of the input end and output end of an application (as opposed to 3 stage design.

The typical 3-stage pipeline is:

MyPipelineIOContext pio;
pio.Init(your init args here);
qtPipeline< MyPipelineIOContext, MyPipelineBuffer > pipeline;
pipeline.addPipe(InputPipe);
pipeline.addPipe(DoWorkPipe);
pipeline.addPipe(OutputPipe);
pipeline.run(&pio);
pipeline.WaitTillDone();

The 3-stage pipeline works quite well for most applications. However, there are certain classes of applications where a single linear pipeline is unsuitable. One such example was referenced in the ddj.com/go-parallel blog. For this you might find sandwidching your work phase between two 2-stage pipelines:

// 2-stage pipeline to suck-in and prep data
MyInputPipelineIOContext pio_in;
pio_in.Init(your init args here);
qtPipeline< MyInputPipelineIOContext, MyInputPipelineBuffer > pipeline_in;
pipeline_in.addPipe(InputPipe); // I/O
pipeline_in.addPipe(DoInputPrepWorkPipe); // compute
pipeline_in.run(&pio_in);
pipeline_in.WaitTillDone();
// input complete (back to serial code)

// start work (which has parallel constructs)
DoWorkOutsideOfPipeline();
// done with work

// use 2-stage pipeline to output results
MyOutputPipelineIOContext pio_out;
pio_out.Init(your init args here);
qtPipeline< MyOutputPipelineIOContext, MyOutputPipelineBuffer > pipeline_out;
pipeline_out.addPipe(DoOutputPrepWorkPipe); // compute
pipeline_out.addPipe(OutputPipe); // I/O
pipeline_out.run(&pio_out);
pipeline_out.WaitTillDone();

The prep work for input might consist of converting ASCII text input into binary format. The prep work for output migh be converting binary format to ASCII text. Although you are free to use as you see fit.

Jim

3 comments:

  1. I forgot to mention: typically a 3-stage pipeline assumes (requires) buffers to pass through the pipeline. i.e. The number of buffers read is the number of buffers written (although data varies). When this is not the case, and when using a 3-stage pipeline, you can accomidate differing number of writes from reads by use of flags. e.g. bool nothingToOuptput.

    When the input is substantially different from the output you will find it easier to code using two 2-stage pipelines.

    Also note, the "DoWorkOutsideOfPipeline();" can be contained within the compute pipe of one of the two 2-stage pipelines.

    Jim Dempsey
    www.quickthreadprogramming.com

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. hey i really need help in the code of producer consumer in open mp to work i have written a code as follows if u get time please correct it ok

    // producerconsumeropenmp.cpp : Defines the entry point for the console application.
    //

    #include "stdafx.h"
    #include
    #include
    #include
    #include
    #include
    #include
    #define NUM 10 //buffer size
    #define NUM_DATA 20000 //number of data

    int main()
    {

    int buffer[NUM];
    int start; //index of the first element a consumer has to consume
    int end; //index of the first free position in the buffer
    int PTemp; //the new element the producer has to insert in the buffer
    int CTemp;
    int full,empty;
    int dataNum; //number of data in the buffer
    int np,nc,finished;
    double time_start;

    PTemp=CTemp=start=end=full=dataNum=np=nc=finished=0;
    empty=1;

    np=1; //number of producers
    nc=1; //number of consumers
    printf("np = %d ; nc = %d\n",np,nc);

    omp_set_nested(2);

    time_start=clock();

    #pragma omp parallel sections num_threads(nc+np)
    {
    #pragma omp sections nowait
    {
    #pragma omp section //producer's code
    while(PTemp0)
    {
    while(empty)
    {
    #pragma omp critical (control)
    if(dataNum>0){
    empty=0;
    }
    }
    #pragma omp critical (control)
    if(dataNum>0){
    CTemp=buffer[start];
    if(CTemp>=NUM_DATA){finished=1;}
    dataNum--;
    printf("CONSUMER %d: consumed data = %d (dataNum= %d)\n",omp_get_thread_num(),CTemp,dataNum);
    start=(start+1)%NUM;
    }
    else{
    empty=1;
    }
    }
    }
    }
    printf("Time = %.0f millisec\n\n",clock()-time_start);
    return 0;
    }

    ReplyDelete