$24
In 2004, engineers at Google introduced a new paradigm for large-scale parallel data processing known as MapReduce (see the original paper here
(Links to an external site.)
, and make sure to look in the citations at the end). One key aspect of MapReduce is that it makes programming such tasks on large-scale clusters easy for developers; instead of worrying about how to manage parallelism, handle machine crashes, and many other complexities common within clusters of machines, the developer can instead just focus on writing little bits of code (described below) and the infrastructure handles the rest.
In this project, you'll be building a simplified version of MapReduce for just a single machine. While somewhat easier to build MapReduce for a single machine, there are still numerous challenges, mostly in building the correct concurrency support. Thus, you'll have to think a bit about how to build the MapReduce implementation, and then build it to work efficiently and correctly. For some additional practice with concurrency, you will modify an existing HashMap implementation to be multi-threaded by adding reader-writer locks.
There are three specific objectives to this assignment:
* To learn about the general nature of the MapReduce paradigm.
* To implement a correct and efficient MapReduce framework using threads and related functions.
* To gain more experience writing concurrent code.
Background
To understand how to make progress on any project that involves concurrency, you should understand the basics of thread creation, mutual exclusion (with locks), and signaling/waiting (with condition variables). These are described in the following book chapters:
* Intro to Threads
* Links to an external site.
* * Threads API
* Links to an external site.
* * Locks
* Links to an external site.
* * Using Locks
* Links to an external site.
* * Condition Variables
* Links to an external site.
* Read these chapters carefully in order to prepare yourself for this project.
General Idea
Let's now get into the exact code you'll have to build. The MapReduce infrastructure you will build supports the execution of user-defined Map() and Reduce() functions.
As from the original paper: "Map(), written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key K and passes them to the Reduce() function."
"The Reduce() function, also written by the user, accepts an intermediate key K and a set of values for that key. It merges together these values to form a possibly smaller set of values; typically just zero or one output value is produced per Reduce() invocation. The intermediate values are supplied to the user's reduce function via an iterator."
A classic example, written here in pseudocode, shows how to count the number of occurrences of each word in a set of documents:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
print key, result;
What's fascinating about MapReduce is that so many different kinds of relevant computations can be mapped onto this framework. The original paper lists many examples, including word counting (as above), a distributed grep, a URL frequency access counters, a reverse web-link graph application, a term-vector per host analysis, and others.
What's also quite interesting is how easy it is to parallelize: many mappers can be running at the same time, and later, many reducers can be running at the same time. Users don't have to worry about how to parallelize their application; rather, they just write Map() and Reduce() functions and the infrastructure does the rest.
Code Overview
We will provide several starter files for this assignment. Files are available in /p/course/cs537-swift/public/p3a on CSL machines.
* mapreduce.h - This header file specifies exactly what you build for your MapReduce library.
* sequential_mapreduce.c - This is a functional, but inefficient non-concurrent implementation of MapReduce. We include it to help clarify how to use the functions defined in mapreduce.h
* hashmap.h - This header file specifies the interface for the HashMap
* hashmap.c - This is a functional implementation of a HashMap. You should modify this file to include reader-writer locks.
* main.c - runs the whole program. We are providing a simple word count application to you to test your program with, but it will also be tested with other Map() and Reduce() functions.
In summary, you should be creating a file mapreduce.c with your MapReduce implementation. Feel free to reference our single-threaded implementation, but keep in mind that your implementation will be substantially different. You will modify hashmap.c so that it works with multiple threads, and you may want to edit the header file as well. You may want to experiment with writing your own Map() and Reduce() functions in main.c, but this will be ungraded. You should not modify mapreduce.h
To run the example program, compile it with gcc -o mapreduce main.c sequential_mapreduce.c hashmap.c. Run it with ./mapreduce basic.txt four, and it should print Found four 4 times. This program runs wordcount, loads the results into the hashmap, and searches for a word in the hashmap after wordcount is complete. We will test your solution on similar programs.
mapreduce.h
We give you here the mapreduce.h
(Links to an external site.)
header file that specifies exactly what you must build in your MapReduce library:
#ifndef __mapreduce_h__
#define __mapreduce_h__
// Different function pointer types used by MR
typedef char *(*Getter)(char *key, int partition_number);
typedef void (*Mapper)(char *file_name);
typedef void (*Reducer)(char *key, Getter get_func, int partition_number);
typedef unsigned long (*Partitioner)(char *key, int num_partitions);
// External functions: these are what you must define
void MR_Emit(char *key, char *value);
unsigned long MR_DefaultHashPartition(char *key, int num_partitions);
void MR_Run(int argc, char *argv[],
Mapper map, int num_mappers,
Reducer reduce, int num_reducers,
Partitioner partition);
#endif // __mapreduce_h__
The most important function is MR_Run, which takes the command line parameters of a given program, a pointer to a Map function (type Mapper, called map), the number of mapper threads your library should create (num_mappers), a pointer to a Reduce function (type Reducer, called reduce), the number of reducers (num_reducers), and finally, a pointer to a Partition function (partition, described below).
Thus, when a user is writing a MapReduce computation with your library, they will implement a Map function, implement a Reduce function, possibly implement a Partition function, and then call MR_Run(). The infrastructure will then create threads as appropriate and run the computation.
One basic assumption is that the library will create num_mappers threads (in a thread pool) that perform the map tasks. Another is that your library will create num_reducers threads to perform the reduction tasks. Finally, your library will create some kind of internal data structure to pass keys and values from mappers to reducers; more on this below.
Example: Wordcount
Here is a simple (but functional) wordcount program, written to use this infrastructure:
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mapreduce.h"
void Map(char *file_name) {
FILE *fp = fopen(file_name, "r");
assert(fp != NULL);
char *line = NULL;
size_t size = 0;
while (getline(&line, &size, fp) != -1) {
char *token, *dummy = line;
while ((token = strsep(&dummy, " \t\n\r")) != NULL) {
MR_Emit(token, "1");
}
}
free(line);
fclose(fp);
}
void Reduce(char *key, Getter get_next, int partition_number) {
int count = 0;
char *value;
while ((value = get_next(key, partition_number)) != NULL)
count++;
printf("%s %d\n", key, count);
}
int main(int argc, char *argv[]) {
MR_Run(argc, argv, Map, 10, Reduce, 10, MR_DefaultHashPartition);
}
Let's walk through this code, in order to see what it is doing. First, notice that Map() is called with a file name. In general, we assume that this type of computation is being run over many files; each invocation of Map() is thus handed one file name and is expected to process that file in its entirety.
In this example, the code above just reads through the file, one line at a time, and uses strsep() to chop the line into tokens. Each token is then emitted using the MR_Emit() function, which takes two strings as input: a key and a value. The key here is the word itself, and the token is just a count, in this case, 1 (as a string). It then closes the file.
The MR_Emit() function is thus another key part of your library; it needs to take key/value pairs from the many different mappers and store them in a way that later reducers can access them, given constraints described below. Designing and implementing this data structure is thus a central challenge of the project.
After the mappers are finished, your library should have stored the key/value pairs in such a way that the Reduce() function can be called. Reduce() is invoked once per key, and is passed the key along with a function that enables iteration over all of the values that produced that same key. To iterate, the code just calls get_next() repeatedly until a NULL value is returned; get_next returns a pointer to the value passed in by the MR_Emit() function above, or NULL when the key's values have been processed. The output, in the example, is just a count of how many times a given word has appeared, and is just printed to standard output.
All of this computation is started off by a call to MR_Run() in the main() routine of the user program. This function is passed the argv array, and assumes that argv[1] ... argv[n-1] (with argc equal to n) all contain file names that will be passed to the mappers.
One interesting function that you also need to pass to MR_Run() is the partitioning function. In most cases, programs will use the default function (MR_DefaultHashPartition), which should be implemented by your code. Here is its implementation:
unsigned long MR_DefaultHashPartition(char *key, int num_partitions) {
unsigned long hash = 5381;
int c;
while ((c = *key++) != '\0')
hash = hash * 33 + c;
return hash % num_partitions;
}
The function's role is to take a given key and map it to a number, from 0 to num_partitions - 1. Its use is internal to the MapReduce library, but critical. Specifically, your MR library should use this function to decide which partition (and hence, which reducer thread) gets a particular key/list of values to process. For some applications, which reducer thread processes a particular key is not important (and thus the default function above should be passed in to MR_Run()); for others, it is, and this is why the user can pass in their own partitioning function as need be.
One last requirement: For each partition, keys (and the value list associated with said keys) should be sorted in ascending key order; thus, when a particular reducer thread (and its associated partition) are working, the Reduce() function should be called on each key in order for that partition.
Considerations
Here are a few things to consider in your implementation:
* Thread Management. This part is fairly straightforward. You should create num_mappers mapping threads, and assign a file to each Map() invocation in some manner you think is best (e.g., Round Robin, Shortest-File-First, etc.). Which way might lead to best performance? You should also create num_reducers reducer threads at some point, to work on the map'd output.
* Partitioning and Sorting. Your central data structure should be concurrent, allowing mappers to each put values into different partitions correctly and efficiently. Once the mappers have completed, a sorting phase should order the key/value-lists. Then, finally, each reducer thread should start calling the user-defined Reduce() function on the keys in sorted order per partition. You should think about what type of locking is needed throughout this process for correctness.
* Memory Management. One last concern is memory management. The MR_Emit() function is passed a key/value pair; it is the responsibility of the MR library to make copies of each of these. Then, when the entire mapping and reduction is complete, it is the responsibility of the MR library to free everything.
Compile and Grading
Your code will be compiled with gcc -o mapreduce main.c mapreduce.c hashmap.c -Wall -Werror -pthread. Tests will be released and discussed on Piazza.
Handing in your code
* Handing it in: Copy your source files to /p/course/cs537-swift/turnin/login/p3A/ontime where login is your CS login. Do NOT use this turnin directory for your workspace. You should keep a separate copy of your project files in your own home directory and then simply copy the relevant files to this turnin directory when you are done. The permissions to this turnin directory will be turned off promptly when the deadline passes and you will no longer be able to modify files in that directory. Please turn in all files required to compile the project, including the provided mapreduce.h and hashmap.h
* Each project partner should turn in their joint code to each of their turnin directories. Each person should place a file named partners.txt in their turnin/p3A directory, so that we can tell who worked together on this project. The format of partners.txt should be exactly as follows:
cslogin1 wiscNetid1 Lastname1 Firstname1
cslogin2 wiscNetid2 Lastname2 Firstname2
It does not matter who is 1 and who is 2. If you worked alone, your partners.txt file should have only one line. There should be no spaces within your first or last name; just use spaces to separate fields.
* You can use up to 3 slip (late) days across all the projects throughout this semester; for example, you can use 1 slip day on three separate assignments or 3 slip days on a single assignment (or other combinations adding up to a total of 3 days), but we hope you can save them! If you are using slip days, you must create a file called slip_days with the full pathname /p/course/cs537-swift/turnin/login/p3A/slip_days. The file should contain a single line containing the integer number of slip days you are using; for example, you can create this file with echo 1 > slip_days. You must copy your code into the corresponding slip directory: /p/course/cs537-swift/turnin/login/p3A/slip1, /p/course/cs537-swift/turnin/login/p3A/slip2, or /p/course/cs537-swift/turnin/login/p3A/slip3. We will grade the latest submission.