Skip to content Skip to sidebar Skip to footer

Mapreduce In Pymongo

My Mongo collection : Impressions has docs in the following format:- { _uid: 10, 'impressions': [ { 'pos': 6, 'id': 1

Solution 1:

You can use the aggregation framework

importpymongoconn= pymongo.MongoClient()
db = conn.testcol=  db.collection

for doc in col.aggregate([{'$unwind': '$impressions'}, 
    {'$match': {'impressions.service': 'furniture'}}, 
    {'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}}, 
    ]):
    print(doc)

Or more efficiently using the $map and the $setDifference operators.

col.aggregate([
    { "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}}, 
    { "$unwind": "$impressions" }, 
    { "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])

Which yields:

{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}

Solution 2:

I made a tool that lets you run MongoDB Map/Reduce in Python

https://mreduce.com

import random
import threading

import bson
import pymongo

import mreduce


mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")

defmap_func(document):
    for impression in document["impressions"]:
        yield document["id"], 1defreduce_func(id, prices):
    returnsum(prices)

worker_functions = {
    "exampleMap": map_func,
    "exampleReduce": reduce_func
}

api = mreduce.API(
    api_key = "...",
    mongo_client = mongo_client
)

project_id = "..."

thread = threading.Thread(
    target=api.run,
    args=[project_id, worker_functions]
)
thread.start()

job = api.submit_job(
    projectId=project["_id"],
    mapFunctionName="exampleMap",
    reduceFunctionName="exampleReduce",
    inputDatabase="db",
    inputCollection="impressions",
    outputDatabase="db",
    outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
    print("Key: " + key, ", Value: " + str(value))

Post a Comment for "Mapreduce In Pymongo"