/* Producer / consumer example with command line inputs. Sergey V. 2021. gcc pc1.c -pthread -o pc1 ./pc1 30 supplier1.txt consumer1.txt consumer2.txt // supplier1.txt Supplier1 1 20 2 // consumer1.txt Jane 5 10 4 // consumer2.txt Alice 5 10 4 */ #include #include #include #include #include #include pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t condition = PTHREAD_COND_INITIALIZER; int workCount = 0; int MAX_UNITS; int NUM_CONSUMERS; int NUM_SUPPLIERS; typedef struct{ char* name; int num; int interval; int repetitions; int size; }SUPPLIER; typedef struct{ char* name; int num; int interval; int repetitions; int size; }CONSUMER; void *supplierThread( void *arg ) { int i, j, ret, nnum, ninterval, nreps, nsize; double result=0.0; pthread_detach( pthread_self() ); //int tid = pthread_getthreadid_np(); nnum = ((SUPPLIER *)arg)->num; ninterval = ((SUPPLIER *)arg)->interval; nreps = ((SUPPLIER *)arg)->repetitions; nsize = ((SUPPLIER *)arg)->size; printf("Supplier %x (%d): Started\n", (unsigned int)pthread_self(), nnum); printf("nnum: %d\n", nnum); printf("num: %d\n", ((SUPPLIER *)arg)->num); printf("name: %s\n", ((SUPPLIER *)arg)->name); printf("interval: %d\n", ((SUPPLIER *)arg)->interval); printf("repetitions: %d\n", ((SUPPLIER *)arg)->repetitions); printf("size: %d\n", ((SUPPLIER *)arg)->size); for ( i = 0 ; i < nreps ; i++ ) { ret = pthread_mutex_lock( &cond_mutex ); if (ret == 0) { printf( "Supplier: Creating work (total: %d)\n", workCount ); if(workCount + nsize <= MAX_UNITS) workCount = workCount + nsize; pthread_cond_broadcast( &condition ); pthread_mutex_unlock( &cond_mutex ); } else { assert( 0 ); } sleep(ninterval); } printf("Supplier finished\n"); //pthread_cond_broadcast( &condition ); pthread_exit( NULL ); } void *consumerThread( void *arg ) { int ret, nnum, ninterval, nrepetitions, nsize; nnum = ((SUPPLIER *)arg)->num; ninterval = ((SUPPLIER *)arg)->interval; nrepetitions = ((SUPPLIER *)arg)->repetitions; nsize = ((SUPPLIER *)arg)->size; printf("Consumer %x (%d): Started\n", (unsigned int)pthread_self(), nnum); printf("num: %d\n", ((CONSUMER *)arg)->num); printf("name: %s\n", ((CONSUMER *)arg)->name); printf("interval: %d\n", ((CONSUMER *)arg)->interval); printf("repetitions: %d\n", ((CONSUMER *)arg)->repetitions); printf("size: %d\n", ((CONSUMER *)arg)->size); pthread_detach( pthread_self() ); ret = 0; while( 1 ) { assert( pthread_mutex_lock( &cond_mutex ) == 0 ); if (ret == 0) { if (workCount > 0) { workCount = workCount - nsize; if(workCount >= 0){ printf( "Consumer %x (%d): Took work amount %d, total left is %d\n", (unsigned int)pthread_self(), nnum, nsize, workCount ); } else{ printf("No more work product left\n"); } } assert( pthread_mutex_unlock( &cond_mutex ) == 0 ); } sleep(ninterval); //ret = pthread_cond_wait( &condition, &cond_mutex ); } printf( "Consumer %x: Finished\n", (unsigned int)pthread_self() ); pthread_exit( NULL ); } int main (int argc, char * argv []) { int i; NUM_SUPPLIERS = 1; NUM_CONSUMERS = 2; pthread_t consumers[NUM_CONSUMERS]; pthread_t suppliers[NUM_SUPPLIERS]; /* for file I/O */ FILE *fp1, *fp2, *fp3; char *input = (char*)malloc(100*sizeof(char)); size_t len = 0; ssize_t read; int nline; CONSUMER* consumerds = malloc(NUM_CONSUMERS * sizeof(CONSUMER)); SUPPLIER* supplierds = malloc(NUM_SUPPLIERS * sizeof(SUPPLIER)); CONSUMER* consd; SUPPLIER* suppd; /* scan supplier and consumer files */ if(argc != 5){ printf("supply max units, supplier0, consumer0, consumer2 config files\n"); return -1; } MAX_UNITS = atoi(argv[1]); printf("MAX UNITS = %d\n", MAX_UNITS); fp1 = fopen(argv[2], "r"); nline = 0; supplierds[0].num = 0; while ((read = getline(&input, &len, fp1)) != -1) { nline++; if(nline == 1){ supplierds[0].name = (char*)malloc(15*sizeof(char)); strcpy(supplierds[0].name, input); } if(nline == 2){ supplierds[0].interval = atoi(input); } if(nline == 3){ supplierds[0].repetitions = atoi(input); } if(nline == 4){ supplierds[0].size = atoi(input); } //printf("input = %s\n", input); } fclose(fp1); fp2 = fopen(argv[3], "r"); nline = 0; consumerds[0].num = 0; while ((read = getline(&input, &len, fp1)) != -1) { nline++; if(nline == 1){ consumerds[0].name = (char*)malloc(15*sizeof(char)); strcpy(consumerds[0].name, input); } if(nline == 2){ consumerds[0].interval = atoi(input); } if(nline == 3){ consumerds[0].repetitions = atoi(input); } if(nline == 4){ consumerds[0].size = atoi(input); } } fclose(fp2); fp3 = fopen(argv[4], "r"); nline = 0; consumerds[1].num = 1; while ((read = getline(&input, &len, fp1)) != -1) { nline++; if(nline == 1){ consumerds[1].name = (char*)malloc(15*sizeof(char)); strcpy(consumerds[1].name, input); } if(nline == 2){ consumerds[1].interval = atoi(input); } if(nline == 3){ consumerds[1].repetitions = atoi(input); } if(nline == 4){ consumerds[1].size = atoi(input); } } fclose(fp3); /* Spawn the consumer threads */ for ( i = 0 ; i < NUM_CONSUMERS ; i++ ) { printf("launch consumer %d\n", i); pthread_create(&consumers[i], NULL, consumerThread, (void *)(consumerds+i)); } /* Spawn the supplier threads */ for ( i = 0 ; i < NUM_SUPPLIERS ; i++ ) { printf("launch supplier %d\n", i); pthread_create(&suppliers[i], NULL, supplierThread, (void *)(supplierds+i)); } for ( i = 0 ; i < NUM_SUPPLIERS ; i++ ) { pthread_join( suppliers[i], NULL ); } while ((workCount > 0)); for ( i = 0 ; i < NUM_CONSUMERS ; i++ ) { pthread_cancel( consumers[i] ); } pthread_mutex_destroy( &cond_mutex ); pthread_cond_destroy( &condition ); return 0; }