diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index 860be68f..90492c13 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -78,12 +78,12 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u return nil, update.ErrOperationAlreadyInProgress } eventsCh := make(chan update.Event, 100) - go func() { defer s.lock.Unlock() defer close(eventsCh) eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") + eventsCh <- update.NewProgressEvent(0.0) stream := runUpgradeCommand(ctx, names) for line, err := range stream { if err != nil { @@ -92,16 +92,17 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u } eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting") + eventsCh <- update.NewProgressEvent(80.0) for line, err := range runAptCleanCommand(ctx) { if err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)) return } + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } - + eventsCh <- update.NewProgressEvent(85.0) eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....") streamCleanup := cleanupDockerContainers(ctx) for line, err := range streamCleanup { @@ -113,6 +114,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } } + eventsCh <- update.NewProgressEvent(90.0) // TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli. // Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600 @@ -128,7 +130,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line) } eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...") - + eventsCh <- update.NewProgressEvent(100.0) err := restartServices(ctx) if err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)) diff --git a/internal/update/arduino/arduino.go b/internal/update/arduino/arduino.go index 0c4e5d29..b6757a47 100644 --- a/internal/update/arduino/arduino.go +++ b/internal/update/arduino/arduino.go @@ -131,24 +131,44 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st } eventsCh := make(chan update.Event, 100) - downloadProgressCB := func(curr *rpc.DownloadProgress) { - data := helpers.ArduinoCLIDownloadProgressToString(curr) - slog.Debug("Download progress", slog.String("download_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - taskProgressCB := func(msg *rpc.TaskProgress) { - data := helpers.ArduinoCLITaskProgressToString(msg) - slog.Debug("Task progress", slog.String("task_progress", data)) - eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) - } - go func() { defer a.lock.Unlock() defer close(eventsCh) + const indexBase float32 = 0.0 + const indexWeight float32 = 30.0 + const upgradeBase float32 = 30.0 + const upgradeWeight float32 = 60.0 + + makeDownloadProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.DownloadProgress) { + return func(curr *rpc.DownloadProgress) { + data := helpers.ArduinoCLIDownloadProgressToString(curr) + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) + if updateInfo := curr.GetUpdate(); updateInfo != nil { + if updateInfo.GetTotalSize() <= 0 { + return + } + localProgress := (float32(updateInfo.GetDownloaded()) / float32(updateInfo.GetTotalSize())) * 100.0 + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- update.NewProgressEvent(totalArduinoProgress) + } + } + } + makeTaskProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.TaskProgress) { + return func(msg *rpc.TaskProgress) { + data := helpers.ArduinoCLITaskProgressToString(msg) + eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data) + if !msg.GetCompleted() { + localProgress := msg.GetPercent() + totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight + eventsCh <- update.NewProgressEvent(totalArduinoProgress) + } + } + } + eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting") - logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli + logrus.SetLevel(logrus.ErrorLevel) srv := commands.NewArduinoCoreServer() if err := setConfig(ctx, srv); err != nil { @@ -172,21 +192,28 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }() { - stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB) + updateIndexProgressCB := makeDownloadProgressCallback(indexBase, indexWeight) + stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, updateIndexProgressCB) if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)) return } + + eventsCh <- update.NewProgressEvent(indexBase + indexWeight) + if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil { eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)) return } } + platformDownloadCB := makeDownloadProgressCallback(upgradeBase, upgradeWeight) + platformTaskCB := makeTaskProgressCallback(upgradeBase, upgradeWeight) + stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ) if err := srv.PlatformUpgrade( &rpc.PlatformUpgradeRequest{ @@ -218,8 +245,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st }, commands.PlatformInstallStreamResponseToCallbackFunction( ctx, - downloadProgressCB, - taskProgressCB, + platformDownloadCB, + platformTaskCB, ), ) if err != nil { @@ -247,6 +274,7 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)) return } + eventsCh <- update.NewProgressEvent(100.0) }() return eventsCh, nil diff --git a/internal/update/event.go b/internal/update/event.go index 2aac04ee..d3e467cf 100644 --- a/internal/update/event.go +++ b/internal/update/event.go @@ -15,7 +15,11 @@ package update -import "go.bug.st/f" +import ( + "fmt" + + "go.bug.st/f" +) // EventType defines the type of upgrade event. type EventType int @@ -24,16 +28,17 @@ const ( UpgradeLineEvent EventType = iota StartEvent RestartEvent + ProgressEvent DoneEvent ErrorEvent ) // Event represents a single event in the upgrade process. type Event struct { - Type EventType - - data string - err error // error field for error events + Type EventType + Progress float32 + data string + err error // error field for error events } func (t EventType) String() string { @@ -44,6 +49,8 @@ func (t EventType) String() string { return "restarting" case StartEvent: return "starting" + case ProgressEvent: + return "progress" case DoneEvent: return "done" case ErrorEvent: @@ -60,6 +67,14 @@ func NewDataEvent(t EventType, data string) Event { } } +func NewProgressEvent(progress float32) Event { + return Event{ + Type: ProgressEvent, + data: fmt.Sprintf("%.2f", progress), + Progress: progress, + } +} + func NewErrorEvent(err error) Event { return Event{ Type: ErrorEvent, diff --git a/internal/update/update.go b/internal/update/update.go index 7e3bc99c..3e722526 100644 --- a/internal/update/update.go +++ b/internal/update/update.go @@ -136,24 +136,37 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage) // update of the cores we will end up with inconsistent state, or // we need to re run the upgrade because the orchestrator interrupted // in the middle the upgrade of the cores. + + const arduinoWeight float32 = 20.0 + const aptWeight float32 = 80.0 + arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform) if err != nil { m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err))) return } for e := range arduinoEvents { - m.broadcast(e) + if e.Type == ProgressEvent { + globalProgress := (e.Progress / 100.0) * arduinoWeight + m.broadcast(NewProgressEvent(globalProgress)) + } else { + m.broadcast(e) + } } - aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs) if err != nil { m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err))) return } for e := range aptEvents { - m.broadcast(e) + if e.Type == ProgressEvent { + globalProgress := arduinoWeight + (e.Progress/100.0)*aptWeight + m.broadcast(NewProgressEvent(globalProgress)) + } else { + m.broadcast(e) + } } - + m.broadcast(NewProgressEvent(100.0)) m.broadcast(NewDataEvent(DoneEvent, "Update completed")) }() return nil