diff --git a/.gitignore b/.gitignore index 66c303089..b04d422d8 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ deploy/ # Configurations configurations/* !configurations/default +!configurations/test # Secret config files .env diff --git a/configurations/default/server.yml.tmp b/configurations/default/server.yml.tmp index 9750daca5..760ed9cd8 100644 --- a/configurations/default/server.yml.tmp +++ b/configurations/default/server.yml.tmp @@ -1,61 +1,44 @@ application: + title: Data Tools + logo: https://d2tyb7byn1fef9.cloudfront.net/ibi_group-128x128.png + logo_large: https://d2tyb7byn1fef9.cloudfront.net/ibi_group_black-512x512.png client_assets_url: https://example.com shortcut_icon_url: https://d2tyb7byn1fef9.cloudfront.net/ibi-logo-original%402x.png public_url: http://localhost:9966 - notifications_enabled: true - cors: - enabled: true - origins: https://google.com - methods: - headers: + notifications_enabled: false + docs_url: http://conveyal-data-tools.readthedocs.org + support_email: support@ibigroup.com port: 4000 data: - editor_mapdb: /tmp/editor/mapdb - mapdb: /tmp/mapdb gtfs: /tmp use_s3_storage: false s3_region: us-east-1 gtfs_s3_bucket: bucket-name modules: - dump: - enabled: true enterprise: enabled: false - deployment: - enabled: true editor: enabled: true - url: http://localhost:9001 - alerts: - enabled: true - use_extension: mtc - url: /alerts - sign_config: + deployment: enabled: true - use_extension: mtc - url: /signs # eventually remove this + ec2: + enabled: false + default_ami: ami-your-ami-id + # Note: using a cloudfront URL for these download URLs will greatly + # increase download/deploy speed. + otp_download_url: https://optional-otp-repo.com + r5_download_url: https://optional-r5-repo.com user_admin: enabled: true - # Enable GTFS+ module for testing purposes - gtfsplus: - enabled: true gtfsapi: enabled: true load_on_fetch: false # use_extension: mtc # update_frequency: 30 # in seconds - extensions: - mtc: - enabled: true - rtd_api: http://localhost:9876/ - s3_bucket: bucket-name - s3_prefix: waiting/ - s3_download_prefix: waiting/ transitland: enabled: true api: https://transit.land/api/v1/feeds transitfeeds: enabled: true api: http://api.transitfeeds.com/v1/getFeeds - key: your-api-key diff --git a/configurations/test/env.yml.tmp b/configurations/test/env.yml.tmp new file mode 100644 index 000000000..eb5769962 --- /dev/null +++ b/configurations/test/env.yml.tmp @@ -0,0 +1,19 @@ +# This client ID refers to the UI client in Auth0. +AUTH0_CLIENT_ID: your-auth0-client-id +AUTH0_DOMAIN: your-auth0-domain +# Note: One of AUTH0_SECRET or AUTH0_PUBLIC_KEY should be used depending on the signing algorithm set on the client. +# It seems that newer Auth0 accounts (2017 and later) might default to RS256 (public key). +AUTH0_SECRET: your-auth0-secret # uses HS256 signing algorithm +# AUTH0_PUBLIC_KEY: /path/to/auth0.pem # uses RS256 signing algorithm +# This client/secret pair refer to a machine-to-machine Auth0 application used to access the Management API. +AUTH0_API_CLIENT: your-api-client-id +AUTH0_API_SECRET: your-api-secret-id +DISABLE_AUTH: false +OSM_VEX: http://localhost:1000 +SPARKPOST_KEY: your-sparkpost-key +SPARKPOST_EMAIL: email@example.com +GTFS_DATABASE_URL: jdbc:postgresql://localhost/catalogue +# GTFS_DATABASE_USER: +# GTFS_DATABASE_PASSWORD: +#MONGO_URI: mongodb://mongo-host:27017 +MONGO_DB_NAME: catalogue diff --git a/configurations/test/server.yml.tmp b/configurations/test/server.yml.tmp new file mode 100644 index 000000000..be50409b8 --- /dev/null +++ b/configurations/test/server.yml.tmp @@ -0,0 +1,50 @@ +application: + title: Data Tools + logo: https://d2tyb7byn1fef9.cloudfront.net/ibi_group-128x128.png + logo_large: https://d2tyb7byn1fef9.cloudfront.net/ibi_group_black-512x512.png + client_assets_url: https://example.com + shortcut_icon_url: https://d2tyb7byn1fef9.cloudfront.net/ibi-logo-original%402x.png + public_url: http://localhost:9966 + notifications_enabled: false + docs_url: http://conveyal-data-tools.readthedocs.org + support_email: support@ibigroup.com + port: 4000 + data: + gtfs: /tmp + use_s3_storage: false + s3_region: us-east-1 + gtfs_s3_bucket: bucket-name +modules: + enterprise: + enabled: false + editor: + enabled: true + deployment: + enabled: true + ec2: + enabled: false + default_ami: ami-your-ami-id + user_admin: + enabled: true + # Enable GTFS+ module for testing purposes + gtfsplus: + enabled: true + gtfsapi: + enabled: true + load_on_fetch: false + # use_extension: mtc + # update_frequency: 30 # in seconds +extensions: + # Enable MTC extension so MTC-specific feed merge tests + mtc: + enabled: true + rtd_api: http://localhost:9876/ + s3_bucket: bucket-name + s3_prefix: waiting/ + s3_download_prefix: waiting/ + transitland: + enabled: true + api: https://transit.land/api/v1/feeds + transitfeeds: + enabled: true + api: http://api.transitfeeds.com/v1/getFeeds diff --git a/pom.xml b/pom.xml index daa17f867..4f7b0e06d 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ 17.5 + 1.11.625 @@ -394,6 +395,22 @@ super-csv 2.4.0 + + + com.amazonaws + aws-java-sdk-ec2 + ${awsjavasdk.version} + + + com.amazonaws + aws-java-sdk-iam + ${awsjavasdk.version} + + + com.amazonaws + aws-java-sdk-elasticloadbalancingv2 + ${awsjavasdk.version} + diff --git a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java index 660b21b68..7b9516515 100644 --- a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java +++ b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java @@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.Serializable; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -18,7 +19,8 @@ /** * Created by landon on 6/13/16. */ -public abstract class MonitorableJob implements Runnable { +public abstract class MonitorableJob implements Runnable, Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MonitorableJob.class); public final String owner; @@ -60,6 +62,7 @@ public enum JobType { EXPORT_SNAPSHOT_TO_GTFS, CONVERT_EDITOR_MAPDB_TO_SQL, VALIDATE_ALL_FEEDS, + MONITOR_SERVER_STATUS, MERGE_FEED_VERSIONS } @@ -128,7 +131,6 @@ public void run () { boolean parentJobErrored = false; boolean subTaskErrored = false; String cancelMessage = ""; - long startTimeNanos = System.nanoTime(); try { // First execute the core logic of the specific MonitorableJob subclass jobLogic(); @@ -187,8 +189,7 @@ public void run () { LOG.error("Job failed", ex); status.update(true, ex.getMessage(), 100, true); } - status.startTime = TimeUnit.NANOSECONDS.toMillis(startTimeNanos); - status.duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos); + status.duration = System.currentTimeMillis() - status.startTime; LOG.info("{} {} {} in {} ms", type, jobId, status.error ? "errored" : "completed", status.duration); } @@ -242,7 +243,7 @@ public static class Status { /** How much of task is complete? */ public double percentComplete; - public long startTime; + public long startTime = System.currentTimeMillis(); public long duration; // When was the job initialized? diff --git a/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java b/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java index f69c11f5e..d27ffc5c5 100644 --- a/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java +++ b/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java @@ -112,7 +112,7 @@ public static void logMessageAndHalt( ) throws HaltException { // Note that halting occurred, also print error stacktrace if applicable if (e != null) e.printStackTrace(); - LOG.info("Halting with status code {}. Error message: {}.", statusCode, message); + LOG.info("Halting with status code {}. Error message: {}", statusCode, message); if (statusCode >= 500) { LOG.error(message); @@ -122,7 +122,7 @@ public static void logMessageAndHalt( if (bugsnag != null && e != null) { // create report to send to bugsnag Report report = bugsnag.buildReport(e); - Auth0UserProfile userProfile = request.attribute("user"); + Auth0UserProfile userProfile = request != null ? request.attribute("user") : null; String userEmail = userProfile != null ? userProfile.getEmail() : "no-auth"; report.setUserEmail(userEmail); bugsnag.notify(report); @@ -218,11 +218,16 @@ public static void logRequestOrResponse( String bodyString, int statusCode ) { + // If request is null, log warning and exit. We do not want to hit an NPE in this method. + if (request == null) { + LOG.warn("Request object is null. Cannot log."); + return; + } Auth0UserProfile userProfile = request.attribute("user"); String userEmail = userProfile != null ? userProfile.getEmail() : "no-auth"; String queryString = request.queryParams().size() > 0 ? "?" + request.queryString() : ""; LOG.info( - "{} {} {}: {}{}{}{}", + "{} {} {}: {}{}{} {}", logRequest ? "req" : String.format("res (%s)", statusCode), userEmail, request.requestMethod(), diff --git a/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java b/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java index 0ef5be411..0069f5093 100644 --- a/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java +++ b/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java @@ -219,7 +219,7 @@ private static Snapshot deleteSnapshot(Request req, Response res) { if (snapshot == null) logMessageAndHalt(req, 400, "Must provide valid snapshot ID."); try { // Remove the snapshot and then renumber the snapshots - Persistence.snapshots.removeById(snapshot.id); + snapshot.delete(); feedSource.renumberSnapshots(); // FIXME Are there references that need to be removed? E.g., what if the active buffer snapshot is deleted? // FIXME delete tables from database? diff --git a/src/main/java/com/conveyal/datatools/editor/datastore/FeedTx.java b/src/main/java/com/conveyal/datatools/editor/datastore/FeedTx.java index 04760ee2f..ddaea5764 100644 --- a/src/main/java/com/conveyal/datatools/editor/datastore/FeedTx.java +++ b/src/main/java/com/conveyal/datatools/editor/datastore/FeedTx.java @@ -32,8 +32,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import static com.conveyal.datatools.editor.jobs.ProcessGtfsSnapshotExport.toGtfsDate; - /** a transaction in an agency database */ public class FeedTx extends DatabaseTx { private static final Logger LOG = LoggerFactory.getLogger(FeedTx.class); @@ -124,6 +122,10 @@ public FeedTx(DB tx, boolean buildSecondaryIndices) { // editedSinceSnapshot = tx.getAtomicBoolean("editedSinceSnapshot") == null ? tx.createAtomicBoolean("editedSinceSnapshot", false) : editedSinceSnapshot; } + private static int toGtfsDate (LocalDate date) { + return date.getYear() * 10000 + date.getMonthValue() * 100 + date.getDayOfMonth(); + } + public void commit () { try { // editedSinceSnapshot.set(true); diff --git a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotExport.java b/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotExport.java deleted file mode 100755 index d4450cc59..000000000 --- a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotExport.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.conveyal.datatools.editor.jobs; - -import com.beust.jcommander.internal.Lists; -import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.gtfs.GTFSFeed; -import com.conveyal.datatools.editor.datastore.FeedTx; -import com.conveyal.datatools.editor.datastore.GlobalTx; -import com.conveyal.datatools.editor.datastore.VersionedDataStore; -import com.conveyal.datatools.editor.models.Snapshot; - -import java.time.LocalDate; - -import org.mapdb.Fun.Tuple2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Arrays; -import java.util.Collection; - -public class ProcessGtfsSnapshotExport extends MonitorableJob { - public static final Logger LOG = LoggerFactory.getLogger(ProcessGtfsSnapshotExport.class); - private Collection> snapshots; - private File output; -// private LocalDate startDate; -// private LocalDate endDate; - - /** Export the named snapshots to GTFS */ - public ProcessGtfsSnapshotExport(Collection> snapshots, File output, LocalDate startDate, LocalDate endDate) { - super("application", "Exporting snapshots to GTFS", JobType.PROCESS_SNAPSHOT_EXPORT); - this.snapshots = snapshots; - this.output = output; -// this.startDate = startDate; -// this.endDate = endDate; - } - - /** - * Export the master branch of the named feeds to GTFS. The boolean variable can be either true or false, it is only to make this - * method have a different erasure from the other - */ - public ProcessGtfsSnapshotExport(Collection agencies, File output, LocalDate startDate, LocalDate endDate, boolean isagency) { - super("application", "Exporting snapshots to GTFS", JobType.PROCESS_SNAPSHOT_EXPORT); - this.snapshots = Lists.newArrayList(agencies.size()); - - for (String agency : agencies) { - // leaving version null will cause master to be used - this.snapshots.add(new Tuple2(agency, null)); - } - - this.output = output; -// this.startDate = startDate; -// this.endDate = endDate; - } - - /** - * Export this snapshot to GTFS, using the validity range in the snapshot. - */ - public ProcessGtfsSnapshotExport (Snapshot snapshot, File output) { - this(Arrays.asList(new Tuple2[] { snapshot.id }), output, snapshot.validFrom, snapshot.validTo); - } - - @Override - public void jobLogic() { - GTFSFeed feed = null; - - GlobalTx gtx = VersionedDataStore.getGlobalTx(); - FeedTx feedTx = null; - - try { - for (Tuple2 ssid : snapshots) { - String feedId = ssid.a; - - // retrieveById present feed database if no snapshot version provided - if (ssid.b == null) { - feedTx = VersionedDataStore.getFeedTx(feedId); - } - // else retrieveById snapshot version data - else { - feedTx = VersionedDataStore.getFeedTx(feedId, ssid.b); - } - feed = feedTx.toGTFSFeed(false); - } - feed.toFile(output.getAbsolutePath()); - } finally { - gtx.rollbackIfOpen(); - if (feedTx != null) feedTx.rollbackIfOpen(); - } - } - - public static int toGtfsDate (LocalDate date) { - return date.getYear() * 10000 + date.getMonthValue() * 100 + date.getDayOfMonth(); - } -} - diff --git a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotMerge.java b/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotMerge.java deleted file mode 100755 index 23816dc5f..000000000 --- a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotMerge.java +++ /dev/null @@ -1,537 +0,0 @@ -package com.conveyal.datatools.editor.jobs; - -import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.editor.datastore.FeedTx; -import com.conveyal.datatools.editor.models.Snapshot; -import com.conveyal.datatools.editor.models.transit.Agency; -import com.conveyal.datatools.editor.models.transit.EditorFeed; -import com.conveyal.datatools.editor.models.transit.GtfsRouteType; -import com.conveyal.datatools.editor.models.transit.Route; -import com.conveyal.datatools.editor.models.transit.RouteType; -import com.conveyal.datatools.editor.models.transit.ServiceCalendar; -import com.conveyal.datatools.editor.models.transit.Stop; -import com.conveyal.datatools.manager.models.FeedVersion; -import com.conveyal.gtfs.loader.Feed; -import com.google.common.collect.Maps; -import com.vividsolutions.jts.geom.Envelope; -import com.vividsolutions.jts.geom.GeometryFactory; -import com.vividsolutions.jts.geom.PrecisionModel; -import com.conveyal.datatools.editor.datastore.GlobalTx; -import com.conveyal.datatools.editor.datastore.VersionedDataStore; -import gnu.trove.map.TIntObjectMap; -import gnu.trove.map.hash.TIntObjectHashMap; - -import java.awt.geom.Rectangle2D; - -import org.mapdb.Fun.Tuple2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class ProcessGtfsSnapshotMerge extends MonitorableJob { - public static final Logger LOG = LoggerFactory.getLogger(ProcessGtfsSnapshotMerge.class); - /** map from GTFS agency IDs to Agencies */ - private Map agencyIdMap = new HashMap<>(); - private Map routeIdMap = new HashMap<>(); - /** map from (gtfs stop ID, database agency ID) -> stop */ - private Map, Stop> stopIdMap = Maps.newHashMap(); - private TIntObjectMap routeTypeIdMap = new TIntObjectHashMap<>(); - - private Feed inputFeedTables; - private EditorFeed editorFeed; - - public FeedVersion feedVersion; - - /*public ProcessGtfsSnapshotMerge (File gtfsFile) { - this(gtfsFile, null); - }*/ - - public ProcessGtfsSnapshotMerge (FeedVersion feedVersion, String owner) { - super(owner, "Creating snapshot for " + feedVersion.parentFeedSource().name, JobType.PROCESS_SNAPSHOT_MERGE); - this.feedVersion = feedVersion; - status.update(false, "Waiting to begin job...", 0); - LOG.info("GTFS Snapshot Merge for feedVersion {}", feedVersion.id); - } - - public void jobLogic () { - long agencyCount = 0; - long routeCount = 0; - long stopCount = 0; - long stopTimeCount = 0; - long tripCount = 0; - long shapePointCount = 0; - long serviceCalendarCount = 0; - long fareCount = 0; - - GlobalTx gtx = VersionedDataStore.getGlobalTx(); - - // create a new feed based on this version - FeedTx feedTx = VersionedDataStore.getFeedTx(feedVersion.feedSourceId); - - editorFeed = new EditorFeed(); - editorFeed.setId(feedVersion.feedSourceId); - Rectangle2D bounds = feedVersion.validationResult.fullBounds.toRectangle2D(); - if (bounds != null) { - editorFeed.defaultLat = bounds.getCenterY(); - editorFeed.defaultLon = bounds.getCenterX(); - } - - - try { - synchronized (status) { - status.message = "Wiping old data..."; - status.percentComplete = 2; - } - // clear the existing data - for(String key : feedTx.agencies.keySet()) feedTx.agencies.remove(key); - for(String key : feedTx.routes.keySet()) feedTx.routes.remove(key); - for(String key : feedTx.stops.keySet()) feedTx.stops.remove(key); - for(String key : feedTx.calendars.keySet()) feedTx.calendars.remove(key); - for(String key : feedTx.exceptions.keySet()) feedTx.exceptions.remove(key); - for(String key : feedTx.fares.keySet()) feedTx.fares.remove(key); - for(String key : feedTx.tripPatterns.keySet()) feedTx.tripPatterns.remove(key); - for(String key : feedTx.trips.keySet()) feedTx.trips.remove(key); - LOG.info("Cleared old data"); - - synchronized (status) { - status.message = "Loading GTFS file..."; - status.percentComplete = 5; - } - - // retrieveById Feed connection to SQL tables for the feed version - inputFeedTables = feedVersion.retrieveFeed(); - if(inputFeedTables == null) return; - - LOG.info("GtfsImporter: importing feed..."); - synchronized (status) { - status.message = "Beginning feed import..."; - status.percentComplete = 8; - } - // load feed_info.txt - // FIXME add back in feed info!! -// if(inputFeedTables.feedInfo.size() > 0) { -// FeedInfo feedInfo = input.feedInfo.values().iterator().next(); -// editorFeed.feedPublisherName = feedInfo.feed_publisher_name; -// editorFeed.feedPublisherUrl = feedInfo.feed_publisher_url; -// editorFeed.feedLang = feedInfo.feed_lang; -// editorFeed.feedEndDate = feedInfo.feed_end_date; -// editorFeed.feedStartDate = feedInfo.feed_start_date; -// editorFeed.feedVersion = feedInfo.feed_version; -// } - gtx.feeds.put(feedVersion.feedSourceId, editorFeed); - - // load the GTFS agencies - Iterator agencyIterator = inputFeedTables.agencies.iterator(); - while (agencyIterator.hasNext()) { - com.conveyal.gtfs.model.Agency gtfsAgency = agencyIterator.next(); - Agency agency = new Agency(gtfsAgency, editorFeed); - - // don't save the agency until we've come up with the stop centroid, below. - agencyCount++; - - // we do want to use the modified agency ID here, because everything that refers to it has a reference - // to the agency object we updated. - feedTx.agencies.put(agency.id, agency); - agencyIdMap.put(gtfsAgency.agency_id, agency); - } - synchronized (status) { - status.message = "Agencies loaded: " + agencyCount; - status.percentComplete = 10; - } - LOG.info("Agencies loaded: " + agencyCount); - - LOG.info("GtfsImporter: importing stops..."); - synchronized (status) { - status.message = "Importing stops..."; - status.percentComplete = 15; - } - // TODO: remove stop ownership inference entirely? - // infer agency ownership of stops, if there are multiple feeds -// SortedSet> stopsByAgency = inferAgencyStopOwnership(); - - // build agency centroids as we go - // note that these are not actually centroids, but the center of the extent of the stops . . . - Map stopEnvelopes = Maps.newHashMap(); - - for (Agency agency : agencyIdMap.values()) { - stopEnvelopes.put(agency.id, new Envelope()); - } - - GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(), 4326); - for (com.conveyal.gtfs.model.Stop gtfsStop : inputFeedTables.stops) { - Stop stop = new Stop(gtfsStop, geometryFactory, editorFeed); - feedTx.stops.put(stop.id, stop); - stopIdMap.put(new Tuple2(gtfsStop.stop_id, editorFeed.id), stop); - stopCount++; - } - - LOG.info("Stops loaded: " + stopCount); - synchronized (status) { - status.message = "Stops loaded: " + stopCount; - status.percentComplete = 25; - } - LOG.info("GtfsImporter: importing routes..."); - synchronized (status) { - status.message = "Importing routes..."; - status.percentComplete = 30; - } - // import routes - for (com.conveyal.gtfs.model.Route gtfsRoute : inputFeedTables.routes) { - Agency agency = agencyIdMap.get(gtfsRoute.agency_id); - - if (!routeTypeIdMap.containsKey(gtfsRoute.route_type)) { - RouteType rt = new RouteType(); - rt.gtfsRouteType = GtfsRouteType.fromGtfs(gtfsRoute.route_type); - gtx.routeTypes.put(rt.id, rt); - routeTypeIdMap.put(gtfsRoute.route_type, rt.id); - } - - Route route = new Route(gtfsRoute, editorFeed, agency); - - feedTx.routes.put(route.id, route); - routeIdMap.put(gtfsRoute.route_id, route); - routeCount++; - } - - LOG.info("Routes loaded: " + routeCount); - synchronized (status) { - status.message = "Routes loaded: " + routeCount; - status.percentComplete = 35; - } - - LOG.info("GtfsImporter: importing Service Calendars..."); - synchronized (status) { - status.message = "Importing service calendars..."; - status.percentComplete = 38; - } - // we don't put service calendars in the database just yet, because we don't know what agency they're associated with - // we copy them into the agency database as needed - // GTFS service ID -> ServiceCalendar - Map calendars = Maps.newHashMap(); - - // FIXME: add back in services! -// for (Service svc : input.services.values()) { -// -// ServiceCalendar cal; -// -// if (svc.calendar != null) { -// // easy case: don't have to infer anything! -// cal = new ServiceCalendar(svc.calendar, feed); -// } else { -// // infer a calendar -// // number of mondays, etc. that this calendar is active -// int monday, tuesday, wednesday, thursday, friday, saturday, sunday; -// monday = tuesday = wednesday = thursday = friday = saturday = sunday = 0; -// LocalDate startDate = null; -// LocalDate endDate = null; -// -// for (CalendarDate cd : svc.calendar_dates.values()) { -// if (cd.exception_type == 2) -// continue; -// -// if (startDate == null || cd.date.isBefore(startDate)) -// startDate = cd.date; -// -// if (endDate == null || cd.date.isAfter(endDate)) -// endDate = cd.date; -// -// int dayOfWeek = cd.date.getDayOfWeek().getValue(); -// -// switch (dayOfWeek) { -// case DateTimeConstants.MONDAY: -// monday++; -// break; -// case DateTimeConstants.TUESDAY: -// tuesday++; -// break; -// case DateTimeConstants.WEDNESDAY: -// wednesday++; -// break; -// case DateTimeConstants.THURSDAY: -// thursday++; -// break; -// case DateTimeConstants.FRIDAY: -// friday++; -// break; -// case DateTimeConstants.SATURDAY: -// saturday++; -// break; -// case DateTimeConstants.SUNDAY: -// sunday++; -// break; -// } -// } -// -// // infer the calendar. if there is service on more than half as many as the maximum number of -// // a particular day that has service, assume that day has service in general. -// int maxService = Ints.max(monday, tuesday, wednesday, thursday, friday, saturday, sunday); -// -// cal = new ServiceCalendar(); -// cal.feedId = feed.id; -// -// if (startDate == null) { -// // no service whatsoever -// LOG.warn("Service ID " + svc.service_id + " has no service whatsoever"); -// startDate = LocalDate.now().minusMonths(1); -// endDate = startDate.plusYears(1); -// cal.monday = cal.tuesday = cal.wednesday = cal.thursday = cal.friday = cal.saturday = cal.sunday = false; -// } -// else { -// // infer parameters -// -// int threshold = (int) Math.round(Math.ceil((double) maxService / 2)); -// -// cal.monday = monday >= threshold; -// cal.tuesday = tuesday >= threshold; -// cal.wednesday = wednesday >= threshold; -// cal.thursday = thursday >= threshold; -// cal.friday = friday >= threshold; -// cal.saturday = saturday >= threshold; -// cal.sunday = sunday >= threshold; -// -// cal.startDate = startDate; -// cal.endDate = endDate; -// } -// -// cal.inferName(); -// cal.gtfsServiceId = svc.service_id; -// } -// -// calendars.put(svc.service_id, cal); -// -// serviceCalendarCount++; -// } - - LOG.info("Service calendars loaded: " + serviceCalendarCount); - synchronized (status) { - status.message = "Service calendars loaded: " + serviceCalendarCount; - status.percentComplete = 45; - } - LOG.info("GtfsImporter: importing trips..."); - synchronized (status) { - status.message = "Importing trips..."; - status.percentComplete = 50; - } - // FIXME need to load patterns and trips - // import trips, stop times and patterns all at once -// Map patterns = input.patterns; -// Set processedTrips = new HashSet<>(); -// for (Entry pattern : patterns.entrySet()) { -// // it is possible, though unlikely, for two routes to have the same stopping pattern -// // we want to ensure they retrieveById different trip patterns -// Map tripPatternsByRoute = Maps.newHashMap(); -// for (String tripId : pattern.getValue().associatedTrips) { -// -// // TODO: figure out why trips are being added twice. This check prevents that. -// if (processedTrips.contains(tripId)) { -// continue; -// } -// synchronized (status) { -// status.message = "Importing trips... (id: " + tripId + ") " + tripCount + "/" + input.trips.size(); -// status.percentComplete = 50 + 45 * tripCount / input.trips.size(); -// } -// com.conveyal.gtfs.model.Trip gtfsTrip = input.trips.retrieveById(tripId); -// -// if (!tripPatternsByRoute.containsKey(gtfsTrip.route_id)) { -// TripPattern pat = createTripPatternFromTrip(gtfsTrip, feedTx); -// feedTx.tripPatterns.put(pat.id, pat); -// tripPatternsByRoute.put(gtfsTrip.route_id, pat); -// } -// -// // there is more than one pattern per route, but this map is specific to only this pattern -// // generally it will contain exactly one entry, unless there are two routes with identical -// // stopping patterns. -// // (in DC, suppose there were trips on both the E2/weekday and E3/weekend from Friendship Heights -// // that short-turned at Missouri and 3rd). -// TripPattern pat = tripPatternsByRoute.retrieveById(gtfsTrip.route_id); -// -// ServiceCalendar cal = calendars.retrieveById(gtfsTrip.service_id); -// -// // if the service calendar has not yet been imported, import it -// if (feedTx.calendars != null && !feedTx.calendars.containsKey(cal.id)) { -// // no need to clone as they are going into completely separate mapdbs -// feedTx.calendars.put(cal.id, cal); -// } -// -// Trip trip = new Trip(gtfsTrip, routeIdMap.retrieveById(gtfsTrip.route_id), pat, cal); -// -// // TODO: query ordered stopTimes for a given trip id -// // FIXME: add back in stopTimes -// Collection stopTimes = new ArrayList<>(); -// input.stopTimes.subMap(new Tuple2(gtfsTrip.trip_id, null), new Tuple2(gtfsTrip.trip_id, Fun.HI)).values(); -// -// for (com.conveyal.gtfs.model.StopTime st : stopTimes) { -// trip.stopTimes.add(new StopTime(st, stopIdMap.retrieveById(new Tuple2<>(st.stop_id, feed.id)).id)); -// stopTimeCount++; -// } -// -// feedTx.trips.put(trip.id, trip); -// processedTrips.add(tripId); -// tripCount++; -// -// // FIXME add back in total number of trips for QC -// if (tripCount % 1000 == 0) { -// LOG.info("Loaded {} / {} trips", tripCount); // input.trips.size() -// } -// } -// } - - LOG.info("Trips loaded: " + tripCount); - synchronized (status) { - status.message = "Trips loaded: " + tripCount; - status.percentComplete = 90; - } - - LOG.info("GtfsImporter: importing fares..."); - // FIXME add in fares -// Map fares = input.fares; -// for (com.conveyal.gtfs.model.Fare f : fares.values()) { -// Fare fare = new Fare(f.fare_attribute, f.fare_rules, feed); -// feedTx.fares.put(fare.id, fare); -// fareCount++; -// } - LOG.info("Fares loaded: " + fareCount); - synchronized (status) { - status.message = "Fares loaded: " + fareCount; - status.percentComplete = 92; - } - LOG.info("Saving snapshot..."); - synchronized (status) { - status.message = "Saving snapshot..."; - status.percentComplete = 95; - } - // commit the feed TXs first, so that we have orphaned data rather than inconsistent data on a commit failure - feedTx.commit(); - gtx.commit(); - Snapshot.deactivateSnapshots(feedVersion.feedSourceId, null); - // create an initial snapshot for this FeedVersion - Snapshot snapshot = VersionedDataStore.takeSnapshot(editorFeed.id, feedVersion.id, "Snapshot of " + feedVersion.name, "none"); - - - LOG.info("Imported GTFS file: " + agencyCount + " agencies; " + routeCount + " routes;" + stopCount + " stops; " + stopTimeCount + " stopTimes; " + tripCount + " trips;" + shapePointCount + " shapePoints"); - synchronized (status) { - status.message = "Import complete!"; - status.percentComplete = 100; - } - } - catch (Exception e) { - e.printStackTrace(); - synchronized (status) { - status.message = "Failed to process GTFS snapshot."; - status.error = true; - } - } - finally { - feedTx.rollbackIfOpen(); - gtx.rollbackIfOpen(); - - // FIXME: anything we need to do at the end of using Feed? -// inputFeedTables.close(); - - } - } - - /** infer the ownership of stops based on what stops there - * Returns a set of tuples stop ID, agency ID with GTFS IDs */ -// private SortedSet> inferAgencyStopOwnership() { -// SortedSet> ret = Sets.newTreeSet(); -// -// for (com.conveyal.gtfs.model.StopTime st : input.stop_times.values()) { -// String stopId = st.stop_id; -// com.conveyal.gtfs.model.Trip trip = input.trips.retrieveById(st.trip_id); -// if (trip != null) { -// String routeId = trip.route_id; -// String agencyId = input.routes.retrieveById(routeId).agency_id; -// Tuple2 key = new Tuple2(stopId, agencyId); -// ret.add(key); -// } -// } -// -// return ret; -// } - - /** - * Create a trip pattern from the given trip. - * Neither the TripPattern nor the TripPatternStops are saved. - */ -// public TripPattern createTripPatternFromTrip (com.conveyal.gtfs.model.Trip gtfsTrip, FeedTx tx) { -// TripPattern patt = new TripPattern(); -// com.conveyal.gtfs.model.Route gtfsRoute = input.routes.retrieveById(gtfsTrip.route_id); -// patt.routeId = routeIdMap.retrieveById(gtfsTrip.route_id).id; -// patt.feedId = feed.id; -// -// String patternId = input.tripPatternMap.retrieveById(gtfsTrip.trip_id); -// Pattern gtfsPattern = input.patterns.retrieveById(patternId); -// patt.shape = gtfsPattern.geometry; -// patt.id = gtfsPattern.pattern_id; -// -// patt.patternStops = new ArrayList<>(); -// patt.patternDirection = TripDirection.fromGtfs(gtfsTrip.direction_id); -// -// com.conveyal.gtfs.model.StopTime[] stopTimes = -// input.stop_times.subMap(new Tuple2(gtfsTrip.trip_id, 0), new Tuple2(gtfsTrip.trip_id, Fun.HI)).values().toArray(new com.conveyal.gtfs.model.StopTime[0]); -// -// if (gtfsTrip.trip_headsign != null && !gtfsTrip.trip_headsign.isEmpty()) -// patt.name = gtfsTrip.trip_headsign; -// else -// patt.name = gtfsPattern.name; -// -// for (com.conveyal.gtfs.model.StopTime st : stopTimes) { -// TripPatternStop tps = new TripPatternStop(); -// -// Stop stop = stopIdMap.retrieveById(new Tuple2(st.stop_id, patt.feedId)); -// tps.stopId = stop.id; -// -// // set timepoint according to first gtfs value and then whether arrival and departure times are present -// if (st.timepoint != Entity.INT_MISSING) -// tps.timepoint = st.timepoint == 1; -// else if (st.arrival_time != Entity.INT_MISSING && st.departure_time != Entity.INT_MISSING) { -// tps.timepoint = true; -// } -// else -// tps.timepoint = false; -// -// if (st.departure_time != Entity.INT_MISSING && st.arrival_time != Entity.INT_MISSING) -// tps.defaultDwellTime = st.departure_time - st.arrival_time; -// else -// tps.defaultDwellTime = 0; -// -// patt.patternStops.add(tps); -// } -// -// patt.calcShapeDistTraveled(tx); -// -// // infer travel times -// if (stopTimes.length >= 2) { -// int startOfBlock = 0; -// // start at one because the first stop has no travel time -// // but don't put nulls in the data -// patt.patternStops.retrieveById(0).defaultTravelTime = 0; -// for (int i = 1; i < stopTimes.length; i++) { -// com.conveyal.gtfs.model.StopTime current = stopTimes[i]; -// -// if (current.arrival_time != Entity.INT_MISSING) { -// // interpolate times -// -// int timeSinceLastSpecifiedTime = current.arrival_time - stopTimes[startOfBlock].departure_time; -// -// double blockLength = patt.patternStops.retrieveById(i).shapeDistTraveled - patt.patternStops.retrieveById(startOfBlock).shapeDistTraveled; -// -// // go back over all of the interpolated stop times and interpolate them -// for (int j = startOfBlock + 1; j <= i; j++) { -// TripPatternStop tps = patt.patternStops.retrieveById(j); -// double distFromLastStop = patt.patternStops.retrieveById(j).shapeDistTraveled - patt.patternStops.retrieveById(j - 1).shapeDistTraveled; -// tps.defaultTravelTime = (int) Math.round(timeSinceLastSpecifiedTime * distFromLastStop / blockLength); -// } -// -// startOfBlock = i; -// } -// } -// } -// -// return patt; -// } - -} - diff --git a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotUpload.java b/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotUpload.java deleted file mode 100755 index c30be3030..000000000 --- a/src/main/java/com/conveyal/datatools/editor/jobs/ProcessGtfsSnapshotUpload.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.conveyal.datatools.editor.jobs; - -//import play.jobs.Job; - -public class ProcessGtfsSnapshotUpload implements Runnable { - @Override - public void run() { - - } - /* - private Long _gtfsSnapshotMergeId; - - private Map agencyIdMap = new HashMap(); - - public ProcessGtfsSnapshotUpload(Long gtfsSnapshotMergeId) { - this._gtfsSnapshotMergeId = gtfsSnapshotMergeId; - } - - public void doJob() { - - GtfsSnapshotMerge snapshotMerge = null; - while(snapshotMerge == null) - { - snapshotMerge = GtfsSnapshotMerge.findById(this._gtfsSnapshotMergeId); - LOG.warn("Waiting for snapshotMerge to save..."); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - GtfsReader reader = new GtfsReader(); - GtfsDaoImpl store = new GtfsDaoImpl(); - - Long agencyCount = new Long(0); - - try { - - File gtfsFile = new File(Play.configuration.getProperty("application.publicGtfsDataDirectory"), snapshotMerge.snapshot.getFilename()); - - reader.setInputLocation(gtfsFile); - reader.setEntityStore(store); - reader.run(); - - LOG.info("GtfsImporter: listing feeds..."); - - for (org.onebusaway.gtfs.model.Agency gtfsAgency : reader.getAgencies()) { - - GtfsAgency agency = new GtfsAgency(gtfsAgency); - agency.snapshot = snapshotMerge.snapshot; - agency.save(); - - } - - snapshotMerge.snapshot.agencyCount = store.getAllAgencies().size(); - snapshotMerge.snapshot.routeCount = store.getAllRoutes().size(); - snapshotMerge.snapshot.stopCount = store.getAllStops().size(); - snapshotMerge.snapshot.tripCount = store.getAllTrips().size(); - - snapshotMerge.snapshot.save(); - - } - catch (Exception e) { - - LOG.error(e.toString()); - - snapshotMerge.failed(e.toString()); - } - }*/ -} - diff --git a/src/main/java/com/conveyal/datatools/editor/models/Snapshot.java b/src/main/java/com/conveyal/datatools/editor/models/Snapshot.java index ada896941..99d71a6c2 100644 --- a/src/main/java/com/conveyal/datatools/editor/models/Snapshot.java +++ b/src/main/java/com/conveyal/datatools/editor/models/Snapshot.java @@ -2,7 +2,6 @@ import com.conveyal.datatools.editor.datastore.GlobalTx; import com.conveyal.datatools.editor.datastore.VersionedDataStore; -import com.conveyal.datatools.editor.jobs.ProcessGtfsSnapshotExport; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -89,32 +88,6 @@ public String generateFileName () { return this.feedId + "_" + this.snapshotTime + ".zip"; } - /** Write snapshot to disk as GTFS */ - public static boolean writeSnapshotAsGtfs (Tuple2 decodedId, File outFile) { - GlobalTx gtx = VersionedDataStore.getGlobalTx(); - Snapshot local; - try { - if (!gtx.snapshots.containsKey(decodedId)) { - return false; - } - local = gtx.snapshots.get(decodedId); - new ProcessGtfsSnapshotExport(local, outFile).run(); - } finally { - gtx.rollbackIfOpen(); - } - return true; - } - - public static boolean writeSnapshotAsGtfs (String id, File outFile) { - Tuple2 decodedId; - try { - decodedId = JacksonSerializers.Tuple2IntDeserializer.deserialize(id); - } catch (IOException e1) { - return false; - } - return writeSnapshotAsGtfs(decodedId, outFile); - } - @JsonIgnore public static Collection getSnapshots (String feedId) { GlobalTx gtx = VersionedDataStore.getGlobalTx(); diff --git a/src/main/java/com/conveyal/datatools/manager/DataManager.java b/src/main/java/com/conveyal/datatools/manager/DataManager.java index 74d48bce6..96e59d1c6 100644 --- a/src/main/java/com/conveyal/datatools/manager/DataManager.java +++ b/src/main/java/com/conveyal/datatools/manager/DataManager.java @@ -17,6 +17,7 @@ import com.conveyal.datatools.manager.controllers.api.NoteController; import com.conveyal.datatools.manager.controllers.api.OrganizationController; import com.conveyal.datatools.manager.controllers.api.ProjectController; +import com.conveyal.datatools.manager.controllers.api.ServerController; import com.conveyal.datatools.manager.controllers.api.StatusController; import com.conveyal.datatools.manager.controllers.api.UserController; import com.conveyal.datatools.manager.extensions.ExternalFeedResource; @@ -220,6 +221,7 @@ static void registerRoutes() throws IOException { NoteController.register(API_PREFIX); StatusController.register(API_PREFIX); OrganizationController.register(API_PREFIX); + ServerController.register(API_PREFIX); // Register editor API routes if (isModuleEnabled("editor")) { @@ -231,6 +233,7 @@ static void registerRoutes() throws IOException { gtfsConfig = yamlMapper.readTree(gtfs); new EditorControllerImpl(EDITOR_API_PREFIX, Table.AGENCY, DataManager.GTFS_DATA_SOURCE); new EditorControllerImpl(EDITOR_API_PREFIX, Table.CALENDAR, DataManager.GTFS_DATA_SOURCE); + // NOTE: fare_attributes controller handles updates to nested table fare_rules. new EditorControllerImpl(EDITOR_API_PREFIX, Table.FARE_ATTRIBUTES, DataManager.GTFS_DATA_SOURCE); new EditorControllerImpl(EDITOR_API_PREFIX, Table.FEED_INFO, DataManager.GTFS_DATA_SOURCE); new EditorControllerImpl(EDITOR_API_PREFIX, Table.ROUTES, DataManager.GTFS_DATA_SOURCE); diff --git a/src/main/java/com/conveyal/datatools/manager/auth/Auth0Connection.java b/src/main/java/com/conveyal/datatools/manager/auth/Auth0Connection.java index 80c5942ce..6e8ac8787 100644 --- a/src/main/java/com/conveyal/datatools/manager/auth/Auth0Connection.java +++ b/src/main/java/com/conveyal/datatools/manager/auth/Auth0Connection.java @@ -1,5 +1,6 @@ package com.conveyal.datatools.manager.auth; +import com.auth0.jwt.JWTExpiredException; import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.pem.PemReader; import com.conveyal.datatools.manager.DataManager; @@ -90,6 +91,9 @@ public static void checkUser(Request req) { // The user attribute is used on the server side to check user permissions and does not have all of the // fields that the raw Auth0 profile string does. req.attribute("user", profile); + } catch (JWTExpiredException e) { + LOG.warn("JWT token has expired for user."); + logMessageAndHalt(req, 401, "User's authentication token has expired. Please re-login."); } catch (Exception e) { LOG.warn("Login failed to verify with our authorization provider.", e); logMessageAndHalt(req, 401, "Could not verify user's token"); diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java index 385db49c1..dafd81ba1 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java @@ -1,15 +1,25 @@ package com.conveyal.datatools.manager.controllers.api; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.model.DescribeInstancesRequest; +import com.amazonaws.services.ec2.model.DescribeInstancesResult; +import com.amazonaws.services.ec2.model.Filter; +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.Reservation; +import com.amazonaws.services.s3.AmazonS3URI; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.jobs.DeployJob; import com.conveyal.datatools.manager.models.Deployment; +import com.conveyal.datatools.manager.models.EC2InstanceSummary; import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.OtpServer; import com.conveyal.datatools.manager.models.Project; +import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.json.JsonManager; import org.bson.Document; @@ -23,12 +33,15 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static com.conveyal.datatools.common.utils.S3Utils.downloadFromS3; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; import static spark.Spark.delete; import static spark.Spark.get; @@ -44,12 +57,13 @@ public class DeploymentController { private static JsonManager json = new JsonManager<>(Deployment.class, JsonViews.UserInterface.class); private static final Logger LOG = LoggerFactory.getLogger(DeploymentController.class); private static Map deploymentJobsByServer = new HashMap<>(); + private static final AmazonEC2 ec2 = AmazonEC2Client.builder().build(); /** * Gets the deployment specified by the request's id parameter and ensure that user has access to the * deployment. If the user does not have permission the Spark request is halted with an error. */ - private static Deployment checkDeploymentPermissions (Request req, Response res) { + private static Deployment getDeploymentWithPermissions(Request req, Response res) { Auth0UserProfile userProfile = req.attribute("user"); String deploymentId = req.params("id"); Deployment deployment = Persistence.deployments.getById(deploymentId); @@ -65,22 +79,62 @@ private static Deployment checkDeploymentPermissions (Request req, Response res) } private static Deployment getDeployment (Request req, Response res) { - return checkDeploymentPermissions(req, res); + return getDeploymentWithPermissions(req, res); } private static Deployment deleteDeployment (Request req, Response res) { - Deployment deployment = checkDeploymentPermissions(req, res); + Deployment deployment = getDeploymentWithPermissions(req, res); deployment.delete(); return deployment; } + /** + * HTTP endpoint for downloading a build artifact (e.g., otp build log or Graph.obj) from S3. + */ + private static String downloadBuildArtifact (Request req, Response res) { + Deployment deployment = getDeploymentWithPermissions(req, res); + DeployJob.DeploySummary summaryToDownload = null; + String uriString = null; + // If a jobId query param is provided, find the matching job summary. + String jobId = req.queryParams("jobId"); + if (jobId != null) { + for (DeployJob.DeploySummary summary : deployment.deployJobSummaries) { + if (summary.jobId.equals(jobId)) { + summaryToDownload = summary; + break; + } + } + } else { + summaryToDownload = deployment.latest(); + } + if (summaryToDownload == null) { + // Try to construct the URI string + OtpServer server = Persistence.servers.getById(deployment.deployedTo); + if (server == null) { + uriString = String.format("s3://%s/bundles/%s/%s/%s", "S3_BUCKET", deployment.projectId, deployment.id, jobId); + logMessageAndHalt(req, 400, "The deployment does not have job history or associated server information to construct URI for build artifact. " + uriString); + return null; + } + uriString = String.format("s3://%s/bundles/%s/%s/%s", server.s3Bucket, deployment.projectId, deployment.id, jobId); + LOG.warn("Could not find deploy summary for job. Attempting to use {}", uriString); + } else { + uriString = summaryToDownload.buildArtifactsFolder; + } + AmazonS3URI uri = new AmazonS3URI(uriString); + String filename = req.queryParams("filename"); + if (filename == null) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Must provide filename query param for build artifact."); + } + return downloadFromS3(FeedStore.s3Client, uri.getBucket(), String.join("/", uri.getKey(), filename), false, res); + } + /** * Download all of the GTFS files in the feed. * * TODO: Should there be an option to download the OSM network as well? */ private static FileInputStream downloadDeployment (Request req, Response res) throws IOException { - Deployment deployment = checkDeploymentPermissions(req, res); + Deployment deployment = getDeploymentWithPermissions(req, res); // Create temp file in order to generate input stream. File temp = File.createTempFile("deployment", ".zip"); // just include GTFS, not any of the ancillary information @@ -197,8 +251,8 @@ private static Deployment createDeploymentFromFeedSource (Request req, Response * Update a single deployment. If the deployment's feed versions are updated, checks to ensure that each * version exists and is a part of the same parent project are performed before updating. */ - private static Object updateDeployment (Request req, Response res) { - Deployment deploymentToUpdate = checkDeploymentPermissions(req, res); + private static Deployment updateDeployment (Request req, Response res) { + Deployment deploymentToUpdate = getDeploymentWithPermissions(req, res); Document updateDocument = Document.parse(req.body()); // FIXME use generic update hook, also feedVersions is getting serialized into MongoDB (which is undesirable) // Check that feed versions in request body are OK to add to deployment, i.e., they exist and are a part of @@ -241,6 +295,100 @@ private static Object updateDeployment (Request req, Response res) { return updatedDeployment; } + // TODO: Add some point it may be useful to refactor DeployJob to allow adding an EC2 instance to an existing job, + // but for now that can be achieved by using the AWS EC2 console: choose an EC2 instance to replicate and select + // "Run more like this". Then follow the prompts to replicate the instance. +// private static Object addEC2InstanceToDeployment(Request req, Response res) { +// Deployment deployment = getDeploymentWithPermissions(req, res); +// List currentEC2Instances = deployment.retrieveEC2Instances(); +// EC2InstanceSummary ec2ToClone = currentEC2Instances.get(0); +// RunInstancesRequest request = new RunInstancesRequest(); +// ec2.runInstances() +// ec2ToClone. +// DeployJob.DeploySummary latestDeployJob = deployment.latest(); +// +// } + + /** + * HTTP endpoint to deregister and terminate a set of instance IDs that are associated with a particular deployment. + * The intent here is to give the user a device by which they can terminate an EC2 instance that has started up, but + * is not responding or otherwise failed to successfully become an OTP instance as part of an ELB deployment (or + * perhaps two people somehow kicked off a deploy job for the same deployment simultaneously and one of the EC2 + * instances has out-of-date data). + */ + private static boolean terminateEC2InstanceForDeployment(Request req, Response res) { + Deployment deployment = getDeploymentWithPermissions(req, res); + String instanceIds = req.queryParams("instanceIds"); + if (instanceIds == null) { + logMessageAndHalt(req, 400, "Must provide one or more instance IDs."); + return false; + } + List idsToTerminate = Arrays.asList(instanceIds.split(",")); + // Ensure that request does not contain instance IDs which are not associated with this deployment. + List instances = deployment.retrieveEC2Instances(); + List instanceIdsForDeployment = instances.stream() + .map(ec2InstanceSummary -> ec2InstanceSummary.instanceId) + .collect(Collectors.toList()); + // Get the target group ARN from the latest deployment. Surround in a try/catch in case of NPEs. + // TODO: Perhaps provide some other way to provide the target group ARN. + String targetGroupArn; + try { + targetGroupArn = deployment.latest().ec2Info.targetGroupArn; + } catch (Exception e) { + logMessageAndHalt(req, 400, "Latest deploy job does not exist or is missing target group ARN."); + return false; + } + for (String id : idsToTerminate) { + if (!instanceIdsForDeployment.contains(id)) { + logMessageAndHalt(req, HttpStatus.UNAUTHORIZED_401, "It is not permitted to terminate an instance that is not associated with deployment " + deployment.id); + return false; + } + int code = instances.get(instanceIdsForDeployment.indexOf(id)).state.getCode(); + // 48 indicates instance is terminated, 32 indicates shutting down. Prohibit terminating an already + if (code == 48 || code == 32) { + logMessageAndHalt(req, 400, "Instance is already terminated/shutting down: " + id); + return false; + } + } + // If checks are ok, terminate instances. + boolean success = ServerController.deRegisterAndTerminateInstances(targetGroupArn, idsToTerminate); + if (!success) { + logMessageAndHalt(req, 400, "Could not complete termination request"); + return false; + } + return true; + } + + /** + * HTTP controller to fetch information about provided EC2 machines that power ELBs running a trip planner. + */ + private static List fetchEC2InstanceSummaries(Request req, Response res) { + Deployment deployment = getDeploymentWithPermissions(req, res); + return deployment.retrieveEC2Instances(); + } + + /** + * Fetches list of {@link EC2InstanceSummary} for all instances matching the provided filters. + */ + public static List fetchEC2InstanceSummaries(Filter... filters) { + return fetchEC2Instances(filters).stream().map(EC2InstanceSummary::new).collect(Collectors.toList()); + } + + /** + * Fetch EC2 instances from AWS that match the provided set of filters (e.g., tags, instance ID, or other properties). + */ + public static List fetchEC2Instances(Filter... filters) { + List instances = new ArrayList<>(); + DescribeInstancesRequest request = new DescribeInstancesRequest().withFilters(filters); + DescribeInstancesResult result = ec2.describeInstances(request); + for (Reservation reservation : result.getReservations()) { + instances.addAll(reservation.getInstances()); + } + // Sort by launch time (most recent first). + instances.sort(Comparator.comparing(Instance::getLaunchTime).reversed()); + return instances; + } + /** * Create a deployment bundle, and send it to the specified OTP target servers (or the specified s3 bucket). */ @@ -248,18 +396,16 @@ private static String deploy (Request req, Response res) { // Check parameters supplied in request for validity. Auth0UserProfile userProfile = req.attribute("user"); String target = req.params("target"); - Deployment deployment = checkDeploymentPermissions(req, res); + Deployment deployment = getDeploymentWithPermissions(req, res); Project project = Persistence.projects.getById(deployment.projectId); if (project == null) logMessageAndHalt(req, 400, "Internal reference error. Deployment's project ID is invalid"); - - // FIXME: Currently the otp server to deploy to is determined by the string name field (with special characters - // replaced with underscores). This should perhaps be replaced with an immutable server ID so that there is - // no risk that these values can overlap. This may be over engineering this system though. The user deploying - // a set of feeds would likely not create two deployment targets with the same name (and the name is unlikely - // to change often). - OtpServer otpServer = project.retrieveServer(target); - if (otpServer == null) logMessageAndHalt(req, 400, "Must provide valid OTP server target ID."); + // Get server by ID + OtpServer otpServer = Persistence.servers.getById(target); + if (otpServer == null) { + logMessageAndHalt(req, 400, "Must provide valid OTP server target ID."); + return null; + } // Check that permissions of user allow them to deploy to target. boolean isProjectAdmin = userProfile.canAdministerProject(deployment.projectId, deployment.organizationId()); @@ -318,10 +464,14 @@ public static void register (String apiPrefix) { }), json::write); options(apiPrefix + "secure/deployments", (q, s) -> ""); get(apiPrefix + "secure/deployments/:id/download", DeploymentController::downloadDeployment); + get(apiPrefix + "secure/deployments/:id/artifact", DeploymentController::downloadBuildArtifact); + get(apiPrefix + "secure/deployments/:id/ec2", DeploymentController::fetchEC2InstanceSummaries, json::write); + delete(apiPrefix + "secure/deployments/:id/ec2", DeploymentController::terminateEC2InstanceForDeployment, json::write); get(apiPrefix + "secure/deployments/:id", DeploymentController::getDeployment, json::write); delete(apiPrefix + "secure/deployments/:id", DeploymentController::deleteDeployment, json::write); get(apiPrefix + "secure/deployments", DeploymentController::getAllDeployments, json::write); post(apiPrefix + "secure/deployments", DeploymentController::createDeployment, json::write); +// post(apiPrefix + "secure/deployments/:id/ec2", DeploymentController::addEC2InstanceToDeployment, json::write); put(apiPrefix + "secure/deployments/:id", DeploymentController::updateDeployment, json::write); post(apiPrefix + "secure/deployments/fromfeedsource/:id", DeploymentController::createDeploymentFromFeedSource, json::write); } diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/OrganizationController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/OrganizationController.java index ddf712093..046c0be32 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/OrganizationController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/OrganizationController.java @@ -68,8 +68,8 @@ public static Organization createOrganization (Request req, Response res) { public static Organization updateOrganization (Request req, Response res) throws IOException { String organizationId = req.params("id"); - requestOrganizationById(req); - Organization organization = Persistence.organizations.update(organizationId, req.body()); + Organization updatedOrganization = requestOrganizationById(req); + Persistence.organizations.replace(organizationId, updatedOrganization); // FIXME: Add back in hook after organization is updated. // See https://github.com/catalogueglobal/datatools-server/issues/111 @@ -101,7 +101,7 @@ public static Organization updateOrganization (Request req, Response res) throws // p.save(); // } - return organization; + return updatedOrganization; } public static Organization deleteOrganization (Request req, Response res) { diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java index cc4db1c1c..5bc91691b 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java @@ -120,10 +120,7 @@ private static Project updateProject(Request req, Response res) { private static Project deleteProject(Request req, Response res) { // Fetch project first to check permissions, and so we can return the deleted project after deletion. Project project = requestProjectById(req, "manage"); - boolean successfullyDeleted = project.delete(); - if (!successfullyDeleted) { - logMessageAndHalt(req, 500, "Did not delete project.", new Exception("Delete unsuccessful")); - } + project.delete(); return project; } diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java new file mode 100644 index 000000000..93d13ba33 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java @@ -0,0 +1,454 @@ +package com.conveyal.datatools.manager.controllers.api; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.model.AmazonEC2Exception; +import com.amazonaws.services.ec2.model.DescribeImagesRequest; +import com.amazonaws.services.ec2.model.DescribeImagesResult; +import com.amazonaws.services.ec2.model.DescribeKeyPairsResult; +import com.amazonaws.services.ec2.model.DescribeSecurityGroupsRequest; +import com.amazonaws.services.ec2.model.DescribeSecurityGroupsResult; +import com.amazonaws.services.ec2.model.DescribeSubnetsRequest; +import com.amazonaws.services.ec2.model.DescribeSubnetsResult; +import com.amazonaws.services.ec2.model.Image; +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.InstanceType; +import com.amazonaws.services.ec2.model.KeyPairInfo; +import com.amazonaws.services.ec2.model.SecurityGroup; +import com.amazonaws.services.ec2.model.Subnet; +import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.amazonaws.services.ec2.model.TerminateInstancesResult; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; +import com.amazonaws.services.elasticloadbalancingv2.model.AmazonElasticLoadBalancingException; +import com.amazonaws.services.elasticloadbalancingv2.model.DeregisterTargetsRequest; +import com.amazonaws.services.elasticloadbalancingv2.model.DescribeTargetGroupsRequest; +import com.amazonaws.services.elasticloadbalancingv2.model.TargetDescription; +import com.amazonaws.services.elasticloadbalancingv2.model.TargetGroup; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagement; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClientBuilder; +import com.amazonaws.services.identitymanagement.model.InstanceProfile; +import com.amazonaws.services.identitymanagement.model.ListInstanceProfilesResult; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.conveyal.datatools.manager.auth.Auth0UserProfile; +import com.conveyal.datatools.manager.jobs.MonitorServerStatusJob; +import com.conveyal.datatools.manager.models.Deployment; +import com.conveyal.datatools.manager.models.JsonViews; +import com.conveyal.datatools.manager.models.OtpServer; +import com.conveyal.datatools.manager.models.Project; +import com.conveyal.datatools.manager.persistence.FeedStore; +import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.datatools.manager.utils.json.JsonManager; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import spark.HaltException; +import spark.Request; +import spark.Response; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; +import static com.conveyal.datatools.manager.jobs.DeployJob.DEFAULT_INSTANCE_TYPE; +import static spark.Spark.delete; +import static spark.Spark.get; +import static spark.Spark.options; +import static spark.Spark.post; +import static spark.Spark.put; + +/** + * Handlers for HTTP API requests that affect deployment Servers. + * These methods are mapped to API endpoints by Spark. + */ +public class ServerController { + private static JsonManager json = new JsonManager<>(OtpServer.class, JsonViews.UserInterface.class); + private static final Logger LOG = LoggerFactory.getLogger(ServerController.class); + private static final ObjectMapper mapper = new ObjectMapper(); + private static final AmazonEC2 ec2 = AmazonEC2Client.builder().build(); + private static final AmazonIdentityManagement iam = AmazonIdentityManagementClientBuilder.defaultClient(); + private static final AmazonElasticLoadBalancing elb = AmazonElasticLoadBalancingClient.builder().build(); + + /** + * Gets the server specified by the request's id parameter and ensure that user has access to the + * deployment. If the user does not have permission the Spark request is halted with an error. + */ + private static OtpServer getServerWithPermissions(Request req, Response res) { + Auth0UserProfile userProfile = req.attribute("user"); + String serverId = req.params("id"); + OtpServer server = Persistence.servers.getById(serverId); + if (server == null) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Server does not exist."); + } + boolean isProjectAdmin = userProfile.canAdministerProject(server.projectId, server.organizationId()); + if (!isProjectAdmin && !userProfile.getUser_id().equals(server.user())) { + // If user is not a project admin and did not create the deployment, access to the deployment is denied. + logMessageAndHalt(req, HttpStatus.UNAUTHORIZED_401, "User not authorized for deployment."); + } + return server; + } + + /** HTTP endpoint for deleting an {@link OtpServer}. */ + private static OtpServer deleteServer(Request req, Response res) { + OtpServer server = getServerWithPermissions(req, res); + // Ensure that there are no active EC2 instances associated with server. Halt deletion if so. + List activeInstances = server.retrieveEC2Instances().stream() + .filter(instance -> "running".equals(instance.getState().getName())) + .collect(Collectors.toList()); + if (activeInstances.size() > 0) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Cannot delete server with active EC2 instances: " + getIds(activeInstances)); + } + server.delete(); + return server; + } + + /** HTTP method for terminating EC2 instances associated with an ELB OTP server. */ + private static OtpServer terminateEC2InstancesForServer(Request req, Response res) { + OtpServer server = getServerWithPermissions(req, res); + List instances = server.retrieveEC2Instances(); + List ids = getIds(instances); + terminateInstances(ids); + for (Deployment deployment : Deployment.retrieveDeploymentForServerAndRouterId(server.id, null)) { + Persistence.deployments.updateField(deployment.id, "deployedTo", null); + } + return server; + } + + /** + * Shorthand method for getting list of string identifiers from a list of EC2 instances. + */ + public static List getIds (List instances) { + return instances.stream().map(Instance::getInstanceId).collect(Collectors.toList()); + } + + /** Terminate the list of EC2 instance IDs. */ + public static TerminateInstancesResult terminateInstances(Collection instanceIds) throws AmazonEC2Exception { + if (instanceIds.size() == 0) { + LOG.warn("No instance IDs provided in list. Skipping termination request."); + return null; + } + LOG.info("Terminating EC2 instances {}", instanceIds); + TerminateInstancesRequest request = new TerminateInstancesRequest().withInstanceIds(instanceIds); + return ec2.terminateInstances(request); + } + + /** Convenience method to override {@link #terminateInstances(Collection)}. */ + public static TerminateInstancesResult terminateInstances(String... instanceIds) throws AmazonEC2Exception { + return terminateInstances(Arrays.asList(instanceIds)); + } + + /** Convenience method to override {@link #terminateInstances(Collection)}. */ + public static TerminateInstancesResult terminateInstances(List instances) throws AmazonEC2Exception { + return terminateInstances(getIds(instances)); + } + + /** + * De-register instances from the specified target group/load balancer and terminate the instances. + * + */ + public static boolean deRegisterAndTerminateInstances(String targetGroupArn, List instanceIds) { + LOG.info("De-registering instances from load balancer {}", instanceIds); + TargetDescription[] targetDescriptions = instanceIds.stream() + .map(id -> new TargetDescription().withId(id)) + .toArray(TargetDescription[]::new); + try { + DeregisterTargetsRequest request = new DeregisterTargetsRequest() + .withTargetGroupArn(targetGroupArn) + .withTargets(targetDescriptions); + AmazonElasticLoadBalancing elb = AmazonElasticLoadBalancingClient.builder().build(); + elb.deregisterTargets(request); + ServerController.terminateInstances(instanceIds); + } catch (AmazonEC2Exception | AmazonElasticLoadBalancingException e) { + LOG.warn("Could not terminate EC2 instances: " + String.join(",", instanceIds), e); + return false; + } + return true; + } + + /** + * Create a new server for the project. All feed sources with a valid latest version are added to the new + * deployment. + */ + private static OtpServer createServer(Request req, Response res) { + Auth0UserProfile userProfile = req.attribute("user"); + OtpServer newServer = getServerFromRequestBody(req); + // If server has no project ID specified, user must be an application admin to create it. Otherwise, they must + // be a project admin. + boolean allowedToCreate = newServer.projectId == null + ? userProfile.canAdministerApplication() + : userProfile.canAdministerProject(newServer.projectId, newServer.organizationId()); + if (allowedToCreate) { + validateFields(req, newServer); + Persistence.servers.create(newServer); + return newServer; + } else { + logMessageAndHalt(req, 403, "Not authorized to create a server for project " + newServer.projectId); + return null; + } + } + + /** Utility method to parse OtpServer object from Spark request body. */ + private static OtpServer getServerFromRequestBody(Request req) { + try { + return mapper.readValue(req.body(), OtpServer.class); + } catch (IOException e) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Error parsing OTP server JSON.", e); + return null; + } + } + + /** + * HTTP controller to fetch all servers or servers assigned to a particular project. This should only be used for the + * management of these servers. For checking servers that a project can deploy to, use {@link Project#availableOtpServers()}. + */ + private static List fetchServers (Request req, Response res) { + String projectId = req.queryParams("projectId"); + Auth0UserProfile userProfile = req.attribute("user"); + if (projectId != null) { + Project project = Persistence.projects.getById(projectId); + if (project == null) logMessageAndHalt(req, 400, "Must provide a valid project ID."); + else if (userProfile.canAdministerProject(projectId, null)) return project.availableOtpServers(); + } + else if (userProfile.canAdministerApplication()) return Persistence.servers.getAll(); + return Collections.emptyList(); + } + + /** + * Update a single OTP server. + */ + private static OtpServer updateServer(Request req, Response res) { + OtpServer serverToUpdate = getServerWithPermissions(req, res); + OtpServer updatedServer = getServerFromRequestBody(req); + Auth0UserProfile user = req.attribute("user"); + if ((serverToUpdate.admin || serverToUpdate.projectId == null) && !user.canAdministerApplication()) { + logMessageAndHalt(req, HttpStatus.UNAUTHORIZED_401, "User cannot modify admin-only or application-wide server."); + } + validateFields(req, updatedServer); + Persistence.servers.replace(serverToUpdate.id, updatedServer); + return Persistence.servers.getById(updatedServer.id); + } + + /** + * Validate certain fields found in the document representing a server. This also currently modifies the document by + * removing problematic date fields. + */ + private static void validateFields(Request req, OtpServer server) throws HaltException { + try { + // Check that projectId is valid. + if (server.projectId != null) { + Project project = Persistence.projects.getById(server.projectId); + if (project == null) + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Must specify valid project ID."); + } + // If a server's ec2 info object is not null, it must pass a few validation checks on various fields related to + // AWS. (e.g., target group ARN and instance type). + if (server.ec2Info != null) { + validateTargetGroup(server.ec2Info.targetGroupArn, req); + validateInstanceType(server.ec2Info.instanceType, req); + validateSubnetId(server.ec2Info.subnetId, req); + validateSecurityGroupId(server.ec2Info.securityGroupId, req); + validateIamInstanceProfileArn(server.ec2Info.iamInstanceProfileArn, req); + validateKeyName(server.ec2Info.keyName, req); + validateAmiId(server.ec2Info.amiId, req); + if (server.ec2Info.instanceCount < 0) server.ec2Info.instanceCount = 0; + } + // Server must have name. + if (isEmpty(server.name)) + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Server must have valid name."); + // Server must have an internal URL (for build graph over wire) or an s3 bucket (for auto deploy ec2). + if (isEmpty(server.s3Bucket)) { + if (server.internalUrl == null || server.internalUrl.size() == 0) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Server must contain either internal URL(s) or s3 bucket name."); + } + } else { + verifyS3WritePermissions(server, req); + } + } catch (Exception e) { + if (e instanceof HaltException) throw e; + else logMessageAndHalt(req, 400, "Error encountered while validating server field", e); + } + } + + /** + * Verify that application has permission to write to/delete from S3 bucket. We're following the recommended + * approach from https://stackoverflow.com/a/17284647/915811, but perhaps there is a way to do this + * effectively without incurring AWS costs (although writing/deleting an empty file to S3 is probably + * miniscule). + * @param s3Bucket + */ + private static boolean verifyS3WritePermissions(AmazonS3 s3Client, String s3Bucket, Request req) { + String key = UUID.randomUUID().toString(); + try { + s3Client.putObject(s3Bucket, key, File.createTempFile("test", ".zip")); + s3Client.deleteObject(s3Bucket, key); + } catch (IOException | AmazonS3Exception e) { + LOG.warn("S3 client cannot write to bucket" + s3Bucket, e); + return false; + } + return true; + } + + /** + * Verify that application can write to S3 bucket. + * + * TODO: Also verify that, with AWS credentials, application can assume instance profile + */ + private static void verifyS3WritePermissions(OtpServer server, Request req) { + // Verify first that this application can write to the S3 bucket, which is needed to write the transit bundle + // file to S3. + if (!verifyS3WritePermissions(FeedStore.s3Client, server.s3Bucket, req)) { + String message = "Application cannot write to specified S3 bucket: " + server.s3Bucket; + logMessageAndHalt(req, 400, message); + } + // TODO: If EC2 info is not null, check that the IAM role ARN is able to write to the S3 bucket. I keep running + // into errors with this code, but will leave it commented out for now. LTR 2019/09/20 +// if (server.ec2Info != null) { +//// InstanceProfile iamInstanceProfile = getIamInstanceProfile(server.ec2Info.iamInstanceProfileArn); +// AWSSecurityTokenServiceClient tokenServiceClient = new +// AWSSecurityTokenServiceClient(FeedStore.getAWSCreds().getCredentials()); +//// AWSSecurityTokenServiceClient tokenServiceClient = new AWSSecurityTokenServiceClient(); +// AssumeRoleRequest request = new AssumeRoleRequest() +// .withRoleArn(server.ec2Info.iamInstanceProfileArn) +// .withDurationSeconds(900) +// .withRoleSessionName("test"); +// AssumeRoleResult result = tokenServiceClient.assumeRole(request); +// Credentials credentials = result.getCredentials(); +// BasicSessionCredentials basicSessionCredentials = new BasicSessionCredentials( +// credentials.getAccessKeyId(), credentials.getSecretAccessKey(), +// credentials.getSessionToken()); +// AmazonS3 temporaryS3Client = AmazonS3ClientBuilder.standard() +// .withCredentials(new AWSStaticCredentialsProvider(basicSessionCredentials)) +//// .withRegion(clientRegion) +// .build(); +// if (!verifyS3WritePermissions(temporaryS3Client, server.s3Bucket, req)) { +// String message = "EC2 IAM role cannot write to specified S3 bucket " + server.s3Bucket; +// logMessageAndHalt(req, 400, message); +// } +// } + } + + /** Validate that AMI exists and value is not empty. */ + private static void validateAmiId(String amiId, Request req) { + String message = "Server must have valid AMI ID (or field must be empty)"; + if (isEmpty(amiId)) return; + if (!amiExists(amiId)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + } + + /** Determine if AMI ID exists (and is gettable by the application's AWS credentials). */ + public static boolean amiExists(String amiId) { + try { + DescribeImagesRequest request = new DescribeImagesRequest().withImageIds(amiId); + DescribeImagesResult result = ec2.describeImages(request); + // Iterate over AMIs to find a matching ID. + for (Image image : result.getImages()) if (image.getImageId().equals(amiId)) return true; + } catch (AmazonEC2Exception e) { + LOG.warn("AMI does not exist or some error prevented proper checking of the AMI ID.", e); + } + return false; + } + + /** Validate that AWS key name (the first part of a .pem key) exists and is not empty. */ + private static void validateKeyName(String keyName, Request req) { + String message = "Server must have valid key name"; + if (isEmpty(keyName)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + DescribeKeyPairsResult response = ec2.describeKeyPairs(); + for (KeyPairInfo key_pair : response.getKeyPairs()) if (key_pair.getKeyName().equals(keyName)) return; + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + } + + /** Get IAM instance profile for the provided role ARN. */ + private static InstanceProfile getIamInstanceProfile (String iamInstanceProfileArn) { + ListInstanceProfilesResult result = iam.listInstanceProfiles(); + // Iterate over instance profiles. If a matching ARN is found, silently return. + for (InstanceProfile profile: result.getInstanceProfiles()) if (profile.getArn().equals(iamInstanceProfileArn)) return profile; + return null; + } + + /** Validate IAM instance profile ARN exists and is not empty. */ + private static void validateIamInstanceProfileArn(String iamInstanceProfileArn, Request req) { + String message = "Server must have valid IAM instance profile ARN (e.g., arn:aws:iam::123456789012:instance-profile/otp-ec2-role)."; + if (isEmpty(iamInstanceProfileArn)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + if (getIamInstanceProfile(iamInstanceProfileArn) == null) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + } + + /** Validate that EC2 security group exists and is not empty. */ + private static void validateSecurityGroupId(String securityGroupId, Request req) { + String message = "Server must have valid security group ID"; + if (isEmpty(securityGroupId)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + DescribeSecurityGroupsRequest request = new DescribeSecurityGroupsRequest().withGroupIds(securityGroupId); + DescribeSecurityGroupsResult result = ec2.describeSecurityGroups(request); + // Iterate over groups. If a matching ID is found, silently return. + for (SecurityGroup group : result.getSecurityGroups()) if (group.getGroupId().equals(securityGroupId)) return; + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + } + + /** Validate that subnet exists and is not empty. */ + private static void validateSubnetId(String subnetId, Request req) { + String message = "Server must have valid subnet ID"; + if (isEmpty(subnetId)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + try { + DescribeSubnetsRequest request = new DescribeSubnetsRequest().withSubnetIds(subnetId); + DescribeSubnetsResult result = ec2.describeSubnets(request); + // Iterate over subnets. If a matching ID is found, silently return. + for (Subnet subnet : result.getSubnets()) if (subnet.getSubnetId().equals(subnetId)) return; + } catch (AmazonEC2Exception e) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message, e); + } + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message); + } + + /** + * Validate that EC2 instance type (e.g., t2-medium) exists. This value can be empty and will default to + * {@link com.conveyal.datatools.manager.jobs.DeployJob#DEFAULT_INSTANCE_TYPE} at deploy time. + */ + private static void validateInstanceType(String instanceType, Request req) { + if (instanceType == null) return; + try { + InstanceType.fromValue(instanceType); + } catch (IllegalArgumentException e) { + String message = String.format("Must provide valid instance type (if none provided, defaults to %s).", DEFAULT_INSTANCE_TYPE); + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, message, e); + } + } + + /** Validate that ELB target group exists and is not empty. */ + private static void validateTargetGroup(String targetGroupArn, Request req) { + if (isEmpty(targetGroupArn)) logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Invalid value for Target Group ARN."); + try { + DescribeTargetGroupsRequest request = new DescribeTargetGroupsRequest().withTargetGroupArns(targetGroupArn); + List targetGroups = elb.describeTargetGroups(request).getTargetGroups(); + if (targetGroups.size() == 0) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Invalid value for Target Group ARN. Could not locate Target Group."); + } + } catch (AmazonElasticLoadBalancingException e) { + logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Invalid value for Target Group ARN.", e); + } + } + + /** + * @return false if string value is empty or null + */ + public static boolean isEmpty(String val) { + return val == null || "".equals(val); + } + + /** + * Register HTTP methods with handler methods. + */ + public static void register (String apiPrefix) { + options(apiPrefix + "secure/servers", (q, s) -> ""); + delete(apiPrefix + "secure/servers/:id", ServerController::deleteServer, json::write); + delete(apiPrefix + "secure/servers/:id/ec2", ServerController::terminateEC2InstancesForServer, json::write); + get(apiPrefix + "secure/servers", ServerController::fetchServers, json::write); + post(apiPrefix + "secure/servers", ServerController::createServer, json::write); + put(apiPrefix + "secure/servers/:id", ServerController::updateServer, json::write); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/StatusController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/StatusController.java index ae701864f..94873786a 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/StatusController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/StatusController.java @@ -18,6 +18,7 @@ import java.util.stream.Collectors; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; +import static spark.Spark.delete; import static spark.Spark.get; /** @@ -55,6 +56,20 @@ private static MonitorableJob getOneJobRoute(Request req, Response res) { return getJobById(userId, jobId, true); } + /** + * API route that cancels a single job by ID. + */ + // TODO Add ability to cancel job. This requires some changes to how these jobs are executed. It appears that + // only scheduled jobs can be canceled. +// private static MonitorableJob cancelJob(Request req, Response res) { +// String jobId = req.params("jobId"); +// Auth0UserProfile userProfile = req.attribute("user"); +// // FIXME: refactor underscore in user_id methods +// String userId = userProfile.getUser_id(); +// MonitorableJob job = getJobById(userId, jobId, true); +// return job; +// } + /** * Gets a job by user ID and job ID. * @param clearCompleted if true, remove requested job if it has completed or errored @@ -132,5 +147,7 @@ public static void register (String apiPrefix) { // FIXME Change endpoint for all jobs (to avoid overlap with jobId param)? get(apiPrefix + "secure/status/jobs/all", StatusController::getAllJobsRoute, json::write); get(apiPrefix + "secure/status/jobs/:jobId", StatusController::getOneJobRoute, json::write); + // TODO Add ability to cancel job +// delete(apiPrefix + "secure/status/jobs/:jobId", StatusController::cancelJob, json::write); } } diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/UserController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/UserController.java index 4931fca31..3f0ffe967 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/UserController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/UserController.java @@ -24,6 +24,7 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; +import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Request; diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java index abb2c4ef9..119b2d38f 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java @@ -2,6 +2,23 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.model.AmazonEC2Exception; +import com.amazonaws.services.ec2.model.CreateTagsRequest; +import com.amazonaws.services.ec2.model.DescribeInstanceStatusRequest; +import com.amazonaws.services.ec2.model.DescribeInstancesRequest; +import com.amazonaws.services.ec2.model.IamInstanceProfileSpecification; +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.InstanceNetworkInterfaceSpecification; +import com.amazonaws.services.ec2.model.Reservation; +import com.amazonaws.services.ec2.model.RunInstancesRequest; +import com.amazonaws.services.ec2.model.Tag; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; +import com.amazonaws.services.elasticloadbalancingv2.model.DeregisterTargetsRequest; +import com.amazonaws.services.elasticloadbalancingv2.model.TargetDescription; +import com.amazonaws.services.s3.AmazonS3URI; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; @@ -19,25 +36,42 @@ import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Scanner; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.amazonaws.waiters.Waiter; +import com.amazonaws.waiters.WaiterParameters; import com.conveyal.datatools.common.status.MonitorableJob; import com.conveyal.datatools.manager.DataManager; +import com.conveyal.datatools.manager.controllers.api.ServerController; import com.conveyal.datatools.manager.models.Deployment; +import com.conveyal.datatools.manager.models.EC2Info; +import com.conveyal.datatools.manager.models.EC2InstanceSummary; import com.conveyal.datatools.manager.models.OtpServer; import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.datatools.manager.utils.StringUtils; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.codec.binary.Base64; +import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.mongodb.client.model.Filters.and; -import static com.mongodb.client.model.Filters.eq; -import static com.mongodb.client.model.Filters.not; -import static com.mongodb.client.model.Updates.pull; -import static com.mongodb.client.model.Updates.set; +import static com.conveyal.datatools.manager.controllers.api.ServerController.getIds; +import static com.conveyal.datatools.manager.models.Deployment.DEFAULT_OTP_VERSION; +import static com.conveyal.datatools.manager.models.Deployment.DEFAULT_R5_VERSION; /** * Deploy the given deployment to the OTP servers specified by targets. @@ -47,7 +81,33 @@ public class DeployJob extends MonitorableJob { private static final Logger LOG = LoggerFactory.getLogger(DeployJob.class); - private static final String bundlePrefix = "bundles/"; + private static final String bundlePrefix = "bundles"; + public static final String DEFAULT_INSTANCE_TYPE = "t2.medium"; + private static final String AMI_CONFIG_PATH = "modules.deployment.ec2.default_ami"; + private static final String DEFAULT_AMI_ID = DataManager.getConfigPropertyAsText(AMI_CONFIG_PATH); + private static final String OTP_GRAPH_FILENAME = "Graph.obj"; + // Use txt at the end of these filenames so that these can easily be viewed in a web browser. + public static final String BUNDLE_DOWNLOAD_COMPLETE_FILE = "BUNDLE_DOWNLOAD_COMPLETE.txt"; + public static final String GRAPH_STATUS_FILE = "GRAPH_STATUS.txt"; + private static final long TEN_MINUTES_IN_MILLISECONDS = 10 * 60 * 1000; + // Note: using a cloudfront URL for these download repo URLs will greatly increase download/deploy speed. + private static final String R5_REPO_URL = DataManager.hasConfigProperty("modules.deployment.r5_download_url") + ? DataManager.getConfigPropertyAsText("modules.deployment.r5_download_url") + : "https://r5-builds.s3.amazonaws.com"; + private static final String OTP_REPO_URL = DataManager.hasConfigProperty("modules.deployment.otp_download_url") + ? DataManager.getConfigPropertyAsText("modules.deployment.otp_download_url") + : "https://opentripplanner-builds.s3.amazonaws.com"; + /** + * S3 bucket to upload deployment to. If not null, uses {@link OtpServer#s3Bucket}. Otherwise, defaults to + * {@link DataManager#feedBucket} + * */ + private final String s3Bucket; + private final int targetCount; + private int tasksCompleted = 0; + private int totalTasks; + + private AmazonEC2 ec2; + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z"); /** The deployment to deploy */ private Deployment deployment; @@ -61,30 +121,60 @@ public class DeployJob extends MonitorableJob { /** This hides the status field on the parent class, providing additional fields. */ public DeployStatus status; + private String statusMessage; + private int serverCounter = 0; + private String dateString = DATE_FORMAT.format(new Date()); + @JsonProperty public String getDeploymentId () { return deployment.id; } + /** Increment the completed servers count (for use during ELB deployment) and update the job status. */ + public void incrementCompletedServers() { + status.numServersCompleted++; + int totalServers = otpServer.ec2Info.instanceCount; + if (totalServers < 1) totalServers = 1; + int numRemaining = totalServers - status.numServersCompleted; + double newStatus = status.percentComplete + (100 - status.percentComplete) * numRemaining / totalServers; + status.update(String.format("Completed %d servers. %d remaining...", status.numServersCompleted, numRemaining), newStatus); + } + + @JsonProperty + public String getServerId () { + return otpServer.id; + } + + public Deployment getDeployment() { + return deployment; + } + + public OtpServer getOtpServer() { + return otpServer; + } + public DeployJob(Deployment deployment, String owner, OtpServer otpServer) { // TODO add new job type or get rid of enum in favor of just using class names super(owner, "Deploying " + deployment.name, JobType.DEPLOY_TO_OTP); this.deployment = deployment; this.otpServer = otpServer; + this.s3Bucket = otpServer.s3Bucket != null ? otpServer.s3Bucket : DataManager.feedBucket; // Use a special subclass of status here that has additional fields this.status = new DeployStatus(); + this.targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0; + this.totalTasks = 1 + targetCount; status.message = "Initializing..."; status.built = false; status.numServersCompleted = 0; status.totalServers = otpServer.internalUrl == null ? 0 : otpServer.internalUrl.size(); + // CONNECT TO EC2 + // FIXME Should this ec2 client be longlived? + ec2 = AmazonEC2Client.builder().build(); } public void jobLogic () { - int targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0; - int totalTasks = 1 + targetCount; - int tasksCompleted = 0; - String statusMessage; - + if (otpServer.s3Bucket != null) totalTasks++; + if (otpServer.ec2Info != null) totalTasks++; try { deploymentTempFile = File.createTempFile("deployment", ".zip"); } catch (IOException e) { @@ -99,7 +189,7 @@ public void jobLogic () { // Dump the deployment bundle to the temp file. try { - status.message = "Creating OTP Bundle"; + status.message = "Creating transit bundle (GTFS and OSM)"; this.deployment.dump(deploymentTempFile, true, true, true); tasksCompleted++; } catch (Exception e) { @@ -114,59 +204,87 @@ public void jobLogic () { LOG.info("Deployment pctComplete = {}", status.percentComplete); status.built = true; - // Upload to S3, if applicable - if(otpServer.s3Bucket != null) { + // Upload to S3, if specifically required by the OTPServer or needed for servers in the target group to fetch. + if (otpServer.s3Bucket != null || otpServer.ec2Info != null) { if (!DataManager.useS3) { String message = "Cannot upload deployment to S3. Application not configured for s3 storage."; LOG.error(message); status.fail(message); return; } - status.message = "Uploading to S3"; - status.uploadingS3 = true; - LOG.info("Uploading deployment {} to s3", deployment.name); - String key = null; try { - TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build(); - key = bundlePrefix + deployment.parentProject().id + "/" + deployment.name + ".zip"; - final Upload upload = tx.upload(otpServer.s3Bucket, key, deploymentTempFile); - - upload.addProgressListener((ProgressListener) progressEvent -> { - status.percentUploaded = upload.getProgress().getPercentTransferred(); - }); - - upload.waitForCompletion(); - - // Shutdown the Transfer Manager, but don't shut down the underlying S3 client. - // The default behavior for shutdownNow shut's down the underlying s3 client - // which will cause any following s3 operations to fail. - tx.shutdownNow(false); - - // copy to [name]-latest.zip - String copyKey = bundlePrefix + deployment.parentProject().id + "/" + deployment.parentProject().name.toLowerCase() + "-latest.zip"; - CopyObjectRequest copyObjRequest = new CopyObjectRequest( - otpServer.s3Bucket, key, otpServer.s3Bucket, copyKey); - FeedStore.s3Client.copyObject(copyObjRequest); - } catch (AmazonClientException|InterruptedException e) { - statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s/%s", otpServer.s3Bucket, key); - LOG.error(statusMessage); - e.printStackTrace(); + uploadBundleToS3(); + } catch (AmazonClientException | InterruptedException e) { + statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s", s3Bucket); + LOG.error(statusMessage, e); status.fail(statusMessage); - return; } - status.uploadingS3 = false; + } + + // Handle spinning up new EC2 servers for the load balancer's target group. + if (otpServer.ec2Info != null) { + if ("true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.enabled"))) { + replaceEC2Servers(); + // If creating a new server, there is no need to deploy to an existing one. + return; + } else { + String message = "Cannot complete deployment. EC2 deployment disabled in server configuration."; + LOG.error(message); + status.fail(message); + return; + } } // If there are no OTP targets (i.e. we're only deploying to S3), we're done. - if(otpServer.internalUrl == null) { - status.completed = true; - return; + if(otpServer.internalUrl != null) { + // If we come to this point, there are internal URLs we need to deploy to (i.e., build graph over the wire). + boolean sendOverWireSuccessful = buildGraphOverWire(); + if (!sendOverWireSuccessful) return; + // Set baseUrl after success. + status.baseUrl = otpServer.publicUrl; } + status.completed = true; + } - // figure out what router we're using - String router = deployment.routerId != null ? deployment.routerId : "default"; + /** + * Upload to S3 the transit data bundle zip that contains GTFS zip files, OSM data, and config files. + */ + private void uploadBundleToS3() throws InterruptedException, AmazonClientException { + AmazonS3URI uri = new AmazonS3URI(getS3BundleURI()); + String bucket = uri.getBucket(); + status.message = "Uploading bundle to " + getS3BundleURI(); + status.uploadingS3 = true; + LOG.info("Uploading deployment {} to {}", deployment.name, uri.toString()); + TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build(); + final Upload upload = tx.upload(bucket, uri.getKey(), deploymentTempFile); + + upload.addProgressListener( + (ProgressListener) progressEvent -> status.percentUploaded = upload.getProgress().getPercentTransferred() + ); + + upload.waitForCompletion(); + + // Shutdown the Transfer Manager, but don't shut down the underlying S3 client. + // The default behavior for shutdownNow shut's down the underlying s3 client + // which will cause any following s3 operations to fail. + tx.shutdownNow(false); + + // copy to [name]-latest.zip + String copyKey = getLatestS3BundleKey(); + CopyObjectRequest copyObjRequest = new CopyObjectRequest(bucket, uri.getKey(), uri.getBucket(), copyKey); + FeedStore.s3Client.copyObject(copyObjRequest); + LOG.info("Copied to s3://{}/{}", bucket, copyKey); + LOG.info("Uploaded to {}", getS3BundleURI()); + status.update("Upload to S3 complete.", status.percentComplete + 10); + status.uploadingS3 = false; + } + /** + * Builds the OTP graph over wire, i.e., send the data over an HTTP POST request to boot/replace the existing graph + * using the OTP Routers#buildGraphOverWire endpoint. + */ + private boolean buildGraphOverWire() { // Send the deployment file over the wire to each OTP server. for (String rawUrl : otpServer.internalUrl) { status.message = "Deploying to " + rawUrl; @@ -175,7 +293,7 @@ public void jobLogic () { URL url; try { - url = new URL(rawUrl + "/routers/" + router); + url = new URL(rawUrl + "/routers/" + getRouterId()); } catch (MalformedURLException e) { statusMessage = String.format("Malformed deployment URL %s", rawUrl); LOG.error(statusMessage); @@ -217,7 +335,7 @@ public void jobLogic () { LOG.error(statusMessage); e.printStackTrace(); status.fail(statusMessage); - return; + return false; } // retrieveById the input file @@ -227,7 +345,7 @@ public void jobLogic () { } catch (FileNotFoundException e) { LOG.error("Internal error: could not read dumped deployment!"); status.fail("Internal error: could not read dumped deployment!"); - return; + return false; } try { @@ -236,7 +354,7 @@ public void jobLogic () { statusMessage = String.format("Unable to open connection to OTP server %s", url); LOG.error(statusMessage); status.fail(statusMessage); - return; + return false; } // copy @@ -247,7 +365,7 @@ public void jobLogic () { LOG.error(statusMessage); e.printStackTrace(); status.fail(statusMessage); - return; + return false; } try { @@ -257,7 +375,7 @@ public void jobLogic () { LOG.error(message); e.printStackTrace(); status.fail(message); - return; + return false; } try { @@ -276,8 +394,8 @@ public void jobLogic () { if (code != HttpURLConnection.HTTP_CREATED) { // Get input/error stream from connection response. InputStream stream = code < HttpURLConnection.HTTP_BAD_REQUEST - ? conn.getInputStream() - : conn.getErrorStream(); + ? conn.getInputStream() + : conn.getErrorStream(); String response; try (Scanner scanner = new Scanner(stream)) { scanner.useDelimiter("\\Z"); @@ -288,7 +406,7 @@ public void jobLogic () { status.fail(statusMessage); // Skip deploying to any other servers. // There is no reason to take out the rest of the servers, it's going to have the same result. - return; + return false; } } catch (IOException e) { statusMessage = String.format("Could not finish request to server %s", url); @@ -300,9 +418,16 @@ public void jobLogic () { tasksCompleted++; status.percentComplete = 100.0 * (double) tasksCompleted / totalTasks; } + return true; + } - status.completed = true; - status.baseUrl = otpServer.publicUrl; + private String getS3BundleURI() { + return joinToS3FolderURI("bundle.zip"); + } + + private String getLatestS3BundleKey() { + String name = StringUtils.getCleanName(deployment.parentProject().name.toLowerCase()); + return String.format("%s/%s/%s-latest.zip", bundlePrefix, deployment.projectId, name); } @Override @@ -317,19 +442,384 @@ public void jobFinished () { if (!status.error) { // Update status with successful completion state only if no error was encountered. status.update(false, "Deployment complete!", 100, true); - // Store the target server in the deployedTo field. - LOG.info("Updating deployment target to {} id={}", otpServer.target(), deployment.id); - Persistence.deployments.updateField(deployment.id, "deployedTo", otpServer.target()); - // Update last deployed field. - Persistence.deployments.updateField(deployment.id, "lastDeployed", new Date()); - message = String.format("Deployment %s successfully deployed to %s", deployment.name, otpServer.publicUrl); + // Store the target server in the deployedTo field and set last deployed time. + LOG.info("Updating deployment target and deploy time."); + deployment.deployedTo = otpServer.id; + deployment.deployJobSummaries.add(0, new DeploySummary(this)); + Persistence.deployments.replace(deployment.id, deployment); + long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - status.startTime); + message = String.format("Deployment %s successfully deployed to %s in %s minutes.", deployment.name, otpServer.publicUrl, durationMinutes); } else { - message = String.format("WARNING: Deployment %s failed to deploy to %s", deployment.name, otpServer.publicUrl); + message = String.format("WARNING: Deployment %s failed to deploy to %s. Error: %s", deployment.name, otpServer.publicUrl, status.message); } // Send notification to those subscribed to updates for the deployment. NotifyUsersForSubscriptionJob.createNotification("deployment-updated", deployment.id, message); } + /** + * Start up EC2 instances as trip planning servers running on the provided ELB. After monitoring the server statuses + * and verifying that they are running, remove the previous EC2 instances that were assigned to the ELB and terminate + * them. + */ + private void replaceEC2Servers() { + try { + // Track any previous instances running for the server we're deploying to in order to de-register and + // terminate them later. + List previousInstances = otpServer.retrieveEC2InstanceSummaries(); + // First start graph-building instance and wait for graph to successfully build. + status.message = "Starting up graph building EC2 instance"; + List instances = startEC2Instances(1, false); + // Exit if an error was encountered. + if (status.error || instances.size() == 0) { + ServerController.terminateInstances(instances); + return; + } + status.message = "Waiting for graph build to complete..."; + MonitorServerStatusJob monitorInitialServerJob = new MonitorServerStatusJob(owner, this, instances.get(0), false); + monitorInitialServerJob.run(); + + status.update("Graph build is complete!", 50); + // If only building graph, job is finished. Note: the graph building EC2 instance should automatically shut + // itself down if this flag is turned on (happens in user data). We do not want to proceed with the rest of + // the job which would shut down existing servers running for the deployment. + if (deployment.buildGraphOnly) { + status.update("Graph build is complete!", 100); + return; + } + Persistence.deployments.replace(deployment.id, deployment); + if (monitorInitialServerJob.status.error) { + // If an error occurred while monitoring the initial server, fail this job and instruct user to inspect + // build logs. + statusMessage = "Error encountered while building graph. Inspect build logs."; + LOG.error(statusMessage); + status.fail(statusMessage); + ServerController.terminateInstances(instances); + return; + } + // Spin up remaining servers which will download the graph from S3. + status.numServersRemaining = otpServer.ec2Info.instanceCount <= 0 ? 0 : otpServer.ec2Info.instanceCount - 1; + List remainingServerMonitorJobs = new ArrayList<>(); + List remainingInstances = new ArrayList<>(); + if (status.numServersRemaining > 0) { + // Spin up remaining EC2 instances. + status.message = String.format("Spinning up remaining %d instance(s).", status.numServersRemaining); + remainingInstances.addAll(startEC2Instances(status.numServersRemaining, true)); + if (remainingInstances.size() == 0 || status.error) { + ServerController.terminateInstances(remainingInstances); + return; + } + // Create new thread pool to monitor server setup so that the servers are monitored in parallel. + ExecutorService service = Executors.newFixedThreadPool(status.numServersRemaining); + for (Instance instance : remainingInstances) { + // Note: new instances are added + MonitorServerStatusJob monitorServerStatusJob = new MonitorServerStatusJob(owner, this, instance, true); + remainingServerMonitorJobs.add(monitorServerStatusJob); + service.submit(monitorServerStatusJob); + } + // Shutdown thread pool once the jobs are completed and wait for its termination. Once terminated, we can + // consider the servers up and running (or they have failed to initialize properly). + service.shutdown(); + service.awaitTermination(4, TimeUnit.HOURS); + } + // Check if any of the monitor jobs encountered any errors and terminate the job's associated instance. + for (MonitorServerStatusJob job : remainingServerMonitorJobs) { + if (job.status.error) { + String id = job.getInstanceId(); + LOG.warn("Error encountered while monitoring server {}. Terminating.", id); + remainingInstances.removeIf(instance -> instance.getInstanceId().equals(id)); + ServerController.terminateInstances(id); + } + } + // Add all servers that did not encounter issues to list for registration with ELB. + instances.addAll(remainingInstances); + String finalMessage = "Server setup is complete!"; + // Get EC2 servers running that are associated with this server. + List previousInstanceIds = previousInstances.stream() + .filter(instance -> "running".equals(instance.state.getName())) + .map(instance -> instance.instanceId) + .collect(Collectors.toList()); + if (previousInstanceIds.size() > 0) { + boolean success = ServerController.deRegisterAndTerminateInstances(otpServer.ec2Info.targetGroupArn, previousInstanceIds); + // If there was a problem during de-registration/termination, notify via status message. + if (!success) { + finalMessage = String.format("Server setup is complete! (WARNING: Could not terminate previous EC2 instances: %s", previousInstanceIds); + } + } + // Job is complete. + status.update(false, finalMessage, 100, true); + } catch (Exception e) { + LOG.error("Could not deploy to EC2 server", e); + status.fail("Could not deploy to EC2 server", e); + } + } + + /** + * Start the specified number of EC2 instances based on the {@link OtpServer#ec2Info}. + * @param count number of EC2 instances to start + * @return a list of the instances is returned once the public IP addresses have been assigned + * + * TODO: Booting up R5 servers has not been fully tested. + */ + private List startEC2Instances(int count, boolean graphAlreadyBuilt) { + String instanceType = otpServer.ec2Info.instanceType == null ? DEFAULT_INSTANCE_TYPE : otpServer.ec2Info.instanceType; + // User data should contain info about: + // 1. Downloading GTFS/OSM info (s3) + // 2. Time to live until shutdown/termination (for test servers) + // 3. Hosting / nginx + String userData = constructUserData(graphAlreadyBuilt); + // Failure was encountered while constructing user data. + if (userData == null) { + // Fail job if it is not already failed. + if (!status.error) status.fail("Error constructing EC2 user data."); + return Collections.EMPTY_LIST; + } + // The subnet ID should only change if starting up a server in some other AWS account. This is not + // likely to be a requirement. + // Define network interface so that a public IP can be associated with server. + InstanceNetworkInterfaceSpecification interfaceSpecification = new InstanceNetworkInterfaceSpecification() + .withSubnetId(otpServer.ec2Info.subnetId) + .withAssociatePublicIpAddress(true) + .withGroups(otpServer.ec2Info.securityGroupId) + .withDeviceIndex(0); + // If AMI not defined, use the default AMI ID. + String amiId = otpServer.ec2Info.amiId; + if (amiId == null) { + amiId = DEFAULT_AMI_ID; + // Verify that AMI is correctly defined. + if (amiId == null || !ServerController.amiExists(amiId)) { + statusMessage = String.format( + "Default AMI ID (%s) is missing or bad. Should be provided in config at %s", + amiId, + AMI_CONFIG_PATH); + LOG.error(statusMessage); + status.fail(statusMessage); + } + } + RunInstancesRequest runInstancesRequest = new RunInstancesRequest() + .withNetworkInterfaces(interfaceSpecification) + .withInstanceType(instanceType) + .withMinCount(count) + .withMaxCount(count) + .withIamInstanceProfile(new IamInstanceProfileSpecification().withArn(otpServer.ec2Info.iamInstanceProfileArn)) + .withImageId(amiId) + .withKeyName(otpServer.ec2Info.keyName) + // This will have the instance terminate when it is shut down. + .withInstanceInitiatedShutdownBehavior("terminate") + .withUserData(Base64.encodeBase64String(userData.getBytes())); + final List instances = ec2.runInstances(runInstancesRequest).getReservation().getInstances(); + + List instanceIds = getIds(instances); + Map instanceIpAddresses = new HashMap<>(); + // Wait so that create tags request does not fail because instances not found. + try { + Waiter waiter = ec2.waiters().instanceStatusOk(); + long beginWaiting = System.currentTimeMillis(); + waiter.run(new WaiterParameters<>(new DescribeInstanceStatusRequest().withInstanceIds(instanceIds))); + LOG.info("Instance status is OK after {} ms", (System.currentTimeMillis() - beginWaiting)); + } catch (Exception e) { + statusMessage = "Waiter for instance status check failed. You may need to terminate the failed instances."; + LOG.error(statusMessage, e); + status.fail(statusMessage); + return Collections.EMPTY_LIST; + } + for (Instance instance : instances) { + // The public IP addresses will likely be null at this point because they take a few seconds to initialize. + instanceIpAddresses.put(instance.getInstanceId(), instance.getPublicIpAddress()); + String serverName = String.format("%s %s (%s) %d %s", deployment.r5 ? "r5" : "otp", deployment.name, dateString, serverCounter++, graphAlreadyBuilt ? "clone" : "builder"); + LOG.info("Creating tags for new EC2 instance {}", serverName); + ec2.createTags(new CreateTagsRequest() + .withTags(new Tag("Name", serverName)) + .withTags(new Tag("projectId", deployment.projectId)) + .withTags(new Tag("deploymentId", deployment.id)) + .withTags(new Tag("jobId", this.jobId)) + .withTags(new Tag("serverId", otpServer.id)) + .withTags(new Tag("routerId", getRouterId())) + .withTags(new Tag("user", this.owner)) + .withResources(instance.getInstanceId()) + ); + } + List updatedInstances = new ArrayList<>(); + while (instanceIpAddresses.values().contains(null)) { + LOG.info("Checking that public IP addresses have initialized for EC2 instances."); + // Reset instances list so that updated instances have the latest state information (e.g., public IP has + // been assigned). + updatedInstances.clear(); + // Check that all of the instances have public IPs. + DescribeInstancesRequest request = new DescribeInstancesRequest().withInstanceIds(instanceIds); + List reservations = ec2.describeInstances(request).getReservations(); + for (Reservation reservation : reservations) { + for (Instance instance : reservation.getInstances()) { + instanceIpAddresses.put(instance.getInstanceId(), instance.getPublicIpAddress()); + updatedInstances.add(instance); + } + } + try { + int sleepTimeMillis = 10000; + LOG.info("Waiting {} seconds to perform another public IP address check...", sleepTimeMillis / 1000); + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (System.currentTimeMillis() - status.startTime > TEN_MINUTES_IN_MILLISECONDS) { + status.fail("Job timed out due to public IP assignment taking longer than ten minutes!"); + return updatedInstances; + } + } + LOG.info("Public IP addresses have all been assigned. {}", instanceIpAddresses.values().toString()); + return updatedInstances; + } + + /** + * @return the router ID for this deployment (defaults to "default") + */ + private String getRouterId() { + return deployment.routerId == null ? "default" : deployment.routerId; + } + + /** + * Construct the user data script (as string) that should be provided to the AMI and executed upon EC2 instance + * startup. + */ + private String constructUserData(boolean graphAlreadyBuilt) { + // Prefix/name of JAR file (WITHOUT .jar) + String jarName = deployment.r5 ? deployment.r5Version : deployment.otpVersion; + if (jarName == null) { + if (deployment.r5) deployment.r5Version = DEFAULT_R5_VERSION; + else deployment.otpVersion = DEFAULT_OTP_VERSION; + // If there is no version specified, use the default (and persist value). + jarName = deployment.r5 ? deployment.r5Version : deployment.otpVersion; + Persistence.deployments.replace(deployment.id, deployment); + } + // Construct URL for trip planner jar and check that it exists with a lightweight HEAD request. + String s3JarKey = jarName + ".jar"; + String repoUrl = deployment.r5 ? R5_REPO_URL : OTP_REPO_URL; + String s3JarUrl = String.join("/", repoUrl, s3JarKey); + try { + final URL url = new URL(s3JarUrl); + HttpURLConnection huc = (HttpURLConnection) url.openConnection(); + huc.setRequestMethod("HEAD"); + int responseCode = huc.getResponseCode(); + if (responseCode != HttpStatus.OK_200) { + statusMessage = String.format("Requested trip planner jar does not exist at %s", s3JarUrl); + LOG.error(statusMessage); + status.fail(statusMessage); + return null; + } + } catch (IOException e) { + statusMessage = String.format("Error checking for trip planner jar: %s", s3JarUrl); + LOG.error(statusMessage, e); + status.fail(statusMessage); + return null; + } + String jarDir = String.format("/opt/%s", getTripPlannerString()); + List lines = new ArrayList<>(); + String routerName = "default"; + String routerDir = String.format("/var/%s/graphs/%s", getTripPlannerString(), routerName); + String graphPath = String.join("/", routerDir, OTP_GRAPH_FILENAME); + //////////////// BEGIN USER DATA + lines.add("#!/bin/bash"); + // Send trip planner logs to LOGFILE + lines.add(String.format("BUILDLOGFILE=/var/log/%s", getBuildLogFilename())); + lines.add(String.format("LOGFILE=/var/log/%s.log", getTripPlannerString())); + lines.add("USERDATALOG=/var/log/user-data.log"); + // Log user data setup to /var/log/user-data.log + lines.add("exec > >(tee $USERDATALOG|logger -t user-data -s 2>/dev/console) 2>&1"); + // Create the directory for the graph inputs. + lines.add(String.format("mkdir -p %s", routerDir)); + lines.add(String.format("chown ubuntu %s", routerDir)); + // Remove the current inputs from router directory. + lines.add(String.format("rm -rf %s/*", routerDir)); + // Download trip planner JAR. + lines.add(String.format("mkdir -p %s", jarDir)); + // Add client static file directory for uploading deploy stage status files. + // TODO: switch to AMI that uses /usr/share/nginx/html as static file dir so we don't have to create this new dir. + lines.add("WEB_DIR=/usr/share/nginx/client"); + lines.add("sudo mkdir $WEB_DIR"); + lines.add(String.format("wget %s -O %s/%s.jar", s3JarUrl, jarDir, jarName)); + if (graphAlreadyBuilt) { + lines.add("echo 'downloading graph from s3'"); + // Download Graph from S3. + lines.add(String.format("aws s3 --region us-east-1 cp %s %s ", getS3GraphURI(), graphPath)); + } else { + // Download data bundle from S3. + lines.add(String.format("aws s3 --region us-east-1 cp %s /tmp/bundle.zip", getS3BundleURI())); + // Determine if bundle download was successful. + lines.add("[ -f /tmp/bundle.zip ] && BUNDLE_STATUS='SUCCESS' || BUNDLE_STATUS='FAILURE'"); + // Create file with bundle status in web dir to notify Data Tools that download is complete. + lines.add(String.format("sudo echo $BUNDLE_STATUS > $WEB_DIR/%s", BUNDLE_DOWNLOAD_COMPLETE_FILE)); + // Put unzipped bundle data into router directory. + lines.add(String.format("unzip /tmp/bundle.zip -d %s", routerDir)); + // FIXME: Add ability to fetch custom bikeshare.xml file (CarFreeAtoZ) + if (false) { + lines.add(String.format("wget -O %s/bikeshare.xml ${config.bikeshareFeed}", routerDir)); + lines.add(String.format("printf \"{\\n bikeRentalFile: \"bikeshare.xml\"\\n}\" >> %s/build-config.json\"", routerDir)); + } + lines.add("echo 'starting graph build'"); + // Build the graph. + if (deployment.r5) lines.add(String.format("sudo -H -u ubuntu java -Xmx6G -jar %s/%s.jar point --build %s", jarDir, jarName, routerDir)); + else lines.add(String.format("sudo -H -u ubuntu java -jar %s/%s.jar --build %s > $BUILDLOGFILE 2>&1", jarDir, jarName, routerDir)); + // Upload the build log file and graph to S3. + if (!deployment.r5) { + String s3BuildLogPath = joinToS3FolderURI(getBuildLogFilename()); + lines.add(String.format("aws s3 --region us-east-1 cp $BUILDLOGFILE %s ", s3BuildLogPath)); + lines.add(String.format("aws s3 --region us-east-1 cp %s %s ", graphPath, getS3GraphURI())); + } + } + // Determine if graph build/download was successful (and that Graph.obj is not zero bytes). + lines.add(String.format("FILESIZE=$(wc -c <%s)", graphPath)); + lines.add(String.format("[ -f %s ] && (($FILESIZE > 0)) && GRAPH_STATUS='SUCCESS' || GRAPH_STATUS='FAILURE'", graphPath)); + // Create file with bundle status in web dir to notify Data Tools that download is complete. + lines.add(String.format("sudo echo $GRAPH_STATUS > $WEB_DIR/%s", GRAPH_STATUS_FILE)); + // Get the instance's instance ID from the AWS metadata endpoint. + lines.add("instance_id=`curl http://169.254.169.254/latest/meta-data/instance-id`"); + // Upload user data log associated with instance to a log file on S3. + lines.add(String.format("aws s3 --region us-east-1 cp $USERDATALOG %s/${instance_id}.log", getS3FolderURI().toString())); + if (deployment.buildGraphOnly) { + // If building graph only, tell the instance to shut itself down after the graph build (and log upload) is + // complete. + lines.add("echo 'shutting down server (build graph only specified in deployment target)'"); + lines.add("sudo poweroff"); + } else { + // Otherwise, kick off the application. + lines.add("echo 'kicking off trip planner (logs at $LOGFILE)'"); + if (deployment.r5) lines.add(String.format("sudo -H -u ubuntu nohup java -Xmx6G -Djava.util.Arrays.useLegacyMergeSort=true -jar %s/%s.jar point --isochrones %s > /var/log/r5.out 2>&1&", jarDir, jarName, routerDir)); + else lines.add(String.format("sudo -H -u ubuntu nohup java -jar %s/%s.jar --server --bindAddress 127.0.0.1 --router default > $LOGFILE 2>&1 &", jarDir, jarName)); + } + // Return the entire user data script as a single string. + return String.join("\n", lines); + } + + private String getBuildLogFilename() { + return String.format("%s-build.log", getTripPlannerString()); + } + + private String getTripPlannerString() { + return deployment.r5 ? "r5" : "otp"; + } + + @JsonIgnore + public String getJobRelativePath() { + return String.join("/", bundlePrefix, deployment.projectId, deployment.id, this.jobId); + } + + @JsonIgnore + public AmazonS3URI getS3FolderURI() { + return new AmazonS3URI(String.format("s3://%s/%s", otpServer.s3Bucket, getJobRelativePath())); + } + + @JsonIgnore + public String getS3GraphURI() { + return joinToS3FolderURI(OTP_GRAPH_FILENAME); + } + + /** Join list of paths to S3 URI for job folder to create a fully qualified URI (e.g., s3://bucket/path/to/file). */ + private String joinToS3FolderURI(CharSequence... paths) { + List pathList = new ArrayList<>(); + pathList.add(getS3FolderURI().toString()); + pathList.addAll(Arrays.asList(paths)); + return String.join("/", pathList); + } + /** * Represents the current status of this job. */ @@ -347,6 +837,8 @@ public static class DeployStatus extends Status { /** To how many servers have we successfully deployed thus far? */ public int numServersCompleted; + public int numServersRemaining; + /** How many servers are we attempting to deploy to? */ public int totalServers; @@ -354,4 +846,36 @@ public static class DeployStatus extends Status { public String baseUrl; } + + /** + * Contains details about a specific deployment job in order to preserve and recall this info after the job has + * completed. + */ + public static class DeploySummary implements Serializable { + private static final long serialVersionUID = 1L; + public String serverId; + public long duration; + public String s3Bucket; + public String jobId; + /** URL for build log file from latest deploy job. */ + public String buildArtifactsFolder; + public String otpVersion; + public EC2Info ec2Info; + public long startTime; + public long finishTime = System.currentTimeMillis(); + + /** Empty constructor for serialization */ + public DeploySummary () { } + + public DeploySummary (DeployJob job) { + this.serverId = job.otpServer.id; + this.ec2Info = job.otpServer.ec2Info; + this.otpVersion = job.deployment.otpVersion; + this.jobId = job.jobId; + this.s3Bucket = job.s3Bucket; + this.startTime = job.status.startTime; + this.duration = job.status.duration; + this.buildArtifactsFolder = job.getS3FolderURI().toString(); + } + } } diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java new file mode 100644 index 000000000..7afcd4a4a --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java @@ -0,0 +1,267 @@ +package com.conveyal.datatools.manager.jobs; + +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.InstanceStateChange; +import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.amazonaws.services.ec2.model.TerminateInstancesResult; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; +import com.amazonaws.services.elasticloadbalancingv2.model.RegisterTargetsRequest; +import com.amazonaws.services.elasticloadbalancingv2.model.TargetDescription; +import com.amazonaws.services.s3.AmazonS3URI; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.manager.models.Deployment; +import com.conveyal.datatools.manager.models.OtpServer; +import com.conveyal.datatools.manager.persistence.FeedStore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.conveyal.datatools.manager.jobs.DeployJob.BUNDLE_DOWNLOAD_COMPLETE_FILE; +import static com.conveyal.datatools.manager.jobs.DeployJob.GRAPH_STATUS_FILE; + +/** + * Job that is dispatched during a {@link DeployJob} that spins up EC2 instances. This handles waiting for the server to + * come online and for the OTP application/API to become available. + */ +public class MonitorServerStatusJob extends MonitorableJob { + private static final Logger LOG = LoggerFactory.getLogger(MonitorServerStatusJob.class); + private final DeployJob deployJob; + private final Deployment deployment; + private final Instance instance; + private final boolean graphAlreadyBuilt; + private final OtpServer otpServer; + private final AmazonEC2 ec2 = AmazonEC2Client.builder().build(); + private final CloseableHttpClient httpClient = HttpClients.createDefault(); + // If the job takes longer than XX seconds, fail the job. + private static final int TIMEOUT_MILLIS = 60 * 60 * 1000; // One hour + private static final int DELAY_SECONDS = 5; + private final long startTime; + public long graphBuildSeconds; + + public MonitorServerStatusJob(String owner, DeployJob deployJob, Instance instance, boolean graphAlreadyBuilt) { + super( + owner, + String.format("Monitor server setup %s", instance.getPublicIpAddress()), + JobType.MONITOR_SERVER_STATUS + ); + this.deployJob = deployJob; + this.deployment = deployJob.getDeployment(); + this.otpServer = deployJob.getOtpServer(); + this.instance = instance; + this.graphAlreadyBuilt = graphAlreadyBuilt; + status.message = "Checking server status..."; + startTime = System.currentTimeMillis(); + } + + @JsonProperty + public String getInstanceId () { + return instance != null ? instance.getInstanceId() : null; + } + + @JsonProperty + public String getDeploymentId () { + return deployJob.getDeploymentId(); + } + + @Override + public void jobLogic() { + String message; + String ipUrl = "http://" + instance.getPublicIpAddress(); + // Get OTP URL for instance to check for availability. + boolean routerIsAvailable = false, graphIsAvailable = false; + // If graph was not already built by a previous server, wait for it to build. + if (!graphAlreadyBuilt) { + boolean bundleIsDownloaded = false; + // Progressively check status of OTP server + if (deployment.buildGraphOnly) { + // No need to check that OTP is running. Just check to see that the graph is built. + bundleIsDownloaded = true; + routerIsAvailable = true; + } + // First, check that OTP has started up. + status.update("Prepping for graph build...", 20); + String bundleUrl = String.join("/", ipUrl, BUNDLE_DOWNLOAD_COMPLETE_FILE); + long bundleDownloadStartTime = System.currentTimeMillis(); + while (!bundleIsDownloaded) { + // If the request is successful, the OTP instance has started. + wait("bundle download check:" + bundleUrl); + bundleIsDownloaded = checkForSuccessfulRequest(bundleUrl); + if (jobHasTimedOut()) { + status.fail(String.format("Job timed out while checking for server bundle download status (%s)", instance.getInstanceId())); + return; + } + } + // Check status of bundle download and fail job if there was a failure. + String bundleStatus = getUrlAsString(bundleUrl); + if (bundleStatus == null || !bundleStatus.contains("SUCCESS")) { + status.fail("Failure encountered while downloading transit bundle."); + return; + } + long bundleDownloadSeconds = (System.currentTimeMillis() - bundleDownloadStartTime) / 1000; + message = String.format("Bundle downloaded in %d seconds!", bundleDownloadSeconds); + LOG.info(message); + status.update("Building graph...", 30); + } + status.update("Loading graph...", 40); + long graphBuildStartTime = System.currentTimeMillis(); + String graphStatusUrl = String.join("/", ipUrl, GRAPH_STATUS_FILE); + while (!graphIsAvailable) { + // If the request is successful, the OTP instance has started. + wait("graph build/download check: " + graphStatusUrl); + graphIsAvailable = checkForSuccessfulRequest(graphStatusUrl); + if (jobHasTimedOut()) { + message = String.format("Job timed out while waiting for graph build/download (%s)", instance.getInstanceId()); + LOG.error(message); + status.fail(message); + return; + } + } + // Check status of bundle download and fail job if there was a failure. + String graphStatus = getUrlAsString(graphStatusUrl); + if (graphStatus == null || !graphStatus.contains("SUCCESS")) { + message = String.format("Failure encountered while building/downloading graph (%s).", instance.getInstanceId()); + LOG.error(message); + status.fail(message); + return; + } + graphBuildSeconds = (System.currentTimeMillis() - graphBuildStartTime) / 1000; + message = String.format("Graph build/download completed in %d seconds!", graphBuildSeconds); + LOG.info(message); + // If only task is to build graph, this machine's job is complete and we can consider this job done. + if (deployment.buildGraphOnly) { + status.update(false, message, 100); + return; + } + // Once this is confirmed, check for the existence of the router, which will indicate that the graph build is + // complete. + String routerUrl = String.join("/", ipUrl, "otp/routers/default"); + while (!routerIsAvailable) { + // If the request was successful, the graph build is complete! + // TODO: Substitute in specific router ID? Or just default to... default. + wait("router to become available: " + routerUrl); + routerIsAvailable = checkForSuccessfulRequest(routerUrl); + if (jobHasTimedOut()) { + message = String.format("Job timed out while waiting for trip planner to start up (%s)", instance.getInstanceId()); + status.fail(message); + LOG.error(message); + return; + } + } + status.update("Graph loaded!", 90); + if (otpServer.ec2Info != null && otpServer.ec2Info.targetGroupArn != null) { + // After the router is available, the EC2 instance can be registered with the load balancer. + // REGISTER INSTANCE WITH LOAD BALANCER + AmazonElasticLoadBalancing elbClient = AmazonElasticLoadBalancingClient.builder().build(); + RegisterTargetsRequest registerTargetsRequest = new RegisterTargetsRequest() + .withTargetGroupArn(otpServer.ec2Info.targetGroupArn) + .withTargets(new TargetDescription().withId(instance.getInstanceId())); + elbClient.registerTargets(registerTargetsRequest); + // FIXME how do we know it was successful? + message = String.format("Server successfully registered with load balancer %s. OTP running at %s", otpServer.ec2Info.targetGroupArn, routerUrl); + LOG.info(message); + status.update(false, message, 100, true); + deployJob.incrementCompletedServers(); + } else { + message = String.format("There is no load balancer under which to register ec2 instance %s.", instance.getInstanceId()); + LOG.error(message); + status.fail(message); + } + } + + /** Determine if job has passed time limit for its run time. */ + private boolean jobHasTimedOut() { + long runTime = System.currentTimeMillis() - startTime; + return runTime > TIMEOUT_MILLIS; + } + + /** + * Checks for Graph object on S3. + */ + private boolean isGraphBuilt() { + AmazonS3URI uri = new AmazonS3URI(deployJob.getS3GraphURI()); + LOG.info("Checking for graph at {}", uri.toString()); + // Surround with try/catch (exception thrown if object does not exist). + try { + return FeedStore.s3Client.doesObjectExist(uri.getBucket(), uri.getKey()); + } catch (AmazonS3Exception e) { + LOG.warn("Object not found for key " + uri.getKey(), e); + return false; + } + } + + /** Have the current thread sleep for a few seconds in order to pause during a while loop. */ + private void wait(String waitingFor) { + try { + LOG.info("Waiting {} seconds for {}", DELAY_SECONDS, waitingFor); + Thread.sleep(1000 * DELAY_SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * Get the S3 key for the bundle status file, which is uploaded by the graph-building EC2 instance after the graph + * build completes. The file contains either "SUCCESS" or "FAILURE". + */ + private String getBundleStatusKey () { + return String.join("/", deployJob.getJobRelativePath(), BUNDLE_DOWNLOAD_COMPLETE_FILE); + } + + private String getUrlAsString(String url) { + HttpGet httpGet = new HttpGet(url); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + return EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + LOG.error("Could not complete request to {}", url); + e.printStackTrace(); + return null; + } + } + + /** + * Checks the provided URL for a successful response (i.e., HTTP status code is 200). + */ + private boolean checkForSuccessfulRequest(String url) { + HttpGet httpGet = new HttpGet(url); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + HttpEntity entity = response.getEntity(); + int statusCode = response.getStatusLine().getStatusCode(); + // Ensure the response body is fully consumed + EntityUtils.consume(entity); + return statusCode == 200; + } catch (IOException e) { + LOG.error("Could not complete request to {}", url); + e.printStackTrace(); + } + return false; + } + + @Override + public void jobFinished() { + if (status.error) { + // Terminate server. + TerminateInstancesResult terminateInstancesResult = ec2.terminateInstances( + new TerminateInstancesRequest().withInstanceIds(instance.getInstanceId()) + ); + InstanceStateChange instanceStateChange = terminateInstancesResult.getTerminatingInstances().get(0); + // If instance state code is 48 that means it has been terminated. + if (instanceStateChange.getCurrentState().getCode() == 48) { + // FIXME: this message will not make it to the client because the status has already been failed. Also, + // I'm not sure if this is even the right way to handle the instance state check. + status.update("Instance is terminated!", 100); + } + } + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/NotifyUsersForSubscriptionJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/NotifyUsersForSubscriptionJob.java index a72e01e57..481cb1144 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/NotifyUsersForSubscriptionJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/NotifyUsersForSubscriptionJob.java @@ -35,7 +35,7 @@ private NotifyUsersForSubscriptionJob(String subscriptionType, String target, St /** * Convenience method to create and schedule a notification job to notify subscribed users. */ - public static void createNotification(String subscriptionType, String target, String message) { + public static void createNotification(String subscriptionType, String target, String message) { if (APPLICATION_URL == null || !(APPLICATION_URL.startsWith("https://") || APPLICATION_URL.startsWith("http://"))) { LOG.error("application.public_url (value={}) property must be set to a valid URL in order to send notifications to users.", APPLICATION_URL); return; diff --git a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java index 94156e21f..c6b939af6 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java @@ -1,6 +1,9 @@ package com.conveyal.datatools.manager.models; +import com.amazonaws.services.ec2.model.Filter; import com.conveyal.datatools.manager.DataManager; +import com.conveyal.datatools.manager.controllers.api.DeploymentController; +import com.conveyal.datatools.manager.jobs.DeployJob; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.StringUtils; import com.conveyal.datatools.manager.utils.json.JsonManager; @@ -29,6 +32,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Locale; @@ -55,9 +59,14 @@ public class Deployment extends Model implements Serializable { public String name; + public static final String DEFAULT_OTP_VERSION = "otp-v1.4.0"; + public static final String DEFAULT_R5_VERSION = "v2.4.1-9-g3be6daa"; + /** What server is this currently deployed to? */ public String deployedTo; + public List deployJobSummaries = new ArrayList<>(); + @JsonView(JsonViews.DataDump.class) public String projectId; @@ -105,6 +114,13 @@ public List retrieveFeedVersions() { return ret; } + /** All of the feed versions used in this deployment, summarized so that the Internet won't break */ + @JsonProperty("ec2Instances") + public List retrieveEC2Instances() { + Filter deploymentFilter = new Filter("tag:deploymentId", Collections.singletonList(id)); + return DeploymentController.fetchEC2InstanceSummaries(deploymentFilter); + } + public void storeFeedVersions(Collection versions) { feedVersionIds = new ArrayList<>(versions.size()); @@ -116,11 +132,34 @@ public void storeFeedVersions(Collection versions) { // future use public String osmFileId; - /** The commit of OTP being used on this deployment */ - public String otpCommit; + /** + * The version (according to git describe) of OTP being used on this deployment This should default to + * {@link Deployment#DEFAULT_OTP_VERSION}. + */ + public String otpVersion; + + public boolean buildGraphOnly; + + /** + * The version (according to git describe) of R5 being used on this deployment. This should default to + * {@link Deployment#DEFAULT_R5_VERSION}. + */ + public String r5Version; + + /** Whether this deployment should build an r5 server (false=OTP) */ + public boolean r5; /** Date when the deployment was last deployed to a server */ - public Date lastDeployed; + @JsonProperty("lastDeployed") + public Date retrieveLastDeployed () { + return latest() != null ? new Date(latest().finishTime) : null; + } + + /** Get latest deployment summary. */ + @JsonProperty("latest") + public DeployJob.DeploySummary latest () { + return deployJobSummaries.size() > 0 ? deployJobSummaries.get(0) : null; + } /** * The routerId of this deployment @@ -226,7 +265,8 @@ public Deployment() { // do nothing. } - /** Dump this deployment to the given file + /** + * Dump this deployment to the given output file. * @param output the output file * @param includeOsm should an osm.pbf file be included in the dump? * @param includeOtpConfig should OTP build-config.json and router-config.json be included? @@ -418,9 +458,9 @@ public Rectangle2D retrieveProjectBounds() { /** * Get the deployments currently deployed to a particular server and router combination. */ - public static FindIterable retrieveDeploymentForServerAndRouterId(String server, String routerId) { + public static FindIterable retrieveDeploymentForServerAndRouterId(String serverId, String routerId) { return Persistence.deployments.getMongoCollection().find(and( - eq("deployedTo", server), + eq("deployedTo", serverId), eq("routerId", routerId) )); } diff --git a/src/main/java/com/conveyal/datatools/manager/models/EC2Info.java b/src/main/java/com/conveyal/datatools/manager/models/EC2Info.java new file mode 100644 index 000000000..fcbd37aa7 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/models/EC2Info.java @@ -0,0 +1,36 @@ +package com.conveyal.datatools.manager.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.io.Serializable; +import java.util.List; + +/** + * Contains the fields specific to starting up new EC2 servers for an ELB target group. If null, at least one internal + * URLs must be provided. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class EC2Info implements Serializable { + private static final long serialVersionUID = 1L; + /** Empty constructor for serialization. */ + public EC2Info () {} + /** + * The AWS-style instance type (e.g., t2.medium) to use for new EC2 machines. Defaults to + * {@link com.conveyal.datatools.manager.jobs.DeployJob#DEFAULT_INSTANCE_TYPE} if null during deployment. + */ + public String instanceType; + /** Number of instances to spin up and add to target group. If zero, defaults to 1. */ + public int instanceCount; + /** The subnet ID associated with the target group. */ + public String subnetId; + /** The security group ID associated with the target group. */ + public String securityGroupId; + /** The Amazon machine image (AMI) to be used for the OTP EC2 machines. */ + public String amiId; + /** The IAM instance profile ARN that the OTP EC2 server should assume. For example, arn:aws:iam::123456789012:instance-profile/otp-ec2-role */ + public String iamInstanceProfileArn; + /** The AWS key file (.pem) that should be used to set up OTP EC2 servers (gives a way for admins to SSH into machine). */ + public String keyName; + /** The target group to deploy new EC2 instances to. */ + public String targetGroupArn; +} \ No newline at end of file diff --git a/src/main/java/com/conveyal/datatools/manager/models/EC2InstanceSummary.java b/src/main/java/com/conveyal/datatools/manager/models/EC2InstanceSummary.java new file mode 100644 index 000000000..ff53ac5f1 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/models/EC2InstanceSummary.java @@ -0,0 +1,54 @@ +package com.conveyal.datatools.manager.models; + +import com.amazonaws.services.ec2.model.Instance; +import com.amazonaws.services.ec2.model.InstanceState; +import com.amazonaws.services.ec2.model.Tag; + +import java.io.Serializable; +import java.util.Date; +import java.util.List; + +/** + * Summarizes information derived from an EC2 instance for consumption by a user interface. + */ +public class EC2InstanceSummary implements Serializable { + private static final long serialVersionUID = 1L; + public String privateIpAddress; + public String publicIpAddress; + public String publicDnsName; + public String instanceType; + public String instanceId; + public String imageId; + public String projectId; + public String jobId; + public String deploymentId; + public String name; + public InstanceState state; + public String availabilityZone; + public Date launchTime; + public String stateTransitionReason; + + /** Empty constructor for serialization */ + public EC2InstanceSummary () { } + + public EC2InstanceSummary (Instance ec2Instance) { + publicIpAddress = ec2Instance.getPublicIpAddress(); + privateIpAddress = ec2Instance.getPrivateIpAddress(); + publicDnsName = ec2Instance.getPublicDnsName(); + instanceType = ec2Instance.getInstanceType(); + instanceId = ec2Instance.getInstanceId(); + imageId = ec2Instance.getImageId(); + List tags = ec2Instance.getTags(); + // Set project and deployment ID if they exist. + for (Tag tag : tags) { + if (tag.getKey().equals("projectId")) projectId = tag.getValue(); + if (tag.getKey().equals("deploymentId")) deploymentId = tag.getValue(); + if (tag.getKey().equals("jobId")) jobId = tag.getValue(); + if (tag.getKey().equals("Name")) name = tag.getValue(); + } + state = ec2Instance.getState(); + availabilityZone = ec2Instance.getPlacement().getAvailabilityZone(); + launchTime = ec2Instance.getLaunchTime(); + stateTransitionReason = ec2Instance.getStateTransitionReason(); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java b/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java index fa18e3aae..4c2148824 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java +++ b/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java @@ -9,6 +9,8 @@ import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.HashUtils; +import com.conveyal.gtfs.GTFS; +import com.conveyal.gtfs.validator.ValidationResult; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; @@ -537,10 +539,16 @@ public enum FeedRetrievalMethod { * * FIXME: Use a Mongo transaction to handle the deletion of these related objects. */ - public boolean delete() { + public void delete() { try { + // Remove all feed version records for this feed source retrieveFeedVersions().forEach(FeedVersion::delete); - + // Remove all snapshot records for this feed source + retrieveSnapshots().forEach(Snapshot::delete); + // Delete active editor buffer if exists. + if (this.editorNamespace != null) { + GTFS.delete(this.editorNamespace, DataManager.GTFS_DATA_SOURCE); + } // Delete latest copy of feed source on S3. if (DataManager.useS3) { DeleteObjectsRequest delete = new DeleteObjectsRequest(DataManager.feedBucket); @@ -554,10 +562,9 @@ public boolean delete() { // editor snapshots)? // Finally, delete the feed source mongo document. - return Persistence.feedSources.removeById(this.id); + Persistence.feedSources.removeById(this.id); } catch (Exception e) { LOG.error("Could not delete feed source", e); - return false; } } diff --git a/src/main/java/com/conveyal/datatools/manager/models/FeedVersion.java b/src/main/java/com/conveyal/datatools/manager/models/FeedVersion.java index 50f69f695..3405fc060 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/FeedVersion.java +++ b/src/main/java/com/conveyal/datatools/manager/models/FeedVersion.java @@ -397,6 +397,8 @@ public void delete() { Persistence.feedSources.update(fs.id, "{lastFetched:null}"); } feedStore.deleteFeed(id); + // Delete feed version tables in GTFS database + GTFS.delete(this.namespace, DataManager.GTFS_DATA_SOURCE); // Remove this FeedVersion from all Deployments associated with this FeedVersion's FeedSource's Project // TODO TEST THOROUGHLY THAT THIS UPDATE EXPRESSION IS CORRECT // Although outright deleting the feedVersion from deployments could be surprising and shouldn't be done anyway. diff --git a/src/main/java/com/conveyal/datatools/manager/models/Organization.java b/src/main/java/com/conveyal/datatools/manager/models/Organization.java index 381d7cf07..585f311d6 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Organization.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Organization.java @@ -30,8 +30,8 @@ public class Organization extends Model implements Serializable { public boolean active; public UsageTier usageTier; public Set extensions = new HashSet<>(); - public Date subscriptionBeginDate; - public Date subscriptionEndDate; + public long subscriptionBeginDate; + public long subscriptionEndDate; public Organization () {} diff --git a/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java b/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java index b9065ce4d..de50a0c6c 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java +++ b/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java @@ -1,27 +1,81 @@ package com.conveyal.datatools.manager.models; -import java.io.Serializable; +import com.amazonaws.services.ec2.model.Filter; +import com.amazonaws.services.ec2.model.Instance; +import com.conveyal.datatools.manager.controllers.api.DeploymentController; +import com.conveyal.datatools.manager.persistence.Persistence; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; import java.util.List; /** + * An OtpServer represents a deployment target for deploying transit and OSM data to. This can take the shape of a number + * of things: + * 1. Simply writing a data bundle to S3. + * 2. Deploying to an internal URL for a build graph over wire request. + * 3. Spinning up an EC2 instance to build the graph, write it to S3, and have a collection of instances start up, become + * part of an Elastic Load Balancer (ELB) target group, and download/read in the OTP graph. + * read in that graph. + * 4. Spinning up an EC2 instance to only build the OTP graph and write it to S3 (dependent on {@link Deployment#buildGraphOnly} + * value). + * * Created by landon on 5/20/16. */ -public class OtpServer implements Serializable { +@JsonIgnoreProperties(ignoreUnknown = true) +public class OtpServer extends Model { private static final long serialVersionUID = 1L; public String name; + /** URL to direct build graph over wire requests to (if not using ELB target group). */ public List internalUrl; + /** Optional project to associate this server with (server can also be made available to entire application). */ + public String projectId; + /** Contains all of the information needed to commission EC2 instances for an AWS Elastic Load Balancer (ELB) target group. */ + public EC2Info ec2Info; + /** + * URL location of the publicly-available user interface asssociated with either the {@link #internalUrl} or the + * load balancer/target group. + */ public String publicUrl; - public Boolean admin; + /** Whether deploying to this server is limited to admins only. */ + public boolean admin; + /** S3 bucket name to upload deployment artifacts to (e.g., Graph.obj and/or transit + OSM data). */ public String s3Bucket; - public String s3Credentials; + + /** Empty constructor for serialization. */ + public OtpServer () {} + + /** The EC2 instances that are associated with this serverId. */ + @JsonProperty("ec2Instances") + public List retrieveEC2InstanceSummaries() { + // Prevent calling EC2 method on servers that do not have EC2 info defined because this is a JSON property. + if (ec2Info == null) return Collections.EMPTY_LIST; + Filter serverFilter = new Filter("tag:serverId", Collections.singletonList(id)); + return DeploymentController.fetchEC2InstanceSummaries(serverFilter); + } + + public List retrieveEC2Instances() { + Filter serverFilter = new Filter("tag:serverId", Collections.singletonList(id)); + return DeploymentController.fetchEC2Instances(serverFilter); + } + + @JsonProperty("organizationId") + public String organizationId() { + Project project = parentProject(); + return project == null ? null : project.organizationId; + } + + public Project parentProject() { + return Persistence.projects.getById(projectId); + } /** - * Convert the name field into a string with no special characters. + * Nothing fancy here. Just delete the Mongo record. * - * FIXME: This is currently used to keep track of which deployments have been deployed to which servers (it is used - * for the {@link Deployment#deployedTo} field), but we should likely. + * TODO should this also check refs in deployments? */ - public String target() { - return name != null ? name.replaceAll("[^a-zA-Z0-9]", "_") : null; + public void delete () { + Persistence.servers.removeById(this.id); } } diff --git a/src/main/java/com/conveyal/datatools/manager/models/Project.java b/src/main/java/com/conveyal/datatools/manager/models/Project.java index e3b849db1..203fb004f 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Project.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Project.java @@ -2,6 +2,7 @@ import com.conveyal.datatools.manager.persistence.Persistence; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,6 +11,7 @@ import java.util.stream.Collectors; import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.or; /** * Represents a collection of feed sources that can be made into a deployment. @@ -34,23 +36,19 @@ public class Project extends Model { public OtpRouterConfig routerConfig; - public Collection otpServers; - public String organizationId; /** - * Locate and return an OTP server contained within the project that matches the name argument. + * A list of servers that are available to deploy project feeds/OSM to. This includes servers assigned to this + * project as well as those that belong to no project. + * @return */ - public OtpServer retrieveServer(String name) { - if (name == null) return null; - for (OtpServer otpServer : otpServers) { - if (otpServer.name == null) continue; - if (name.equals(otpServer.name) || name.equals(otpServer.target())) { - return otpServer; - } - } - LOG.warn("Could not find OTP server with name {}", name); - return null; + @JsonProperty("otpServers") + public List availableOtpServers() { + return Persistence.servers.getFiltered(or( + eq("projectId", this.id), + eq("projectId", null) + )); } public String defaultTimeZone; @@ -92,9 +90,7 @@ public Collection retrieveProjectFeedSources() { * Get all the deployments for this project. */ public Collection retrieveDeployments() { - List deployments = Persistence.deployments - .getFiltered(eq("projectId", this.id)); - return deployments; + return Persistence.deployments.getFiltered(eq("projectId", this.id)); } // TODO: Does this need to be returned with JSON API response @@ -106,16 +102,13 @@ public Organization retrieveOrganization() { } } - public boolean delete() { + public void delete() { // FIXME: Handle this in a Mongo transaction. See https://docs.mongodb.com/master/core/transactions/#transactions-and-mongodb-drivers -// ClientSession clientSession = Persistence.startSession(); -// clientSession.startTransaction(); - // Delete each feed source in the project (which in turn deletes each feed version). retrieveProjectFeedSources().forEach(FeedSource::delete); // Delete each deployment in the project. retrieveDeployments().forEach(Deployment::delete); // Finally, delete the project. - return Persistence.projects.removeById(this.id); + Persistence.projects.removeById(this.id); } } diff --git a/src/main/java/com/conveyal/datatools/manager/models/Snapshot.java b/src/main/java/com/conveyal/datatools/manager/models/Snapshot.java index e9401979c..365853eb7 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Snapshot.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Snapshot.java @@ -1,9 +1,16 @@ package com.conveyal.datatools.manager.models; +import com.conveyal.datatools.manager.DataManager; +import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.gtfs.GTFS; import com.conveyal.gtfs.loader.FeedLoadResult; +import com.conveyal.gtfs.util.InvalidNamespaceException; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonAlias; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.sql.SQLException; import java.util.Date; /** @@ -15,6 +22,7 @@ public class Snapshot extends Model { public static final long serialVersionUID = 1L; public static final String FEED_SOURCE_REF = "feedSourceId"; + private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class); /** Is this snapshot the current snapshot - the most recently created or restored (i.e. the most current view of what's in master */ public boolean current; @@ -70,6 +78,17 @@ public Snapshot(String feedSourceId, String snapshotOf) { generateName(); } + public void delete () { + try { + // Delete snapshot tables in GTFS database + GTFS.delete(this.namespace, DataManager.GTFS_DATA_SOURCE); + // If SQL delete is successful, delete Mongo record. + Persistence.snapshots.removeById(this.id); + } catch (InvalidNamespaceException | SQLException e) { + LOG.error("Could not delete snapshot", e); + } + } + public void generateName() { this.name = "New snapshot " + new Date().toString(); } diff --git a/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java b/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java index f5dfe21de..08ba90d0a 100644 --- a/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java +++ b/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java @@ -158,7 +158,7 @@ public Long getFeedSize (String id) { } } - private static AWSCredentialsProvider getAWSCreds () { + public static AWSCredentialsProvider getAWSCreds () { if (S3_CREDENTIALS_FILENAME != null) { return new ProfileCredentialsProvider(S3_CREDENTIALS_FILENAME, "default"); } else { diff --git a/src/main/java/com/conveyal/datatools/manager/persistence/Persistence.java b/src/main/java/com/conveyal/datatools/manager/persistence/Persistence.java index 9359340d3..66a266192 100644 --- a/src/main/java/com/conveyal/datatools/manager/persistence/Persistence.java +++ b/src/main/java/com/conveyal/datatools/manager/persistence/Persistence.java @@ -11,6 +11,7 @@ import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.Note; import com.conveyal.datatools.manager.models.Organization; +import com.conveyal.datatools.manager.models.OtpServer; import com.conveyal.datatools.manager.models.Project; import com.conveyal.datatools.manager.models.Snapshot; import com.mongodb.MongoClient; @@ -47,12 +48,14 @@ public class Persistence { public static TypedPersistence notes; public static TypedPersistence organizations; public static TypedPersistence externalFeedSourceProperties; - public static TypedPersistence tokens; + public static TypedPersistence servers; public static TypedPersistence snapshots; + public static TypedPersistence tokens; public static void initialize () { PojoCodecProvider pojoCodecProvider = PojoCodecProvider.builder() + .register("com.conveyal.datatools.manager.jobs") .register("com.conveyal.datatools.manager.models") .register("com.conveyal.gtfs.loader") .register("com.conveyal.gtfs.validator") @@ -90,8 +93,9 @@ public static void initialize () { notes = new TypedPersistence(mongoDatabase, Note.class); organizations = new TypedPersistence(mongoDatabase, Organization.class); externalFeedSourceProperties = new TypedPersistence(mongoDatabase, ExternalFeedSourceProperty.class); - tokens = new TypedPersistence(mongoDatabase, FeedDownloadToken.class); + servers = new TypedPersistence(mongoDatabase, OtpServer.class); snapshots = new TypedPersistence(mongoDatabase, Snapshot.class); + tokens = new TypedPersistence(mongoDatabase, FeedDownloadToken.class); // TODO: Set up indexes on feed versions by feedSourceId, version #? deployments, feedSources by projectId. // deployments.getMongoCollection().createIndex(Indexes.descending("projectId")); diff --git a/src/main/java/com/conveyal/datatools/manager/persistence/TypedPersistence.java b/src/main/java/com/conveyal/datatools/manager/persistence/TypedPersistence.java index 2b1231322..8d02bbc6d 100644 --- a/src/main/java/com/conveyal/datatools/manager/persistence/TypedPersistence.java +++ b/src/main/java/com/conveyal/datatools/manager/persistence/TypedPersistence.java @@ -141,7 +141,7 @@ public List getAll () { * We should really have a bit more abstraction here. */ public List getFiltered (Bson filter) { - return mongoCollection.find(filter).into(new ArrayList()); + return mongoCollection.find(filter).into(new ArrayList<>()); } /** diff --git a/src/test/java/com/conveyal/datatools/DatatoolsTest.java b/src/test/java/com/conveyal/datatools/DatatoolsTest.java index 0f39ff761..7ce63b3fe 100644 --- a/src/test/java/com/conveyal/datatools/DatatoolsTest.java +++ b/src/test/java/com/conveyal/datatools/DatatoolsTest.java @@ -43,7 +43,7 @@ public static void setUp() throws RuntimeException, IOException { // Travis CI, these files will automatically be setup. String[] args = getBooleanEnvVar("RUN_E2E") ? new String[] { "configurations/default/env.yml", "configurations/default/server.yml" } - : new String[] { "configurations/default/env.yml.tmp", "configurations/default/server.yml.tmp" }; + : new String[] { "configurations/test/env.yml.tmp", "configurations/test/server.yml.tmp" }; // fail this test and others if the above files do not exist for (String arg : args) { diff --git a/src/test/java/com/conveyal/datatools/editor/ProcessGtfsSnapshotMergeTest.java b/src/test/java/com/conveyal/datatools/editor/ProcessGtfsSnapshotMergeTest.java deleted file mode 100644 index 7038ccdc1..000000000 --- a/src/test/java/com/conveyal/datatools/editor/ProcessGtfsSnapshotMergeTest.java +++ /dev/null @@ -1,177 +0,0 @@ -package com.conveyal.datatools.editor; - -import com.conveyal.datatools.UnitTest; -import com.conveyal.datatools.editor.jobs.ProcessGtfsSnapshotMerge; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by landon on 2/24/17. - */ -public class ProcessGtfsSnapshotMergeTest extends UnitTest { - private static final Logger LOG = LoggerFactory.getLogger(ProcessGtfsSnapshotMergeTest.class); - static ProcessGtfsSnapshotMerge snapshotMerge; - private static boolean setUpIsDone = false; - - // TODO: add back in test once editor load is working -// @Before -// public void setUp() { -// if (setUpIsDone) { -// return; -// } -// super.setUp(); -// LOG.info("ProcessGtfsSnapshotMergeTest setup"); -// -// snapshotMerge = new ProcessGtfsSnapshotMerge(super.version, "test@conveyal.com"); -// snapshotMerge.run(); -// setUpIsDone = true; -// } -// -// @Test -// public void countRoutes() { -// FeedTx feedTx = VersionedDataStore.getFeedTx(source.id); -// assertEquals(feedTx.routes.size(), 3); -// } -// -// @Test -// public void countStops() { -// FeedTx feedTx = VersionedDataStore.getFeedTx(source.id); -// assertEquals(feedTx.stops.size(), 31); -// } -// -// @Test -// public void countTrips() { -// FeedTx feedTx = VersionedDataStore.getFeedTx(source.id); -// assertEquals(feedTx.trips.size(), 252); -// } -// -// @Test -// public void countFares() { -// FeedTx feedTx = VersionedDataStore.getFeedTx(source.id); -// assertEquals(feedTx.fares.size(), 6); -// } - -// @Test -// public void duplicateStops() { -// ValidationResult result = new ValidationResult(); -// -// result = gtfsValidation1.duplicateStops(); -// Assert.assertEquals(result.invalidValues.size(), 0); -// -// -// // try duplicate stop test to confirm that stops within the buffer limit are found -// result = gtfsValidation1.duplicateStops(25.0); -// Assert.assertEquals(result.invalidValues.size(), 1); -// -// // try same test to confirm that buffers below the limit don't detect duplicates -// result = gtfsValidation1.duplicateStops(5.0); -// Assert.assertEquals(result.invalidValues.size(), 0); -// } -// -// @Test -// public void reversedTripShapes() { -// -// ValidationResult result = gtfsValidation1.listReversedTripShapes(); -// -// Assert.assertEquals(result.invalidValues.size(), 1); -// -// // try again with an unusually high distanceMultiplier value -// result = gtfsValidation1.listReversedTripShapes(50000.0); -// -// Assert.assertEquals(result.invalidValues.size(), 0); -// -// } -// -// -// @Test -// public void validateTrips() { -// ValidationResult result = gtfsValidation2.validateTrips(); -// -// Assert.assertEquals(result.invalidValues.size(), 9); -// -// } -// -// @Test -// public void completeBadGtfsTest() { -// -// GtfsDaoImpl gtfsStore = new GtfsDaoImpl(); -// -// GtfsReader gtfsReader = new GtfsReader(); -// -// File gtfsFile = new File("src/test/resources/st_gtfs_bad.zip"); -// -// try { -// -// gtfsReader.setInputLocation(gtfsFile); -// -// } catch (IOException e) { -// e.printStackTrace(); -// } -// -// gtfsReader.setEntityStore(gtfsStore); -// -// -// try { -// gtfsReader.run(); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// try { -// GtfsValidationService gtfsValidation = new GtfsValidationService(gtfsStore); -// -// ValidationResult results = gtfsValidation.validateRoutes(); -// results.add(gtfsValidation.validateTrips()); -// -// Assert.assertEquals(results.invalidValues.size(), 5); -// -// System.out.println(results.invalidValues.size()); -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// } -// -// @Test -// public void completeGoodGtfsTest() { -// -// GtfsDaoImpl gtfsStore = new GtfsDaoImpl(); -// -// GtfsReader gtfsReader = new GtfsReader(); -// -// File gtfsFile = new File("src/test/resources/st_gtfs_good.zip"); -// -// try { -// -// gtfsReader.setInputLocation(gtfsFile); -// -// } catch (IOException e) { -// e.printStackTrace(); -// } -// -// gtfsReader.setEntityStore(gtfsStore); -// -// -// try { -// gtfsReader.run(); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// try { -// GtfsValidationService gtfsValidation = new GtfsValidationService(gtfsStore); -// -// ValidationResult results = gtfsValidation.validateRoutes(); -// results.add(gtfsValidation.validateTrips()); -// -// Assert.assertEquals(results.invalidValues.size(), 0); -// -// System.out.println(results.invalidValues.size()); -// -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// } -}