Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modular policies support #105

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions mttl/models/containers/lora_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
BatchSequenceExpertsAndWeightsSelectorOutput,
ExpertsAndWeightsSelectorOutput,
SelectorOutput,
SelectorOutputsContainer,
)
from mttl.models.library.expert import Expert
from mttl.models.modifiers.lora import LoRA, LoRAConfig, SkilledLoRA, SkilledLoRAConfig
Expand Down Expand Up @@ -106,7 +107,13 @@ def _convert_expert_names_to_indices(
indices.append(index)
return indices

def route(self, input, selection, **kwargs):
def route(self, input, selection, add_base_forward=True, **kwargs):

# We compute the base model's forward pass here, to leverage the
# contiguity of the current input (which might be split later)
if add_base_forward:
base_out = self.experts.layer(input).to(input.dtype)

"""Depending on the selection output, we and merge differently."""
from mttl.models.modifiers.lora import SkilledLoRA, SkilledLoRAView

Expand All @@ -116,7 +123,7 @@ def route(self, input, selection, **kwargs):
skilled_lora = SkilledLoRAView.from_loras(
[self.get(module) for module in selection.experts]
)
return SkilledLoRA.parallel_linear_weighted_forward(
module_output = SkilledLoRA.parallel_linear_weighted_forward(
input,
[skilled_lora],
selection.weights,
Expand All @@ -125,7 +132,7 @@ def route(self, input, selection, **kwargs):
)
elif isinstance(selection, BatchExpertsSelectorOutput):
# In this case, we have exactly one expert per example in the batch with no weights
return LoRA.parallel_linear_forward(
module_output = LoRA.parallel_linear_forward(
input, [self.get(module) for module in selection.experts]
)
elif isinstance(
Expand Down Expand Up @@ -212,7 +219,15 @@ def route(self, input, selection, **kwargs):
dim_names=selection.dim_names,
merge_after=self.lora_merge_after,
)
return module_output.view(input.shape[0], input.shape[1], -1)
module_output = module_output.view(input.shape[0], input.shape[1], -1)

if add_base_forward:
if base_out.ndim == 2:
module_output = module_output.squeeze(1)

module_output = base_out + module_output

return module_output

def forward(self, input, **kwargs):
if len(self.experts) > 0:
Expand Down Expand Up @@ -317,7 +332,39 @@ def on_add_expert(self, expert: Expert, action="route", is_default=False) -> Non

self.experts.add_skill(modifier_module)

def route(self, input, selection, **kwargs):
def route(self, input, selection, add_base_forward=True, **kwargs):

# We compute the base model's forward pass here, to leverage the
# contiguity of the current input (which might be split later)
if add_base_forward:
base_out = self.experts.layer(input).to(input.dtype)
else:
base_out = 0.0

if isinstance(selection, SelectorOutputsContainer):
assert add_base_forward
assert base_out.ndim == 3

output = base_out

for selector_output, selector_idx in zip(
selection.selector_outputs, selection.selector_indices
):
selector_route_input = input.index_select(
selection.dim_index, selector_idx
)
mod_out = self.route(
selector_route_input,
selector_output,
add_base_forward=False,
**kwargs,
)

# add the output to the right place
output = output.index_add(selection.dim_index, selector_idx, mod_out)

return output

if isinstance(selection, BatchExpertsSelectorOutput):
# in order to use this container, we need to create one-hot weights for the experts
batch_size = len(selection.experts)
Expand Down Expand Up @@ -347,7 +394,6 @@ def route(self, input, selection, **kwargs):
dim_names=["batch", "experts"],
merge_after=self.lora_merge_after,
)
return module_output
elif (
isinstance(selection, BatchSequenceExpertsAndWeightsSelectorOutput)
or isinstance(selection, BatchExpertsAndWeightsSelectorOutput)
Expand Down Expand Up @@ -383,10 +429,17 @@ def route(self, input, selection, **kwargs):
dim_names=selection.dim_names,
merge_after=self.lora_merge_after,
)
return module_output
else:
raise ValueError("Unknown selection type.")

if add_base_forward:
if base_out.ndim == 2:
module_output = module_output.squeeze(1)

module_output = base_out + module_output

return module_output

def forward(self, input, **kwargs):
if len(self.experts) > 0:
selection = self.selector(input, container=self, **kwargs)
Expand Down
140 changes: 137 additions & 3 deletions mttl/models/containers/selectors/poly_selector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Union

Expand All @@ -15,6 +16,9 @@
SelectorOutput,
forward_with_cache,
)
from mttl.models.containers.selectors.selector_output import (
SequenceSelectorOutputsContainer,
)
from mttl.models.library.expert import ExpertInfo


Expand Down Expand Up @@ -42,6 +46,7 @@ def __init__(self, **kwargs) -> None:
self.n_tasks + 1,
self.config.n_splits,
)
self.module_logits_leading_dims = shape[:-1]
self.module_logits = nn.Parameter(torch.empty(*shape).uniform_(-1e-3, 1e-3))

