Source code for lux.processor.Compiler

#  Copyright 2019-2020 The Lux Authors.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from lux.vis import Clause
from typing import List, Dict, Union
from lux.vis.Vis import Vis
from lux.core.frame import LuxDataFrame
from lux.vis.VisList import VisList
from lux.utils import date_utils
from lux.utils import utils
import pandas as pd
import numpy as np
import warnings
import lux


[docs]class Compiler: """ Given a intent with underspecified inputs, compile the intent into fully specified visualizations for visualization. """
[docs] def __init__(self): self.name = "Compiler" warnings.formatwarning = lux.warning_format
def __repr__(self): return f"<Compiler>"
[docs] @staticmethod def compile_vis(ldf: LuxDataFrame, vis: Vis) -> Vis: """ Root method for compiling visualizations Parameters ---------- ldf : LuxDataFrame vis : Vis Returns ------- Vis Compiled Vis object """ if vis: # autofill data type/model information Compiler.populate_data_type_model(ldf, [vis]) # remove invalid visualizations from collection Compiler.remove_all_invalid([vis]) # autofill viz related information Compiler.determine_encoding(ldf, vis) ldf._compiled = True return vis
[docs] @staticmethod def compile_intent(ldf: LuxDataFrame, _inferred_intent: List[Clause]) -> VisList: """ Compiles input specifications in the intent of the ldf into a collection of lux.vis objects for visualization. 1) Enumerate a collection of visualizations interested by the user to generate a vis list 2) Expand underspecified specifications(lux.Clause) for each of the generated visualizations. 3) Determine encoding properties for each vis Parameters ---------- ldf : lux.core.frame LuxDataFrame with underspecified intent. vis_collection : list[lux.vis.Vis] empty list that will be populated with specified lux.Vis objects. Returns ------- vis_collection: list[lux.Vis] vis list with compiled lux.Vis objects. """ if _inferred_intent: vis_collection = Compiler.enumerate_collection(_inferred_intent, ldf) # autofill data type/model information Compiler.populate_data_type_model(ldf, vis_collection) # remove invalid visualizations from collection if len(vis_collection) >= 1: vis_collection = Compiler.remove_all_invalid(vis_collection) for vis in vis_collection: # autofill viz related information Compiler.determine_encoding(ldf, vis) ldf._compiled = True return vis_collection
[docs] @staticmethod def enumerate_collection(_inferred_intent: List[Clause], ldf: LuxDataFrame) -> VisList: """ Given specifications that have been expanded thorught populateOptions, recursively iterate over the resulting list combinations to generate a vis list. Parameters ---------- ldf : lux.core.frame LuxDataFrame with underspecified intent. Returns ------- VisList: list[lux.Vis] vis list with compiled lux.Vis objects. """ import copy intent = Compiler.populate_wildcard_options(_inferred_intent, ldf) attributes = intent["attributes"] filters = intent["filters"] if len(attributes) == 0 and len(filters) > 0: return [] collection = [] # TODO: generate combinations of column attributes recursively by continuing to accumulate attributes for len(colAtrr) times def combine(col_attrs, accum): last = len(col_attrs) == 1 n = len(col_attrs[0]) for i in range(n): column_list = copy.deepcopy(accum + [col_attrs[0][i]]) if last: # if we have filters, generate combinations for each row. if len(filters) > 0: for row in filters: _inferred_intent = copy.deepcopy(column_list + [row]) vis = Vis(_inferred_intent) collection.append(vis) else: vis = Vis(column_list) collection.append(vis) else: combine(col_attrs[1:], column_list) combine(attributes, []) return VisList(collection)
[docs] @staticmethod def populate_data_type_model(ldf, vlist): """ Given a underspecified Clause, populate the data_type and data_model information accordingly Parameters ---------- ldf : lux.core.frame LuxDataFrame with underspecified intent vis_collection : list[lux.vis.Vis] List of lux.Vis objects that will have their underspecified Clause details filled out. """ # TODO: copy might not be neccesary from lux.utils.date_utils import is_datetime_string data_model_lookup = lux.config.executor.compute_data_model_lookup(ldf.data_type) for vis in vlist: for clause in vis._inferred_intent: if clause.description == "?": clause.description = "" # TODO: Note that "and not is_datetime_string(clause.attribute))" is a temporary hack and breaks the `test_row_column_group` example # and not is_datetime_string(clause.attribute): if clause.attribute != "" and clause.attribute != "Record": if clause.data_type == "": clause.data_type = ldf.data_type[clause.attribute] if clause.data_type == "id": clause.data_type = "nominal" if clause.data_model == "": clause.data_model = data_model_lookup[clause.attribute] if clause.value != "": # If user provided title for Vis, then don't override. if vis.title == "": if isinstance(clause.value, np.datetime64): chart_title = date_utils.date_formatter(clause.value, ldf) else: chart_title = clause.value vis.title = f"{clause.attribute} {clause.filter_op} {chart_title}" vis._ndim = 0 vis._nmsr = 0 for clause in vis._inferred_intent: if clause.value == "": if clause.data_model == "dimension": vis._ndim += 1 elif clause.data_model == "measure" and clause.attribute != "Record": vis._nmsr += 1
[docs] @staticmethod def remove_all_invalid(vis_collection: VisList) -> VisList: """ Given an expanded vis list, remove all visualizations that are invalid. Currently, the invalid visualizations are ones that do not contain: - two of the same attribute, - more than two temporal attributes, - no overlapping attributes (same filter attribute and visualized attribute), - more than 1 temporal attribute with 2 or more measures Parameters ---------- vis_collection : list[lux.vis.Vis] empty list that will be populated with specified lux.Vis objects. Returns ------- lux.vis.VisList vis list with compiled lux.Vis objects. """ new_vc = [] for vis in vis_collection: num_temporal_specs = 0 attribute_set = set() for clause in vis._inferred_intent: attribute_set.add(clause.attribute) if clause.data_type == "temporal": num_temporal_specs += 1 all_distinct_specs = 0 == len(vis._inferred_intent) - len(attribute_set) if ( num_temporal_specs < 2 and all_distinct_specs and not (vis._nmsr == 2 and num_temporal_specs == 1) ): new_vc.append(vis) # else: # warnings.warn("\nThere is more than one duplicate attribute specified in the intent.\nPlease check your intent specification again.") return VisList(new_vc)
[docs] @staticmethod def determine_encoding(ldf: LuxDataFrame, vis: Vis): """ Populates Vis with the appropriate mark type and channel information based on ShowMe logic Currently support up to 3 dimensions or measures Parameters ---------- ldf : lux.core.frame LuxDataFrame with underspecified intent vis : lux.vis.Vis Returns ------- None Notes ----- Implementing automatic encoding from Tableau's VizQL Mackinlay, J. D., Hanrahan, P., & Stolte, C. (2007). Show Me: Automatic presentation for visual analysis. IEEE Transactions on Visualization and Computer Graphics, 13(6), 1137–1144. https://doi.org/10.1109/TVCG.2007.70594 """ # Count number of measures and dimensions ndim = vis._ndim nmsr = vis._nmsr # preserve to add back to _inferred_intent later filters = utils.get_filter_specs(vis._inferred_intent) # Helper function (TODO: Move this into utils) def line_or_bar(ldf, dimension: Clause, measure: Clause): dim_type = dimension.data_type # If no aggregation function is specified, then default as average if measure.aggregation == "": measure.set_aggregation("mean") if dim_type == "temporal" or dim_type == "oridinal": return "line", {"x": dimension, "y": measure} else: # unordered categorical # if cardinality large than 5 then sort bars if ldf.cardinality[dimension.attribute] > 5: dimension.sort = "ascending" return "bar", {"x": measure, "y": dimension} # ShowMe logic + additional heuristics # count_col = Clause( attribute="count()", data_model="measure") count_col = Clause( attribute="Record", aggregation="count", data_model="measure", data_type="quantitative", ) auto_channel = {} if ndim == 0 and nmsr == 1: # Histogram with Count measure = vis.get_attr_by_data_model("measure", exclude_record=True)[0] if len(vis.get_attr_by_attr_name("Record")) < 0: vis._inferred_intent.append(count_col) # If no bin specified, then default as 10 if measure.bin_size == 0: measure.bin_size = 10 auto_channel = {"x": measure, "y": count_col} vis._mark = "histogram" elif ndim == 1 and (nmsr == 0 or nmsr == 1): # Line or Bar Chart if nmsr == 0: vis._inferred_intent.append(count_col) dimension = vis.get_attr_by_data_model("dimension")[0] measure = vis.get_attr_by_data_model("measure")[0] vis._mark, auto_channel = line_or_bar(ldf, dimension, measure) elif ndim == 2 and (nmsr == 0 or nmsr == 1): # Line or Bar chart broken down by the dimension dimensions = vis.get_attr_by_data_model("dimension") d1 = dimensions[0] d2 = dimensions[1] if ldf.cardinality[d1.attribute] < ldf.cardinality[d2.attribute]: # d1.channel = "color" vis.remove_column_from_spec(d1.attribute) dimension = d2 color_attr = d1 else: # if same attribute then remove_column_from_spec will remove both dims, we only want to remove one if d1.attribute == d2.attribute: vis._inferred_intent.pop(0) else: vis.remove_column_from_spec(d2.attribute) dimension = d1 color_attr = d2 # Colored Bar/Line chart with Count as default measure if not ldf.pre_aggregated: if nmsr == 0 and not ldf.pre_aggregated: vis._inferred_intent.append(count_col) measure = vis.get_attr_by_data_model("measure")[0] vis._mark, auto_channel = line_or_bar(ldf, dimension, measure) auto_channel["color"] = color_attr elif ndim == 0 and nmsr == 2: # Scatterplot vis._mark = "scatter" vis._inferred_intent[0].set_aggregation(None) vis._inferred_intent[1].set_aggregation(None) auto_channel = {"x": vis._inferred_intent[0], "y": vis._inferred_intent[1]} elif ndim == 1 and nmsr == 2: # Scatterplot broken down by the dimension measure = vis.get_attr_by_data_model("measure") m1 = measure[0] m2 = measure[1] vis._inferred_intent[0].set_aggregation(None) vis._inferred_intent[1].set_aggregation(None) color_attr = vis.get_attr_by_data_model("dimension")[0] vis.remove_column_from_spec(color_attr) vis._mark = "scatter" auto_channel = {"x": m1, "y": m2, "color": color_attr} elif ndim == 0 and nmsr == 3: # Scatterplot with color vis._mark = "scatter" auto_channel = { "x": vis._inferred_intent[0], "y": vis._inferred_intent[1], "color": vis._inferred_intent[2], } relevant_attributes = [auto_channel[channel].attribute for channel in auto_channel] relevant_min_max = dict( (attr, ldf._min_max[attr]) for attr in relevant_attributes if attr != "Record" and attr in ldf._min_max ) vis._min_max = relevant_min_max if auto_channel != {}: vis = Compiler.enforce_specified_channel(vis, auto_channel) vis._inferred_intent.extend(filters) # add back the preserved filters
[docs] @staticmethod def enforce_specified_channel(vis: Vis, auto_channel: Dict[str, str]): """ Enforces that the channels specified in the Vis by users overrides the showMe autoChannels. Parameters ---------- vis : lux.vis.Vis Input Vis without channel specification. auto_channel : Dict[str,str] Key-value pair in the form [channel: attributeName] specifying the showMe recommended channel location. Returns ------- vis : lux.vis.Vis Vis with channel specification combining both original and auto_channel specification. Raises ------ ValueError Ensures no more than one attribute is placed in the same channel. """ # result of enforcing specified channel will be stored in result_dict result_dict = {} # specified_dict={"x":[],"y":[list of Dobj with y specified as channel]} specified_dict = {} # create a dictionary of specified channels in the given dobj for val in auto_channel.keys(): specified_dict[val] = vis.get_attr_by_channel(val) result_dict[val] = "" # for every element, replace with what's in specified_dict if specified for sVal, sAttr in specified_dict.items(): if len(sAttr) == 1: # if specified in dobj # remove the specified channel from auto_channel (matching by value, since channel key may not be same) for i in list(auto_channel.keys()): # need to ensure that the channel is the same (edge case when duplicate Cols with same attribute name) if ( auto_channel[i].attribute == sAttr[0].attribute and auto_channel[i].channel == sVal ): auto_channel.pop(i) break sAttr[0].channel = sVal result_dict[sVal] = sAttr[0] elif len(sAttr) > 1: raise ValueError( "There should not be more than one attribute specified in the same channel." ) # For the leftover channels that are still unspecified in result_dict, # and the leftovers in the auto_channel specification, # step through them together and fill it automatically. leftover_channels = list(filter(lambda x: result_dict[x] == "", result_dict)) for leftover_channel, leftover_encoding in zip(leftover_channels, auto_channel.values()): leftover_encoding.channel = leftover_channel result_dict[leftover_channel] = leftover_encoding vis._inferred_intent = list(result_dict.values()) return vis
[docs] @staticmethod # def populate_wildcard_options(ldf: LuxDataFrame) -> dict: def populate_wildcard_options(_inferred_intent: List[Clause], ldf: LuxDataFrame) -> dict: """ Given wildcards and constraints in the LuxDataFrame's intent, return the list of available values that satisfies the data_type or data_model constraints. Parameters ---------- ldf : LuxDataFrame LuxDataFrame with row or attributes populated with available wildcard options. Returns ------- intent: Dict[str,list] a dictionary that holds the attributes and filters generated from wildcards and constraints. """ import copy from lux.utils.utils import convert_to_list inverted_data_type = lux.config.executor.invert_data_type(ldf.data_type) data_model = lux.config.executor.compute_data_model(ldf.data_type) intent = {"attributes": [], "filters": []} for clause in _inferred_intent: spec_options = [] if clause.value == "": # attribute if clause.attribute == "?": options = set(list(ldf.columns)) # all attributes if clause.data_type != "": options = options.intersection(set(inverted_data_type[clause.data_type])) if clause.data_model != "": options = options.intersection(set(data_model[clause.data_model])) options = list(options) else: options = convert_to_list(clause.attribute) for optStr in options: if str(optStr) not in clause.exclude: spec_copy = copy.copy(clause) spec_copy.attribute = optStr spec_options.append(spec_copy) intent["attributes"].append(spec_options) else: # filters attr_lst = convert_to_list(clause.attribute) for attr in attr_lst: options = [] if clause.value == "?": options = ldf.unique_values[attr] specInd = _inferred_intent.index(clause) _inferred_intent[specInd] = Clause( attribute=clause.attribute, filter_op="=", value=list(options), ) else: options.extend(convert_to_list(clause.value)) for optStr in options: if str(optStr) not in clause.exclude: spec_copy = copy.copy(clause) spec_copy.attribute = attr spec_copy.value = optStr spec_options.append(spec_copy) intent["filters"].extend(spec_options) return intent