Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/update/apt/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u
return nil, update.ErrOperationAlreadyInProgress
}
eventsCh := make(chan update.Event, 100)

go func() {
defer s.lock.Unlock()
defer close(eventsCh)

eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
eventsCh <- update.NewProgressEvent(0.0)
stream := runUpgradeCommand(ctx, names)
for line, err := range stream {
if err != nil {
Expand All @@ -92,16 +92,17 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u
}
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}

eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")
eventsCh <- update.NewProgressEvent(80.0)
for line, err := range runAptCleanCommand(ctx) {
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))
return
}

eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}

eventsCh <- update.NewProgressEvent(85.0)
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")
streamCleanup := cleanupDockerContainers(ctx)
for line, err := range streamCleanup {
Expand All @@ -113,6 +114,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}
}
eventsCh <- update.NewProgressEvent(90.0)

// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
Expand All @@ -128,7 +130,7 @@ func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan u
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
}
eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")

eventsCh <- update.NewProgressEvent(100.0)
err := restartServices(ctx)
if err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))
Expand Down
62 changes: 45 additions & 17 deletions internal/update/arduino/arduino.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,44 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
}
eventsCh := make(chan update.Event, 100)

downloadProgressCB := func(curr *rpc.DownloadProgress) {
data := helpers.ArduinoCLIDownloadProgressToString(curr)
slog.Debug("Download progress", slog.String("download_progress", data))
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
}
taskProgressCB := func(msg *rpc.TaskProgress) {
data := helpers.ArduinoCLITaskProgressToString(msg)
slog.Debug("Task progress", slog.String("task_progress", data))
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
}

go func() {
defer a.lock.Unlock()
defer close(eventsCh)

const indexBase float32 = 0.0
const indexWeight float32 = 30.0
const upgradeBase float32 = 30.0
const upgradeWeight float32 = 60.0

makeDownloadProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.DownloadProgress) {
return func(curr *rpc.DownloadProgress) {
data := helpers.ArduinoCLIDownloadProgressToString(curr)
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
if updateInfo := curr.GetUpdate(); updateInfo != nil {
if updateInfo.GetTotalSize() <= 0 {
return
}
localProgress := (float32(updateInfo.GetDownloaded()) / float32(updateInfo.GetTotalSize())) * 100.0
totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight
eventsCh <- update.NewProgressEvent(totalArduinoProgress)
}
}
}
makeTaskProgressCallback := func(basePercentage, phaseWeight float32) func(*rpc.TaskProgress) {
return func(msg *rpc.TaskProgress) {
data := helpers.ArduinoCLITaskProgressToString(msg)
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
if !msg.GetCompleted() {
localProgress := msg.GetPercent()
totalArduinoProgress := basePercentage + (localProgress/100.0)*phaseWeight
eventsCh <- update.NewProgressEvent(totalArduinoProgress)
}
}
}

eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")

logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
logrus.SetLevel(logrus.ErrorLevel)
srv := commands.NewArduinoCoreServer()

if err := setConfig(ctx, srv); err != nil {
Expand All @@ -172,21 +192,28 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
}()

{
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
updateIndexProgressCB := makeDownloadProgressCallback(indexBase, indexWeight)
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, updateIndexProgressCB)
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))
return
}

eventsCh <- update.NewProgressEvent(indexBase + indexWeight)

if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))
return
}
}

platformDownloadCB := makeDownloadProgressCallback(upgradeBase, upgradeWeight)
platformTaskCB := makeTaskProgressCallback(upgradeBase, upgradeWeight)

stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction(
ctx,
downloadProgressCB,
taskProgressCB,
platformDownloadCB,
platformTaskCB,
)
if err := srv.PlatformUpgrade(
&rpc.PlatformUpgradeRequest{
Expand Down Expand Up @@ -218,8 +245,8 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
},
commands.PlatformInstallStreamResponseToCallbackFunction(
ctx,
downloadProgressCB,
taskProgressCB,
platformDownloadCB,
platformTaskCB,
),
)
if err != nil {
Expand Down Expand Up @@ -247,6 +274,7 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))
return
}
eventsCh <- update.NewProgressEvent(100.0)
}()

return eventsCh, nil
Expand Down
25 changes: 20 additions & 5 deletions internal/update/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

package update

import "go.bug.st/f"
import (
"fmt"

"go.bug.st/f"
)

// EventType defines the type of upgrade event.
type EventType int
Expand All @@ -24,16 +28,17 @@ const (
UpgradeLineEvent EventType = iota
StartEvent
RestartEvent
ProgressEvent
DoneEvent
ErrorEvent
)

// Event represents a single event in the upgrade process.
type Event struct {
Type EventType

data string
err error // error field for error events
Type EventType
Progress float32
data string
err error // error field for error events
}

func (t EventType) String() string {
Expand All @@ -44,6 +49,8 @@ func (t EventType) String() string {
return "restarting"
case StartEvent:
return "starting"
case ProgressEvent:
return "progress"
case DoneEvent:
return "done"
case ErrorEvent:
Expand All @@ -60,6 +67,14 @@ func NewDataEvent(t EventType, data string) Event {
}
}

func NewProgressEvent(progress float32) Event {
return Event{
Type: ProgressEvent,
data: fmt.Sprintf("%.2f", progress),
Progress: progress,
}
}

func NewErrorEvent(err error) Event {
return Event{
Type: ErrorEvent,
Expand Down
21 changes: 17 additions & 4 deletions internal/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,37 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
// update of the cores we will end up with inconsistent state, or
// we need to re run the upgrade because the orchestrator interrupted
// in the middle the upgrade of the cores.

const arduinoWeight float32 = 20.0
const aptWeight float32 = 80.0

arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform)
if err != nil {
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err)))
return
}
for e := range arduinoEvents {
m.broadcast(e)
if e.Type == ProgressEvent {
globalProgress := (e.Progress / 100.0) * arduinoWeight
m.broadcast(NewProgressEvent(globalProgress))
} else {
m.broadcast(e)
}
}

aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs)
if err != nil {
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err)))
return
}
for e := range aptEvents {
m.broadcast(e)
if e.Type == ProgressEvent {
globalProgress := arduinoWeight + (e.Progress/100.0)*aptWeight
m.broadcast(NewProgressEvent(globalProgress))
} else {
m.broadcast(e)
}
}

m.broadcast(NewProgressEvent(100.0))
m.broadcast(NewDataEvent(DoneEvent, "Update completed"))
}()
return nil
Expand Down