Uploaded image for project: 'Pentaho Analysis - Mondrian'
  1. Pentaho Analysis - Mondrian
  2. MONDRIAN-874

RFC: Parallel Evaluation Initial Implementation



    • Type: New Feature
    • Status: Open
    • Severity: Medium
    • Resolution: Unresolved
    • Affects Version/s: 3.2.1 GA (3.7.0 GA Suite Release)
    • Fix Version/s: Not Planned
    • Component/s: None
    • Labels:
    • Notice:
      When an issue is open, the "Fix Version/s" field conveys a target, not necessarily a commitment. When an issue is closed, the "Fix Version/s" field conveys the version that the issue was fixed in.


      Here's an initial cut of adding multi-thread/parallel evaluation support to mondrian. Would appreciate as much feedback as possible.

      First, some TODOs/open questions:
      -need to make the # of threads configurable – should this just be a mondrian.properties thing, or somehow be able to specify per query/function call?
      -relatedly, probably want a configurable ThreadFactory passed to the ExecutorService.
      -need to make the batch size configurable – same question as above
      -if single threaded based on those two configs, should this just revert to the old code? not sure what the overhead vs/ maintainablilty tradeoff looks like here, but this version replaces the old code for simplicity's sake right now

      Ok, now onto the changes themselves:

      -localAggregations needs to be changed from ThreadLocal to InheritableThreadLocal, to allow the spawned threads to see any aggregations loaded by the main thread. Without this, the first query to need an aggregation fails with the "Failed to load aggregations after 10 passes" error.
      -sharedAggregations changed from HashMap to ConcurrentHashMap, and synchronization removed to prevent the spawned threads from blocking each other on reads.

      -currentOrdinal changed to a ThreadLocal to allow for mulitple threads to iterate over a named set in parallel

      -add helper methods parallel[Iterable|List]To[Map|List], which allow calling code to specify a Callable that will operate on a batched sublist of the Iterable|List in question, and return an aggregated Map|List of the items returned by the callable. the ListTo* versions are just slightly optimized to call list.subList() instead of building up a separate list to pass a batch to the callable. Currently hardcoded to 6 concurrent threads (TODO #1). Right now, this spawns off an ExecutorService & shuts it down within the context of the parallel* call; in theory this could be done at the query execution level, but I'm concerned that, depending on the exact query, one parallel-executing evaluation could queue up (and wait for) another one, quickly exhausting the available threads in the ExecutorService/ThreadPool.
      -use parallelIterableToMap to implement evaluateMembers() with batch size of 10,000 (TODO #2)

      -use the FunUtil.parallel*toList functions to implement the filtering with batch size of 100,000 (TODO #2)

      -replace internal structures with concurrent collections versions of the same, to make the CellReader "threadsafe" – may have missed things in here, so please examine closely!
      -Batch#measuresList becomes a CopyOnWriteArrayList – might be nice if there's a more read/write lock style of list somewhere rather than a copy-everything?
      Batch#valueSets becomes a ConcurrentHashMap of key>key rather than a HashSet, as there is no ConcurrentHashSet, and CopyOnWriteArraySet seemed pretty slow for this usage... would be nicer if there was! – leads to a bunch of .keySet() calls in iterations over valueSets

      PinSetImpl extends CopyOnWriteArraySet instead of HashSet, again for threadsafety. similar to FastBatchingCellReader#Batch#valueSets, would be nicer if there was a ConcurrentHashSet! probably possible to change this to be a key>key ConcurrentHashMap, but I haven't tried yet.

      All mondrian's unit tests currently pass. In addition, In my tests, I'm seeing up to a 20% speedup on queries of the following form (where [Dimension].[Level].Members can be large):

              MEMBER [Dimension].[Aggregated] AS IIF(COUNT([Available]) =
      COUNT([Requested]), [Dimension].[All Dimension],

      { [Dimension].[Special], [Dimension].[Unattributed], [Dimension].[OtherSpecial] }

              MEMBER [Dimension].[Special] AS [Dimension].Null
              MEMBER [Dimension].[Unattributed] AS [Dimension].Null
              MEMBER [Dimension].[OtherSpecial] AS [Dimension].Null
              MEMBER [Measures].[total_available_count] AS
      Format(COUNT([Available]), "######")
              MEMBER [Measures].[total_result_count] AS
      Format(COUNT([Requested]), "######")
              MEMBER [Measures].[id] AS [Dimension].CurrentMember.Name
              MEMBER [Time].[Base] AS [Time].[Epoch].[1288483200]
              MEMBER [Time].[Slicer] AS [Time].[Epoch].[1289088000]
              MEMBER [OtherDimension].[Slicer] AS [OtherDimension].[1]
              SET [Available] AS FILTER([Dimension].[Level].Members,
      CAST([Dimension].CurrentMember.Key AS NUMERIC) > 0)
              SET [Filtered] AS CustomFilteringUDF([Available],
              SET [Requested] AS [Filtered]

      { [Dimension].[Aggregated], [Dimension].[Special], [Dimension].[Unattributed], [Dimension].[OtherSpecial], TopCount([Requested], 20, [Measures].[revenue]) }

      ON ROWS,

      { [Measures].[id], [Measures].[total_available_count], [Measures].[total_result_count], [Measures].[profit], [Measures].[roi], [Measures].[cost], [Measures].[profit_margin], [Measures].[revenue] }


      {(        [Time].[Slicer],        [OtherDimension].[Slicer] )}

      (also, attaching both a packChange and a patch/diff, as I've had issues with packChange capturing the right set of files before-- either should be sufficient though.)




            jbarnett Joe Barnett
            1 Vote for this issue
            4 Start watching this issue