# Copyright 2019-2020 The Lux Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from lux.vislib.altair.AltairRenderer import AltairRenderer
from lux.utils.utils import check_import_lux_widget
from typing import List, Union, Callable, Dict
from lux.vis.Vis import Vis
from lux.vis.Clause import Clause
import warnings
import lux
[docs]class VisList:
"""VisList is a list of Vis objects."""
[docs] def __init__(self, input_lst: Union[List[Vis], List[Clause]], source=None):
# Overloaded Constructor
self._source = source
self._input_lst = input_lst
if len(input_lst) > 0:
if self._is_vis_input():
self._collection = input_lst
self._intent = []
else:
self._intent = input_lst
self._collection = []
else:
self._collection = []
self._intent = []
self._widget = None
self.refresh_source(self._source)
warnings.formatwarning = lux.warning_format
@property
def intent(self):
return self._intent
@intent.setter
def intent(self, intent: List[Clause]) -> None:
self.set_intent(intent)
[docs] def set_intent(self, intent: List[Clause]) -> None:
"""
Sets the intent of the VisList and refresh the source based on the new clause
Parameters
----------
intent : List[Clause]
Query specifying the desired VisList
"""
self._intent = intent
self.refresh_source(self._source)
@property
def exported(self):
"""
Get selected visualizations as exported Vis List
Notes
-----
Convert the _selectedVisIdxs dictionary into a programmable VisList
Example _selectedVisIdxs :
{'Vis List': [0, 2]}
Returns
-------
VisList
return a VisList of selected visualizations. -> VisList(v1, v2...)
"""
if not hasattr(self, "widget"):
warnings.warn(
"\nNo widget attached to the VisList."
"Please assign VisList to an output variable.\n"
"See more: https://lux-api.readthedocs.io/en/latest/source/guide/FAQ.html#troubleshooting-tips",
stacklevel=2,
)
return []
exported_vis_lst = self._widget._selectedVisIdxs
if exported_vis_lst == {}:
warnings.warn(
"\nNo visualization selected to export.\n"
"See more: https://lux-api.readthedocs.io/en/latest/source/guide/FAQ.html#troubleshooting-tips",
stacklevel=2,
)
return []
else:
exported_vis = VisList(list(map(self.__getitem__, exported_vis_lst["Vis List"])))
return exported_vis
[docs] def remove_duplicates(self) -> None:
"""
Removes duplicate visualizations in VisList
"""
self._collection = list(set(self._collection))
[docs] def remove_index(self, index):
self._collection.pop(index)
def _is_vis_input(self):
if type(self._input_lst[0]) == Vis:
return True
elif type(self._input_lst[0]) == Clause:
return False
def __getitem__(self, key):
return self._collection[key]
def __setitem__(self, key, value):
self._collection[key] = value
def __len__(self):
return len(self._collection)
def __repr__(self):
if len(self._collection) == 0:
return str(self._input_lst)
x_channel = ""
y_channel = ""
largest_mark = 0
largest_filter = 0
# finds longest x attribute among all visualizations
for vis in self._collection:
filter_intents = None
for clause in vis._inferred_intent:
attr = str(clause.attribute)
if clause.value != "":
filter_intents = clause
if clause.aggregation != "" and clause.aggregation is not None:
attribute = clause._aggregation_name.upper() + f"({attr})"
elif clause.bin_size > 0:
attribute = f"BIN({attr})"
else:
attribute = attr
attribute = str(attribute)
if clause.channel == "x" and len(x_channel) < len(attribute):
x_channel = attribute
if clause.channel == "y" and len(y_channel) < len(attribute):
y_channel = attribute
if len(vis.mark) > largest_mark:
largest_mark = len(vis.mark)
if (
filter_intents
and len(str(filter_intents.value)) + len(str(filter_intents.attribute)) > largest_filter
):
largest_filter = len(str(filter_intents.value)) + len(str(filter_intents.attribute))
vis_repr = []
largest_x_length = len(x_channel)
largest_y_length = len(y_channel)
# pads the shorter visualizations with spaces before the y attribute
for vis in self._collection:
filter_intents = None
x_channel = ""
y_channel = ""
additional_channels = []
for clause in vis._inferred_intent:
attr = str(clause.attribute)
if clause.value != "":
filter_intents = clause
if clause.aggregation != "" and clause.aggregation is not None and vis.mark != "scatter":
attribute = clause._aggregation_name.upper() + f"({attr})"
elif clause.bin_size > 0:
attribute = f"BIN({attr})"
else:
attribute = attr
if clause.channel == "x":
x_channel = attribute.ljust(largest_x_length)
elif clause.channel == "y":
y_channel = attribute
elif clause.channel != "":
additional_channels.append([clause.channel, attribute])
if filter_intents:
y_channel = y_channel.ljust(largest_y_length)
elif largest_filter != 0:
y_channel = y_channel.ljust(largest_y_length + largest_filter + 9)
else:
y_channel = y_channel.ljust(largest_y_length + largest_filter)
if x_channel != "":
x_channel = "x: " + x_channel + ", "
if y_channel != "":
y_channel = "y: " + y_channel
aligned_mark = vis.mark.ljust(largest_mark)
str_additional_channels = ""
for channel in additional_channels:
str_additional_channels += ", " + channel[0] + ": " + channel[1]
if filter_intents:
aligned_filter = (
" -- ["
+ str(filter_intents.attribute)
+ filter_intents.filter_op
+ str(filter_intents.value)
+ "]"
)
aligned_filter = aligned_filter.ljust(largest_filter + 8)
vis_repr.append(
f" <Vis ({x_channel}{y_channel}{str_additional_channels} {aligned_filter}) mark: {aligned_mark}, score: {vis.score:.2f} >"
)
else:
vis_repr.append(
f" <Vis ({x_channel}{y_channel}{str_additional_channels}) mark: {aligned_mark}, score: {vis.score:.2f} >"
)
return "[" + ",\n".join(vis_repr)[1:] + "]"
[docs] def map(self, function):
# generalized way of applying a function to each element
return map(function, self._collection)
[docs] def get(self, field_name):
# Get the value of the field for all objects in the collection
def get_field(d_obj):
field_val = getattr(d_obj, field_name)
# Might want to write catch error if key not in field
return field_val
return self.map(get_field)
[docs] def set(self, field_name, field_val):
return NotImplemented
[docs] def sort(self, remove_invalid=True, descending=True):
# remove the items that have invalid (-1) score
if remove_invalid:
self._collection = list(filter(lambda x: x.score != -1, self._collection))
if lux.config.sort == "none":
return
elif lux.config.sort == "ascending":
descending = False
elif lux.config.sort == "descending":
descending = True
# sort in-place by “score” by default if available, otherwise user-specified field to sort by
self._collection.sort(key=lambda x: x.score, reverse=descending)
[docs] def showK(self):
k = lux.config.topk
if k == False:
return self
elif isinstance(k, int):
k = abs(k)
return VisList(self._collection[:k])
[docs] def normalize_score(self, invert_order=False):
max_score = max(list(self.get("score")))
for dobj in self._collection:
dobj.score = dobj.score / max_score
if invert_order:
dobj.score = 1 - dobj.score
def _ipython_display_(self):
self._widget = None
from IPython.display import display
from lux.core.frame import LuxDataFrame
recommendation = {
"action": "Vis List",
"description": "Shows a vis list defined by the intent",
}
recommendation["collection"] = self._collection
check_import_lux_widget()
import luxwidget
recJSON = LuxDataFrame.rec_to_JSON([recommendation])
self._widget = luxwidget.LuxWidget(
currentVis={},
recommendations=recJSON,
intent="",
message="",
config={"plottingScale": lux.config.plotting_scale},
)
display(self._widget)
[docs] def refresh_source(self, ldf):
"""
Loading the source into the visualizations in the VisList, then populating each visualization
based on the new source data, effectively "materializing" the visualization collection.
Parameters
----------
ldf : LuxDataframe
Input Dataframe to be attached to the VisList
Returns
-------
VisList
Complete VisList with fully-specified fields
See Also
--------
lux.vis.Vis.refresh_source
Note
----
Function derives a new _inferred_intent by instantiating the intent specification on the new data
"""
if ldf is not None:
from lux.processor.Parser import Parser
from lux.processor.Validator import Validator
from lux.processor.Compiler import Compiler
self._source = ldf
self._source.maintain_metadata()
if len(self._input_lst) > 0:
approx = False
if self._is_vis_input():
compiled_collection = []
for vis in self._collection:
vis._inferred_intent = Parser.parse(vis._intent)
Validator.validate_intent(vis._inferred_intent, ldf)
Compiler.compile_vis(ldf, vis)
compiled_collection.append(vis)
self._collection = compiled_collection
else:
self._inferred_intent = Parser.parse(self._intent)
Validator.validate_intent(self._inferred_intent, ldf)
self._collection = Compiler.compile_intent(ldf, self._inferred_intent)
# Early pruning determination criteria
width_criteria = len(self._collection) > (lux.config.topk + 3)
length_criteria = len(ldf) > lux.config.early_pruning_sample_start
if lux.config.early_pruning and width_criteria and length_criteria:
# print("Apply approx to this VisList")
ldf._message.add_unique(
"Large search space detected: Lux is approximating the interestingness of recommended visualizations.",
priority=1,
)
approx = True
lux.config.executor.execute(self._collection, ldf, approx=approx)