One of the features in Perl's threading implementation that caught my attention was its support for thread-safe queues (see http://perldoc.perl.org/Thread/Queue.html). Its major feature is that, if a thread tries to read the next entry in a queue, but the queue is empty, then it can be instructed to wait for an entry to appear before continuing. This makes coordinating thread execution rather simple. In an event-driven scenario, a thread can just stand by, ready to leap into action as soon as an event is handed off to it.
The ability to sort is a fundamental task in computing. And the task of sorting items that can't all be held simultaneously in memory has been around since the early days of computing. Such a task confronted the mathematician John von Neumann back in 1945. Using a stack of playing cards as an analogue (if memory serves me right), he devised the merge-sort algorithm.
The algorithm follows a divide-and-conquer paradigm and is usually implemented today using recursion. The recursion stack obviates the programmer's need to keep track explicitly of all the sorted data subsets. Moreover, to tackle the problem of efficiently sorting very large data sets, variants of the algorithm, known as tiled algorithms, have been devised. They deal with subsets of the data, first sorting each in memory and then merge-sorting the new subsets.
Such a tiled algorithm can easily be implemented using a queue. The main program splits the data set into subsets and sorts each one in memory. These subsets are written back to disk with each filename being place in a queue. Concurrently, a threaded helper reads a pair of filenames from the queue and merge-sorts to a new file. This file's name is in turn placed in the queue for processing at a later stage. Execution continues until only one filename is left in the queue. This file is the desired end product.
Doing the merge-sort concurrently is not a big time saver as the whole operation is very IO-bound: it is the built-in queueing mechanism that is being exploited. One can attenuate the read-write times by only storing, sorting and merging the key and the file pointer for each record. Once the merge-sort is accomplished, a final sorted version of the original data set can be constructed. This also makes this variant stable in that, if Records A and B have the same key and A precedes B in the original data, their order will be preserved in the merge-sorted set.
The Perl script merge-sort_threaded.pl, displayed below, implements this approach. It is geared towards sorting, in lexically ascending order, very large, tab-separated data sets consisting of many fields. The code is released for personal, non-commercial and non-profit use only. The listing includes the line numbers in order to reference them in the following general remarks.
perl merge-sort_threaded.pl K:/temp/test.txt C:/temp/test.out 100000
which requests that the 1.53Gbyte input file "K:/temp/test.txt" be sorted to the output file "C:/temp/test.out" using at most 100,000 records for each initial key file.Our test case takes an hour to be processed on a Pentium IV box. As stated previously, one doesn't expect any massive speed gains here due to the IO bottleneck. However, the thread queueing mechanism is certainly a handy feature for inter-thread communication which we intend to explore further.
As always, if you have any questions regarding the code or my explanations, please do not hesitate in contacting me.
001 use strict; 002 use Time::HiRes qw(gettimeofday tv_interval); 003 my $Start_time = [gettimeofday]; 004 use File::Temp; 005 use Term::ANSIScreen qw|:color :cursor :screen|; 006 use Win32::Console::ANSI qw|Cursor|; 007 use threads; 008 use threads::shared; 009 use Thread::Queue; 010 use constant FATAL => "\aFATAL ERROR: "; 011 use vars qw|$RecordPtrLen|; 012 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================ 013 # merge-sort_threaded.pl: Stable threaded merge sort of a text file with tab separated values. 014 #=============================================================================================================================== 015 # Usage : perl merge-sort_threaded.pl INFILE OUTFILE MAXRECORDS 016 # Arguments : INFILE = path of the data text file to be sorted 017 # OUTFILE = path of sorted data file 018 # MAXRECORDS = maximum number of records per initial file 019 # Input Files : See Arguments. 020 # Output Files : See Arguments. 021 # Temporary Files : Filename templates are mergeXXXXX.dat 022 # Remarks : See http://www.webpraxis.ab.ca/merge/merge-sort_threaded.shtml for details. 023 # History : v1.0.0 - February 26, 2009 - Original release. 024 #=============================================================================================================================== 025 # 0) DEFINE SORT CHARACTERISTICS: 026 my $MakeCompositeKey = sub{ my $record = shift; #anonymous sub for creating a composite key 027 my $recordPtr = shift; 028 sprintf( "%5.5d\t%s", ( split /\t/, $record )[0,8] ) . 029 sprintf( "\t%${RecordPtrLen}.${RecordPtrLen}lu", $recordPtr ); 030 }; 031 #------------------------------------------------------------------------------------------------------------------------------- 032 # 1) INITIALIZE: 033 $| = 1; #set STDOUT buffer to auto-flush 034 cls(); #clear screen 035 print colored ['black on white'], "$0\n\n\n", color 'reset'; #display program name 036 037 my $InFile = shift || die FATAL, 'Input file not specified'; #get path of input file 038 my $OutFile = shift || die FATAL, 'Output file not specified'; #get path of output file 039 my $MaxRecordCount = shift || die FATAL, 'Max number of records not specified'; #maximum number of records per initial file 040 $RecordPtrLen = length( -s $InFile ); #establish maximum length of a record pointer 041 042 print " Problem Details:\n\n", #echo problem details 043 "\t Input File: $InFile\n", 044 "\tOutput File: $OutFile\n\n", 045 '-' x 80, "\n\n"; 046 #------------------------------------------------------------------------------------------------------------------------------- 047 # 2) CREATE FILE QUEUE & LAUNCH THREAD FOR MERGING: 048 my $CursorY :shared; #cursor row coordinate 049 my $KeysCreated :shared; #boolean flag indicating all keys created 050 my $FileQueue = Thread::Queue->new(); #create file queue of record keys 051 my $MergeThread = threads->create( \&merge ); #launch thread for merging keys in sorted order 052 #------------------------------------------------------------------------------------------------------------------------------- 053 # 3) CREATE FILES OF COMPOSITE RECORD KEYS AND POPULATE MERGE FILE QUEUE: 054 my $recordStart = 0; #init file pointer for start of a data record 055 my $recordCount; #record counter 056 my $noFiles; 057 my @keys; #array of composite record keys 058 059 print " Process Details:\n\n"; #inform user of processing start 060 $CursorY = ( Cursor() )[1]; #record cursor row position 061 print locate( $CursorY, 3 ), 'File Queue: ', 062 locate( $CursorY+2, 3 ), ' Main: ', 063 locate( $CursorY+4, 3 ), ' Thread: ', colored['bold white'], 'Enqueued ', 064 locate( $CursorY+5, 3 ), ' ', colored['bold white'], 'Dequeued ', 065 locate( $CursorY+6, 3 ), ' ', colored['bold white'], 'Dequeued '; 066 067 open( INFILE, $InFile ) || die FATAL, "Can't open data file '$InFile' - $!"; #open the data file for read 068 while( my $record = <INFILE> ) { #repeat for each input data record 069 push @keys, &$MakeCompositeKey( $record, $recordStart ); # create & store composite key 070 $recordStart = tell; # store start of next record 071 072 next unless ( ++$recordCount == $MaxRecordCount ) or eof; # next record unless enough records or end of file 073 my $tmpKeys = new File::Temp( TEMPLATE => 'mergeXXXXX', # create temp file for the keys 074 DIR => $ENV{TEMP}, 075 SUFFIX => '.dat', 076 UNLINK => 0 077 ) 078 || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!"; 079 print( $tmpKeys $keys[$_], "\n" ) for sort { $keys[$a] cmp $keys[$b] } 0..$#keys; # output lexically ascending composite keys 080 close( $tmpKeys ); # close temp file for the keys 081 ++$noFiles; # update file counter 082 $FileQueue->enqueue( $tmpKeys->filename ); # add keys file to file queue 083 print locate( $CursorY, 15 ), clline, # inform user 084 colored ['bold yellow'], $FileQueue->pending, 085 locate( $CursorY+2, 15 ), clline, 086 colored['bold white'], 'Enqueued ', 087 colored ['bold green'], "$tmpKeys"; 088 @keys = (); # clear array for the keys 089 $recordCount = 0; # reset record count 090 } #until all input records processed 091 die locate( $CursorY+8, 3 ), #abort if not enough files 092 FATAL, "Specified max number of records too large!" unless $noFiles > 1; 093 $KeysCreated = 1; #flag end of keys creation 094 #------------------------------------------------------------------------------------------------------------------------------- 095 # 4) READ SORTED KEYS & OUTPUT CORRESPONDING DATA RECORDS: 096 print locate( $CursorY+2, 15 ), clline, #inform user 097 colored ['bold red'], 'Waiting...'; 098 my $sortedKeysFile = $MergeThread->join(); #get filename of resulting sorted keys 099 print locate( $CursorY+2, 15 ), clline, #inform user 100 colored ['bold red'], "Creating sorted output file..."; 101 open( OUTFILE, ">$OutFile" ) #open output file for write 102 || die FATAL, "Can't create output file '$OutFile' - $!"; 103 open( KEYS, $sortedKeysFile ) #open keys file for read 104 || die FATAL, "Can't read last temporary file '$sortedKeysFile' - $!"; 105 while( <KEYS> ) { #repeat for each key 106 seek INFILE, ( split /\t/ )[2], 0; # position file pointer for INFILE 107 print OUTFILE scalar <INFILE>; # output data record 108 } #until all keys processed 109 close( OUTFILE ); #close sorted data file 110 close( INFILE ); #close input data file 111 close( KEYS ); #close keys file 112 unlink $sortedKeysFile; #delete keys file 113 print locate( $CursorY+2, 15 ), clline, #inform user 114 colored ['bold red'], "\aDone!", 115 locate( $CursorY+8, 3 ), 116 colored ['bold white'], 117 "Elapsed time: ", tv_interval( $Start_time ), "\n"; #report total elapsed time 118 exit; 119 #===== SUBROUTINES ============================================================================================================= 120 # Usage : &merge() 121 # Purpose : Merge enqueued sorted key files. Returns name of last remaining enqueued file. 122 # Arguments : None. 123 # Temporary Files : Filename templates are mergeXXXXX.dat 124 # Externals : $FileQueue 125 # Shared Externals : $CursorY, $KeysCreated 126 # Subs : None. 127 # Remarks : None. 128 # History : v1.0.0 - February 26, 2009 - Original release. 129 130 sub merge { #begin sub 131 my $sourceKeys1; # filename for 1st keys file to be merged 132 my $sourceKeys2; # filename for 2nd keys file to be merged 133 my $tmpKeys; # filename for merged keys file 134 135 do { 136 $sourceKeys1 = $FileQueue->dequeue(); # get 1st filename 137 $sourceKeys2 = $FileQueue->dequeue(); # get 2nd filename 138 print locate( $CursorY+5, 24 ), clline, # inform user 139 colored ['bold yellow'], "$sourceKeys1", 140 locate( $CursorY+6, 24 ), clline, 141 colored ['bold yellow'], "$sourceKeys2", 142 locate( $CursorY, 15 ), clline, 143 colored ['bold yellow'], $FileQueue->pending; 144 open( SOURCEKEYS1, $sourceKeys1 ) # open 1st keys file for read 145 || die FATAL, "Can't read temporary file '$sourceKeys1' - $!"; 146 open( SOURCEKEYS2, $sourceKeys2 ) # open 2nd keys file for read 147 || die FATAL, "Can't read temporary file '$sourceKeys2' - $!"; 148 149 $tmpKeys = new File::Temp( TEMPLATE => 'mergeXXXXX', # create temp file for the merged keys 150 DIR => $ENV{TEMP}, 151 SUFFIX => '.dat', 152 UNLINK => 0 153 ) 154 || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!"; 155 156 my ( $key1, $key2 ) = ( undef, undef ); # clear the key from each file 157 until( ( !defined $key1 && eof SOURCEKEYS1 ) || # repeat 158 ( !defined $key2 && eof SOURCEKEYS2 ) 159 ) { 160 $key1 = <SOURCEKEYS1> unless defined $key1; # get the next key in 1st file 161 $key2 = <SOURCEKEYS2> unless defined $key2; # get the next key in 2nd file 162 if( $key1 lt $key2 ) { # case of 1st key less than 2nd one 163 print $tmpKeys $key1; # add key from 1st file to new temp key file 164 undef $key1; # clear the current key from 1st file 165 } else { # case of 2nd key less than 1st one 166 print $tmpKeys $key2; # add key from 2nd file to new temp key file 167 undef $key2; # clear the current key from 2nd file 168 } # end case of keys ordering 169 } # until no more keys to process from either file 170 unless( !defined $key1 && eof SOURCEKEYS1 ) { # if the 1st file has some unprocessed keys 171 print $tmpKeys $key1 if defined $key1; # add any unprocessed read key to new temp file 172 print $tmpKeys <SOURCEKEYS1>; # add any unread keys to new temp file 173 } else { # else the 2nd file has some unprocessed keys 174 print $tmpKeys $key2 if defined $key2; # add any unprocessed read key to new temp file 175 print $tmpKeys <SOURCEKEYS2>; # add any unread keys to new temp file 176 } # end if-else 177 close SOURCEKEYS1; # close the 1st file 178 close SOURCEKEYS2; # close the 2nd file 179 close $tmpKeys; # close the new temp file 180 181 unlink $sourceKeys1, $sourceKeys2; # delete source temp files 182 $FileQueue->enqueue( $tmpKeys->filename ); 183 print locate( $CursorY, 15 ), clline, # inform user 184 colored ['bold yellow'], $FileQueue->pending, 185 locate( $CursorY+4, 24 ), clline, 186 colored ['bold green'], "$tmpKeys", 187 locate( $CursorY+5, 24 ), clline, 188 locate( $CursorY+6, 24 ), clline; 189 } until ($KeysCreated == 1) && ($FileQueue->pending == 1); # until only 1 file left in queue 190 $tmpKeys->filename; # return filepath of resulting file 191 } #end sub merge 192 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================ 193 # end of merge-sort_threaded.pl
© 2024 Webpraxis Consulting Ltd. – ALL RIGHTS RESERVED.