merge-sort_threaded.pl
Stable threaded merge sort of a text file with tab separated values.

One of the features in Perl's threading implementation that caught my attention was its support for thread-safe queues (see http://perldoc.perl.org/Thread/Queue.html). Its major feature is that, if a thread tries to read the next entry in a queue, but the queue is empty, then it can be instructed to wait for an entry to appear before continuing. This makes coordinating thread execution rather simple. In an event-driven scenario, a thread can just stand by, ready to leap into action as soon as an event is handed off to it.

The ability to sort is a fundamental task in computing. And the task of sorting items that can't all be held simultaneously in memory has been around since the early days of computing. Such a task confronted the mathematician John von Neumann back in 1945. Using a stack of playing cards as an analogue (if memory serves me right), he devised the merge-sort algorithm.

The algorithm follows a divide-and-conquer paradigm and is usually implemented today using recursion. The recursion stack obviates the programmer's need to keep track explicitly of all the sorted data subsets. Moreover, to tackle the problem of efficiently sorting very large data sets, variants of the algorithm, known as tiled algorithms, have been devised. They deal with subsets of the data, first sorting each in memory and then merge-sorting the new subsets.

Such a tiled algorithm can easily be implemented using a queue. The main program splits the data set into subsets and sorts each one in memory. These subsets are written back to disk with each filename being place in a queue. Concurrently, a threaded helper reads a pair of filenames from the queue and merge-sorts to a new file. This file's name is in turn placed in the queue for processing at a later stage. Execution continues until only one filename is left in the queue. This file is the desired end product.

Doing the merge-sort concurrently is not a big time saver as the whole operation is very IO-bound: it is the built-in queueing mechanism that is being exploited. One can attenuate the read-write times by only storing, sorting and merging the key and the file pointer for each record. Once the merge-sort is accomplished, a final sorted version of the original data set can be constructed. This also makes this variant stable in that, if Records A and B have the same key and A precedes B in the original data, their order will be preserved in the merge-sorted set.

The Perl script merge-sort_threaded.pl, displayed below, implements this approach. It is geared towards sorting, in lexically ascending order, very large, tab-separated data sets consisting of many fields. The code is released for personal, non-commercial and non-profit use only. The listing includes the line numbers in order to reference them in the following general remarks.

Our test case takes an hour to be processed on a Pentium IV box. As stated previously, one doesn't expect any massive speed gains here due to the IO bottleneck. However, the thread queueing mechanism is certainly a handy feature for inter-thread communication which we intend to explore further.

As always, if you have any questions regarding the code or my explanations, please do not hesitate in contacting me.