if self.n_tasks == 0:
Expand All @@ -62,11 +67,12 @@ def _convert_task_names_to_ids(self, task_names: List[str]) -> torch.LongTensor:
],
).to(self.module_logits.device)

def _get_weights(self, task_names: List[str] = None) -> torch.Tensor:
"""Gets the routing weights for the corresponding task names.
def _get_task_ids(self, task_names: List[str] = None) -> torch.LongTensor:
"""Get the task ids for the corresponding task names.

If `task_names` is None, read task names from the routing infos structure.
"""

# Poly used for finetuning a single task
if self.n_tasks == 0:
task_ids = [0]
Expand Down Expand Up @@ -107,6 +113,11 @@ def _get_weights(self, task_names: List[str] = None) -> torch.Tensor:

assert not self.training, "Unknown tasks during training"

return task_ids

def _get_weights(self, task_names: List[str] = None) -> torch.Tensor:
"""Gets the routing weights for the corresponding task names."""
task_ids = self._get_task_ids(task_names)
module_logits = torch.sigmoid(self.module_logits[task_ids])
module_logits = module_logits.view(
module_logits.size(0), self.config.n_splits, self.n_experts
Expand Down Expand Up @@ -148,7 +159,7 @@ def on_add_expert(
):
# we need additional space in the routing to accomodate the incoming expert
self.module_logits.data = torch.empty(
self.n_tasks + 1, self.config.n_splits * self.n_experts
*self.module_logits_leading_dims, self.config.n_splits * self.n_experts
).uniform_(-1e-3, 1e-3)

# Last expert is exactly uniform
Expand Down Expand Up @@ -256,3 +267,126 @@ def on_add_expert(
for name in self.module_logits_dict.keys():
self.module_logits_dict[name].data = torch.ones(1).to(self.device)
self.module_logits_dict[name].data /= len(self.module_logits_dict)


@dataclass
class VectorSelectorConfig(SelectorConfig):
task_names: List[str] = None


@Selector.register("vector_router", VectorSelectorConfig)
class VectorSelector(Selector):
"""
User can specify a distribution over the skills.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.register_buffer("dummy", torch.ones(1))

@forward_with_cache
def forward(self, input, **kwargs) -> ExpertsSplitsAndWeightsSelectorOutput:

routing_infos = self.routing_infos
assert (
routing_infos.skill_mixing_coefs is not None
), "No skill mixing coefs found"

mixing_coefs = routing_infos.skill_mixing_coefs.to(self.dummy.device)

if mixing_coefs.ndim == 1:
mixing_coefs = mixing_coefs.unsqueeze(0)

return ExpertsSplitsAndWeightsSelectorOutput(
SelectorOutput.ALL_EXPERTS, mixing_coefs
)


@dataclass
class MultimodalPolySelectorConfig(PolySelectorConfig):
modality_names: List[str] = None


@Selector.register("multimodal_poly_router", MultimodalPolySelectorConfig)
class MultimodalPolySelector(PolySelector):
pass

"""
Implements routing at a per-layer or per-model level
"""

avg_selector_warned: bool = False

def __init__(self, **kwargs) -> None:
# Call the grandparent constructor
Selector.__init__(self, **kwargs)

self.n_modalities = len(self.config.modality_names)
self.n_tasks = len(self.config.task_names) if self.config.task_names else 0

assert (
self.n_modalities > 0
), "MultimodalPolySelector requires n_modalities >= 1"

# We add an extra task for the default (average) expert if not found
shape = (
self.n_tasks + 1,
self.n_modalities,
self.config.n_splits,
)
self.module_logits_leading_dims = shape[:-1]
self.module_logits = nn.Parameter(torch.empty(*shape).uniform_(-1e-3, 1e-3))

