001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.scheme.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.IOException;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.scheme.Scheme;
028    import cascading.scheme.SinkCall;
029    import cascading.scheme.SourceCall;
030    import cascading.tap.Tap;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.TupleEntry;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.OutputCollector;
036    import org.apache.hadoop.mapred.RecordReader;
037    import org.apache.hadoop.mapred.SequenceFileInputFormat;
038    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
039    
040    /**
041     * A SequenceFile is a type of {@link cascading.scheme.Scheme}, which is a flat file consisting of
042     * binary key/value pairs. This is a space and time efficient means to store data.
043     */
044    public class SequenceFile extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Void>
045      {
046      /** Protected for use by TempDfs and other subclasses. Not for general consumption. */
047      protected SequenceFile()
048        {
049        super( Fields.UNKNOWN, Fields.ALL );
050        }
051    
052      /**
053       * Creates a new SequenceFile instance that stores the given field names.
054       *
055       * @param fields
056       */
057      @ConstructorProperties({"fields"})
058      public SequenceFile( Fields fields )
059        {
060        super( fields, fields );
061        }
062    
063      @Override
064      public void sourceConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
065        {
066        conf.setInputFormat( SequenceFileInputFormat.class );
067        }
068    
069      @Override
070      public void sinkConfInit( FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf )
071        {
072        conf.setOutputKeyClass( Tuple.class ); // supports TapCollector
073        conf.setOutputValueClass( Tuple.class ); // supports TapCollector
074        conf.setOutputFormat( SequenceFileOutputFormat.class );
075        }
076    
077      @Override
078      public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
079        {
080        Object[] pair = new Object[]{
081          sourceCall.getInput().createKey(),
082          sourceCall.getInput().createValue()
083        };
084    
085        sourceCall.setContext( pair );
086        }
087    
088      @Override
089      public boolean source( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall ) throws IOException
090        {
091        Tuple key = (Tuple) sourceCall.getContext()[ 0 ];
092        Tuple value = (Tuple) sourceCall.getContext()[ 1 ];
093        boolean result = sourceCall.getInput().next( key, value );
094    
095        if( !result )
096          return false;
097    
098        TupleEntry entry = sourceCall.getIncomingEntry();
099    
100        if( entry.hasTypes() )
101          entry.setCanonicalTuple( value );
102        else
103          entry.setTuple( value );
104    
105        return true;
106        }
107    
108      @Override
109      public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall )
110        {
111        sourceCall.setContext( null );
112        }
113    
114      @Override
115      public void sink( FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall ) throws IOException
116        {
117        sinkCall.getOutput().collect( Tuple.NULL, sinkCall.getOutgoingEntry().getTuple() );
118        }
119      }