IBM Books

Embedded SQL Programming Guide


Example: Extracting Large Volume of Data (largevol.c)

Although DB2 Universal Database provides excellent features for parallel query processing, the single point of connection of an application or an EXPORT command can become a bottleneck if you are extracting large volumes of data. This occurs because the passing of data from the database manager to the application is a CPU-intensive process that executes on a single node (typically a single processor as well).

DB2 Universal Database provides several methods to overcome the bottleneck, so that the volume of extracted data scales linearly per unit of time with an increasing number of processors. The following example, describes the basic idea behind these methods.

Assume that you have a table called EMPLOYEE which is stored on 20 nodes, and you generate a mailing list (FIRSTNME, LASTNAME, JOB) of all employees who are in a legitimate department (that is, WORKDEPT is not NULL).

The following query is run on each node in parallel, and then generates the entire answer set at a single node (the coordinator node):

  SELECT FIRSTNME, LASTNAME, JOB FROM EMPLOYEE WHERE WORKDEPT IS NOT NULL

But, the following query could be run on each partition in the database (that is, if there are five partitions, five separate queries are required, one at each partition). Each query generates the set of all the employee names whose record is on the particular partition where the query runs. Each local result set can be redirected to a file. The result sets then need to be merged into a single result set. (On AIX, you can use a property of Network File System (NFS) files to automate that merging. If all the partitions direct their answer sets to the same file on an NFS mount, the results are merged.)

  SELECT FIRSTNME, LASTNAME, JOB FROM EMPLOYEE WHERE WORKDEPT IS NOT NULL
                           AND NODENUMBER(NAME) = CURRENT NODE

The result can either be stored in a local file (meaning that the final result would be 20 files, each containing a portion of the complete answer set), or in a single NFS-mounted file.

The following example uses the second method, so that the result is in a single file that is NFS mounted across the 20 nodes. The NFS locking mechanism ensures serialization of writes into the result file from the different partitions. Note that this example, as presented, runs on the AIX platform with an NFS file system installed.

#define _POSIX_SOURCE
#define INCL_32
 
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sqlenv.h>
#include <errno.h>
#include <sys/access.h>
#include <sys/flock.h>
#include <unistd.h>
 
#define BUF_SIZE 1500000  /* Local buffer to store the fetched records */
#define MAX_RECORD_SIZE 80 /* >= size of one written record */
 
