Map Process with State
Sum data rows' value with map's state.
| productName | dollar_sales |
|
1937 Lincoln Berline |
3,726 |
|
1936 Mercedes-Benz 500K Special Roadster |
1,768 |
|
1952 Alpine Renault 1300 |
5,572 |
|
1962 LanciaA Delta 16V |
5,026 |
|
1958 Setra Bus |
3,284 |
|
1940 Ford Pickup Truck |
3,308 |
|
1926 Ford Fire Engine |
1,283 |
|
1913 Ford Model T Speedster |
2,489 |
|
1934 Ford V8 Coupe |
2,164 |
|
18th Century Vintage Horse Carriage |
2,173 |
->pipe(new Map([
'{value}' => function($row, $meta, $index, $mapState) {
$numTopRows = 2;
//If a row is among the first 2 rows
if ($index < $numTopRows) {
$mappedRows = [$row];
//return it to send to next process or datastore
return ['{rows}' => $mappedRows];
}
//Otherwise,
//initialise a key of this Map's state to use for sum
$sum = Util::init($mapState, 'sumOthers', []);
foreach ($row as $columnName => $value) {
Util::init($sum, $columnName, 0);
//if column name = 'dollar_sales', sum it
$sum[$columnName] = $columnName === 'dollar_sales' ?
$sum[$columnName] + $value : 'Other Customers';
}
//Save the sum to this Map's state
$mapState['sumOthers'] = $sum;
$mappedRows = [];
//Skip rows after the first 2 rows (they won't be sent to next process or datastore)
//return this Map's state to save it
return ['{rows}' => $mappedRows, '{state}' => $mapState];
},
'{end}' => function($count, $mapState) {
//After all rows had been sent
//retrieve this Map's state and send it at the end of Map process
$rowsToSend = [$mapState['sumOthers']];
return $rowsToSend;
},
]))
| productName | dollar_sales |
|
1937 Lincoln Berline |
3,726 |
|
1936 Mercedes-Benz 500K Special Roadster |
1,768 |
|
Other Customers |
25,300 |