# How to perform experiments with damast

One of the main motivation of this library is to facilitate the development and evaluation of Machine-Learning models.
Hence, 'damast' offers a mini-framework and API to simplify the development of machine learning models.
This requires a 'hopefully' minimal set of constraints - as what is envisioned by the authors of this library - so that researchers and ML-starters have lower entry barrier into running machine learning.

That being said, we give an example here on a minimal experiment.

In [None]:
!pip install damast

In [None]:
# The list of modules used for this example
from collections import OrderedDict
from typing import List, Optional
from pathlib import Path

# For performance reasons the underlying data handling library is 'polars'
import polars

# The current development has focused on a keras+tensorflow based Machine Learning setup
import tensorflow as tf
import keras

import damast
from damast.core.transformations import CycleTransformer
# You can define custom units to annotate data, but otherwise astropy units will be used
from damast.core.units import units
# Data ranges can be defined as list, or marked with a lower-bound (min), upper-bound (max)
from damast.core.datarange import MinMax, CyclicMinMax
# The AnnotatedDataFrame combines a data specification and actual 'numeric' data
from damast.core.dataframe import AnnotatedDataFrame
# An AnnotatedDataFrame contains MetaData to describe the data
from damast.core.metadata import MetaData

# Data processing is centered around a DataProcessingPipeline which consists of multiple PipelineElement being run
# in sequence
from damast.core.dataprocessing import DataProcessingPipeline, PipelineElement


# To allow the machine learning process to be simplified, we offer a 'BaseModel' that should be inherited from
from damast.ml.models.base import BaseModel

# The experiment setup
from damast.ml.experiments import Experiment, LearningTask, ForecastTask, ModelInstanceDescription, TrainingParameters

# Allow to generate data for this particular example that uses data from the maritime domain
from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec

To illustrate a full experiment, we require a data processing pipeline to be set up. This pipeline will extract all those features, that are necessary to train the Machine Learning model(s). The pipeline will run transformations on the data, as provided here by a LatLonTransformer.

In [None]:
class LatLonTransformer(PipelineElement):
    """
    The LatLonTransformer will consume a lat(itude) and a lon(gitude) column and perform
    cyclic normalization. It will add four columns to a dataframe, namely lat_x, lat_y, lon_x, lon_y.
    """
    @damast.core.describe("Lat/Lon cyclic transformation")
    @damast.core.input({
        "lat": {"unit": units.deg},
        "lon": {"unit": units.deg}
    })
    @damast.core.output({
        "lat_x": {"value_range": MinMax(-1.0, 1.0)},
        "lat_y": {"value_range": MinMax(-1.0, 1.0)},
        "lon_x": {"value_range": MinMax(-1.0, 1.0)},
        "lon_y": {"value_range": MinMax(-1.0, 1.0)}
    })
    def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame:        
        lat_cyclic_transformer = CycleTransformer(features=["lat"], n=180.0)
        lon_cyclic_transformer = CycleTransformer(features=["lon"], n=360.0)

        _df = lat_cyclic_transformer.fit_transform(df=df)
        _df = lon_cyclic_transformer.fit_transform(df=_df)
        df._dataframe = _df
        return df


The selected example model here, will require the above listed features as input - and provide a likewise-shaped output (for illustration purposes).

In [None]:
class Baseline(BaseModel):
    """
    This is a placeholder ML model that illustrates the minimal
    requirements.
    """
    input_specs = OrderedDict({
        "lat_x": {"length": 1},
        "lat_y": {"length": 1},
        "lon_x": {"length": 1},
        "lon_y": {"length": 1}
    })

    output_specs = OrderedDict({
        "lat_x": {"length": 1},
        "lat_y": {"length": 1},
        "lon_x": {"length": 1},
        "lon_y": {"length": 1}
    })

    def __init__(self,
                 name: str,
                 features: List[str],
                 timeline_length: int,
                 output_dir: Path,
                 targets: Optional[List[str]] = None):
        self.timeline_length = timeline_length

        super().__init__(name=name,
                         output_dir=output_dir,
                         features=features,
                         targets=targets)

    def _init_model(self):
        features_width = len(self.features)
        targets_width = len(self.targets)

        self.model = tf.keras.models.Sequential([
            keras.layers.Flatten(input_shape=[self.timeline_length, features_width]),
            keras.layers.Dense(targets_width)
        ])


class BaselineA(Baseline):
    """Placeholder Model to illustrate the use of multiple models"""
    pass


class BaselineB(Baseline):
    """Placeholder Model to illustrate the use of multiple models"""
    pass

This example operates with synthetic, i.e. automatically generated data which is specific to the maritime domain.
You will see a previous of the first 10 columns when running the following cell.

In [None]:
import tempfile
import shutil

tmp_path = Path(tempfile.gettempdir()) / "test-output-ais_preparation"
if tmp_path.exists():
    shutil.rmtree(tmp_path)
tmp_path.mkdir(parents=True)

pipeline = DataProcessingPipeline(name="ais_preparation",
                                  base_dir=tmp_path) \
    .add("cyclic", LatLonTransformer())
features = ["lat_x", "lat_y", "lon_x", "lon_y"]

data = AISTestData(1000)
adf = AnnotatedDataFrame(dataframe=data.dataframe,
                         metadata=MetaData.from_dict(data=AISTestDataSpec.copy()))
dataset_filename = tmp_path / "test.hdf5"
adf.save(filename=dataset_filename)

adf.head(10)

A central idea to the experiment framework lies in providing a means for a consistent input and output to perform experiments. Hence, define a LearningTask (here: ForecastTask) that collects the learning parameters that define this task.

In [None]:
forecast_task = ForecastTask(
    label="forecast-ais-short-sequence",
    pipeline=pipeline, features=features,
    models=[ModelInstanceDescription(BaselineA, {}),
            ModelInstanceDescription(BaselineB, {}),
            ],
    group_column="mmsi",
    sequence_length=5,
    forecast_length=1,
    training_parameters=TrainingParameters(epochs=1,
                                           validation_steps=1)
)

The actual experimentation takes a single LearningTask as input and it will output

In [None]:
experiment = Experiment(learning_task=forecast_task,
                        input_data=dataset_filename,
                        output_directory=tmp_path)
report = experiment.run()
    
with open(report, "r") as f:
    print(f.read())        

The outputs of an experiment are collected inside a dedicated (timestamped) folder. This folder will also contain a subfolder for each of the parametrized models that defines a LearningTask.

In [None]:
last_experiments = sorted([str(f) for f in Path(experiment.output_directory).glob(pattern="*") if f.is_dir()])
print("Last experiment in: ", last_experiments[-1])

experiment_folder = sorted([str(f) for f in Path(last_experiments[-1]).glob(pattern="*")])
file_listing = '\n'.join(experiment_folder)
print("Contents:\n")
print(file_listing)


Once the training is running it can be monitored using tensorboard:
```
    tensorboard --logdir=<experiments-directory>
```