merge-sort_threaded.pl -- (Download latest version  - MD5 checksum )
001 use strict;
002 use Time::HiRes qw(gettimeofday tv_interval);
003 my $Start_time = [gettimeofday];
004 use File::Temp;
005 use Term::ANSIScreen qw|:color :cursor :screen|;
006 use Win32::Console::ANSI qw|Cursor|;
007 use threads;
008 use threads::shared;
009 use Thread::Queue;
010 use constant FATAL => "\aFATAL ERROR: ";
011 use vars qw|$RecordPtrLen|;
012 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
013 # merge-sort_threaded.pl: Stable threaded merge sort of a text file with tab separated values.
014 #===============================================================================================================================
015 #           Usage : perl merge-sort_threaded.pl INFILE OUTFILE MAXRECORDS
016 #       Arguments : INFILE     = path of the data text file to be sorted
017 #                   OUTFILE    = path of sorted data file
018 #                   MAXRECORDS = maximum number of records per initial file
019 #     Input Files : See Arguments.
020 #    Output Files : See Arguments.
021 # Temporary Files : Filename templates are mergeXXXXX.dat
022 #         Remarks : See http://www.webpraxis.ab.ca/merge/merge-sort_threaded.shtml for details.
023 #         History : v1.0.0 - February 26, 2009 - Original release.
024 #===============================================================================================================================
025 # 0) DEFINE SORT CHARACTERISTICS:
026 my $MakeCompositeKey    = sub{  my $record      = shift;                                #anonymous sub for creating a composite key
027                                 my $recordPtr   = shift;
028                                 sprintf( "%5.5d\t%s", ( split /\t/, $record )[0,8] ) .
029                                 sprintf( "\t%${RecordPtrLen}.${RecordPtrLen}lu", $recordPtr );
030                              };
031 #-------------------------------------------------------------------------------------------------------------------------------
032 # 1) INITIALIZE:
033 $| = 1;                                                                                 #set STDOUT buffer to auto-flush
034 cls();                                                                                  #clear screen
035 print colored ['black on white'], "$0\n\n\n", color 'reset';                            #display program name
036
037 my $InFile          = shift || die FATAL, 'Input file not specified';                   #get path of input file
038 my $OutFile         = shift || die FATAL, 'Output file not specified';                  #get path of output file
039 my $MaxRecordCount  = shift || die FATAL, 'Max number of records not specified';        #maximum number of records per initial file
040 $RecordPtrLen       = length( -s $InFile );                                             #establish maximum length of a record pointer
041
042 print " Problem Details:\n\n",                                                          #echo problem details
043       "\t Input File: $InFile\n",
044       "\tOutput File: $OutFile\n\n",
045       '-' x 80, "\n\n";
046 #-------------------------------------------------------------------------------------------------------------------------------
047 # 2) CREATE FILE QUEUE & LAUNCH THREAD FOR MERGING:
048 my $CursorY     :shared;                                                                #cursor row coordinate
049 my $KeysCreated :shared;                                                                #boolean flag indicating all keys created
050 my $FileQueue   = Thread::Queue->new();                                                 #create file queue of record keys
051 my $MergeThread = threads->create( \&merge );                                           #launch thread for merging keys in sorted order
052 #-------------------------------------------------------------------------------------------------------------------------------
053 # 3) CREATE FILES OF COMPOSITE RECORD KEYS AND POPULATE MERGE FILE QUEUE:
054 my $recordStart = 0;                                                                    #init file pointer for start of a data record
055 my $recordCount;                                                                        #record counter
056 my $noFiles;
057 my @keys;                                                                               #array of composite record keys
058
059 print " Process Details:\n\n";                                                          #inform user of processing start
060 $CursorY = ( Cursor() )[1];                                                             #record cursor row position
061 print   locate( $CursorY,   3 ), 'File Queue: ',
062         locate( $CursorY+2, 3 ), '      Main: ',
063         locate( $CursorY+4, 3 ), '    Thread: ', colored['bold white'], 'Enqueued ',
064         locate( $CursorY+5, 3 ), '            ', colored['bold white'], 'Dequeued ',
065         locate( $CursorY+6, 3 ), '            ', colored['bold white'], 'Dequeued ';
066
067 open( INFILE, $InFile ) || die FATAL, "Can't open data file '$InFile' - $!";            #open the data file for read
068 while( my $record = <INFILE> ) {                                                        #repeat for each input data record
069     push @keys, &$MakeCompositeKey( $record, $recordStart );                            # create & store composite key
070     $recordStart = tell;                                                                # store start of next record
071
072     next unless ( ++$recordCount == $MaxRecordCount ) or eof;                           # next record unless enough records or end of file
073     my $tmpKeys     = new File::Temp(   TEMPLATE    => 'mergeXXXXX',                    # create temp file for the keys
074                                         DIR         => $ENV{TEMP},
075                                         SUFFIX      => '.dat',
076                                         UNLINK      => 0
077                                     )
078                         || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";
079     print( $tmpKeys $keys[$_], "\n" ) for sort { $keys[$a] cmp $keys[$b] } 0..$#keys;   # output lexically ascending composite keys
080     close( $tmpKeys );                                                                  # close temp file for the keys
081     ++$noFiles;                                                                         # update file counter
082     $FileQueue->enqueue( $tmpKeys->filename );                                          # add keys file to file queue
083     print   locate( $CursorY,   15 ), clline,                                           # inform user
084             colored ['bold yellow'], $FileQueue->pending,
085             locate( $CursorY+2, 15 ), clline,
086             colored['bold white'], 'Enqueued ',
087             colored ['bold green'], "$tmpKeys";
088     @keys           = ();                                                               # clear array for the keys
089     $recordCount    = 0;                                                                # reset record count
090 }                                                                                       #until all input records processed
091 die locate( $CursorY+8, 3 ),                                                            #abort if not enough files
092     FATAL, "Specified max number of records too large!" unless $noFiles > 1;
093 $KeysCreated = 1;                                                                       #flag end of keys creation
094 #-------------------------------------------------------------------------------------------------------------------------------
095 # 4) READ SORTED KEYS & OUTPUT CORRESPONDING DATA RECORDS:
096 print   locate( $CursorY+2, 15 ), clline,                                               #inform user
097         colored ['bold red'], 'Waiting...';
098 my $sortedKeysFile = $MergeThread->join();                                              #get filename of resulting sorted keys
099 print   locate( $CursorY+2, 15 ), clline,                                               #inform user
100         colored ['bold red'], "Creating sorted output file...";
101 open( OUTFILE, ">$OutFile" )                                                            #open output file for write
102  || die FATAL, "Can't create output file '$OutFile' - $!";
103 open( KEYS, $sortedKeysFile )                                                           #open keys file for read
104  || die FATAL, "Can't read last temporary file '$sortedKeysFile' - $!";
105 while( <KEYS> ) {                                                                       #repeat for each key
106     seek INFILE, ( split /\t/ )[2], 0;                                                  # position file pointer for INFILE
107     print OUTFILE scalar <INFILE>;                                                      # output data record
108 }                                                                                       #until all keys processed
109 close( OUTFILE );                                                                       #close sorted data file
110 close( INFILE );                                                                        #close input data file
111 close( KEYS );                                                                          #close keys file
112 unlink $sortedKeysFile;                                                                 #delete keys file
113 print   locate( $CursorY+2, 15 ), clline,                                               #inform user
114         colored ['bold red'], "\aDone!",
115         locate( $CursorY+8, 3 ),
116         colored ['bold white'],
117         "Elapsed time: ", tv_interval( $Start_time ), "\n";                             #report total elapsed time
118 exit;
119 #===== SUBROUTINES =============================================================================================================
120 #            Usage : &merge()
121 #          Purpose : Merge enqueued sorted key files. Returns name of last remaining enqueued file.
122 #        Arguments : None.
123 #  Temporary Files : Filename templates are mergeXXXXX.dat
124 #        Externals : $FileQueue
125 # Shared Externals : $CursorY, $KeysCreated
126 #             Subs : None.
127 #          Remarks : None.
128 #          History : v1.0.0 - February 26, 2009 - Original release.
129
130 sub merge {                                                                             #begin sub
131     my $sourceKeys1;                                                                    # filename for 1st keys file to be merged
132     my $sourceKeys2;                                                                    # filename for 2nd keys file to be merged
133     my $tmpKeys;                                                                        # filename for merged keys file
134
135     do {
136         $sourceKeys1 = $FileQueue->dequeue();                                           # get 1st filename
137         $sourceKeys2 = $FileQueue->dequeue();                                           # get 2nd filename
138         print   locate( $CursorY+5, 24 ), clline,                                       # inform user
139                 colored ['bold yellow'], "$sourceKeys1",
140                 locate( $CursorY+6, 24 ), clline,
141                 colored ['bold yellow'], "$sourceKeys2",
142                 locate( $CursorY,   15 ), clline,
143                 colored ['bold yellow'], $FileQueue->pending;
144         open( SOURCEKEYS1,      $sourceKeys1    )                                       #  open 1st keys file for read
145          || die FATAL, "Can't read temporary file '$sourceKeys1' - $!";
146         open( SOURCEKEYS2,      $sourceKeys2    )                                       #  open 2nd keys file for read
147          || die FATAL, "Can't read temporary file '$sourceKeys2' - $!";
148
149         $tmpKeys    = new File::Temp(   TEMPLATE    => 'mergeXXXXX',                    #  create temp file for the merged keys
150                                         DIR         => $ENV{TEMP},
151                                         SUFFIX      => '.dat',
152                                         UNLINK      => 0
153                                     )
154                         || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";
155
156         my ( $key1, $key2 ) = ( undef, undef );                                         #  clear the key from each file
157         until(  ( !defined $key1 && eof SOURCEKEYS1 ) ||                                #  repeat
158                 ( !defined $key2 && eof SOURCEKEYS2 )
159              ) {
160             $key1   = <SOURCEKEYS1> unless defined $key1;                               #   get the next key in 1st file
161             $key2   = <SOURCEKEYS2> unless defined $key2;                               #   get the next key in 2nd file
162             if( $key1 lt $key2 ) {                                                      #   case of 1st key less than 2nd one
163                 print $tmpKeys  $key1;                                                  #    add key from 1st file to new temp key file
164                 undef $key1;                                                            #    clear the current key from 1st file
165             } else {                                                                    #   case of 2nd key less than 1st one
166                 print $tmpKeys  $key2;                                                  #    add key from 2nd file to new temp key file
167                 undef $key2;                                                            #    clear the current key from 2nd file
168             }                                                                           #   end case of keys ordering
169         }                                                                               #  until no more keys to process from either file
170         unless( !defined $key1 && eof SOURCEKEYS1 ) {                                   #  if the 1st file has some unprocessed keys
171             print $tmpKeys $key1 if defined $key1;                                      #   add any unprocessed read key to new temp file
172             print $tmpKeys  <SOURCEKEYS1>;                                              #   add any unread keys to new temp file
173         } else {                                                                        #  else the 2nd file has some unprocessed keys
174             print $tmpKeys $key2 if defined $key2;                                      #   add any unprocessed read key to new temp file
175             print $tmpKeys  <SOURCEKEYS2>;                                              #   add any unread keys to new temp file
176         }                                                                               #  end if-else
177         close SOURCEKEYS1;                                                              #  close the 1st file
178         close SOURCEKEYS2;                                                              #  close the 2nd file
179         close $tmpKeys;                                                                 #  close the new temp file
180
181         unlink $sourceKeys1, $sourceKeys2;                                              #  delete source temp files
182         $FileQueue->enqueue( $tmpKeys->filename );
183         print   locate( $CursorY,   15 ), clline,                                       #  inform user
184                 colored ['bold yellow'], $FileQueue->pending,
185                 locate( $CursorY+4, 24 ), clline,
186                 colored ['bold green'], "$tmpKeys",
187                 locate( $CursorY+5, 24 ), clline,
188                 locate( $CursorY+6, 24 ), clline;
189     } until ($KeysCreated == 1) && ($FileQueue->pending == 1);                          # until only 1 file left in queue
190     $tmpKeys->filename;                                                                 # return filepath of resulting file
191 }                                                                                       #end sub merge
192 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
193 # end of merge-sort_threaded.pl
			

© 2024 Webpraxis Consulting Ltd. – ALL RIGHTS RESERVED.

Valid HTML 4.01 Transitional Valid CSS!