(* This is best used interactively *) #load "str.cma" (**** polymorphic map/reduce computation phases ****) type 'a maybe = Just of 'a | Nothing;; (* 'map' phase: fill `d2' from `d1' processed by `mapfun' *) let map_Dict_to_Dict fold_d1 d1 mapfun add_d2 find_d2 empty_d2 = let insert_list_d2 k v d2 = let insert_one_d2 d2 (k2, v2) = add_d2 k2 (v2::(find_d2 k2 d2)) d2 in List.fold_left insert_one_d2 d2 (mapfun k v) in fold_d1 insert_list_d2 d1 empty_d2;; (* 'reduce' phase: fill `d3' from `d2' processing by `redfun' *) let reduce_Dict_to_Dict fold_d2 d2 redfun add_d3 empty_d3 = let maybe_insert_d3 k v d3 = match (redfun k v) with Nothing -> d3 | Just x -> add_d3 k x d3 in fold_d2 maybe_insert_d3 d2 empty_d3;; (**** computation wrapped in a functor ****) (* problem abstraction *) module type MapReduceProblemType = sig module K1 : Map.OrderedType (* input dict key *) type v1 (* input dict value *) module K2 : Map.OrderedType (* interm./output dict key *) type v2 (* intermediate dict value *) type v3 (* output dict value *) val mapf : K1.t -> v1 -> (K2.t * v2) list (* interm. producer *) val redf : K2.t -> v2 list -> v3 maybe (* interm. consumer *) end let find_with_default findf dflt = fun k m -> try findf k m with Not_found -> dflt;; (* solver functor *) module MapReduceSolver (P : MapReduceProblemType) = struct open P module Dk1 = Map.Make(K1) module Dk2 = Map.Make(K2) let map_input dict = map_Dict_to_Dict Dk1.fold dict mapf Dk2.add (find_with_default Dk2.find []) Dk2.empty let reduce_intrm dict = reduce_Dict_to_Dict Dk2.fold dict redf Dk2.add Dk2.empty let solve dict = reduce_intrm (map_input dict) end (**** usage examples ****) let words text = Str.split (Str.regexp " \\|\t\\|\n\\|,") text;; (* for each word, count how many times it appears in a set of documents *) module WordOccurrenceCountProblem = struct module K1 = String (* document id -- ignored *) type v1 = string (* document text *) module K2 = String (* word *) type v2 = int (* 1 *) type v3 = int (* count *) let mapf doc_id text = List.map (fun x -> (x, 1)) (words text) let redf word onelst = Just (List.length onelst) end module WOCSolver = MapReduceSolver(WordOccurrenceCountProblem);; let in_d = WOCSolver.Dk1.add "doc1" "fold the fold" WOCSolver.Dk1.empty;; let in_d = WOCSolver.Dk1.add "doc2" "appreciate the unfold" in_d;; let in_d = WOCSolver.Dk1.add "doc3" "it is all just fold, really" in_d;; (* steps exposed: map *) let tmp_d = WOCSolver.map_input in_d;; WOCSolver.Dk2.iter (fun k v -> print_endline (k ^ ( List.fold_left (fun s i -> s ^ " " ^ (string_of_int i)) ":" v))) tmp_d;; (* steps exposed: reduce *) let out_d = WOCSolver.reduce_intrm tmp_d;; WOCSolver.Dk2.iter (fun k v -> print_endline (k ^ ": " ^ (string_of_int v))) out_d;; (* for each word, return a list of (document, position) where it appears *) module InvertedIndexProblem = struct module K1 = String (* document id *) type v1 = string (* document text *) module K2 = String (* word *) type v2 = (string * int) (* doc id, position *) type v3 = (string * int) list (* doc id, pos list [no reduce] *) let mapf doc_id text = fst (List.fold_left (fun (lst, idx) word -> ((word, (doc_id, idx))::lst, idx+1)) ([], 0) (words text)) let redf word idxlst = Just idxlst end module IISolver = MapReduceSolver(InvertedIndexProblem);; let in_d = IISolver.Dk1.add "doc1" "fold the fold" IISolver.Dk1.empty;; let in_d = IISolver.Dk1.add "doc2" "appreciate the unfold" in_d;; let in_d = IISolver.Dk1.add "doc3" "it is all just fold, really" in_d;; let out_d = IISolver.solve in_d;; IISolver.Dk2.iter (fun k v -> print_endline (k ^ ( List.fold_left (fun s (doc, pos) -> s ^ " " ^ doc ^ " @ " ^ (string_of_int pos)) ":" v))) out_d;; (* for each document, return the number of documents linking it (excl. self) *) module RankByLinkCountProblem = struct module K1 = String (* document id *) type v1 = string (* document text: contains only document links *) module K2 = String (* document id that a link points to *) type v2 = string (* document id containing the link *) type v3 = int (* link count *) let mapf doc_id text = List.map (fun x -> (x, doc_id)) (words text) let redf doc_id links = let module LinkSet = Set.Make(String) in let set_others = List.fold_left (fun set link -> if link <> doc_id then LinkSet.add link set else set) LinkSet.empty links in let cnt = LinkSet.cardinal set_others in if cnt > 0 then Just cnt else Nothing end module RBLCSolver = MapReduceSolver(RankByLinkCountProblem);; let in_d = RBLCSolver.Dk1.add "doc1" "doc1 doc2 doc1" RBLCSolver.Dk1.empty;; let in_d = RBLCSolver.Dk1.add "doc2" "doc2 doc3 doc3 doc2" in_d;; let in_d = RBLCSolver.Dk1.add "doc3" "doc2 doc3" in_d;; let out_d = RBLCSolver.solve in_d;; RBLCSolver.Dk2.iter (fun k v -> print_endline (k ^ ": " ^ (string_of_int v))) out_d;;