One of the features in Perl's threading implementation that caught my attention was its support for thread-safe queues (see http://perldoc.perl.org/Thread/Queue.html). Its major feature is that, if a thread tries to read the next entry in a queue, but the queue is empty, then it can be instructed to wait for an entry to appear before continuing. This makes coordinating thread execution rather simple. In an event-driven scenario, a thread can just stand by, ready to leap into action as soon as an event is handed off to it.
The ability to sort is a fundamental task in computing. And the task of sorting items that can't all be held simultaneously in memory has been around since the early days of computing. Such a task confronted the mathematician John von Neumann back in 1945. Using a stack of playing cards as an analogue (if memory serves me right), he devised the merge-sort algorithm.
The algorithm follows a divide-and-conquer paradigm and is usually implemented today using recursion. The recursion stack obviates the programmer's need to keep track explicitly of all the sorted data subsets. Moreover, to tackle the problem of efficiently sorting very large data sets, variants of the algorithm, known as tiled algorithms, have been devised. They deal with subsets of the data, first sorting each in memory and then merge-sorting the new subsets.
Such a tiled algorithm can easily be implemented using a queue. The main program splits the data set into subsets and sorts each one in memory. These subsets are written back to disk with each filename being place in a queue. Concurrently, a threaded helper reads a pair of filenames from the queue and merge-sorts to a new file. This file's name is in turn placed in the queue for processing at a later stage. Execution continues until only one filename is left in the queue. This file is the desired end product.
Doing the merge-sort concurrently is not a big time saver as the whole operation is very IO-bound: it is the built-in queueing mechanism that is being exploited. One can attenuate the read-write times by only storing, sorting and merging the key and the file pointer for each record. Once the merge-sort is accomplished, a final sorted version of the original data set can be constructed. This also makes this variant stable in that, if Records A and B have the same key and A precedes B in the original data, their order will be preserved in the merge-sorted set.
The Perl script merge-sort_threaded.pl, displayed below, implements this approach. It is geared towards sorting, in lexically ascending order, very large, tab-separated data sets consisting of many fields. The code is released for personal, non-commercial and non-profit use only. The listing includes the line numbers in order to reference them in the following general remarks.
perl merge-sort_threaded.pl K:/temp/test.txt C:/temp/test.out 100000
which requests that the 1.53Gbyte input file "K:/temp/test.txt" be sorted to the output file "C:/temp/test.out" using at most 100,000 records for each initial key file.Our test case takes an hour to be processed on a Pentium IV box. As stated previously, one doesn't expect any massive speed gains here due to the IO bottleneck. However, the thread queueing mechanism is certainly a handy feature for inter-thread communication which we intend to explore further.
As always, if you have any questions regarding the code or my explanations, please do not hesitate in contacting me.
001 use strict;
002 use Time::HiRes qw(gettimeofday tv_interval);
003 my $Start_time = [gettimeofday];
004 use File::Temp;
005 use Term::ANSIScreen qw|:color :cursor :screen|;
006 use Win32::Console::ANSI qw|Cursor|;
007 use threads;
008 use threads::shared;
009 use Thread::Queue;
010 use constant FATAL => "\aFATAL ERROR: ";
011 use vars qw|$RecordPtrLen|;
012 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
013 # merge-sort_threaded.pl: Stable threaded merge sort of a text file with tab separated values.
014 #===============================================================================================================================
015 # Usage : perl merge-sort_threaded.pl INFILE OUTFILE MAXRECORDS
016 # Arguments : INFILE = path of the data text file to be sorted
017 # OUTFILE = path of sorted data file
018 # MAXRECORDS = maximum number of records per initial file
019 # Input Files : See Arguments.
020 # Output Files : See Arguments.
021 # Temporary Files : Filename templates are mergeXXXXX.dat
022 # Remarks : See http://www.webpraxis.ab.ca/merge/merge-sort_threaded.shtml for details.
023 # History : v1.0.0 - February 26, 2009 - Original release.
024 #===============================================================================================================================
025 # 0) DEFINE SORT CHARACTERISTICS:
026 my $MakeCompositeKey = sub{ my $record = shift; #anonymous sub for creating a composite key
027 my $recordPtr = shift;
028 sprintf( "%5.5d\t%s", ( split /\t/, $record )[0,8] ) .
029 sprintf( "\t%${RecordPtrLen}.${RecordPtrLen}lu", $recordPtr );
030 };
031 #-------------------------------------------------------------------------------------------------------------------------------
032 # 1) INITIALIZE:
033 $| = 1; #set STDOUT buffer to auto-flush
034 cls(); #clear screen
035 print colored ['black on white'], "$0\n\n\n", color 'reset'; #display program name
036
037 my $InFile = shift || die FATAL, 'Input file not specified'; #get path of input file
038 my $OutFile = shift || die FATAL, 'Output file not specified'; #get path of output file
039 my $MaxRecordCount = shift || die FATAL, 'Max number of records not specified'; #maximum number of records per initial file
040 $RecordPtrLen = length( -s $InFile ); #establish maximum length of a record pointer
041
042 print " Problem Details:\n\n", #echo problem details
043 "\t Input File: $InFile\n",
044 "\tOutput File: $OutFile\n\n",
045 '-' x 80, "\n\n";
046 #-------------------------------------------------------------------------------------------------------------------------------
047 # 2) CREATE FILE QUEUE & LAUNCH THREAD FOR MERGING:
048 my $CursorY :shared; #cursor row coordinate
049 my $KeysCreated :shared; #boolean flag indicating all keys created
050 my $FileQueue = Thread::Queue->new(); #create file queue of record keys
051 my $MergeThread = threads->create( \&merge ); #launch thread for merging keys in sorted order
052 #-------------------------------------------------------------------------------------------------------------------------------
053 # 3) CREATE FILES OF COMPOSITE RECORD KEYS AND POPULATE MERGE FILE QUEUE:
054 my $recordStart = 0; #init file pointer for start of a data record
055 my $recordCount; #record counter
056 my $noFiles;
057 my @keys; #array of composite record keys
058
059 print " Process Details:\n\n"; #inform user of processing start
060 $CursorY = ( Cursor() )[1]; #record cursor row position
061 print locate( $CursorY, 3 ), 'File Queue: ',
062 locate( $CursorY+2, 3 ), ' Main: ',
063 locate( $CursorY+4, 3 ), ' Thread: ', colored['bold white'], 'Enqueued ',
064 locate( $CursorY+5, 3 ), ' ', colored['bold white'], 'Dequeued ',
065 locate( $CursorY+6, 3 ), ' ', colored['bold white'], 'Dequeued ';
066
067 open( INFILE, $InFile ) || die FATAL, "Can't open data file '$InFile' - $!"; #open the data file for read
068 while( my $record = <INFILE> ) { #repeat for each input data record
069 push @keys, &$MakeCompositeKey( $record, $recordStart ); # create & store composite key
070 $recordStart = tell; # store start of next record
071
072 next unless ( ++$recordCount == $MaxRecordCount ) or eof; # next record unless enough records or end of file
073 my $tmpKeys = new File::Temp( TEMPLATE => 'mergeXXXXX', # create temp file for the keys
074 DIR => $ENV{TEMP},
075 SUFFIX => '.dat',
076 UNLINK => 0
077 )
078 || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";
079 print( $tmpKeys $keys[$_], "\n" ) for sort { $keys[$a] cmp $keys[$b] } 0..$#keys; # output lexically ascending composite keys
080 close( $tmpKeys ); # close temp file for the keys
081 ++$noFiles; # update file counter
082 $FileQueue->enqueue( $tmpKeys->filename ); # add keys file to file queue
083 print locate( $CursorY, 15 ), clline, # inform user
084 colored ['bold yellow'], $FileQueue->pending,
085 locate( $CursorY+2, 15 ), clline,
086 colored['bold white'], 'Enqueued ',
087 colored ['bold green'], "$tmpKeys";
088 @keys = (); # clear array for the keys
089 $recordCount = 0; # reset record count
090 } #until all input records processed
091 die locate( $CursorY+8, 3 ), #abort if not enough files
092 FATAL, "Specified max number of records too large!" unless $noFiles > 1;
093 $KeysCreated = 1; #flag end of keys creation
094 #-------------------------------------------------------------------------------------------------------------------------------
095 # 4) READ SORTED KEYS & OUTPUT CORRESPONDING DATA RECORDS:
096 print locate( $CursorY+2, 15 ), clline, #inform user
097 colored ['bold red'], 'Waiting...';
098 my $sortedKeysFile = $MergeThread->join(); #get filename of resulting sorted keys
099 print locate( $CursorY+2, 15 ), clline, #inform user
100 colored ['bold red'], "Creating sorted output file...";
101 open( OUTFILE, ">$OutFile" ) #open output file for write
102 || die FATAL, "Can't create output file '$OutFile' - $!";
103 open( KEYS, $sortedKeysFile ) #open keys file for read
104 || die FATAL, "Can't read last temporary file '$sortedKeysFile' - $!";
105 while( <KEYS> ) { #repeat for each key
106 seek INFILE, ( split /\t/ )[2], 0; # position file pointer for INFILE
107 print OUTFILE scalar <INFILE>; # output data record
108 } #until all keys processed
109 close( OUTFILE ); #close sorted data file
110 close( INFILE ); #close input data file
111 close( KEYS ); #close keys file
112 unlink $sortedKeysFile; #delete keys file
113 print locate( $CursorY+2, 15 ), clline, #inform user
114 colored ['bold red'], "\aDone!",
115 locate( $CursorY+8, 3 ),
116 colored ['bold white'],
117 "Elapsed time: ", tv_interval( $Start_time ), "\n"; #report total elapsed time
118 exit;
119 #===== SUBROUTINES =============================================================================================================
120 # Usage : &merge()
121 # Purpose : Merge enqueued sorted key files. Returns name of last remaining enqueued file.
122 # Arguments : None.
123 # Temporary Files : Filename templates are mergeXXXXX.dat
124 # Externals : $FileQueue
125 # Shared Externals : $CursorY, $KeysCreated
126 # Subs : None.
127 # Remarks : None.
128 # History : v1.0.0 - February 26, 2009 - Original release.
129
130 sub merge { #begin sub
131 my $sourceKeys1; # filename for 1st keys file to be merged
132 my $sourceKeys2; # filename for 2nd keys file to be merged
133 my $tmpKeys; # filename for merged keys file
134
135 do {
136 $sourceKeys1 = $FileQueue->dequeue(); # get 1st filename
137 $sourceKeys2 = $FileQueue->dequeue(); # get 2nd filename
138 print locate( $CursorY+5, 24 ), clline, # inform user
139 colored ['bold yellow'], "$sourceKeys1",
140 locate( $CursorY+6, 24 ), clline,
141 colored ['bold yellow'], "$sourceKeys2",
142 locate( $CursorY, 15 ), clline,
143 colored ['bold yellow'], $FileQueue->pending;
144 open( SOURCEKEYS1, $sourceKeys1 ) # open 1st keys file for read
145 || die FATAL, "Can't read temporary file '$sourceKeys1' - $!";
146 open( SOURCEKEYS2, $sourceKeys2 ) # open 2nd keys file for read
147 || die FATAL, "Can't read temporary file '$sourceKeys2' - $!";
148
149 $tmpKeys = new File::Temp( TEMPLATE => 'mergeXXXXX', # create temp file for the merged keys
150 DIR => $ENV{TEMP},
151 SUFFIX => '.dat',
152 UNLINK => 0
153 )
154 || die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";
155
156 my ( $key1, $key2 ) = ( undef, undef ); # clear the key from each file
157 until( ( !defined $key1 && eof SOURCEKEYS1 ) || # repeat
158 ( !defined $key2 && eof SOURCEKEYS2 )
159 ) {
160 $key1 = <SOURCEKEYS1> unless defined $key1; # get the next key in 1st file
161 $key2 = <SOURCEKEYS2> unless defined $key2; # get the next key in 2nd file
162 if( $key1 lt $key2 ) { # case of 1st key less than 2nd one
163 print $tmpKeys $key1; # add key from 1st file to new temp key file
164 undef $key1; # clear the current key from 1st file
165 } else { # case of 2nd key less than 1st one
166 print $tmpKeys $key2; # add key from 2nd file to new temp key file
167 undef $key2; # clear the current key from 2nd file
168 } # end case of keys ordering
169 } # until no more keys to process from either file
170 unless( !defined $key1 && eof SOURCEKEYS1 ) { # if the 1st file has some unprocessed keys
171 print $tmpKeys $key1 if defined $key1; # add any unprocessed read key to new temp file
172 print $tmpKeys <SOURCEKEYS1>; # add any unread keys to new temp file
173 } else { # else the 2nd file has some unprocessed keys
174 print $tmpKeys $key2 if defined $key2; # add any unprocessed read key to new temp file
175 print $tmpKeys <SOURCEKEYS2>; # add any unread keys to new temp file
176 } # end if-else
177 close SOURCEKEYS1; # close the 1st file
178 close SOURCEKEYS2; # close the 2nd file
179 close $tmpKeys; # close the new temp file
180
181 unlink $sourceKeys1, $sourceKeys2; # delete source temp files
182 $FileQueue->enqueue( $tmpKeys->filename );
183 print locate( $CursorY, 15 ), clline, # inform user
184 colored ['bold yellow'], $FileQueue->pending,
185 locate( $CursorY+4, 24 ), clline,
186 colored ['bold green'], "$tmpKeys",
187 locate( $CursorY+5, 24 ), clline,
188 locate( $CursorY+6, 24 ), clline;
189 } until ($KeysCreated == 1) && ($FileQueue->pending == 1); # until only 1 file left in queue
190 $tmpKeys->filename; # return filepath of resulting file
191 } #end sub merge
192 #===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
193 # end of merge-sort_threaded.pl
© 2012 Webpraxis Consulting Ltd. – ALL RIGHTS RESERVED.