int main(int argc, char *argv[]) {
    
    EXEC SQL INCLUDE SQLCA;
    EXEC SQL BEGIN DECLARE SECTION;
       char dbname[10];  /* Database name (argument of the program) */
       char userid[9];
       char passwd[19];
       char first_name[21];
       char last_name[21];
       char job_code[11];
    EXEC SQL END DECLARE SECTION;
 
       struct flock unlock ;  /* structures and variables for handling */
       struct flock lock ;  /* the NFS locking mechanism */
       int lock_command ; 
       int lock_rc ; 
       int iFileHandle ;  /* output file */
       int iOpenOptions = 0 ; 
       int iPermissions ; 
       char * file_buf ;  /* pointer to the buffer where the fetched
                             records are accumulated */
       char * write_ptr ;  /* position where the next record is written */
       int buffer_len = 0 ;  /* length of used portion of the buffer */
    
    /* Initialization */
 
       lock.l_type = F_WRLCK;  /* An exclusive write lock request */
       lock.l_start = 0;  /* To lock the entire file */
       lock.l_whence = SEEK_SET;
       lock.l_len = 0;
       unlock.l_type = F_UNLCK;  /* An release lock request */
       unlock.l_start = 0;  /* To unlock the entire file */
       unlock.l_whence = SEEK_SET;
       unlock.l_len = 0;
       lock_command = F_SETLKW;  /* Set the lock */
       iOpenOptions = O_CREAT;  /* Create the file if not exist */
       iOpenOptions |= O_WRONLY;  /* Open for writing only */
	
    /* Connect to the database */
 
       if (argc == 3) {
          strcpy( dbname, argv[2] ); /* get database name from the argument */
          EXEC SQL CONNECT TO :dbname IN SHARE MODE ;
          if ( SQLCODE != 0 ) {
             printf( "Error: CONNECT TO the database failed. SQLCODE = %ld\n",
	              SQLCODE );
	      exit(1);
	  }
       }
       else if ( argc == 5 ) {
         strcpy( dbname, argv[2] ); /* get  database name from the argument */
	 strcpy (userid, argv[3]);
	 strcpy (passwd, argv[4]);
	 EXEC SQL CONNECT TO :dbname IN SHARE MODE USER :userid USING :passwd;
	 if ( SQLCODE != 0 ) {
	    printf( "Error: CONNECT TO the database failed. SQLCODE = %ld\n",
	            SQLCODE );
	      exit( 1 );
         }
       }
       else {
	   printf ("\nUSAGE: largevol txt_file database [userid passwd]\n\n");
	   exit( 1 ) ;
       } /* endif */
	
      /* Open the input file with the specified access permissions */
    
     if ( ( iFileHandle = open(argv[1], iOpenOptions, 0666 ) ) == -1 ) {
       printf( "Error: Could not open %s.\n", argv[2] ) ;
       exit( 2 ) ;
    }
    
    /* Set up error and end of table escapes */
    
    EXEC SQL WHENEVER SQLERROR GO TO ext ;
    EXEC SQL WHENEVER NOT FOUND GO TO cls ;
    
    /* Declare and open the cursor */
    
    EXEC SQL DECLARE c1 CURSOR FOR
             SELECT firstnme, lastname, job FROM employee
             WHERE workdept IS NOT NULL
             AND NODENUMBER(lastname) = CURRENT NODE;
    EXEC SQL OPEN c1 ; 
    
    /* Set up the temporary buffer for storing the fetched result */
    
    if ( ( file_buf = ( char * ) malloc( BUF_SIZE ) ) == NULL ) {
       printf( "Error: Allocation of buffer failed.\n" ) ;
       exit( 3 ) ;
    }
    memset( file_buf, 0, BUF_SIZE ) ; /* reset the buffer */
    buffer_len = 0 ;  /* reset the buffer length */
    write_ptr = file_buf ;  /* reset the write pointer */
    /* For each fetched record perform the following    */
    /*  - insert it into the buffer following the       */
    /*    previously stored record                      */
    /*  - check if there is still enough space in  the  */
    /*    buffer for the next record and lock/write/    */
    /*    unlock the file and initialize the buffer     */
    /*    if not                                        */
    
    do {
       EXEC SQL FETCH c1 INTO :first_name, :last_name, :job_code;
        buffer_len += sprintf( write_ptr, "%s %s %s\n", 
                               first_name, last_name, job_code ); 
        buffer_len = strlen( file_buf ) ;
       /* Write the content of the buffer to the file if */
       /* the buffer reaches the limit                   */
       if ( buffer_len >= ( BUF_SIZE - MAX_RECORD_SIZE ) ) {
        /*  get excl. write lock */
        lock_rc = fcntl( iFileHandle, lock_command, &lock );
		 if ( lock_rc != 0 ) goto file_lock_err;
		 /*  position at the end of file */
		 lock_rc = lseek( iFileHandle, 0, SEEK_END ); 
		if ( lock_rc < 0 ) goto file_seek_err;
		/* write the buffer */
		lock_rc = write( iFileHandle, 
                               ( void * ) file_buf, buffer_len );
		if ( lock_rc < 0 ) goto file_write_err; 
		  /* release the lock */
		 lock_rc = fcntl( iFileHandle, lock_command, &unlock );
		 if ( lock_rc != 0 ) goto file_unlock_err;
		 file_buf[0] = '\0' ;  /* reset the buffer */
		  buffer_len = 0 ;  /* reset the buffer length */
		  write_ptr = file_buf ;  /* reset the write pointer */
       }
       else {
          write_ptr = file_buf + buffer_len ;  /* next write position */
       }
    } while (1) ;
    
cls:
    /* Write the last piece of data out to the file */
    if (buffer_len > 0) {
       lock_rc = fcntl(iFileHandle, lock_command, &lock);
       if (lock_rc != 0) goto file_lock_err;
      lock_rc = lseek(iFileHandle, 0, SEEK_END);
       if (lock_rc < 0) goto file_seek_err;
       lock_rc = write(iFileHandle, (void *)file_buf, buffer_len);
       if (lock_rc < 0) goto file_write_err;
       lock_rc = fcntl(iFileHandle, lock_command, &unlock);
      if (lock_rc != 0) goto file_unlock_err;
    }
    free(file_buf);
	close(iFileHandle);
    EXEC SQL CLOSE c1;
    exit (0);
 ext:
    if ( SQLCODE != 0 )
       printf( "Error:  SQLCODE = %ld.\n", SQLCODE );
    EXEC SQL WHENEVER SQLERROR CONTINUE;
    EXEC SQL CONNECT RESET;
    if ( SQLCODE != 0 ) {
       printf( "CONNECT RESET Error:  SQLCODE = %ld\n", SQLCODE );
       exit(4);
    }
    exit (5);
 file_lock_err:
    printf("Error: file lock error = %ld.\n",lock_rc); 
    /* unconditional unlock of the file */
    fcntl(iFileHandle, lock_command, &unlock); 
    exit(6);
file_seek_err:
    printf("Error: file seek error = %ld.\n",lock_rc);
    fcntl(iFileHandle, lock_command, &unlock);  /* unconditional unlock of the file */
   exit(7);
file_write_err: 
    printf("Error: file write error = %ld.\n",lock_rc); 
    fcntl(iFileHandle, lock_command, &unlock);  /* unconditional unlock of the file */
    exit(8);
file_unlock_err: 
    printf("Error: file unlock error = %ld.\n",lock_rc);
   fcntl(iFileHandle, lock_command, &unlock);  /* unconditional unlock of the file */
    exit(9);
}

This method is applicable not only to a select from a single table, but also for more complex queries. If, however, the query requires noncollocated operations (that is, the Explain shows more than one subsection besides the Coordinator subsection), this can result in too many processes on some partitions if the query is run in parallel on all partitions. In this situation, you can store the query result in a temporary table TEMP on as many partitions as required, then do the final extract in parallel from TEMP.

If you want to extract all employees, but only for selected job classifications, you can define the TEMP table with the column names, FIRSTNME, LASTNAME, and JOB, as follows:

  INSERT INTO TEMP
  SELECT FIRSTNME, LASTNAME, JOB FROM EMPLOYEE WHERE WORKDEPT IS NOT NULL
         AND EMPNO NOT IN (SELECT EMPNO FROM EMP_ACT WHERE
                           EMPNO<200)

Then you would do the parallel extract on TEMP.

When defining the TEMP table, consider the following:

If you require the final answer set (which is the merged partial answer set from all nodes) to be sorted, you can:


[ Top of Page | Previous Page | Next Page | Table of Contents | Index ]

[ DB2 List of Books | Search the DB2 Books ]