diff --git a/services/datamanager/builtin/config.go b/services/datamanager/builtin/config.go index 526429d9cdbc..10b4dea85459 100644 --- a/services/datamanager/builtin/config.go +++ b/services/datamanager/builtin/config.go @@ -49,6 +49,7 @@ type Config struct { ScheduledSyncDisabled bool `json:"sync_disabled"` SelectiveSyncerName string `json:"selective_syncer_name"` SyncIntervalMins float64 `json:"sync_interval_mins"` + Flag bool `json:"flag"` } // Validate returns components which will be depended upon weakly due to the above matcher. @@ -119,6 +120,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool, lo } return datasync.Config{ + Flag: c.Flag, AdditionalSyncPaths: c.AdditionalSyncPaths, Tags: c.Tags, CaptureDir: c.getCaptureDir(), diff --git a/services/datamanager/builtin/sync/config.go b/services/datamanager/builtin/sync/config.go index 833bce9622f4..fd9f085c3f21 100644 --- a/services/datamanager/builtin/sync/config.go +++ b/services/datamanager/builtin/sync/config.go @@ -10,6 +10,7 @@ import ( // Config is the sync config from builtin. type Config struct { + Flag bool // AdditionalSyncPaths defines the file system paths // that should be synced in addition to the CaptureDir. // Generally 3rd party programs will write arbitrary diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index 80568551462e..fbf6cce01220 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -388,7 +388,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { msg := "error uploading data capture file %s, size: %s, md: %s" errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata()) - bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger) + bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, s.config.Flag, logger) if err != nil { return 0, errors.Wrap(err, errMetadata) } diff --git a/services/datamanager/builtin/sync/upload_data_capture_file.go b/services/datamanager/builtin/sync/upload_data_capture_file.go index c913d5f478f7..f6bc28737d65 100644 --- a/services/datamanager/builtin/sync/upload_data_capture_file.go +++ b/services/datamanager/builtin/sync/upload_data_capture_file.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "io" "github.com/docker/go-units" "github.com/go-viper/mapstructure/v2" @@ -25,34 +26,32 @@ var MaxUnaryFileSize = int64(units.MB) // uses StreamingDataCaptureUpload API so as to not exceed the unary response size. // Otherwise, uploads data over DataCaptureUpload API. // Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case. -func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, logger logging.Logger) (uint64, error) { +func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, flag bool, logger logging.Logger) (uint64, error) { logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size()) md := f.ReadMetadata() + + // camera.GetImages is a special case. For that API we make 2 binary data upload requests + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { + return uint64(f.Size()), uploadGetImages(ctx, conn, md, f, logger) + } + + metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) + if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && flag { + return uint64(f.Size()), uploadChunkedBinaryData(ctx, conn.client, metaData, f, logger) + } + sensorData, err := data.SensorDataFromCaptureFile(f) if err != nil { return 0, errors.Wrap(err, "error reading sensor data") } - // Do not attempt to upload a file without any sensor readings. if len(sensorData) == 0 { logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) // log here as this will delete a .capture file without uploading it and without moving it to the failed directory return 0, nil } - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && len(sensorData) > 1 { - return 0, fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) - } - - // camera.GetImages is a special case. For that API we make 2 binary data upload requests - if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages { - logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) - - return uint64(f.Size()), uploadGetImages(ctx, conn, md, sensorData[0], f.Size(), f.GetPath(), logger) - } - - metaData := uploadMetadata(conn.partID, md, md.GetFileExtension()) return uint64(f.Size()), uploadSensorData(ctx, conn.client, metaData, sensorData, f.Size(), f.GetPath(), logger) } @@ -73,11 +72,26 @@ func uploadGetImages( ctx context.Context, conn cloudConn, md *v1.DataCaptureMetadata, - sd *v1.SensorData, - size int64, - path string, + f *data.CaptureFile, logger logging.Logger, ) error { + logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath()) + + sensorData, err := data.SensorDataFromCaptureFile(f) + if err != nil { + return errors.Wrap(err, "error reading sensor data") + } + + if len(sensorData) == 0 { + logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath()) + // log here as this will delete a .capture file without uploading it and without moving it to the failed directory + return nil + } + + if len(sensorData) > 1 { + return fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath()) + } + sd := sensorData[0] var res pb.GetImagesResponse if err := mapstructure.Decode(sd.GetStruct().AsMap(), &res); err != nil { return errors.Wrap(err, "failed to decode camera.GetImagesResponse") @@ -100,7 +114,7 @@ func uploadGetImages( metadata := uploadMetadata(conn.partID, md, getFileExtFromImageFormat(img.GetFormat())) // TODO: This is wrong as the size describes the size of the entire GetImages response, but we are only // uploading one of the 2 images in that response here. - if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, size, path, logger); err != nil { + if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, f.Size(), f.GetPath(), logger); err != nil { return errors.Wrapf(err, "failed uploading GetImages image index: %d", i) } } @@ -123,6 +137,45 @@ func getImagesTimestamps(res *pb.GetImagesResponse, sensorData *v1.SensorData) ( return timeRequested, timeReceived } +func uploadChunkedBinaryData( + ctx context.Context, + client v1.DataSyncServiceClient, + uploadMD *v1.UploadMetadata, + f *data.CaptureFile, + logger logging.Logger, +) error { + // If it's a large binary file, we need to upload it in chunks. + logger.Debugf("attempting to upload large binary file using StreamingDataCaptureUpload, file: %s", f.GetPath()) + var smd v1.SensorMetadata + r, err := f.BinaryReader(&smd) + if err != nil { + return err + } + c, err := client.StreamingDataCaptureUpload(ctx) + if err != nil { + return errors.Wrap(err, "error creating StreamingDataCaptureUpload client") + } + + // First send metadata. + streamMD := &v1.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: &v1.DataCaptureUploadMetadata{ + UploadMetadata: uploadMD, + SensorMetadata: &smd, + }, + } + if err := c.Send(&v1.StreamingDataCaptureUploadRequest{UploadPacket: streamMD}); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata") + } + + // Then call the function to send the rest. + if err := sendChunkedStreamingDCRequests(ctx, c, r, f.GetPath(), logger); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync") + } + + _, err = c.CloseAndRecv() + return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed") +} + func uploadSensorData( ctx context.Context, client v1.DataSyncServiceClient, @@ -171,6 +224,52 @@ func uploadSensorData( return errors.Wrap(err, "DataCaptureUpload failed") } +func sendChunkedStreamingDCRequests( + ctx context.Context, + stream v1.DataSyncService_StreamingDataCaptureUploadClient, + r io.Reader, + path string, + logger logging.Logger, +) error { + chunk := make([]byte, UploadChunkSize) + // Loop until there is no more content to send. + chunkCount := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + n, errRead := r.Read(chunk) + if n > 0 { + // if there is data, send it + // Build request with contents. + uploadReq := &v1.StreamingDataCaptureUploadRequest{ + UploadPacket: &v1.StreamingDataCaptureUploadRequest_Data{ + Data: chunk[:n], + }, + } + + // Send request + logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path) + if errSend := stream.Send(uploadReq); errSend != nil { + return errSend + } + } + + // if we reached the end of the file return nil err (success) + if errors.Is(errRead, io.EOF) { + return nil + } + + // if Read hit an unexpected error, return the error + if errRead != nil { + return errRead + } + chunkCount++ + } + } +} + func sendStreamingDCRequests( ctx context.Context, stream v1.DataSyncService_StreamingDataCaptureUploadClient,