assert self.n_tasks > 0, "No task names found in the config"

def _get_mod_idxs(self) -> torch.LongTensor:
"""Converts modality names to modality ids (indices in the module_logits routing tensor)."""
# we fetch the info from RoutingInfos
routing_info = self.routing_infos
if hasattr(routing_info, "mod_ids_from_name"):
mod_idxs = routing_info.mod_ids_from_name
else:
mod_names = routing_info.modality_names
mod_idxs = defaultdict(list)
for i, mod_name in enumerate(mod_names):
if mod_name: # skip None
mod_id = self.config.modality_names.index(mod_name)
mod_idxs[mod_id].append(i)

for mod_id in mod_idxs.keys():
mod_idxs[mod_id] = torch.LongTensor(mod_idxs[mod_id]).to(
self.module_logits.device
)

# cache the computation for future use
self.routing_infos.mod_ids_from_name = mod_idxs

return mod_idxs

@forward_with_cache
def forward(self, input, **kwargs) -> SequenceSelectorOutputsContainer:
"""Gets the routing weights for the corresponding task names."""
task_ids = self._get_task_ids()
mod_idxs = self._get_mod_idxs()

module_logits = torch.sigmoid(
self.module_logits[task_ids]
) # [bs, n_modalities, n_splits]
module_logits = module_logits.view(
*module_logits.shape[:2], self.config.n_splits, self.n_experts
)
module_weights = module_logits / (module_logits.sum(dim=-1, keepdim=True) + EPS)

# Build the per-modality SelectorOutput
selector_outputs = []
selector_indices = []

for mod_id, mod_seq_indices in mod_idxs.items():
selector_output = BatchExpertsSplitsAndWeightsSelectorOutput(
SelectorOutput.ALL_EXPERTS, module_weights[:, mod_id]
)
selector_outputs.append(selector_output)
selector_indices.append(mod_seq_indices)

return SequenceSelectorOutputsContainer(selector_outputs, selector_indices)
43 changes: 42 additions & 1 deletion mttl/models/containers/selectors/selector_output.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Union
from typing import Dict, List, Union

import torch

Expand Down Expand Up @@ -128,3 +128,44 @@ class BatchSequenceExpertsSplitsAndWeightsSelectorOutput(
@property
def dim_names(self):
return ["batch", "sequence", "splits", "experts"]


@dataclass
class SelectorOutputsContainer:
"""A container for multiple SelectorOutputs."""

selector_outputs: List[SelectorOutput]
selector_indices: Dict[int, torch.Tensor] = None

def __post_init__(self):
# make sure that all selector outputs are of the same type
if len(set(type(so) for so in self.selector_outputs)) != 1:
raise ValueError("All selector outputs should be of the same type.")

@property
def dim_index(self):
raise NotImplementedError(
"dim_index needs to be specified in order to know which dimension to split across SelectorOutputs"
)


class SequenceSelectorOutputsContainer(SelectorOutputsContainer):

def __post_init__(self):
super().__post_init__()
if any(
so.dim_names[self.dim_index] == "sequence" for so in self.selector_outputs
):
raise ValueError(
"All selector outputs should not have 'sequence', as we are splitting across this dimension"
)

# make sure that indices don't overlap across different selector outputs
all_indices = torch.cat(self.selector_indices)
assert all_indices.unique().size(0) == all_indices.size(
0
), "Indices should not overlap across different selector outputs"

@property
def dim_index(self):
return 1
25 changes: 25 additions & 0 deletions mttl/models/expert_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,28 @@ def wrapper_func(model, **kwargs):
return results

return wrapper_func

@classmethod
def wrap_with_context(cls, f):
"""
Decorator method that wraps a general function of a model class
(We may want to wrap other methods than just forward and generate).
Use `create_context` whenever possible
"""
from mttl.models.modifiers.routing import RoutingInfo

@functools.wraps(f)
def wrapper_func(model, *args, **kwargs):

return_context = kwargs.pop("return_context", False)
with cls(model, RoutingInfo.from_batch(args[0])) as context:
results = f(model, *args, **kwargs)
if return_context:
context_returns = {
"routing_infos": context.routing_infos,
"routing_gates": context.routing_gates,
}
return results, context_returns
return results

return wrapper_func
Loading
Loading